You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Wenchen Fan (Jira)" <ji...@apache.org> on 2020/02/17 12:19:00 UTC
[jira] [Assigned] (SPARK-30346) Improve logging when events dropped
[ https://issues.apache.org/jira/browse/SPARK-30346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wenchen Fan reassigned SPARK-30346:
-----------------------------------
Assignee: liupengcheng
> Improve logging when events dropped
> -----------------------------------
>
> Key: SPARK-30346
> URL: https://issues.apache.org/jira/browse/SPARK-30346
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core
> Affects Versions: 2.1.0, 2.3.2, 3.0.0
> Reporter: liupengcheng
> Assignee: liupengcheng
> Priority: Major
>
> Currently, spark will logging events dropped count info every 60s when events dropped, however, we notice that this not working as expected in our production environment.
> We looked into the code, and find out that the concurrent update of `droppedEventsCounter` may cause the logging logic skipped and delayed a very long time.
>
>
> {code:java}
> def post(event: SparkListenerEvent): Unit = {
> if (stopped.get()) {
> return
> }
> eventCount.incrementAndGet()
> if (eventQueue.offer(event)) {
> return
> }
> eventCount.decrementAndGet()
> droppedEvents.inc()
> droppedEventsCounter.incrementAndGet()
> if (logDroppedEvent.compareAndSet(false, true)) {
> // Only log the following message once to avoid duplicated annoying logs.
> logError(s"Dropping event from queue $name. " +
> "This likely means one of the listeners is too slow and cannot keep up with " +
> "the rate at which tasks are being started by the scheduler.")
> }
> logTrace(s"Dropping event $event")
> val droppedCount = droppedEventsCounter.get
> if (droppedCount > 0) {
> // Don't log too frequently
> if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
> // There may be multiple threads trying to decrease droppedEventsCounter.
> // Use "compareAndSet" to make sure only one thread can win.
> // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
> // then that thread will update it.
> if (droppedEventsCounter.compareAndSet(droppedCount, 0)) {
> val prevLastReportTimestamp = lastReportTimestamp
> lastReportTimestamp = System.currentTimeMillis()
> val previous = new java.util.Date(prevLastReportTimestamp)
> logWarning(s"Dropped $droppedCount events from $name since " +
> s"${if (prevLastReportTimestamp == 0) "the application started" else s"$previous"}.")
> }
> }
> }
> }
> {code}
> What's more, I think we can improve this logic here to also logging the thread dump of dispatcher thread, which can do great help to debugging performance issues may cause the events dropped.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org