You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/07/01 20:59:28 UTC

spark git commit: [SPARK-8378] [STREAMING] Add the Python API for Flume

Repository: spark
Updated Branches:
  refs/heads/master b8faa3287 -> 75b9fe4c5


[SPARK-8378] [STREAMING] Add the Python API for Flume

Author: zsxwing <zs...@gmail.com>

Closes #6830 from zsxwing/flume-python and squashes the following commits:

78dfdac [zsxwing] Fix the compile error in the test code
f1bf3c0 [zsxwing] Address TD's comments
0449723 [zsxwing] Add sbt goal streaming-flume-assembly/assembly
e93736b [zsxwing] Fix the test case for determine_modules_to_test
9d5821e [zsxwing] Fix pyspark_core dependencies
f9ee681 [zsxwing] Merge branch 'master' into flume-python
7a55837 [zsxwing] Add streaming_flume_assembly to run-tests.py
b96b0de [zsxwing] Merge branch 'master' into flume-python
ce85e83 [zsxwing] Fix incompatible issues for Python 3
01cbb3d [zsxwing] Add import sys
152364c [zsxwing] Fix the issue that StringIO doesn't work in Python 3
14ba0ff [zsxwing] Add flume-assembly for sbt building
b8d5551 [zsxwing] Merge branch 'master' into flume-python
4762c34 [zsxwing] Fix the doc
0336579 [zsxwing] Refactor Flume unit tests and also add tests for Python API
9f33873 [zsxwing] Add the Python API for Flume


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/75b9fe4c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/75b9fe4c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/75b9fe4c

Branch: refs/heads/master
Commit: 75b9fe4c5ff6f206c6fc9100563d625b39f142ba
Parents: b8faa32
Author: zsxwing <zs...@gmail.com>
Authored: Wed Jul 1 11:59:24 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Jul 1 11:59:24 2015 -0700

----------------------------------------------------------------------
 dev/run-tests.py                                |   7 +-
 dev/sparktestsupport/modules.py                 |  15 +-
 docs/streaming-flume-integration.md             |  18 ++
 docs/streaming-programming-guide.md             |   2 +-
 .../main/python/streaming/flume_wordcount.py    |  55 +++++
 external/flume-assembly/pom.xml                 | 135 ++++++++++++
 .../spark/streaming/flume/FlumeTestUtils.scala  | 116 ++++++++++
 .../spark/streaming/flume/FlumeUtils.scala      |  76 ++++++-
 .../streaming/flume/PollingFlumeTestUtils.scala | 209 +++++++++++++++++++
 .../flume/FlumePollingStreamSuite.scala         | 173 +++------------
 .../streaming/flume/FlumeStreamSuite.scala      | 106 ++--------
 pom.xml                                         |   1 +
 project/SparkBuild.scala                        |   6 +-
 python/pyspark/streaming/flume.py               | 147 +++++++++++++
 python/pyspark/streaming/tests.py               | 179 +++++++++++++++-
 15 files changed, 1009 insertions(+), 236 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/75b9fe4c/dev/run-tests.py
----------------------------------------------------------------------
diff --git a/dev/run-tests.py b/dev/run-tests.py
index 4596e07..1f0d218 100755
--- a/dev/run-tests.py
+++ b/dev/run-tests.py
@@ -96,8 +96,8 @@ def determine_modules_to_test(changed_modules):
     ['examples', 'graphx']
     >>> x = sorted(x.name for x in determine_modules_to_test([modules.sql]))
     >>> x # doctest: +NORMALIZE_WHITESPACE
-    ['examples', 'hive-thriftserver', 'mllib', 'pyspark-core', 'pyspark-ml', \
-     'pyspark-mllib', 'pyspark-sql', 'pyspark-streaming', 'sparkr', 'sql']
+    ['examples', 'hive-thriftserver', 'mllib', 'pyspark-ml', \
+     'pyspark-mllib', 'pyspark-sql', 'sparkr', 'sql']
     """
     # If we're going to have to run all of the tests, then we can just short-circuit
     # and return 'root'. No module depends on root, so if it appears then it will be
@@ -293,7 +293,8 @@ def build_spark_sbt(hadoop_version):
     build_profiles = get_hadoop_profiles(hadoop_version) + modules.root.build_profile_flags
     sbt_goals = ["package",
                  "assembly/assembly",
-                 "streaming-kafka-assembly/assembly"]
+                 "streaming-kafka-assembly/assembly",
+                 "streaming-flume-assembly/assembly"]
     profiles_and_goals = build_profiles + sbt_goals
 
     print("[info] Building Spark (w/Hive 0.13.1) using SBT with these arguments: ",

http://git-wip-us.apache.org/repos/asf/spark/blob/75b9fe4c/dev/sparktestsupport/modules.py
----------------------------------------------------------------------
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index efe3a89..993583e 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -203,7 +203,7 @@ streaming_flume_sink = Module(
 
 
 streaming_flume = Module(
-    name="streaming_flume",
+    name="streaming-flume",
     dependencies=[streaming],
     source_file_regexes=[
         "external/flume",
@@ -214,6 +214,15 @@ streaming_flume = Module(
 )
 
 
+streaming_flume_assembly = Module(
+    name="streaming-flume-assembly",
+    dependencies=[streaming_flume, streaming_flume_sink],
+    source_file_regexes=[
+        "external/flume-assembly",
+    ]
+)
+
+
 mllib = Module(
     name="mllib",
     dependencies=[streaming, sql],
@@ -241,7 +250,7 @@ examples = Module(
 
 pyspark_core = Module(
     name="pyspark-core",
-    dependencies=[mllib, streaming, streaming_kafka],
+    dependencies=[],
     source_file_regexes=[
         "python/(?!pyspark/(ml|mllib|sql|streaming))"
     ],
@@ -281,7 +290,7 @@ pyspark_sql = Module(
 
 pyspark_streaming = Module(
     name="pyspark-streaming",
-    dependencies=[pyspark_core, streaming, streaming_kafka],
+    dependencies=[pyspark_core, streaming, streaming_kafka, streaming_flume_assembly],
     source_file_regexes=[
         "python/pyspark/streaming"
     ],

http://git-wip-us.apache.org/repos/asf/spark/blob/75b9fe4c/docs/streaming-flume-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-flume-integration.md b/docs/streaming-flume-integration.md
index 8d6e743..de04610 100644
--- a/docs/streaming-flume-integration.md
+++ b/docs/streaming-flume-integration.md
@@ -58,6 +58,15 @@ configuring Flume agents.
 	See the [API docs](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html)
 	and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java).
 	</div>
+	<div data-lang="python" markdown="1">
+		from pyspark.streaming.flume import FlumeUtils
+
+		flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])
+
+	By default, the Python API will decode Flume event body as UTF8 encoded strings. You can specify your custom decoding function to decode the body byte arrays in Flume events to any arbitrary data type. 
+	See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils)
+	and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/flume_wordcount.py).
+	</div>
 	</div>
 
 	Note that the hostname should be the same as the one used by the resource manager in the
@@ -135,6 +144,15 @@ configuring Flume agents.
 		JavaReceiverInputDStream<SparkFlumeEvent>flumeStream =
 			FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port]);
 	</div>
+	<div data-lang="python" markdown="1">
+		from pyspark.streaming.flume import FlumeUtils
+
+		addresses = [([sink machine hostname 1], [sink port 1]), ([sink machine hostname 2], [sink port 2])]
+		flumeStream = FlumeUtils.createPollingStream(streamingContext, addresses)
+
+	By default, the Python API will decode Flume event body as UTF8 encoded strings. You can specify your custom decoding function to decode the body byte arrays in Flume events to any arbitrary data type.
+	See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils).
+	</div>
 	</div>
 
 	See the Scala example [FlumePollingEventCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala).

http://git-wip-us.apache.org/repos/asf/spark/blob/75b9fe4c/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index b784d59..e72d558 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -683,7 +683,7 @@ for Java, and [StreamingContext](api/python/pyspark.streaming.html#pyspark.strea
 {:.no_toc}
 
 <span class="badge" style="background-color: grey">Python API</span> As of Spark {{site.SPARK_VERSION_SHORT}},
-out of these sources, *only* Kafka is available in the Python API. We will add more advanced sources in the Python API in future.
+out of these sources, *only* Kafka and Flume are available in the Python API. We will add more advanced sources in the Python API in future.
 
 This category of sources require interfacing with external non-Spark libraries, some of them with
 complex dependencies (e.g., Kafka and Flume). Hence, to minimize issues related to version conflicts

http://git-wip-us.apache.org/repos/asf/spark/blob/75b9fe4c/examples/src/main/python/streaming/flume_wordcount.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/streaming/flume_wordcount.py b/examples/src/main/python/streaming/flume_wordcount.py
new file mode 100644
index 0000000..091b64d
--- /dev/null
+++ b/examples/src/main/python/streaming/flume_wordcount.py
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+ Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
+ Usage: flume_wordcount.py <hostname> <port>
+
+ To run this on your local machine, you need to setup Flume first, see
+ https://flume.apache.org/documentation.html
+
+ and then run the example
+    `$ bin/spark-submit --jars external/flume-assembly/target/scala-*/\
+      spark-streaming-flume-assembly-*.jar examples/src/main/python/streaming/flume_wordcount.py \
+      localhost 12345
+"""
+from __future__ import print_function
+
+import sys
+
+from pyspark import SparkContext
+from pyspark.streaming import StreamingContext
+from pyspark.streaming.flume import FlumeUtils
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        print("Usage: flume_wordcount.py <hostname> <port>", file=sys.stderr)
+        exit(-1)
+
+    sc = SparkContext(appName="PythonStreamingFlumeWordCount")
+    ssc = StreamingContext(sc, 1)
+
+    hostname, port = sys.argv[1:]
+    kvs = FlumeUtils.createStream(ssc, hostname, int(port))
+    lines = kvs.map(lambda x: x[1])
+    counts = lines.flatMap(lambda line: line.split(" ")) \
+        .map(lambda word: (word, 1)) \
+        .reduceByKey(lambda a, b: a+b)
+    counts.pprint()
+
+    ssc.start()
+    ssc.awaitTermination()

http://git-wip-us.apache.org/repos/asf/spark/blob/75b9fe4c/external/flume-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml
new file mode 100644
index 0000000..8565cd8
--- /dev/null
+++ b/external/flume-assembly/pom.xml
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.10</artifactId>
+    <version>1.5.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.spark</groupId>
+  <artifactId>spark-streaming-flume-assembly_2.10</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Project External Flume Assembly</name>
+  <url>http://spark.apache.org/</url>
+
+  <properties>
+    <sbt.project.name>streaming-flume-assembly</sbt.project.name>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+      <version>${avro.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-ipc</artifactId>
+      <version>${avro.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty-util</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>servlet-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.velocity</groupId>
+          <artifactId>velocity</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+
+  <build>
+  <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+  <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+  <plugins>
+    <plugin>
+      <groupId>org.apache.maven.plugins</groupId>
+      <artifactId>maven-shade-plugin</artifactId>
+      <configuration>
+        <shadedArtifactAttached>false</shadedArtifactAttached>
+        <outputFile>${project.build.directory}/scala-${scala.binary.version}/spark-streaming-flume-assembly-${project.version}.jar</outputFile>
+        <artifactSet>
+          <includes>
+            <include>*:*</include>
+          </includes>
+        </artifactSet>
+        <filters>
+          <filter>
+            <artifact>*:*</artifact>
+            <excludes>
+              <exclude>META-INF/*.SF</exclude>
+              <exclude>META-INF/*.DSA</exclude>
+              <exclude>META-INF/*.RSA</exclude>
+            </excludes>
+          </filter>
+        </filters>
+      </configuration>
+      <executions>
+        <execution>
+          <phase>package</phase>
+          <goals>
+            <goal>shade</goal>
+          </goals>
+          <configuration>
+            <transformers>
+              <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+              <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+                <resource>reference.conf</resource>
+              </transformer>
+              <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+                <resource>log4j.properties</resource>
+              </transformer>
+              <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
+              <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
+            </transformers>
+          </configuration>
+        </execution>
+      </executions>
+    </plugin>
+  </plugins>
+</build>
+</project>
+

http://git-wip-us.apache.org/repos/asf/spark/blob/75b9fe4c/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
new file mode 100644
index 0000000..9d9c3b1
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.net.{InetSocketAddress, ServerSocket}
+import java.nio.ByteBuffer
+import java.util.{List => JList}
+
+import scala.collection.JavaConversions._
+
+import com.google.common.base.Charsets.UTF_8
+import org.apache.avro.ipc.NettyTransceiver
+import org.apache.avro.ipc.specific.SpecificRequestor
+import org.apache.commons.lang3.RandomUtils
+import org.apache.flume.source.avro
+import org.apache.flume.source.avro.{AvroSourceProtocol, AvroFlumeEvent}
+import org.jboss.netty.channel.ChannelPipeline
+import org.jboss.netty.channel.socket.SocketChannel
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+import org.jboss.netty.handler.codec.compression.{ZlibDecoder, ZlibEncoder}
+
+import org.apache.spark.util.Utils
+import org.apache.spark.SparkConf
+
+/**
+ * Share codes for Scala and Python unit tests
+ */
+private[flume] class FlumeTestUtils {
+
+  private var transceiver: NettyTransceiver = null
+
+  private val testPort: Int = findFreePort()
+
+  def getTestPort(): Int = testPort
+
+  /** Find a free port */
+  private def findFreePort(): Int = {
+    val candidatePort = RandomUtils.nextInt(1024, 65536)
+    Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
+      val socket = new ServerSocket(trialPort)
+      socket.close()
+      (null, trialPort)
+    }, new SparkConf())._2
+  }
+
+  /** Send data to the flume receiver */
+  def writeInput(input: JList[String], enableCompression: Boolean): Unit = {
+    val testAddress = new InetSocketAddress("localhost", testPort)
+
+    val inputEvents = input.map { item =>
+      val event = new AvroFlumeEvent
+      event.setBody(ByteBuffer.wrap(item.getBytes(UTF_8)))
+      event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
+      event
+    }
+
+    // if last attempted transceiver had succeeded, close it
+    close()
+
+    // Create transceiver
+    transceiver = {
+      if (enableCompression) {
+        new NettyTransceiver(testAddress, new CompressionChannelFactory(6))
+      } else {
+        new NettyTransceiver(testAddress)
+      }
+    }
+
+    // Create Avro client with the transceiver
+    val client = SpecificRequestor.getClient(classOf[AvroSourceProtocol], transceiver)
+    if (client == null) {
+      throw new AssertionError("Cannot create client")
+    }
+
+    // Send data
+    val status = client.appendBatch(inputEvents.toList)
+    if (status != avro.Status.OK) {
+      throw new AssertionError("Sent events unsuccessfully")
+    }
+  }
+
+  def close(): Unit = {
+    if (transceiver != null) {
+      transceiver.close()
+      transceiver = null
+    }
+  }
+
+  /** Class to create socket channel with compression */
+  private class CompressionChannelFactory(compressionLevel: Int)
+    extends NioClientSocketChannelFactory {
+
+    override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
+      val encoder = new ZlibEncoder(compressionLevel)
+      pipeline.addFirst("deflater", encoder)
+      pipeline.addFirst("inflater", new ZlibDecoder())
+      super.newChannel(pipeline)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/75b9fe4c/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index 44dec45..095bfb0 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -18,10 +18,16 @@
 package org.apache.spark.streaming.flume
 
 import java.net.InetSocketAddress
+import java.io.{DataOutputStream, ByteArrayOutputStream}
+import java.util.{List => JList, Map => JMap}
 
+import scala.collection.JavaConversions._
+
+import org.apache.spark.api.java.function.PairFunction
+import org.apache.spark.api.python.PythonRDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
+import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaReceiverInputDStream, JavaStreamingContext}
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
 
 
@@ -236,3 +242,71 @@ object FlumeUtils {
     createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
   }
 }
+
+/**
+ * This is a helper class that wraps the methods in FlumeUtils into more Python-friendly class and
+ * function so that it can be easily instantiated and called from Python's FlumeUtils.
+ */
+private class FlumeUtilsPythonHelper {
+
+  def createStream(
+      jssc: JavaStreamingContext,
+      hostname: String,
+      port: Int,
+      storageLevel: StorageLevel,
+      enableDecompression: Boolean
+    ): JavaPairDStream[Array[Byte], Array[Byte]] = {
+    val dstream = FlumeUtils.createStream(jssc, hostname, port, storageLevel, enableDecompression)
+    FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream)
+  }
+
+  def createPollingStream(
+      jssc: JavaStreamingContext,
+      hosts: JList[String],
+      ports: JList[Int],
+      storageLevel: StorageLevel,
+      maxBatchSize: Int,
+      parallelism: Int
+    ): JavaPairDStream[Array[Byte], Array[Byte]] = {
+    assert(hosts.length == ports.length)
+    val addresses = hosts.zip(ports).map {
+      case (host, port) => new InetSocketAddress(host, port)
+    }
+    val dstream = FlumeUtils.createPollingStream(
+      jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
+    FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream)
+  }
+
+}
+
+private object FlumeUtilsPythonHelper {
+
+  private def stringMapToByteArray(map: JMap[CharSequence, CharSequence]): Array[Byte] = {
+    val byteStream = new ByteArrayOutputStream()
+    val output = new DataOutputStream(byteStream)
+    try {
+      output.writeInt(map.size)
+      map.foreach { kv =>
+        PythonRDD.writeUTF(kv._1.toString, output)
+        PythonRDD.writeUTF(kv._2.toString, output)
+      }
+      byteStream.toByteArray
+    }
+    finally {
+      output.close()
+    }
+  }
+
+  private def toByteArrayPairDStream(dstream: JavaReceiverInputDStream[SparkFlumeEvent]):
+    JavaPairDStream[Array[Byte], Array[Byte]] = {
+    dstream.mapToPair(new PairFunction[SparkFlumeEvent, Array[Byte], Array[Byte]] {
+      override def call(sparkEvent: SparkFlumeEvent): (Array[Byte], Array[Byte]) = {
+        val event = sparkEvent.event
+        val byteBuffer = event.getBody
+        val body = new Array[Byte](byteBuffer.remaining())
+        byteBuffer.get(body)
+        (stringMapToByteArray(event.getHeaders), body)
+      }
+    })
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/75b9fe4c/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
new file mode 100644
index 0000000..91d63d4
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.util.concurrent._
+import java.util.{List => JList, Map => JMap}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.base.Charsets.UTF_8
+import org.apache.flume.event.EventBuilder
+import org.apache.flume.Context
+import org.apache.flume.channel.MemoryChannel
+import org.apache.flume.conf.Configurables
+
+import org.apache.spark.streaming.flume.sink.{SparkSinkConfig, SparkSink}
+
+/**
+ * Share codes for Scala and Python unit tests
+ */
+private[flume] class PollingFlumeTestUtils {
+
+  private val batchCount = 5
+  val eventsPerBatch = 100
+  private val totalEventsPerChannel = batchCount * eventsPerBatch
+  private val channelCapacity = 5000
+
+  def getTotalEvents: Int = totalEventsPerChannel * channels.size
+
+  private val channels = new ArrayBuffer[MemoryChannel]
+  private val sinks = new ArrayBuffer[SparkSink]
+
+  /**
+   * Start a sink and return the port of this sink
+   */
+  def startSingleSink(): Int = {
+    channels.clear()
+    sinks.clear()
+
+    // Start the channel and sink.
+    val context = new Context()
+    context.put("capacity", channelCapacity.toString)
+    context.put("transactionCapacity", "1000")
+    context.put("keep-alive", "0")
+    val channel = new MemoryChannel()
+    Configurables.configure(channel, context)
+
+    val sink = new SparkSink()
+    context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
+    Configurables.configure(sink, context)
+    sink.setChannel(channel)
+    sink.start()
+
+    channels += (channel)
+    sinks += sink
+
+    sink.getPort()
+  }
+
+  /**
+   * Start 2 sinks and return the ports
+   */
+  def startMultipleSinks(): JList[Int] = {
+    channels.clear()
+    sinks.clear()
+
+    // Start the channel and sink.
+    val context = new Context()
+    context.put("capacity", channelCapacity.toString)
+    context.put("transactionCapacity", "1000")
+    context.put("keep-alive", "0")
+    val channel = new MemoryChannel()
+    Configurables.configure(channel, context)
+
+    val channel2 = new MemoryChannel()
+    Configurables.configure(channel2, context)
+
+    val sink = new SparkSink()
+    context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
+    Configurables.configure(sink, context)
+    sink.setChannel(channel)
+    sink.start()
+
+    val sink2 = new SparkSink()
+    context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
+    Configurables.configure(sink2, context)
+    sink2.setChannel(channel2)
+    sink2.start()
+
+    sinks += sink
+    sinks += sink2
+    channels += channel
+    channels += channel2
+
+    sinks.map(_.getPort())
+  }
+
+  /**
+   * Send data and wait until all data has been received
+   */
+  def sendDatAndEnsureAllDataHasBeenReceived(): Unit = {
+    val executor = Executors.newCachedThreadPool()
+    val executorCompletion = new ExecutorCompletionService[Void](executor)
+
+    val latch = new CountDownLatch(batchCount * channels.size)
+    sinks.foreach(_.countdownWhenBatchReceived(latch))
+
+    channels.foreach(channel => {
+      executorCompletion.submit(new TxnSubmitter(channel))
+    })
+
+    for (i <- 0 until channels.size) {
+      executorCompletion.take()
+    }
+
+    latch.await(15, TimeUnit.SECONDS) // Ensure all data has been received.
+  }
+
+  /**
+   * A Python-friendly method to assert the output
+   */
+  def assertOutput(
+      outputHeaders: JList[JMap[String, String]], outputBodies: JList[String]): Unit = {
+    require(outputHeaders.size == outputBodies.size)
+    val eventSize = outputHeaders.size
+    if (eventSize != totalEventsPerChannel * channels.size) {
+      throw new AssertionError(
+        s"Expected ${totalEventsPerChannel * channels.size} events, but was $eventSize")
+    }
+    var counter = 0
+    for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) {
+      val eventBodyToVerify = s"${channels(k).getName}-$i"
+      val eventHeaderToVerify: JMap[String, String] = Map[String, String](s"test-$i" -> "header")
+      var found = false
+      var j = 0
+      while (j < eventSize && !found) {
+        if (eventBodyToVerify == outputBodies.get(j) &&
+          eventHeaderToVerify == outputHeaders.get(j)) {
+          found = true
+          counter += 1
+        }
+        j += 1
+      }
+    }
+    if (counter != totalEventsPerChannel * channels.size) {
+      throw new AssertionError(
+        s"111 Expected ${totalEventsPerChannel * channels.size} events, but was $counter")
+    }
+  }
+
+  def assertChannelsAreEmpty(): Unit = {
+    channels.foreach(assertChannelIsEmpty)
+  }
+
+  private def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
+    val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
+    queueRemaining.setAccessible(true)
+    val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
+    if (m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] != 5000) {
+      throw new AssertionError(s"Channel ${channel.getName} is not empty")
+    }
+  }
+
+  def close(): Unit = {
+    sinks.foreach(_.stop())
+    sinks.clear()
+    channels.foreach(_.stop())
+    channels.clear()
+  }
+
+  private class TxnSubmitter(channel: MemoryChannel) extends Callable[Void] {
+    override def call(): Void = {
+      var t = 0
+      for (i <- 0 until batchCount) {
+        val tx = channel.getTransaction
+        tx.begin()
+        for (j <- 0 until eventsPerBatch) {
+          channel.put(EventBuilder.withBody(s"${channel.getName}-$t".getBytes(UTF_8),
+            Map[String, String](s"test-$t" -> "header")))
+          t += 1
+        }
+        tx.commit()
+        tx.close()
+        Thread.sleep(500) // Allow some time for the events to reach
+      }
+      null
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/75b9fe4c/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index d772b9c..d5f9a0a 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -18,47 +18,33 @@
 package org.apache.spark.streaming.flume
 
 import java.net.InetSocketAddress
-import java.util.concurrent._
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
-import org.apache.flume.Context
-import org.apache.flume.channel.MemoryChannel
-import org.apache.flume.conf.Configurables
-import org.apache.flume.event.EventBuilder
-import org.scalatest.concurrent.Eventually._
-
+import com.google.common.base.Charsets.UTF_8
 import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
 import org.apache.spark.streaming.{Seconds, TestOutputStream, StreamingContext}
-import org.apache.spark.streaming.flume.sink._
 import org.apache.spark.util.{ManualClock, Utils}
 
 class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging {
 
-  val batchCount = 5
-  val eventsPerBatch = 100
-  val totalEventsPerChannel = batchCount * eventsPerBatch
-  val channelCapacity = 5000
   val maxAttempts = 5
   val batchDuration = Seconds(1)
 
   val conf = new SparkConf()
     .setMaster("local[2]")
     .setAppName(this.getClass.getSimpleName)
+    .set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
 
-  def beforeFunction() {
-    logInfo("Using manual clock")
-    conf.set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
-  }
-
-  before(beforeFunction())
+  val utils = new PollingFlumeTestUtils
 
   test("flume polling test") {
     testMultipleTimes(testFlumePolling)
@@ -89,146 +75,55 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log
   }
 
   private def testFlumePolling(): Unit = {
-    // Start the channel and sink.
-    val context = new Context()
-    context.put("capacity", channelCapacity.toString)
-    context.put("transactionCapacity", "1000")
-    context.put("keep-alive", "0")
-    val channel = new MemoryChannel()
-    Configurables.configure(channel, context)
-
-    val sink = new SparkSink()
-    context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
-    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
-    Configurables.configure(sink, context)
-    sink.setChannel(channel)
-    sink.start()
-
-    writeAndVerify(Seq(sink), Seq(channel))
-    assertChannelIsEmpty(channel)
-    sink.stop()
-    channel.stop()
+    try {
+      val port = utils.startSingleSink()
+
+      writeAndVerify(Seq(port))
+      utils.assertChannelsAreEmpty()
+    } finally {
+      utils.close()
+    }
   }
 
   private def testFlumePollingMultipleHost(): Unit = {
-    // Start the channel and sink.
-    val context = new Context()
-    context.put("capacity", channelCapacity.toString)
-    context.put("transactionCapacity", "1000")
-    context.put("keep-alive", "0")
-    val channel = new MemoryChannel()
-    Configurables.configure(channel, context)
-
-    val channel2 = new MemoryChannel()
-    Configurables.configure(channel2, context)
-
-    val sink = new SparkSink()
-    context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
-    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
-    Configurables.configure(sink, context)
-    sink.setChannel(channel)
-    sink.start()
-
-    val sink2 = new SparkSink()
-    context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
-    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
-    Configurables.configure(sink2, context)
-    sink2.setChannel(channel2)
-    sink2.start()
     try {
-      writeAndVerify(Seq(sink, sink2), Seq(channel, channel2))
-      assertChannelIsEmpty(channel)
-      assertChannelIsEmpty(channel2)
+      val ports = utils.startMultipleSinks()
+      writeAndVerify(ports)
+      utils.assertChannelsAreEmpty()
     } finally {
-      sink.stop()
-      sink2.stop()
-      channel.stop()
-      channel2.stop()
+      utils.close()
     }
   }
 
-  def writeAndVerify(sinks: Seq[SparkSink], channels: Seq[MemoryChannel]) {
+  def writeAndVerify(sinkPorts: Seq[Int]): Unit = {
     // Set up the streaming context and input streams
     val ssc = new StreamingContext(conf, batchDuration)
-    val addresses = sinks.map(sink => new InetSocketAddress("localhost", sink.getPort()))
+    val addresses = sinkPorts.map(port => new InetSocketAddress("localhost", port))
     val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
       FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
-        eventsPerBatch, 5)
+        utils.eventsPerBatch, 5)
     val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
       with SynchronizedBuffer[Seq[SparkFlumeEvent]]
     val outputStream = new TestOutputStream(flumeStream, outputBuffer)
     outputStream.register()
 
     ssc.start()
-    val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-    val executor = Executors.newCachedThreadPool()
-    val executorCompletion = new ExecutorCompletionService[Void](executor)
-
-    val latch = new CountDownLatch(batchCount * channels.size)
-    sinks.foreach(_.countdownWhenBatchReceived(latch))
-
-    channels.foreach(channel => {
-      executorCompletion.submit(new TxnSubmitter(channel, clock))
-    })
-
-    for (i <- 0 until channels.size) {
-      executorCompletion.take()
-    }
-
-    latch.await(15, TimeUnit.SECONDS) // Ensure all data has been received.
-    clock.advance(batchDuration.milliseconds)
-
-    // The eventually is required to ensure that all data in the batch has been processed.
-    eventually(timeout(10 seconds), interval(100 milliseconds)) {
-      val flattenedBuffer = outputBuffer.flatten
-      assert(flattenedBuffer.size === totalEventsPerChannel * channels.size)
-      var counter = 0
-      for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) {
-        val eventToVerify = EventBuilder.withBody((channels(k).getName + " - " +
-          String.valueOf(i)).getBytes("utf-8"),
-          Map[String, String]("test-" + i.toString -> "header"))
-        var found = false
-        var j = 0
-        while (j < flattenedBuffer.size && !found) {
-          val strToCompare = new String(flattenedBuffer(j).event.getBody.array(), "utf-8")
-          if (new String(eventToVerify.getBody, "utf-8") == strToCompare &&
-            eventToVerify.getHeaders.get("test-" + i.toString)
-              .equals(flattenedBuffer(j).event.getHeaders.get("test-" + i.toString))) {
-            found = true
-            counter += 1
-          }
-          j += 1
-        }
-      }
-      assert(counter === totalEventsPerChannel * channels.size)
-    }
-    ssc.stop()
-  }
-
-  def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
-    val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
-    queueRemaining.setAccessible(true)
-    val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
-    assert(m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] === 5000)
-  }
-
-  private class TxnSubmitter(channel: MemoryChannel, clock: ManualClock) extends Callable[Void] {
-    override def call(): Void = {
-      var t = 0
-      for (i <- 0 until batchCount) {
-        val tx = channel.getTransaction
-        tx.begin()
-        for (j <- 0 until eventsPerBatch) {
-          channel.put(EventBuilder.withBody((channel.getName + " - " + String.valueOf(t)).getBytes(
-            "utf-8"),
-            Map[String, String]("test-" + t.toString -> "header")))
-          t += 1
-        }
-        tx.commit()
-        tx.close()
-        Thread.sleep(500) // Allow some time for the events to reach
+    try {
+      utils.sendDatAndEnsureAllDataHasBeenReceived()
+      val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+      clock.advance(batchDuration.milliseconds)
+
+      // The eventually is required to ensure that all data in the batch has been processed.
+      eventually(timeout(10 seconds), interval(100 milliseconds)) {
+        val flattenOutputBuffer = outputBuffer.flatten
+        val headers = flattenOutputBuffer.map(_.event.getHeaders.map {
+          case kv => (kv._1.toString, kv._2.toString)
+        }).map(mapAsJavaMap)
+        val bodies = flattenOutputBuffer.map(e => new String(e.event.getBody.array(), UTF_8))
+        utils.assertOutput(headers, bodies)
       }
-      null
+    } finally {
+      ssc.stop()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/75b9fe4c/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index c926359..5bc4cdf 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -17,20 +17,12 @@
 
 package org.apache.spark.streaming.flume
 
-import java.net.{InetSocketAddress, ServerSocket}
-import java.nio.ByteBuffer
-
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
 import com.google.common.base.Charsets
-import org.apache.avro.ipc.NettyTransceiver
-import org.apache.avro.ipc.specific.SpecificRequestor
-import org.apache.commons.lang3.RandomUtils
-import org.apache.flume.source.avro
-import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
 import org.jboss.netty.channel.ChannelPipeline
 import org.jboss.netty.channel.socket.SocketChannel
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
@@ -41,22 +33,10 @@ import org.scalatest.concurrent.Eventually._
 import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream}
-import org.apache.spark.util.Utils
 
 class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging {
   val conf = new SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite")
-
   var ssc: StreamingContext = null
-  var transceiver: NettyTransceiver = null
-
-  after {
-    if (ssc != null) {
-      ssc.stop()
-    }
-    if (transceiver != null) {
-      transceiver.close()
-    }
-  }
 
   test("flume input stream") {
     testFlumeStream(testCompression = false)
@@ -69,19 +49,29 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w
   /** Run test on flume stream */
   private def testFlumeStream(testCompression: Boolean): Unit = {
     val input = (1 to 100).map { _.toString }
-    val testPort = findFreePort()
-    val outputBuffer = startContext(testPort, testCompression)
-    writeAndVerify(input, testPort, outputBuffer, testCompression)
-  }
+    val utils = new FlumeTestUtils
+    try {
+      val outputBuffer = startContext(utils.getTestPort(), testCompression)
 
-  /** Find a free port */
-  private def findFreePort(): Int = {
-    val candidatePort = RandomUtils.nextInt(1024, 65536)
-    Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
-      val socket = new ServerSocket(trialPort)
-      socket.close()
-      (null, trialPort)
-    }, conf)._2
+      eventually(timeout(10 seconds), interval(100 milliseconds)) {
+        utils.writeInput(input, testCompression)
+      }
+
+      eventually(timeout(10 seconds), interval(100 milliseconds)) {
+        val outputEvents = outputBuffer.flatten.map { _.event }
+        outputEvents.foreach {
+          event =>
+            event.getHeaders.get("test") should be("header")
+        }
+        val output = outputEvents.map(event => new String(event.getBody.array(), Charsets.UTF_8))
+        output should be (input)
+      }
+    } finally {
+      if (ssc != null) {
+        ssc.stop()
+      }
+      utils.close()
+    }
   }
 
   /** Setup and start the streaming context */
@@ -98,58 +88,6 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w
     outputBuffer
   }
 
-  /** Send data to the flume receiver and verify whether the data was received */
-  private def writeAndVerify(
-      input: Seq[String],
-      testPort: Int,
-      outputBuffer: ArrayBuffer[Seq[SparkFlumeEvent]],
-      enableCompression: Boolean
-    ) {
-    val testAddress = new InetSocketAddress("localhost", testPort)
-
-    val inputEvents = input.map { item =>
-      val event = new AvroFlumeEvent
-      event.setBody(ByteBuffer.wrap(item.getBytes(Charsets.UTF_8)))
-      event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
-      event
-    }
-
-    eventually(timeout(10 seconds), interval(100 milliseconds)) {
-      // if last attempted transceiver had succeeded, close it
-      if (transceiver != null) {
-        transceiver.close()
-        transceiver = null
-      }
-
-      // Create transceiver
-      transceiver = {
-        if (enableCompression) {
-          new NettyTransceiver(testAddress, new CompressionChannelFactory(6))
-        } else {
-          new NettyTransceiver(testAddress)
-        }
-      }
-
-      // Create Avro client with the transceiver
-      val client = SpecificRequestor.getClient(classOf[AvroSourceProtocol], transceiver)
-      client should not be null
-
-      // Send data
-      val status = client.appendBatch(inputEvents.toList)
-      status should be (avro.Status.OK)
-    }
-
-    eventually(timeout(10 seconds), interval(100 milliseconds)) {
-      val outputEvents = outputBuffer.flatten.map { _.event }
-      outputEvents.foreach {
-        event =>
-          event.getHeaders.get("test") should be("header")
-      }
-      val output = outputEvents.map(event => new String(event.getBody.array(), Charsets.UTF_8))
-      output should be (input)
-    }
-  }
-
   /** Class to create socket channel with compression */
   private class CompressionChannelFactory(compressionLevel: Int)
     extends NioClientSocketChannelFactory {

http://git-wip-us.apache.org/repos/asf/spark/blob/75b9fe4c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 94dd512..211da9e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -102,6 +102,7 @@
     <module>external/twitter</module>
     <module>external/flume</module>
     <module>external/flume-sink</module>
+    <module>external/flume-assembly</module>
     <module>external/mqtt</module>
     <module>external/zeromq</module>
     <module>examples</module>

http://git-wip-us.apache.org/repos/asf/spark/blob/75b9fe4c/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index f5f1c9a..4ef4dc8 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -45,8 +45,8 @@ object BuildCommons {
     sparkKinesisAsl) = Seq("yarn", "yarn-stable", "java8-tests", "ganglia-lgpl",
     "kinesis-asl").map(ProjectRef(buildLocation, _))
 
-  val assemblyProjects@Seq(assembly, examples, networkYarn, streamingKafkaAssembly) =
-    Seq("assembly", "examples", "network-yarn", "streaming-kafka-assembly")
+  val assemblyProjects@Seq(assembly, examples, networkYarn, streamingFlumeAssembly, streamingKafkaAssembly) =
+    Seq("assembly", "examples", "network-yarn", "streaming-flume-assembly", "streaming-kafka-assembly")
       .map(ProjectRef(buildLocation, _))
 
   val tools = ProjectRef(buildLocation, "tools")
@@ -347,7 +347,7 @@ object Assembly {
         .getOrElse(SbtPomKeys.effectivePom.value.getProperties.get("hadoop.version").asInstanceOf[String])
     },
     jarName in assembly <<= (version, moduleName, hadoopVersion) map { (v, mName, hv) =>
-      if (mName.contains("streaming-kafka-assembly")) {
+      if (mName.contains("streaming-flume-assembly") || mName.contains("streaming-kafka-assembly")) {
         // This must match the same name used in maven (see external/kafka-assembly/pom.xml)
         s"${mName}-${v}.jar"
       } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/75b9fe4c/python/pyspark/streaming/flume.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/flume.py b/python/pyspark/streaming/flume.py
new file mode 100644
index 0000000..cbb573f
--- /dev/null
+++ b/python/pyspark/streaming/flume.py
@@ -0,0 +1,147 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+if sys.version >= "3":
+    from io import BytesIO
+else:
+    from StringIO import StringIO
+from py4j.java_gateway import Py4JJavaError
+
+from pyspark.storagelevel import StorageLevel
+from pyspark.serializers import PairDeserializer, NoOpSerializer, UTF8Deserializer, read_int
+from pyspark.streaming import DStream
+
+__all__ = ['FlumeUtils', 'utf8_decoder']
+
+
+def utf8_decoder(s):
+    """ Decode the unicode as UTF-8 """
+    return s and s.decode('utf-8')
+
+
+class FlumeUtils(object):
+
+    @staticmethod
+    def createStream(ssc, hostname, port,
+                     storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2,
+                     enableDecompression=False,
+                     bodyDecoder=utf8_decoder):
+        """
+        Create an input stream that pulls events from Flume.
+
+        :param ssc:  StreamingContext object
+        :param hostname:  Hostname of the slave machine to which the flume data will be sent
+        :param port:  Port of the slave machine to which the flume data will be sent
+        :param storageLevel:  Storage level to use for storing the received objects
+        :param enableDecompression:  Should netty server decompress input stream
+        :param bodyDecoder:  A function used to decode body (default is utf8_decoder)
+        :return: A DStream object
+        """
+        jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
+
+        try:
+            helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
+                .loadClass("org.apache.spark.streaming.flume.FlumeUtilsPythonHelper")
+            helper = helperClass.newInstance()
+            jstream = helper.createStream(ssc._jssc, hostname, port, jlevel, enableDecompression)
+        except Py4JJavaError as e:
+            if 'ClassNotFoundException' in str(e.java_exception):
+                FlumeUtils._printErrorMsg(ssc.sparkContext)
+            raise e
+
+        return FlumeUtils._toPythonDStream(ssc, jstream, bodyDecoder)
+
+    @staticmethod
+    def createPollingStream(ssc, addresses,
+                            storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2,
+                            maxBatchSize=1000,
+                            parallelism=5,
+                            bodyDecoder=utf8_decoder):
+        """
+        Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+        This stream will poll the sink for data and will pull events as they are available.
+
+        :param ssc:  StreamingContext object
+        :param addresses:  List of (host, port)s on which the Spark Sink is running.
+        :param storageLevel:  Storage level to use for storing the received objects
+        :param maxBatchSize:  The maximum number of events to be pulled from the Spark sink
+                              in a single RPC call
+        :param parallelism:  Number of concurrent requests this stream should send to the sink.
+                             Note that having a higher number of requests concurrently being pulled
+                             will result in this stream using more threads
+        :param bodyDecoder:  A function used to decode body (default is utf8_decoder)
+        :return: A DStream object
+        """
+        jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
+        hosts = []
+        ports = []
+        for (host, port) in addresses:
+            hosts.append(host)
+            ports.append(port)
+
+        try:
+            helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
+                .loadClass("org.apache.spark.streaming.flume.FlumeUtilsPythonHelper")
+            helper = helperClass.newInstance()
+            jstream = helper.createPollingStream(
+                ssc._jssc, hosts, ports, jlevel, maxBatchSize, parallelism)
+        except Py4JJavaError as e:
+            if 'ClassNotFoundException' in str(e.java_exception):
+                FlumeUtils._printErrorMsg(ssc.sparkContext)
+            raise e
+
+        return FlumeUtils._toPythonDStream(ssc, jstream, bodyDecoder)
+
+    @staticmethod
+    def _toPythonDStream(ssc, jstream, bodyDecoder):
+        ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
+        stream = DStream(jstream, ssc, ser)
+
+        def func(event):
+            headersBytes = BytesIO(event[0]) if sys.version >= "3" else StringIO(event[0])
+            headers = {}
+            strSer = UTF8Deserializer()
+            for i in range(0, read_int(headersBytes)):
+                key = strSer.loads(headersBytes)
+                value = strSer.loads(headersBytes)
+                headers[key] = value
+            body = bodyDecoder(event[1])
+            return (headers, body)
+        return stream.map(func)
+
+    @staticmethod
+    def _printErrorMsg(sc):
+        print("""
+________________________________________________________________________________________________
+
+  Spark Streaming's Flume libraries not found in class path. Try one of the following.
+
+  1. Include the Flume library and its dependencies with in the
+     spark-submit command as
+
+     $ bin/spark-submit --packages org.apache.spark:spark-streaming-flume:%s ...
+
+  2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
+     Group Id = org.apache.spark, Artifact Id = spark-streaming-flume-assembly, Version = %s.
+     Then, include the jar in the spark-submit command as
+
+     $ bin/spark-submit --jars <spark-streaming-flume-assembly.jar> ...
+
+________________________________________________________________________________________________
+
+""" % (sc.version, sc.version))

http://git-wip-us.apache.org/repos/asf/spark/blob/75b9fe4c/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 91ce681..188c8ff 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -38,6 +38,7 @@ else:
 from pyspark.context import SparkConf, SparkContext, RDD
 from pyspark.streaming.context import StreamingContext
 from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPartition
+from pyspark.streaming.flume import FlumeUtils
 
 
 class PySparkStreamingTestCase(unittest.TestCase):
@@ -677,7 +678,156 @@ class KafkaStreamTests(PySparkStreamingTestCase):
         rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders)
         self._validateRddResult(sendData, rdd)
 
-if __name__ == "__main__":
+
+class FlumeStreamTests(PySparkStreamingTestCase):
+    timeout = 20  # seconds
+    duration = 1
+
+    def setUp(self):
+        super(FlumeStreamTests, self).setUp()
+
+        utilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
+            .loadClass("org.apache.spark.streaming.flume.FlumeTestUtils")
+        self._utils = utilsClz.newInstance()
+
+    def tearDown(self):
+        if self._utils is not None:
+            self._utils.close()
+            self._utils = None
+
+        super(FlumeStreamTests, self).tearDown()
+
+    def _startContext(self, n, compressed):
+        # Start the StreamingContext and also collect the result
+        dstream = FlumeUtils.createStream(self.ssc, "localhost", self._utils.getTestPort(),
+                                          enableDecompression=compressed)
+        result = []
+
+        def get_output(_, rdd):
+            for event in rdd.collect():
+                if len(result) < n:
+                    result.append(event)
+        dstream.foreachRDD(get_output)
+        self.ssc.start()
+        return result
+
+    def _validateResult(self, input, result):
+        # Validate both the header and the body
+        header = {"test": "header"}
+        self.assertEqual(len(input), len(result))
+        for i in range(0, len(input)):
+            self.assertEqual(header, result[i][0])
+            self.assertEqual(input[i], result[i][1])
+
+    def _writeInput(self, input, compressed):
+        # Try to write input to the receiver until success or timeout
+        start_time = time.time()
+        while True:
+            try:
+                self._utils.writeInput(input, compressed)
+                break
+            except:
+                if time.time() - start_time < self.timeout:
+                    time.sleep(0.01)
+                else:
+                    raise
+
+    def test_flume_stream(self):
+        input = [str(i) for i in range(1, 101)]
+        result = self._startContext(len(input), False)
+        self._writeInput(input, False)
+        self.wait_for(result, len(input))
+        self._validateResult(input, result)
+
+    def test_compressed_flume_stream(self):
+        input = [str(i) for i in range(1, 101)]
+        result = self._startContext(len(input), True)
+        self._writeInput(input, True)
+        self.wait_for(result, len(input))
+        self._validateResult(input, result)
+
+
+class FlumePollingStreamTests(PySparkStreamingTestCase):
+    timeout = 20  # seconds
+    duration = 1
+    maxAttempts = 5
+
+    def setUp(self):
+        utilsClz = \
+            self.sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
+                .loadClass("org.apache.spark.streaming.flume.PollingFlumeTestUtils")
+        self._utils = utilsClz.newInstance()
+
+    def tearDown(self):
+        if self._utils is not None:
+            self._utils.close()
+            self._utils = None
+
+    def _writeAndVerify(self, ports):
+        # Set up the streaming context and input streams
+        ssc = StreamingContext(self.sc, self.duration)
+        try:
+            addresses = [("localhost", port) for port in ports]
+            dstream = FlumeUtils.createPollingStream(
+                ssc,
+                addresses,
+                maxBatchSize=self._utils.eventsPerBatch(),
+                parallelism=5)
+            outputBuffer = []
+
+            def get_output(_, rdd):
+                for e in rdd.collect():
+                    outputBuffer.append(e)
+
+            dstream.foreachRDD(get_output)
+            ssc.start()
+            self._utils.sendDatAndEnsureAllDataHasBeenReceived()
+
+            self.wait_for(outputBuffer, self._utils.getTotalEvents())
+            outputHeaders = [event[0] for event in outputBuffer]
+            outputBodies = [event[1] for event in outputBuffer]
+            self._utils.assertOutput(outputHeaders, outputBodies)
+        finally:
+            ssc.stop(False)
+
+    def _testMultipleTimes(self, f):
+        attempt = 0
+        while True:
+            try:
+                f()
+                break
+            except:
+                attempt += 1
+                if attempt >= self.maxAttempts:
+                    raise
+                else:
+                    import traceback
+                    traceback.print_exc()
+
+    def _testFlumePolling(self):
+        try:
+            port = self._utils.startSingleSink()
+            self._writeAndVerify([port])
+            self._utils.assertChannelsAreEmpty()
+        finally:
+            self._utils.close()
+
+    def _testFlumePollingMultipleHosts(self):
+        try:
+            port = self._utils.startSingleSink()
+            self._writeAndVerify([port])
+            self._utils.assertChannelsAreEmpty()
+        finally:
+            self._utils.close()
+
+    def test_flume_polling(self):
+        self._testMultipleTimes(self._testFlumePolling)
+
+    def test_flume_polling_multiple_hosts(self):
+        self._testMultipleTimes(self._testFlumePollingMultipleHosts)
+
+
+def search_kafka_assembly_jar():
     SPARK_HOME = os.environ["SPARK_HOME"]
     kafka_assembly_dir = os.path.join(SPARK_HOME, "external/kafka-assembly")
     jars = glob.glob(
@@ -692,5 +842,30 @@ if __name__ == "__main__":
         raise Exception(("Found multiple Spark Streaming Kafka assembly JARs in %s; please "
                          "remove all but one") % kafka_assembly_dir)
     else:
-        os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars[0]
+        return jars[0]
+
+
+def search_flume_assembly_jar():
+    SPARK_HOME = os.environ["SPARK_HOME"]
+    flume_assembly_dir = os.path.join(SPARK_HOME, "external/flume-assembly")
+    jars = glob.glob(
+        os.path.join(flume_assembly_dir, "target/scala-*/spark-streaming-flume-assembly-*.jar"))
+    if not jars:
+        raise Exception(
+            ("Failed to find Spark Streaming Flume assembly jar in %s. " % flume_assembly_dir) +
+            "You need to build Spark with "
+            "'build/sbt assembly/assembly streaming-flume-assembly/assembly' or "
+            "'build/mvn package' before running this test")
+    elif len(jars) > 1:
+        raise Exception(("Found multiple Spark Streaming Flume assembly JARs in %s; please "
+                         "remove all but one") % flume_assembly_dir)
+    else:
+        return jars[0]
+
+if __name__ == "__main__":
+    kafka_assembly_jar = search_kafka_assembly_jar()
+    flume_assembly_jar = search_flume_assembly_jar()
+    jars = "%s,%s" % (kafka_assembly_jar, flume_assembly_jar)
+
+    os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
     unittest.main()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org