You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2014/10/01 00:18:59 UTC

git commit: SPARK-3744 [STREAMING] FlumeStreamSuite will fail during port contention

Repository: spark
Updated Branches:
  refs/heads/master d3a3840e0 -> 8764fe368


SPARK-3744 [STREAMING] FlumeStreamSuite will fail during port contention

Since it looked quite easy, I took the liberty of making a quick PR that just uses `Utils.startServiceOnPort` to fix this. It works locally for me.

Author: Sean Owen <so...@cloudera.com>

Closes #2601 from srowen/SPARK-3744 and squashes the following commits:

ddc9319 [Sean Owen] Avoid port contention in tests by retrying several ports for Flume stream


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8764fe36
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8764fe36
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8764fe36

Branch: refs/heads/master
Commit: 8764fe368bbd72fe76ed318faad0e97a7279e2fe
Parents: d3a3840
Author: Sean Owen <so...@cloudera.com>
Authored: Tue Sep 30 15:18:51 2014 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Sep 30 15:18:51 2014 -0700

----------------------------------------------------------------------
 .../streaming/flume/FlumeStreamSuite.scala      | 25 ++++++++++++--------
 1 file changed, 15 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8764fe36/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index 6ee7ac9..33235d1 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -31,7 +31,7 @@ import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
 import org.apache.spark.streaming.util.ManualClock
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.util.Utils
 
 import org.jboss.netty.channel.ChannelPipeline
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
@@ -41,21 +41,26 @@ import org.jboss.netty.handler.codec.compression._
 class FlumeStreamSuite extends TestSuiteBase {
 
   test("flume input stream") {
-    runFlumeStreamTest(false, 9998)
+    runFlumeStreamTest(false)
   }
 
   test("flume input compressed stream") {
-    runFlumeStreamTest(true, 9997)
+    runFlumeStreamTest(true)
   }
   
-  def runFlumeStreamTest(enableDecompression: Boolean, testPort: Int) {
+  def runFlumeStreamTest(enableDecompression: Boolean) {
     // Set up the streaming context and input streams
     val ssc = new StreamingContext(conf, batchDuration)
-    val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] =
-      FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
+    val (flumeStream, testPort) =
+      Utils.startServiceOnPort(9997, (trialPort: Int) => {
+        val dstream = FlumeUtils.createStream(
+          ssc, "localhost", trialPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
+        (dstream, trialPort)
+      })
+
     val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
       with SynchronizedBuffer[Seq[SparkFlumeEvent]]
-    val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer)
+    val outputStream = new TestOutputStream(flumeStream, outputBuffer)
     outputStream.register()
     ssc.start()
 
@@ -63,13 +68,13 @@ class FlumeStreamSuite extends TestSuiteBase {
     val input = Seq(1, 2, 3, 4, 5)
     Thread.sleep(1000)
     val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
-    var client: AvroSourceProtocol = null;
-  
+    var client: AvroSourceProtocol = null
+
     if (enableDecompression) {
       client = SpecificRequestor.getClient(
           classOf[AvroSourceProtocol], 
           new NettyTransceiver(new InetSocketAddress("localhost", testPort), 
-          new CompressionChannelFactory(6)));
+          new CompressionChannelFactory(6)))
     } else {
       client = SpecificRequestor.getClient(
         classOf[AvroSourceProtocol], transceiver)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org