You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2014/07/10 22:15:14 UTC

git commit: [SPARK-1478].3: Upgrade FlumeInputDStream's FlumeReceiver to support FLUME-1915

Repository: spark
Updated Branches:
  refs/heads/master 369aa84e8 -> 40a8fef4e


[SPARK-1478].3: Upgrade FlumeInputDStream's FlumeReceiver to support FLUME-1915

This is a modified version of this PR https://github.com/apache/spark/pull/1168 done by @tmalaska
Adds MIMA binary check exclusions.

Author: tmalaska <te...@cloudera.com>
Author: Tathagata Das <ta...@gmail.com>

Closes #1347 from tdas/FLUME-1915 and squashes the following commits:

96065df [Tathagata Das] Added Mima exclusion for FlumeReceiver.
41d5338 [tmalaska] Address line 57 that was too long
12617e5 [tmalaska] SPARK-1478: Upgrade FlumeInputDStream's Flume...


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40a8fef4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40a8fef4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40a8fef4

Branch: refs/heads/master
Commit: 40a8fef4e6619b4ea10a4ec9026260649ce5ae73
Parents: 369aa84
Author: tmalaska <te...@cloudera.com>
Authored: Thu Jul 10 13:15:02 2014 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Jul 10 13:15:02 2014 -0700

----------------------------------------------------------------------
 .../streaming/flume/FlumeInputDStream.scala     | 76 +++++++++++++++++---
 .../spark/streaming/flume/FlumeUtils.scala      | 41 ++++++++++-
 .../streaming/flume/JavaFlumeStreamSuite.java   |  2 +
 .../streaming/flume/FlumeStreamSuite.scala      | 41 +++++++++--
 project/MimaExcludes.scala                      |  3 +
 5 files changed, 147 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/40a8fef4/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index ed35e34..07ae88f 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.flume
 import java.net.InetSocketAddress
 import java.io.{ObjectInput, ObjectOutput, Externalizable}
 import java.nio.ByteBuffer
+import java.util.concurrent.Executors
 
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
@@ -29,24 +30,32 @@ import org.apache.flume.source.avro.AvroFlumeEvent
 import org.apache.flume.source.avro.Status
 import org.apache.avro.ipc.specific.SpecificResponder
 import org.apache.avro.ipc.NettyServer
-
+import org.apache.spark.Logging
 import org.apache.spark.util.Utils
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.dstream._
-import org.apache.spark.Logging
+import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.receiver.Receiver
 
+import org.jboss.netty.channel.ChannelPipelineFactory
+import org.jboss.netty.channel.Channels
+import org.jboss.netty.channel.ChannelPipeline
+import org.jboss.netty.channel.ChannelFactory
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
+import org.jboss.netty.handler.codec.compression._
+import org.jboss.netty.handler.execution.ExecutionHandler
+
 private[streaming]
 class FlumeInputDStream[T: ClassTag](
   @transient ssc_ : StreamingContext,
   host: String,
   port: Int,
-  storageLevel: StorageLevel
+  storageLevel: StorageLevel,
+  enableDecompression: Boolean
 ) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {
 
   override def getReceiver(): Receiver[SparkFlumeEvent] = {
-    new FlumeReceiver(host, port, storageLevel)
+    new FlumeReceiver(host, port, storageLevel, enableDecompression)
   }
 }
 
@@ -134,22 +143,71 @@ private[streaming]
 class FlumeReceiver(
     host: String,
     port: Int,
-    storageLevel: StorageLevel
+    storageLevel: StorageLevel,
+    enableDecompression: Boolean
   ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
 
   lazy val responder = new SpecificResponder(
     classOf[AvroSourceProtocol], new FlumeEventServer(this))
-  lazy val server = new NettyServer(responder, new InetSocketAddress(host, port))
+  var server: NettyServer = null
+
+  private def initServer() = {
+    if (enableDecompression) {
+      val channelFactory = new NioServerSocketChannelFactory
+        (Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+      val channelPipelieFactory = new CompressionChannelPipelineFactory()
+      
+      new NettyServer(
+        responder, 
+        new InetSocketAddress(host, port),
+        channelFactory, 
+        channelPipelieFactory, 
+        null)
+    } else {
+      new NettyServer(responder, new InetSocketAddress(host, port))
+    }
+  }
 
   def onStart() {
-    server.start()
+    synchronized {
+      if (server == null) {
+        server = initServer()
+        server.start()
+      } else {
+        logWarning("Flume receiver being asked to start more then once with out close")
+      }
+    }
     logInfo("Flume receiver started")
   }
 
   def onStop() {
-    server.close()
+    synchronized {
+      if (server != null) {
+        server.close()
+        server = null
+      }
+    }
     logInfo("Flume receiver stopped")
   }
 
   override def preferredLocation = Some(host)
+  
+  /** A Netty Pipeline factory that will decompress incoming data from 
+    * and the Netty client and compress data going back to the client.
+    *
+    * The compression on the return is required because Flume requires
+    * a successful response to indicate it can remove the event/batch 
+    * from the configured channel 
+    */
+  private[streaming]
+  class CompressionChannelPipelineFactory extends ChannelPipelineFactory {
+
+    def getPipeline() = {
+      val pipeline = Channels.pipeline()
+      val encoder = new ZlibEncoder(6)
+      pipeline.addFirst("deflater", encoder)
+      pipeline.addFirst("inflater", new ZlibDecoder())
+      pipeline
+  }
+}
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/40a8fef4/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index 499f356..716db9f 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -36,7 +36,27 @@ object FlumeUtils {
       port: Int,
       storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
     ): ReceiverInputDStream[SparkFlumeEvent] = {
-    val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
+    createStream(ssc, hostname, port, storageLevel, false)
+  }
+
+  /**
+   * Create a input stream from a Flume source.
+   * @param ssc      StreamingContext object
+   * @param hostname Hostname of the slave machine to which the flume data will be sent
+   * @param port     Port of the slave machine to which the flume data will be sent
+   * @param storageLevel  Storage level to use for storing the received objects
+   * @param enableDecompression  should netty server decompress input stream
+   */
+  def createStream (
+      ssc: StreamingContext,
+      hostname: String,
+      port: Int,
+      storageLevel: StorageLevel,
+      enableDecompression: Boolean
+    ): ReceiverInputDStream[SparkFlumeEvent] = {
+    val inputStream = new FlumeInputDStream[SparkFlumeEvent](
+        ssc, hostname, port, storageLevel, enableDecompression)
+        
     inputStream
   }
 
@@ -66,6 +86,23 @@ object FlumeUtils {
       port: Int,
       storageLevel: StorageLevel
     ): JavaReceiverInputDStream[SparkFlumeEvent] = {
-    createStream(jssc.ssc, hostname, port, storageLevel)
+    createStream(jssc.ssc, hostname, port, storageLevel, false)
+  }
+
+  /**
+   * Creates a input stream from a Flume source.
+   * @param hostname Hostname of the slave machine to which the flume data will be sent
+   * @param port     Port of the slave machine to which the flume data will be sent
+   * @param storageLevel  Storage level to use for storing the received objects
+   * @param enableDecompression  should netty server decompress input stream
+   */
+  def createStream(
+      jssc: JavaStreamingContext,
+      hostname: String,
+      port: Int,
+      storageLevel: StorageLevel,
+      enableDecompression: Boolean
+    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+    createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/40a8fef4/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
index e0ad4f1..3b5e0c7 100644
--- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
+++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
@@ -30,5 +30,7 @@ public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
     JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
     JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
       StorageLevel.MEMORY_AND_DISK_SER_2());
+    JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createStream(ssc, "localhost", 12345,
+      StorageLevel.MEMORY_AND_DISK_SER_2(), false);
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/40a8fef4/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index dd287d0..73dffef 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -33,15 +33,26 @@ import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuite
 import org.apache.spark.streaming.util.ManualClock
 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
 
-class FlumeStreamSuite extends TestSuiteBase {
+import org.jboss.netty.channel.ChannelPipeline
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+import org.jboss.netty.channel.socket.SocketChannel
+import org.jboss.netty.handler.codec.compression._
 
-  val testPort = 9999
+class FlumeStreamSuite extends TestSuiteBase {
 
   test("flume input stream") {
+    runFlumeStreamTest(false, 9998)
+  }
+
+  test("flume input compressed stream") {
+    runFlumeStreamTest(true, 9997)
+  }
+  
+  def runFlumeStreamTest(enableDecompression: Boolean, testPort: Int) {
     // Set up the streaming context and input streams
     val ssc = new StreamingContext(conf, batchDuration)
     val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] =
-      FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
+      FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
     val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
       with SynchronizedBuffer[Seq[SparkFlumeEvent]]
     val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer)
@@ -52,8 +63,17 @@ class FlumeStreamSuite extends TestSuiteBase {
     val input = Seq(1, 2, 3, 4, 5)
     Thread.sleep(1000)
     val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
-    val client = SpecificRequestor.getClient(
-      classOf[AvroSourceProtocol], transceiver)
+    var client: AvroSourceProtocol = null;
+  
+    if (enableDecompression) {
+      client = SpecificRequestor.getClient(
+          classOf[AvroSourceProtocol], 
+          new NettyTransceiver(new InetSocketAddress("localhost", testPort), 
+          new CompressionChannelFactory(6)));
+    } else {
+      client = SpecificRequestor.getClient(
+        classOf[AvroSourceProtocol], transceiver)
+    }
 
     for (i <- 0 until input.size) {
       val event = new AvroFlumeEvent
@@ -64,6 +84,8 @@ class FlumeStreamSuite extends TestSuiteBase {
       clock.addToTime(batchDuration.milliseconds)
     }
 
+    Thread.sleep(1000)
+
     val startTime = System.currentTimeMillis()
     while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
       logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
@@ -85,4 +107,13 @@ class FlumeStreamSuite extends TestSuiteBase {
       assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
     }
   }
+
+  class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory {
+    override def newChannel(pipeline:ChannelPipeline) : SocketChannel = {
+      var encoder : ZlibEncoder = new ZlibEncoder(compressionLevel);
+      pipeline.addFirst("deflater", encoder);
+      pipeline.addFirst("inflater", new ZlibDecoder());
+      super.newChannel(pipeline);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/40a8fef4/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 44bc9dc..3b7b87b 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -64,6 +64,9 @@ object MimaExcludes {
               "org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$"
                 + "createZero$1")
           ) ++
+          Seq(
+            ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.FlumeReceiver.this")
+          ) ++
           Seq( // Ignore some private methods in ALS.
             ProblemFilters.exclude[MissingMethodProblem](
               "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$^dateFeatures"),