You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2014/10/14 07:46:54 UTC

git commit: [SPARK-3912][Streaming] Fixed flakyFlumeStreamSuite

Repository: spark
Updated Branches:
  refs/heads/master 9eb49d413 -> 4d26aca77


[SPARK-3912][Streaming] Fixed flakyFlumeStreamSuite

@harishreedharan @pwendell
See JIRA for diagnosis of the problem
https://issues.apache.org/jira/browse/SPARK-3912

The solution was to reimplement it.
1. Find a free port (by binding and releasing a server-scoket), and then use that port
2. Remove thread.sleep()s, instead repeatedly try to create a sender and send data and check whether data was sent. Use eventually() to minimize waiting time.
3. Check whether all the data was received, without caring about batches.

Author: Tathagata Das <ta...@gmail.com>

Closes #2773 from tdas/flume-test-fix and squashes the following commits:

93cd7f6 [Tathagata Das] Reimplimented FlumeStreamSuite to be more robust.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4d26aca7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4d26aca7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4d26aca7

Branch: refs/heads/master
Commit: 4d26aca770f7dd50eee1ed7855e9eda68b5a7ffa
Parents: 9eb49d4
Author: Tathagata Das <ta...@gmail.com>
Authored: Mon Oct 13 22:46:49 2014 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Oct 13 22:46:49 2014 -0700

----------------------------------------------------------------------
 .../streaming/flume/FlumeStreamSuite.scala      | 166 ++++++++++++-------
 1 file changed, 102 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4d26aca7/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index 33235d1..13943ed 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -17,103 +17,141 @@
 
 package org.apache.spark.streaming.flume
 
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
-
-import java.net.InetSocketAddress
+import java.net.{InetSocketAddress, ServerSocket}
 import java.nio.ByteBuffer
 import java.nio.charset.Charset
 
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
 import org.apache.avro.ipc.NettyTransceiver
 import org.apache.avro.ipc.specific.SpecificRequestor
+import org.apache.flume.source.avro
 import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
+import org.jboss.netty.channel.ChannelPipeline
+import org.jboss.netty.channel.socket.SocketChannel
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+import org.jboss.netty.handler.codec.compression._
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+import org.scalatest.concurrent.Eventually._
 
+import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
-import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream}
+import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerReceiverStarted}
 import org.apache.spark.util.Utils
 
-import org.jboss.netty.channel.ChannelPipeline
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
-import org.jboss.netty.channel.socket.SocketChannel
-import org.jboss.netty.handler.codec.compression._
+class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with Logging {
+  val conf = new SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite")
+
+  var ssc: StreamingContext = null
+  var transceiver: NettyTransceiver = null
 
-class FlumeStreamSuite extends TestSuiteBase {
+  after {
+    if (ssc != null) {
+      ssc.stop()
+    }
+    if (transceiver != null) {
+      transceiver.close()
+    }
+  }
 
   test("flume input stream") {
-    runFlumeStreamTest(false)
+    testFlumeStream(testCompression = false)
   }
 
   test("flume input compressed stream") {
-    runFlumeStreamTest(true)
+    testFlumeStream(testCompression = true)
+  }
+
+  /** Run test on flume stream */
+  private def testFlumeStream(testCompression: Boolean): Unit = {
+    val input = (1 to 100).map { _.toString }
+    val testPort = findFreePort()
+    val outputBuffer = startContext(testPort, testCompression)
+    writeAndVerify(input, testPort, outputBuffer, testCompression)
+  }
+
+  /** Find a free port */
+  private def findFreePort(): Int = {
+    Utils.startServiceOnPort(23456, (trialPort: Int) => {
+      val socket = new ServerSocket(trialPort)
+      socket.close()
+      (null, trialPort)
+    })._2
   }
-  
-  def runFlumeStreamTest(enableDecompression: Boolean) {
-    // Set up the streaming context and input streams
-    val ssc = new StreamingContext(conf, batchDuration)
-    val (flumeStream, testPort) =
-      Utils.startServiceOnPort(9997, (trialPort: Int) => {
-        val dstream = FlumeUtils.createStream(
-          ssc, "localhost", trialPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
-        (dstream, trialPort)
-      })
 
+  /** Setup and start the streaming context */
+  private def startContext(
+      testPort: Int, testCompression: Boolean): (ArrayBuffer[Seq[SparkFlumeEvent]]) = {
+    ssc = new StreamingContext(conf, Milliseconds(200))
+    val flumeStream = FlumeUtils.createStream(
+      ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression)
     val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
       with SynchronizedBuffer[Seq[SparkFlumeEvent]]
     val outputStream = new TestOutputStream(flumeStream, outputBuffer)
     outputStream.register()
     ssc.start()
+    outputBuffer
+  }
 
-    val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-    val input = Seq(1, 2, 3, 4, 5)
-    Thread.sleep(1000)
-    val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
-    var client: AvroSourceProtocol = null
-
-    if (enableDecompression) {
-      client = SpecificRequestor.getClient(
-          classOf[AvroSourceProtocol], 
-          new NettyTransceiver(new InetSocketAddress("localhost", testPort), 
-          new CompressionChannelFactory(6)))
-    } else {
-      client = SpecificRequestor.getClient(
-        classOf[AvroSourceProtocol], transceiver)
-    }
+  /** Send data to the flume receiver and verify whether the data was received */
+  private def writeAndVerify(
+      input: Seq[String],
+      testPort: Int,
+      outputBuffer: ArrayBuffer[Seq[SparkFlumeEvent]],
+      enableCompression: Boolean
+    ) {
+    val testAddress = new InetSocketAddress("localhost", testPort)
 
-    for (i <- 0 until input.size) {
+    val inputEvents = input.map { item =>
       val event = new AvroFlumeEvent
-      event.setBody(ByteBuffer.wrap(input(i).toString.getBytes("utf-8")))
+      event.setBody(ByteBuffer.wrap(item.getBytes("UTF-8")))
       event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
-      client.append(event)
-      Thread.sleep(500)
-      clock.addToTime(batchDuration.milliseconds)
+      event
     }
 
-    Thread.sleep(1000)
-
-    val startTime = System.currentTimeMillis()
-    while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
-      logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
-      Thread.sleep(100)
+    eventually(timeout(10 seconds), interval(100 milliseconds)) {
+      // if last attempted transceiver had succeeded, close it
+      if (transceiver != null) {
+        transceiver.close()
+        transceiver = null
+      }
+
+      // Create transceiver
+      transceiver = {
+        if (enableCompression) {
+          new NettyTransceiver(testAddress, new CompressionChannelFactory(6))
+        } else {
+          new NettyTransceiver(testAddress)
+        }
+      }
+
+      // Create Avro client with the transceiver
+      val client = SpecificRequestor.getClient(classOf[AvroSourceProtocol], transceiver)
+      client should not be null
+
+      // Send data
+      val status = client.appendBatch(inputEvents.toList)
+      status should be (avro.Status.OK)
     }
-    Thread.sleep(1000)
-    val timeTaken = System.currentTimeMillis() - startTime
-    assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
-    logInfo("Stopping context")
-    ssc.stop()
-
-    val decoder = Charset.forName("UTF-8").newDecoder()
-
-    assert(outputBuffer.size === input.length)
-    for (i <- 0 until outputBuffer.size) {
-      assert(outputBuffer(i).size === 1)
-      val str = decoder.decode(outputBuffer(i).head.event.getBody)
-      assert(str.toString === input(i).toString)
-      assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
+    
+    val decoder = Charset.forName("UTF-8").newDecoder()    
+    eventually(timeout(10 seconds), interval(100 milliseconds)) {
+      val outputEvents = outputBuffer.flatten.map { _.event }
+      outputEvents.foreach {
+        event =>
+          event.getHeaders.get("test") should be("header")
+      }
+      val output = outputEvents.map(event => decoder.decode(event.getBody()).toString)
+      output should be (input)
     }
   }
 
-  class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory {
+  /** Class to create socket channel with compression */
+  private class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory {
     override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
       val encoder = new ZlibEncoder(compressionLevel)
       pipeline.addFirst("deflater", encoder)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org