You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/09/26 07:56:47 UTC

git commit: [SPARK-3686][STREAMING] Wait for sink to commit the channel before check...

Repository: spark
Updated Branches:
  refs/heads/master 86bce7649 -> b235e0136


[SPARK-3686][STREAMING] Wait for sink to commit the channel before check...

...ing for the channel size.

Author: Hari Shreedharan <hs...@apache.org>

Closes #2531 from harishreedharan/sparksinksuite-fix and squashes the following commits:

30393c1 [Hari Shreedharan] Use more deterministic method to figure out when batches come in.
6ce9d8b [Hari Shreedharan] [SPARK-3686][STREAMING] Wait for sink to commit the channel before checking for the channel size.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b235e013
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b235e013
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b235e013

Branch: refs/heads/master
Commit: b235e013638685758885842dc3268e9800af3678
Parents: 86bce76
Author: Hari Shreedharan <hs...@apache.org>
Authored: Thu Sep 25 22:56:43 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Sep 25 22:56:43 2014 -0700

----------------------------------------------------------------------
 .../flume/sink/SparkAvroCallbackHandler.scala   | 14 ++++++++++++-
 .../spark/streaming/flume/sink/SparkSink.scala  | 10 +++++++++
 .../flume/sink/TransactionProcessor.scala       | 12 +++++++++++
 .../streaming/flume/sink/SparkSinkSuite.scala   | 22 ++++++++++++--------
 4 files changed, 48 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b235e013/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
index e77cf7b..3c656a3 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.streaming.flume.sink
 
-import java.util.concurrent.{ConcurrentHashMap, Executors}
+import java.util.concurrent.{CountDownLatch, ConcurrentHashMap, Executors}
 import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.JavaConversions._
@@ -58,8 +58,12 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
   private val seqBase = RandomStringUtils.randomAlphanumeric(8)
   private val seqCounter = new AtomicLong(0)
 
+
   @volatile private var stopped = false
 
+  @volatile private var isTest = false
+  private var testLatch: CountDownLatch = null
+
   /**
    * Returns a bunch of events to Spark over Avro RPC.
    * @param n Maximum number of events to return in a batch
@@ -90,6 +94,9 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
         val processor = new TransactionProcessor(
           channel, seq, n, transactionTimeout, backOffInterval, this)
         sequenceNumberToProcessor.put(seq, processor)
+        if (isTest) {
+          processor.countDownWhenBatchAcked(testLatch)
+        }
         Some(processor)
       } else {
         None
@@ -141,6 +148,11 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
     }
   }
 
+  private[sink] def countDownWhenBatchAcked(latch: CountDownLatch) {
+    testLatch = latch
+    isTest = true
+  }
+
   /**
    * Shuts down the executor used to process transactions.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/b235e013/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
index 98ae7d7..14dffb1 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
@@ -138,6 +138,16 @@ class SparkSink extends AbstractSink with Logging with Configurable {
         throw new RuntimeException("Server was not started!")
       )
   }
+
+  /**
+   * Pass in a [[CountDownLatch]] for testing purposes. This batch is counted down when each
+   * batch is received. The test can simply call await on this latch till the expected number of
+   * batches are received.
+   * @param latch
+   */
+  private[flume] def countdownWhenBatchReceived(latch: CountDownLatch) {
+    handler.foreach(_.countDownWhenBatchAcked(latch))
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/b235e013/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
index 13f3aa9..ea45b14 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
@@ -62,6 +62,10 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
 
   @volatile private var stopped = false
 
+  @volatile private var isTest = false
+
+  private var testLatch: CountDownLatch = null
+
   // The transaction that this processor would handle
   var txOpt: Option[Transaction] = None
 
@@ -182,6 +186,9 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
             rollbackAndClose(tx, close = false) // tx will be closed later anyway
         } finally {
           tx.close()
+          if (isTest) {
+            testLatch.countDown()
+          }
         }
       } else {
         logWarning("Spark could not commit transaction, NACK received. Rolling back transaction.")
@@ -237,4 +244,9 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
     processAckOrNack()
     null
   }
+
+  private[sink] def countDownWhenBatchAcked(latch: CountDownLatch) {
+    testLatch = latch
+    isTest = true
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b235e013/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
index 75a6668..a2b2cc6 100644
--- a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
+++ b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
@@ -38,7 +38,7 @@ class SparkSinkSuite extends FunSuite {
   val channelCapacity = 5000
 
   test("Success with ack") {
-    val (channel, sink) = initializeChannelAndSink()
+    val (channel, sink, latch) = initializeChannelAndSink()
     channel.start()
     sink.start()
 
@@ -51,6 +51,7 @@ class SparkSinkSuite extends FunSuite {
     val events = client.getEventBatch(1000)
     client.ack(events.getSequenceNumber)
     assert(events.getEvents.size() === 1000)
+    latch.await(1, TimeUnit.SECONDS)
     assertChannelIsEmpty(channel)
     sink.stop()
     channel.stop()
@@ -58,7 +59,7 @@ class SparkSinkSuite extends FunSuite {
   }
 
   test("Failure with nack") {
-    val (channel, sink) = initializeChannelAndSink()
+    val (channel, sink, latch) = initializeChannelAndSink()
     channel.start()
     sink.start()
     putEvents(channel, eventsPerBatch)
@@ -70,6 +71,7 @@ class SparkSinkSuite extends FunSuite {
     val events = client.getEventBatch(1000)
     assert(events.getEvents.size() === 1000)
     client.nack(events.getSequenceNumber)
+    latch.await(1, TimeUnit.SECONDS)
     assert(availableChannelSlots(channel) === 4000)
     sink.stop()
     channel.stop()
@@ -77,7 +79,7 @@ class SparkSinkSuite extends FunSuite {
   }
 
   test("Failure with timeout") {
-    val (channel, sink) = initializeChannelAndSink(Map(SparkSinkConfig
+    val (channel, sink, latch) = initializeChannelAndSink(Map(SparkSinkConfig
       .CONF_TRANSACTION_TIMEOUT -> 1.toString))
     channel.start()
     sink.start()
@@ -88,7 +90,7 @@ class SparkSinkSuite extends FunSuite {
     val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
     val events = client.getEventBatch(1000)
     assert(events.getEvents.size() === 1000)
-    Thread.sleep(1000)
+    latch.await(1, TimeUnit.SECONDS)
     assert(availableChannelSlots(channel) === 4000)
     sink.stop()
     channel.stop()
@@ -106,7 +108,7 @@ class SparkSinkSuite extends FunSuite {
   def testMultipleConsumers(failSome: Boolean): Unit = {
     implicit val executorContext = ExecutionContext
       .fromExecutorService(Executors.newFixedThreadPool(5))
-    val (channel, sink) = initializeChannelAndSink()
+    val (channel, sink, latch) = initializeChannelAndSink(Map.empty, 5)
     channel.start()
     sink.start()
     (1 to 5).foreach(_ => putEvents(channel, eventsPerBatch))
@@ -136,7 +138,7 @@ class SparkSinkSuite extends FunSuite {
       }
     })
     batchCounter.await()
-    TimeUnit.SECONDS.sleep(1) // Allow the sink to commit the transactions.
+    latch.await(1, TimeUnit.SECONDS)
     executorContext.shutdown()
     if(failSome) {
       assert(availableChannelSlots(channel) === 3000)
@@ -148,8 +150,8 @@ class SparkSinkSuite extends FunSuite {
     transceiversAndClients.foreach(x => x._1.close())
   }
 
-  private def initializeChannelAndSink(overrides: Map[String, String] = Map.empty): (MemoryChannel,
-    SparkSink) = {
+  private def initializeChannelAndSink(overrides: Map[String, String] = Map.empty,
+    batchCounter: Int = 1): (MemoryChannel, SparkSink, CountDownLatch) = {
     val channel = new MemoryChannel()
     val channelContext = new Context()
 
@@ -165,7 +167,9 @@ class SparkSinkSuite extends FunSuite {
     sinkContext.put(SparkSinkConfig.CONF_PORT, 0.toString)
     sink.configure(sinkContext)
     sink.setChannel(channel)
-    (channel, sink)
+    val latch = new CountDownLatch(batchCounter)
+    sink.countdownWhenBatchReceived(latch)
+    (channel, sink, latch)
   }
 
   private def putEvents(ch: MemoryChannel, count: Int): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org