You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2021/02/21 02:58:01 UTC

[spark] branch branch-3.1 updated: [SPARK-20977][CORE] Use a non-final field for the state of CollectionAccumulator

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 093d4e2  [SPARK-20977][CORE] Use a non-final field for the state of CollectionAccumulator
093d4e2 is described below

commit 093d4e275ed7b39057020e60ac1daa8e89fb9ebe
Author: Gera Shegalov <ge...@apache.org>
AuthorDate: Sat Feb 20 20:57:14 2021 -0600

    [SPARK-20977][CORE] Use a non-final field for the state of CollectionAccumulator
    
    This PR is a fix for the JLS 17.5.3 violation identified in
    zsxwing's [19/Feb/19 11:47 comment](https://issues.apache.org/jira/browse/SPARK-20977?focusedCommentId=16772277&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16772277) on the JIRA.
    
    ### What changes were proposed in this pull request?
    - Use a var field to hold the state of the collection accumulator
    
    ### Why are the changes needed?
    AccumulatorV2 auto-registration of accumulator during readObject doesn't work with final fields that are post-processed outside readObject. As it stands incompletely initialized objects are published to heartbeat thread. This leads to sporadic exceptions knocking out executors which increases the cost of the jobs. We observe such failures on a regular basis https://github.com/NVIDIA/spark-rapids/issues/1522.
    
    ### Does this PR introduce _any_ user-facing change?
    None
    
    ### How was this patch tested?
    - this is a concurrency bug that is almost impossible to reproduce as a quick unit test.
    - By trial and error I crafted a command https://github.com/NVIDIA/spark-rapids/pull/1688 that reproduces the issue on my dev box several times per hour, with the first occurrence often within a few minutes. After the patch, these Exceptions have not shown up after running overnight for 10+ hours
    - existing unit tests in *`AccumulatorV2Suite` and *`LiveEntitySuite`
    
    Closes #31540 from gerashegalov/SPARK-20977.
    
    Authored-by: Gera Shegalov <ge...@apache.org>
    Signed-off-by: Sean Owen <sr...@gmail.com>
    (cherry picked from commit fadd0f5d9bff79cbd785631aa2962b9eda644ab8)
    Signed-off-by: Sean Owen <sr...@gmail.com>
---
 .../org/apache/spark/util/AccumulatorV2.scala      | 33 +++++++++++++---------
 1 file changed, 20 insertions(+), 13 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index d5b3ce3..1453840 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -19,7 +19,7 @@ package org.apache.spark.util
 
 import java.{lang => jl}
 import java.io.ObjectInputStream
-import java.util.{ArrayList, Collections}
+import java.util.ArrayList
 import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicLong
 
@@ -449,39 +449,46 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] {
  * @since 2.0.0
  */
 class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
-  private val _list: java.util.List[T] = Collections.synchronizedList(new ArrayList[T]())
+  private var _list: java.util.List[T] = _
+
+  private def getOrCreate = {
+    _list = Option(_list).getOrElse(new java.util.ArrayList[T]())
+    _list
+  }
 
   /**
    * Returns false if this accumulator instance has any values in it.
    */
-  override def isZero: Boolean = _list.isEmpty
+  override def isZero: Boolean = this.synchronized(getOrCreate.isEmpty)
 
   override def copyAndReset(): CollectionAccumulator[T] = new CollectionAccumulator
 
   override def copy(): CollectionAccumulator[T] = {
     val newAcc = new CollectionAccumulator[T]
-    _list.synchronized {
-      newAcc._list.addAll(_list)
+    this.synchronized {
+      newAcc.getOrCreate.addAll(getOrCreate)
     }
     newAcc
   }
 
-  override def reset(): Unit = _list.clear()
+  override def reset(): Unit = this.synchronized {
+    _list = null
+  }
 
-  override def add(v: T): Unit = _list.add(v)
+  override def add(v: T): Unit = this.synchronized(getOrCreate.add(v))
 
   override def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit = other match {
-    case o: CollectionAccumulator[T] => _list.addAll(o.value)
+    case o: CollectionAccumulator[T] => this.synchronized(getOrCreate.addAll(o.value))
     case _ => throw new UnsupportedOperationException(
       s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
   }
 
-  override def value: java.util.List[T] = _list.synchronized {
-    java.util.Collections.unmodifiableList(new ArrayList[T](_list))
+  override def value: java.util.List[T] = this.synchronized {
+    java.util.Collections.unmodifiableList(new ArrayList[T](getOrCreate))
   }
 
-  private[spark] def setValue(newValue: java.util.List[T]): Unit = {
-    _list.clear()
-    _list.addAll(newValue)
+  private[spark] def setValue(newValue: java.util.List[T]): Unit = this.synchronized {
+    _list = null
+    getOrCreate.addAll(newValue)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org