You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ad...@apache.org on 2014/12/09 08:54:27 UTC

spark git commit: [SPARK-3154][STREAMING] Replace ConcurrentHashMap with mutable.HashMap and remove @volatile from 'stopped'

Repository: spark
Updated Branches:
  refs/heads/master 51b1fe142 -> bcb5cdad6


[SPARK-3154][STREAMING] Replace ConcurrentHashMap with mutable.HashMap and remove @volatile from 'stopped'

Since `sequenceNumberToProcessor` and `stopped` are both protected by the lock `sequenceNumberToProcessor`, `ConcurrentHashMap` and `volatile` is unnecessary. So this PR updated them accordingly.

Author: zsxwing <zs...@gmail.com>

Closes #3634 from zsxwing/SPARK-3154 and squashes the following commits:

0d087ac [zsxwing] Replace ConcurrentHashMap with mutable.HashMap and remove @volatile from 'stopped'


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bcb5cdad
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bcb5cdad
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bcb5cdad

Branch: refs/heads/master
Commit: bcb5cdad614d4fce43725dfec3ce88172d2f8c11
Parents: 51b1fe1
Author: zsxwing <zs...@gmail.com>
Authored: Mon Dec 8 23:54:15 2014 -0800
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Mon Dec 8 23:54:15 2014 -0800

----------------------------------------------------------------------
 .../flume/sink/SparkAvroCallbackHandler.scala   | 23 ++++++++++----------
 1 file changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bcb5cdad/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
index 3c656a3..4373be4 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
@@ -16,10 +16,10 @@
  */
 package org.apache.spark.streaming.flume.sink
 
-import java.util.concurrent.{CountDownLatch, ConcurrentHashMap, Executors}
+import java.util.concurrent.{CountDownLatch, Executors}
 import java.util.concurrent.atomic.AtomicLong
 
-import scala.collection.JavaConversions._
+import scala.collection.mutable
 
 import org.apache.flume.Channel
 import org.apache.commons.lang.RandomStringUtils
@@ -47,8 +47,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
   val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
     new ThreadFactoryBuilder().setDaemon(true)
       .setNameFormat("Spark Sink Processor Thread - %d").build()))
-  private val sequenceNumberToProcessor =
-    new ConcurrentHashMap[CharSequence, TransactionProcessor]()
+  // Protected by `sequenceNumberToProcessor`
+  private val sequenceNumberToProcessor = mutable.HashMap[CharSequence, TransactionProcessor]()
   // This sink will not persist sequence numbers and reuses them if it gets restarted.
   // So it is possible to commit a transaction which may have been meant for the sink before the
   // restart.
@@ -58,8 +58,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
   private val seqBase = RandomStringUtils.randomAlphanumeric(8)
   private val seqCounter = new AtomicLong(0)
 
-
-  @volatile private var stopped = false
+  // Protected by `sequenceNumberToProcessor`
+  private var stopped = false
 
   @volatile private var isTest = false
   private var testLatch: CountDownLatch = null
@@ -131,7 +131,7 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
    * @param success Whether the batch was successful or not.
    */
   private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) {
-    Option(removeAndGetProcessor(sequenceNumber)).foreach(processor => {
+    removeAndGetProcessor(sequenceNumber).foreach(processor => {
       processor.batchProcessed(success)
     })
   }
@@ -139,10 +139,11 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
   /**
    * Helper method to remove the TxnProcessor for a Sequence Number. Can be used to avoid a leak.
    * @param sequenceNumber
-   * @return The transaction processor for the corresponding batch. Note that this instance is no
-   *         longer tracked and the caller is responsible for that txn processor.
+   * @return An `Option` of the transaction processor for the corresponding batch. Note that this
+   *         instance is no longer tracked and the caller is responsible for that txn processor.
    */
-  private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence): TransactionProcessor = {
+  private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence):
+      Option[TransactionProcessor] = {
     sequenceNumberToProcessor.synchronized {
       sequenceNumberToProcessor.remove(sequenceNumber.toString)
     }
@@ -160,7 +161,7 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
     logInfo("Shutting down Spark Avro Callback Handler")
     sequenceNumberToProcessor.synchronized {
       stopped = true
-      sequenceNumberToProcessor.values().foreach(_.shutdown())
+      sequenceNumberToProcessor.values.foreach(_.shutdown())
     }
     transactionExecutorOpt.foreach(_.shutdownNow())
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org