You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/05/14 01:43:33 UTC

spark git commit: [SPARK-7356] [STREAMING] Fix flakey tests in FlumePollingStreamSuite using SparkSink's batch CountDownLatch.

Repository: spark
Updated Branches:
  refs/heads/master bb6dec3b1 -> 61d1e87c0


[SPARK-7356] [STREAMING] Fix flakey tests in FlumePollingStreamSuite using SparkSink's batch CountDownLatch.

This is meant to make the FlumePollingStreamSuite deterministic. Now we basically count the number of batches that have been completed - and then verify the results rather than sleeping for random periods of time.

Author: Hari Shreedharan <hs...@apache.org>

Closes #5918 from harishreedharan/flume-test-fix and squashes the following commits:

93f24f3 [Hari Shreedharan] Add an eventually block to ensure that all received data is processed. Refactor the dstream creation and remove redundant code.
1108804 [Hari Shreedharan] [SPARK-7356][STREAMING] Fix flakey tests in FlumePollingStreamSuite using SparkSink's batch CountDownLatch.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/61d1e87c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/61d1e87c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/61d1e87c

Branch: refs/heads/master
Commit: 61d1e87c0d3d12dac0b724d1b84436f748227e99
Parents: bb6dec3
Author: Hari Shreedharan <hs...@apache.org>
Authored: Wed May 13 16:43:30 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Wed May 13 16:43:30 2015 -0700

----------------------------------------------------------------------
 .../flume/FlumePollingStreamSuite.scala         | 110 +++++++++----------
 1 file changed, 51 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/61d1e87c/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index 43c1b86..93afe50 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -18,15 +18,18 @@
 package org.apache.spark.streaming.flume
 
 import java.net.InetSocketAddress
-import java.util.concurrent.{Callable, ExecutorCompletionService, Executors}
+import java.util.concurrent._
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import scala.concurrent.duration._
+import scala.language.postfixOps
 
 import org.apache.flume.Context
 import org.apache.flume.channel.MemoryChannel
 import org.apache.flume.conf.Configurables
 import org.apache.flume.event.EventBuilder
+import org.scalatest.concurrent.Eventually._
 
 import org.scalatest.{BeforeAndAfter, FunSuite}
 
@@ -57,11 +60,11 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
 
   before(beforeFunction())
 
-  ignore("flume polling test") {
+  test("flume polling test") {
     testMultipleTimes(testFlumePolling)
   }
 
-  ignore("flume polling test multiple hosts") {
+  test("flume polling test multiple hosts") {
     testMultipleTimes(testFlumePollingMultipleHost)
   }
 
@@ -100,18 +103,8 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
     Configurables.configure(sink, context)
     sink.setChannel(channel)
     sink.start()
-    // Set up the streaming context and input streams
-    val ssc = new StreamingContext(conf, batchDuration)
-    val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
-      FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", sink.getPort())),
-        StorageLevel.MEMORY_AND_DISK, eventsPerBatch, 1)
-    val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
-      with SynchronizedBuffer[Seq[SparkFlumeEvent]]
-    val outputStream = new TestOutputStream(flumeStream, outputBuffer)
-    outputStream.register()
-    ssc.start()
 
-    writeAndVerify(Seq(channel), ssc, outputBuffer)
+    writeAndVerify(Seq(sink), Seq(channel))
     assertChannelIsEmpty(channel)
     sink.stop()
     channel.stop()
@@ -142,10 +135,22 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
     Configurables.configure(sink2, context)
     sink2.setChannel(channel2)
     sink2.start()
+    try {
+      writeAndVerify(Seq(sink, sink2), Seq(channel, channel2))
+      assertChannelIsEmpty(channel)
+      assertChannelIsEmpty(channel2)
+    } finally {
+      sink.stop()
+      sink2.stop()
+      channel.stop()
+      channel2.stop()
+    }
+  }
 
+  def writeAndVerify(sinks: Seq[SparkSink], channels: Seq[MemoryChannel]) {
     // Set up the streaming context and input streams
     val ssc = new StreamingContext(conf, batchDuration)
-    val addresses = Seq(sink.getPort(), sink2.getPort()).map(new InetSocketAddress("localhost", _))
+    val addresses = sinks.map(sink => new InetSocketAddress("localhost", sink.getPort()))
     val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
       FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
         eventsPerBatch, 5)
@@ -155,61 +160,49 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
     outputStream.register()
 
     ssc.start()
-    try {
-      writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
-      assertChannelIsEmpty(channel)
-      assertChannelIsEmpty(channel2)
-    } finally {
-      sink.stop()
-      sink2.stop()
-      channel.stop()
-      channel2.stop()
-    }
-  }
-
-  def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext,
-    outputBuffer: ArrayBuffer[Seq[SparkFlumeEvent]]) {
     val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
     val executor = Executors.newCachedThreadPool()
     val executorCompletion = new ExecutorCompletionService[Void](executor)
-    channels.map(channel => {
+
+    val latch = new CountDownLatch(batchCount * channels.size)
+    sinks.foreach(_.countdownWhenBatchReceived(latch))
+
+    channels.foreach(channel => {
       executorCompletion.submit(new TxnSubmitter(channel, clock))
     })
+
     for (i <- 0 until channels.size) {
       executorCompletion.take()
     }
-    val startTime = System.currentTimeMillis()
-    while (outputBuffer.size < batchCount * channels.size &&
-      System.currentTimeMillis() - startTime < 15000) {
-      logInfo("output.size = " + outputBuffer.size)
-      Thread.sleep(100)
-    }
-    val timeTaken = System.currentTimeMillis() - startTime
-    assert(timeTaken < 15000, "Operation timed out after " + timeTaken + " ms")
-    logInfo("Stopping context")
-    ssc.stop()
 
-    val flattenedBuffer = outputBuffer.flatten
-    assert(flattenedBuffer.size === totalEventsPerChannel * channels.size)
-    var counter = 0
-    for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) {
-      val eventToVerify = EventBuilder.withBody((channels(k).getName + " - " +
-        String.valueOf(i)).getBytes("utf-8"),
-        Map[String, String]("test-" + i.toString -> "header"))
-      var found = false
-      var j = 0
-      while (j < flattenedBuffer.size && !found) {
-        val strToCompare = new String(flattenedBuffer(j).event.getBody.array(), "utf-8")
-        if (new String(eventToVerify.getBody, "utf-8") == strToCompare &&
-          eventToVerify.getHeaders.get("test-" + i.toString)
-            .equals(flattenedBuffer(j).event.getHeaders.get("test-" + i.toString))) {
-          found = true
-          counter += 1
+    latch.await(15, TimeUnit.SECONDS) // Ensure all data has been received.
+    clock.advance(batchDuration.milliseconds)
+
+    // The eventually is required to ensure that all data in the batch has been processed.
+    eventually(timeout(10 seconds), interval(100 milliseconds)) {
+      val flattenedBuffer = outputBuffer.flatten
+      assert(flattenedBuffer.size === totalEventsPerChannel * channels.size)
+      var counter = 0
+      for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) {
+        val eventToVerify = EventBuilder.withBody((channels(k).getName + " - " +
+          String.valueOf(i)).getBytes("utf-8"),
+          Map[String, String]("test-" + i.toString -> "header"))
+        var found = false
+        var j = 0
+        while (j < flattenedBuffer.size && !found) {
+          val strToCompare = new String(flattenedBuffer(j).event.getBody.array(), "utf-8")
+          if (new String(eventToVerify.getBody, "utf-8") == strToCompare &&
+            eventToVerify.getHeaders.get("test-" + i.toString)
+              .equals(flattenedBuffer(j).event.getHeaders.get("test-" + i.toString))) {
+            found = true
+            counter += 1
+          }
+          j += 1
         }
-        j += 1
       }
+      assert(counter === totalEventsPerChannel * channels.size)
     }
-    assert(counter === totalEventsPerChannel * channels.size)
+    ssc.stop()
   }
 
   def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
@@ -234,7 +227,6 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
         tx.commit()
         tx.close()
         Thread.sleep(500) // Allow some time for the events to reach
-        clock.advance(batchDuration.milliseconds)
       }
       null
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org