You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/03/26 01:37:20 UTC

[2/2] spark git commit: [SPARK-14073][STREAMING][TEST-MAVEN] Move flume back to Spark

[SPARK-14073][STREAMING][TEST-MAVEN] Move flume back to Spark

## What changes were proposed in this pull request?

This PR moves flume back to Spark as per the discussion in the dev mail-list.

## How was this patch tested?

Existing Jenkins tests.

Author: Shixiong Zhu <sh...@databricks.com>

Closes #11895 from zsxwing/move-flume-back.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24587ce4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24587ce4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24587ce4

Branch: refs/heads/master
Commit: 24587ce433aa30f30a5d1ed6566365f24c222a27
Parents: 54d13be
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Fri Mar 25 17:37:16 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Fri Mar 25 17:37:16 2016 -0700

----------------------------------------------------------------------
 dev/audit-release/audit_release.py              |   2 +-
 dev/run-tests.py                                |   1 +
 dev/sparktestsupport/modules.py                 |  34 ++
 examples/pom.xml                                |   5 +
 .../examples/streaming/JavaFlumeEventCount.java |  75 +++++
 .../examples/streaming/FlumeEventCount.scala    |  70 +++++
 .../streaming/FlumePollingEventCount.scala      |  67 ++++
 external/flume-assembly/pom.xml                 | 168 ++++++++++
 external/flume-sink/pom.xml                     | 129 ++++++++
 .../flume-sink/src/main/avro/sparkflume.avdl    |  40 +++
 .../spark/streaming/flume/sink/Logging.scala    | 127 ++++++++
 .../flume/sink/SparkAvroCallbackHandler.scala   | 166 ++++++++++
 .../spark/streaming/flume/sink/SparkSink.scala  | 171 ++++++++++
 .../flume/sink/SparkSinkThreadFactory.scala     |  35 +++
 .../streaming/flume/sink/SparkSinkUtils.scala   |  28 ++
 .../flume/sink/TransactionProcessor.scala       | 252 +++++++++++++++
 .../src/test/resources/log4j.properties         |  28 ++
 .../streaming/flume/sink/SparkSinkSuite.scala   | 218 +++++++++++++
 external/flume/pom.xml                          |  78 +++++
 .../streaming/flume/EventTransformer.scala      |  72 +++++
 .../streaming/flume/FlumeBatchFetcher.scala     | 166 ++++++++++
 .../streaming/flume/FlumeInputDStream.scala     | 205 ++++++++++++
 .../flume/FlumePollingInputDStream.scala        | 123 ++++++++
 .../spark/streaming/flume/FlumeTestUtils.scala  | 117 +++++++
 .../spark/streaming/flume/FlumeUtils.scala      | 311 +++++++++++++++++++
 .../streaming/flume/PollingFlumeTestUtils.scala | 209 +++++++++++++
 .../spark/streaming/flume/package-info.java     |  21 ++
 .../apache/spark/streaming/flume/package.scala  |  23 ++
 .../streaming/LocalJavaStreamingContext.java    |  44 +++
 .../flume/JavaFlumePollingStreamSuite.java      |  44 +++
 .../streaming/flume/JavaFlumeStreamSuite.java   |  36 +++
 .../flume/src/test/resources/log4j.properties   |  28 ++
 .../spark/streaming/TestOutputStream.scala      |  48 +++
 .../flume/FlumePollingStreamSuite.scala         | 130 ++++++++
 .../streaming/flume/FlumeStreamSuite.scala      | 103 ++++++
 pom.xml                                         |  48 +++
 project/SparkBuild.scala                        |  22 +-
 python/pyspark/streaming/flume.py               | 140 +++++++++
 python/pyspark/streaming/tests.py               | 168 +++++++++-
 .../apache/spark/sql/JavaDataFrameSuite.java    |  33 +-
 40 files changed, 3765 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/dev/audit-release/audit_release.py
----------------------------------------------------------------------
diff --git a/dev/audit-release/audit_release.py b/dev/audit-release/audit_release.py
index 426b311..ee72da4 100755
--- a/dev/audit-release/audit_release.py
+++ b/dev/audit-release/audit_release.py
@@ -116,7 +116,7 @@ original_dir = os.getcwd()
 # dependencies within those projects.
 modules = [
     "spark-core", "spark-mllib", "spark-streaming", "spark-repl",
-    "spark-graphx", "spark-streaming-kafka",
+    "spark-graphx", "spark-streaming-flume", "spark-streaming-kafka",
     "spark-catalyst", "spark-sql", "spark-hive", "spark-streaming-kinesis-asl"
 ]
 modules = map(lambda m: "%s_%s" % (m, SCALA_BINARY_VERSION), modules)

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/dev/run-tests.py
----------------------------------------------------------------------
diff --git a/dev/run-tests.py b/dev/run-tests.py
index d940cda..c294474 100755
--- a/dev/run-tests.py
+++ b/dev/run-tests.py
@@ -337,6 +337,7 @@ def build_spark_sbt(hadoop_version):
     build_profiles = get_hadoop_profiles(hadoop_version) + modules.root.build_profile_flags
     sbt_goals = ["package",
                  "streaming-kafka-assembly/assembly",
+                 "streaming-flume-assembly/assembly",
                  "streaming-kinesis-asl-assembly/assembly"]
     profiles_and_goals = build_profiles + sbt_goals
 

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/dev/sparktestsupport/modules.py
----------------------------------------------------------------------
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index d118488..bb04ec6 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -223,6 +223,39 @@ streaming_kafka = Module(
 )
 
 
+streaming_flume_sink = Module(
+    name="streaming-flume-sink",
+    dependencies=[streaming],
+    source_file_regexes=[
+        "external/flume-sink",
+    ],
+    sbt_test_goals=[
+        "streaming-flume-sink/test",
+    ]
+)
+
+
+streaming_flume = Module(
+    name="streaming-flume",
+    dependencies=[streaming],
+    source_file_regexes=[
+        "external/flume",
+    ],
+    sbt_test_goals=[
+        "streaming-flume/test",
+    ]
+)
+
+
+streaming_flume_assembly = Module(
+    name="streaming-flume-assembly",
+    dependencies=[streaming_flume, streaming_flume_sink],
+    source_file_regexes=[
+        "external/flume-assembly",
+    ]
+)
+
+
 mllib = Module(
     name="mllib",
     dependencies=[streaming, sql],
@@ -294,6 +327,7 @@ pyspark_streaming = Module(
         pyspark_core,
         streaming,
         streaming_kafka,
+        streaming_flume_assembly,
         streaming_kinesis_asl
     ],
     source_file_regexes=[

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 1aa730c..b7f3797 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -67,6 +67,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
       <artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
new file mode 100644
index 0000000..da56637
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.examples.streaming.StreamingExamples;
+import org.apache.spark.streaming.*;
+import org.apache.spark.streaming.api.java.*;
+import org.apache.spark.streaming.flume.FlumeUtils;
+import org.apache.spark.streaming.flume.SparkFlumeEvent;
+
+/**
+ *  Produces a count of events received from Flume.
+ *
+ *  This should be used in conjunction with an AvroSink in Flume. It will start
+ *  an Avro server on at the request host:port address and listen for requests.
+ *  Your Flume AvroSink should be pointed to this address.
+ *
+ *  Usage: JavaFlumeEventCount <host> <port>
+ *    <host> is the host the Flume receiver will be started on - a receiver
+ *           creates a server and listens for flume events.
+ *    <port> is the port the Flume receiver will listen on.
+ *
+ *  To run this example:
+ *     `$ bin/run-example org.apache.spark.examples.streaming.JavaFlumeEventCount <host> <port>`
+ */
+public final class JavaFlumeEventCount {
+  private JavaFlumeEventCount() {
+  }
+
+  public static void main(String[] args) {
+    if (args.length != 2) {
+      System.err.println("Usage: JavaFlumeEventCount <host> <port>");
+      System.exit(1);
+    }
+
+    StreamingExamples.setStreamingLogLevels();
+
+    String host = args[0];
+    int port = Integer.parseInt(args[1]);
+
+    Duration batchInterval = new Duration(2000);
+    SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
+    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
+    JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);
+
+    flumeStream.count();
+
+    flumeStream.count().map(new Function<Long, String>() {
+      @Override
+      public String call(Long in) {
+        return "Received " + in + " flume events.";
+      }
+    }).print();
+
+    ssc.start();
+    ssc.awaitTermination();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
new file mode 100644
index 0000000..91e52e4
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.streaming
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.flume._
+import org.apache.spark.util.IntParam
+
+/**
+ *  Produces a count of events received from Flume.
+ *
+ *  This should be used in conjunction with an AvroSink in Flume. It will start
+ *  an Avro server on at the request host:port address and listen for requests.
+ *  Your Flume AvroSink should be pointed to this address.
+ *
+ *  Usage: FlumeEventCount <host> <port>
+ *    <host> is the host the Flume receiver will be started on - a receiver
+ *           creates a server and listens for flume events.
+ *    <port> is the port the Flume receiver will listen on.
+ *
+ *  To run this example:
+ *    `$ bin/run-example org.apache.spark.examples.streaming.FlumeEventCount <host> <port> `
+ */
+object FlumeEventCount {
+  def main(args: Array[String]) {
+    if (args.length < 2) {
+      System.err.println(
+        "Usage: FlumeEventCount <host> <port>")
+      System.exit(1)
+    }
+
+    StreamingExamples.setStreamingLogLevels()
+
+    val Array(host, IntParam(port)) = args
+
+    val batchInterval = Milliseconds(2000)
+
+    // Create the context and set the batch size
+    val sparkConf = new SparkConf().setAppName("FlumeEventCount")
+    val ssc = new StreamingContext(sparkConf, batchInterval)
+
+    // Create a flume stream
+    val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
+
+    // Print out the count of events received from this server in each batch
+    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
+
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala
new file mode 100644
index 0000000..dd725d7
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.streaming
+
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.flume._
+import org.apache.spark.util.IntParam
+
+/**
+ *  Produces a count of events received from Flume.
+ *
+ *  This should be used in conjunction with the Spark Sink running in a Flume agent. See
+ *  the Spark Streaming programming guide for more details.
+ *
+ *  Usage: FlumePollingEventCount <host> <port>
+ *    `host` is the host on which the Spark Sink is running.
+ *    `port` is the port at which the Spark Sink is listening.
+ *
+ *  To run this example:
+ *    `$ bin/run-example org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] `
+ */
+object FlumePollingEventCount {
+  def main(args: Array[String]) {
+    if (args.length < 2) {
+      System.err.println(
+        "Usage: FlumePollingEventCount <host> <port>")
+      System.exit(1)
+    }
+
+    StreamingExamples.setStreamingLogLevels()
+
+    val Array(host, IntParam(port)) = args
+
+    val batchInterval = Milliseconds(2000)
+
+    // Create the context and set the batch size
+    val sparkConf = new SparkConf().setAppName("FlumePollingEventCount")
+    val ssc = new StreamingContext(sparkConf, batchInterval)
+
+    // Create a flume stream that polls the Spark Sink running in a Flume agent
+    val stream = FlumeUtils.createPollingStream(ssc, host, port)
+
+    // Print out the count of events received from this server in each batch
+    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
+
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml
new file mode 100644
index 0000000..ac15b93
--- /dev/null
+++ b/external/flume-assembly/pom.xml
@@ -0,0 +1,168 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.11</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.spark</groupId>
+  <artifactId>spark-streaming-flume-assembly_2.11</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Project External Flume Assembly</name>
+  <url>http://spark.apache.org/</url>
+
+  <properties>
+    <hadoop.deps.scope>provided</hadoop.deps.scope>
+    <sbt.project.name>streaming-flume-assembly</sbt.project.name>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty-util</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <!--
+      Demote already included in the Spark assembly. These are transitive dependencies of flume
+      or spark-streaming-flume, and this need to be explicitly included even through the parent
+      pom may declare them with ${hadoop.deps.scope}.
+    -->
+    <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-net</groupId>
+      <artifactId>commons-net</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-ipc</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-mapred</artifactId>
+      <classifier>${avro.mapred.classifier}</classifier>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <configuration>
+          <shadedArtifactAttached>false</shadedArtifactAttached>
+          <artifactSet>
+            <includes>
+              <include>*:*</include>
+            </includes>
+          </artifactSet>
+          <filters>
+            <filter>
+              <artifact>*:*</artifact>
+              <excludes>
+                <exclude>META-INF/*.SF</exclude>
+                <exclude>META-INF/*.DSA</exclude>
+                <exclude>META-INF/*.RSA</exclude>
+              </excludes>
+            </filter>
+          </filters>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <transformers>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+                  <resource>reference.conf</resource>
+                </transformer>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+                  <resource>log4j.properties</resource>
+                </transformer>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
+              </transformers>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <profiles>
+    <profile>
+      <id>flume-provided</id>
+      <properties>
+        <flume.deps.scope>provided</flume.deps.scope>
+      </properties>
+    </profile>
+  </profiles>
+</project>
+

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume-sink/pom.xml
----------------------------------------------------------------------
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
new file mode 100644
index 0000000..e4effe1
--- /dev/null
+++ b/external/flume-sink/pom.xml
@@ -0,0 +1,129 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.11</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.spark</groupId>
+  <artifactId>spark-streaming-flume-sink_2.11</artifactId>
+  <properties>
+    <sbt.project.name>streaming-flume-sink</sbt.project.name>
+  </properties>
+  <packaging>jar</packaging>
+  <name>Spark Project External Flume Sink</name>
+  <url>http://spark.apache.org/</url>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+      <exclusions>
+        <!-- Guava is excluded to avoid its use in this module. -->
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+        <!--
+          Exclude libthrift since the flume poms seem to confuse sbt, which fails to find the
+          dependency.
+        -->
+        <exclusion>
+          <groupId>org.apache.thrift</groupId>
+          <artifactId>libthrift</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.thrift</groupId>
+          <artifactId>libthrift</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+    </dependency>
+    <dependency>
+      <!-- Add Guava in test scope since flume actually needs it. -->
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <!--
+        Netty explicitly added in test as it has been excluded from
+        Flume dependency (to avoid runtime problems when running with
+        Spark) but unit tests need it. Version of Netty on which
+        Flume 1.4.0 depends on is "3.4.0.Final" .
+      -->
+      <groupId>io.netty</groupId>
+      <artifactId>netty</artifactId>
+      <version>3.4.0.Final</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
+    </dependency>
+  </dependencies>
+  <build>
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-maven-plugin</artifactId>
+        <version>${avro.version}</version>
+        <configuration>
+          <!-- Generate the output in the same directory as the sbt-avro-plugin -->
+          <outputDirectory>${project.basedir}/target/scala-${scala.binary.version}/src_managed/main/compiled_avro</outputDirectory>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>idl-protocol</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <configuration>
+          <!-- Disable all relocations defined in the parent pom. -->
+          <relocations combine.self="override" />
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume-sink/src/main/avro/sparkflume.avdl
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/avro/sparkflume.avdl b/external/flume-sink/src/main/avro/sparkflume.avdl
new file mode 100644
index 0000000..8806e86
--- /dev/null
+++ b/external/flume-sink/src/main/avro/sparkflume.avdl
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+@namespace("org.apache.spark.streaming.flume.sink")
+
+protocol SparkFlumeProtocol {
+
+  record SparkSinkEvent {
+    map<string> headers;
+    bytes body;
+  }
+
+  record EventBatch {
+    string errorMsg = ""; // If this is empty it is a valid message, else it represents an error
+    string sequenceNumber;
+    array<SparkSinkEvent> events;
+  }
+
+  EventBatch getEventBatch (int n);
+
+  void ack (string sequenceNumber);
+
+  void nack (string sequenceNumber);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
new file mode 100644
index 0000000..09d3fe9
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+ * Copy of the org.apache.spark.Logging for being used in the Spark Sink.
+ * The org.apache.spark.Logging is not used so that all of Spark is not brought
+ * in as a dependency.
+ */
+private[sink] trait Logging {
+  // Make the log field transient so that objects with Logging can
+  // be serialized and used on another machine
+  @transient private var _log: Logger = null
+
+  // Method to get or create the logger for this object
+  protected def log: Logger = {
+    if (_log == null) {
+      initializeIfNecessary()
+      var className = this.getClass.getName
+      // Ignore trailing $'s in the class names for Scala objects
+      if (className.endsWith("$")) {
+        className = className.substring(0, className.length - 1)
+      }
+      _log = LoggerFactory.getLogger(className)
+    }
+    _log
+  }
+
+  // Log methods that take only a String
+  protected def logInfo(msg: => String) {
+    if (log.isInfoEnabled) log.info(msg)
+  }
+
+  protected def logDebug(msg: => String) {
+    if (log.isDebugEnabled) log.debug(msg)
+  }
+
+  protected def logTrace(msg: => String) {
+    if (log.isTraceEnabled) log.trace(msg)
+  }
+
+  protected def logWarning(msg: => String) {
+    if (log.isWarnEnabled) log.warn(msg)
+  }
+
+  protected def logError(msg: => String) {
+    if (log.isErrorEnabled) log.error(msg)
+  }
+
+  // Log methods that take Throwables (Exceptions/Errors) too
+  protected def logInfo(msg: => String, throwable: Throwable) {
+    if (log.isInfoEnabled) log.info(msg, throwable)
+  }
+
+  protected def logDebug(msg: => String, throwable: Throwable) {
+    if (log.isDebugEnabled) log.debug(msg, throwable)
+  }
+
+  protected def logTrace(msg: => String, throwable: Throwable) {
+    if (log.isTraceEnabled) log.trace(msg, throwable)
+  }
+
+  protected def logWarning(msg: => String, throwable: Throwable) {
+    if (log.isWarnEnabled) log.warn(msg, throwable)
+  }
+
+  protected def logError(msg: => String, throwable: Throwable) {
+    if (log.isErrorEnabled) log.error(msg, throwable)
+  }
+
+  protected def isTraceEnabled(): Boolean = {
+    log.isTraceEnabled
+  }
+
+  private def initializeIfNecessary() {
+    if (!Logging.initialized) {
+      Logging.initLock.synchronized {
+        if (!Logging.initialized) {
+          initializeLogging()
+        }
+      }
+    }
+  }
+
+  private def initializeLogging() {
+    Logging.initialized = true
+
+    // Force a call into slf4j to initialize it. Avoids this happening from multiple threads
+    // and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html
+    log
+  }
+}
+
+private[sink] object Logging {
+  @volatile private var initialized = false
+  val initLock = new Object()
+  try {
+    // We use reflection here to handle the case where users remove the
+    // slf4j-to-jul bridge order to route their logs to JUL.
+    // scalastyle:off classforname
+    val bridgeClass = Class.forName("org.slf4j.bridge.SLF4JBridgeHandler")
+    // scalastyle:on classforname
+    bridgeClass.getMethod("removeHandlersForRootLogger").invoke(null)
+    val installed = bridgeClass.getMethod("isInstalled").invoke(null).asInstanceOf[Boolean]
+    if (!installed) {
+      bridgeClass.getMethod("install").invoke(null)
+    }
+  } catch {
+    case e: ClassNotFoundException => // can't log anything yet so just fail silently
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
new file mode 100644
index 0000000..719fca0
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import java.util.UUID
+import java.util.concurrent.{CountDownLatch, Executors}
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable
+
+import org.apache.flume.Channel
+
+/**
+ * Class that implements the SparkFlumeProtocol, that is used by the Avro Netty Server to process
+ * requests. Each getEvents, ack and nack call is forwarded to an instance of this class.
+ * @param threads Number of threads to use to process requests.
+ * @param channel The channel that the sink pulls events from
+ * @param transactionTimeout Timeout in millis after which the transaction if not acked by Spark
+ *                           is rolled back.
+ */
+// Flume forces transactions to be thread-local. So each transaction *must* be committed, or
+// rolled back from the thread it was originally created in. So each getEvents call from Spark
+// creates a TransactionProcessor which runs in a new thread, in which the transaction is created
+// and events are pulled off the channel. Once the events are sent to spark,
+// that thread is blocked and the TransactionProcessor is saved in a map,
+// until an ACK or NACK comes back or the transaction times out (after the specified timeout).
+// When the response comes or a timeout is hit, the TransactionProcessor is retrieved and then
+// unblocked, at which point the transaction is committed or rolled back.
+
+private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
+  val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging {
+  val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
+    new SparkSinkThreadFactory("Spark Sink Processor Thread - %d")))
+  // Protected by `sequenceNumberToProcessor`
+  private val sequenceNumberToProcessor = mutable.HashMap[CharSequence, TransactionProcessor]()
+  // This sink will not persist sequence numbers and reuses them if it gets restarted.
+  // So it is possible to commit a transaction which may have been meant for the sink before the
+  // restart.
+  // Since the new txn may not have the same sequence number we must guard against accidentally
+  // committing a new transaction. To reduce the probability of that happening a random string is
+  // prepended to the sequence number. Does not change for life of sink
+  private val seqBase = UUID.randomUUID().toString.substring(0, 8)
+  private val seqCounter = new AtomicLong(0)
+
+  // Protected by `sequenceNumberToProcessor`
+  private var stopped = false
+
+  @volatile private var isTest = false
+  private var testLatch: CountDownLatch = null
+
+  /**
+   * Returns a bunch of events to Spark over Avro RPC.
+   * @param n Maximum number of events to return in a batch
+   * @return [[EventBatch]] instance that has a sequence number and an array of at most n events
+   */
+  override def getEventBatch(n: Int): EventBatch = {
+    logDebug("Got getEventBatch call from Spark.")
+    val sequenceNumber = seqBase + seqCounter.incrementAndGet()
+    createProcessor(sequenceNumber, n) match {
+      case Some(processor) =>
+        transactionExecutorOpt.foreach(_.submit(processor))
+        // Wait until a batch is available - will be an error if error message is non-empty
+        val batch = processor.getEventBatch
+        if (SparkSinkUtils.isErrorBatch(batch)) {
+          // Remove the processor if it is an error batch since no ACK is sent.
+          removeAndGetProcessor(sequenceNumber)
+          logWarning("Received an error batch - no events were received from channel! ")
+        }
+        batch
+      case None =>
+        new EventBatch("Spark sink has been stopped!", "", java.util.Collections.emptyList())
+    }
+  }
+
+  private def createProcessor(seq: String, n: Int): Option[TransactionProcessor] = {
+    sequenceNumberToProcessor.synchronized {
+      if (!stopped) {
+        val processor = new TransactionProcessor(
+          channel, seq, n, transactionTimeout, backOffInterval, this)
+        sequenceNumberToProcessor.put(seq, processor)
+        if (isTest) {
+          processor.countDownWhenBatchAcked(testLatch)
+        }
+        Some(processor)
+      } else {
+        None
+      }
+    }
+  }
+
+  /**
+   * Called by Spark to indicate successful commit of a batch
+   * @param sequenceNumber The sequence number of the event batch that was successful
+   */
+  override def ack(sequenceNumber: CharSequence): Void = {
+    logDebug("Received Ack for batch with sequence number: " + sequenceNumber)
+    completeTransaction(sequenceNumber, success = true)
+    null
+  }
+
+  /**
+   * Called by Spark to indicate failed commit of a batch
+   * @param sequenceNumber The sequence number of the event batch that failed
+   * @return
+   */
+  override def nack(sequenceNumber: CharSequence): Void = {
+    completeTransaction(sequenceNumber, success = false)
+    logInfo("Spark failed to commit transaction. Will reattempt events.")
+    null
+  }
+
+  /**
+   * Helper method to commit or rollback a transaction.
+   * @param sequenceNumber The sequence number of the batch that was completed
+   * @param success Whether the batch was successful or not.
+   */
+  private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) {
+    removeAndGetProcessor(sequenceNumber).foreach(processor => {
+      processor.batchProcessed(success)
+    })
+  }
+
+  /**
+   * Helper method to remove the TxnProcessor for a Sequence Number. Can be used to avoid a leak.
+   * @param sequenceNumber
+   * @return An `Option` of the transaction processor for the corresponding batch. Note that this
+   *         instance is no longer tracked and the caller is responsible for that txn processor.
+   */
+  private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence):
+      Option[TransactionProcessor] = {
+    sequenceNumberToProcessor.synchronized {
+      sequenceNumberToProcessor.remove(sequenceNumber.toString)
+    }
+  }
+
+  private[sink] def countDownWhenBatchAcked(latch: CountDownLatch) {
+    testLatch = latch
+    isTest = true
+  }
+
+  /**
+   * Shuts down the executor used to process transactions.
+   */
+  def shutdown() {
+    logInfo("Shutting down Spark Avro Callback Handler")
+    sequenceNumberToProcessor.synchronized {
+      stopped = true
+      sequenceNumberToProcessor.values.foreach(_.shutdown())
+    }
+    transactionExecutorOpt.foreach(_.shutdownNow())
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
new file mode 100644
index 0000000..14dffb1
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import java.net.InetSocketAddress
+import java.util.concurrent._
+
+import org.apache.avro.ipc.NettyServer
+import org.apache.avro.ipc.specific.SpecificResponder
+import org.apache.flume.Context
+import org.apache.flume.Sink.Status
+import org.apache.flume.conf.{Configurable, ConfigurationException}
+import org.apache.flume.sink.AbstractSink
+
+/**
+ * A sink that uses Avro RPC to run a server that can be polled by Spark's
+ * FlumePollingInputDStream. This sink has the following configuration parameters:
+ *
+ * hostname - The hostname to bind to. Default: 0.0.0.0
+ * port - The port to bind to. (No default - mandatory)
+ * timeout - Time in seconds after which a transaction is rolled back,
+ * if an ACK is not received from Spark within that time
+ * threads - Number of threads to use to receive requests from Spark (Default: 10)
+ *
+ * This sink is unlike other Flume sinks in the sense that it does not push data,
+ * instead the process method in this sink simply blocks the SinkRunner the first time it is
+ * called. This sink starts up an Avro IPC server that uses the SparkFlumeProtocol.
+ *
+ * Each time a getEventBatch call comes, creates a transaction and reads events
+ * from the channel. When enough events are read, the events are sent to the Spark receiver and
+ * the thread itself is blocked and a reference to it saved off.
+ *
+ * When the ack for that batch is received,
+ * the thread which created the transaction is is retrieved and it commits the transaction with the
+ * channel from the same thread it was originally created in (since Flume transactions are
+ * thread local). If a nack is received instead, the sink rolls back the transaction. If no ack
+ * is received within the specified timeout, the transaction is rolled back too. If an ack comes
+ * after that, it is simply ignored and the events get re-sent.
+ *
+ */
+
+class SparkSink extends AbstractSink with Logging with Configurable {
+
+  // Size of the pool to use for holding transaction processors.
+  private var poolSize: Integer = SparkSinkConfig.DEFAULT_THREADS
+
+  // Timeout for each transaction. If spark does not respond in this much time,
+  // rollback the transaction
+  private var transactionTimeout = SparkSinkConfig.DEFAULT_TRANSACTION_TIMEOUT
+
+  // Address info to bind on
+  private var hostname: String = SparkSinkConfig.DEFAULT_HOSTNAME
+  private var port: Int = 0
+
+  private var backOffInterval: Int = 200
+
+  // Handle to the server
+  private var serverOpt: Option[NettyServer] = None
+
+  // The handler that handles the callback from Avro
+  private var handler: Option[SparkAvroCallbackHandler] = None
+
+  // Latch that blocks off the Flume framework from wasting 1 thread.
+  private val blockingLatch = new CountDownLatch(1)
+
+  override def start() {
+    logInfo("Starting Spark Sink: " + getName + " on port: " + port + " and interface: " +
+      hostname + " with " + "pool size: " + poolSize + " and transaction timeout: " +
+      transactionTimeout + ".")
+    handler = Option(new SparkAvroCallbackHandler(poolSize, getChannel, transactionTimeout,
+      backOffInterval))
+    val responder = new SpecificResponder(classOf[SparkFlumeProtocol], handler.get)
+    // Using the constructor that takes specific thread-pools requires bringing in netty
+    // dependencies which are being excluded in the build. In practice,
+    // Netty dependencies are already available on the JVM as Flume would have pulled them in.
+    serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port)))
+    serverOpt.foreach(server => {
+      logInfo("Starting Avro server for sink: " + getName)
+      server.start()
+    })
+    super.start()
+  }
+
+  override def stop() {
+    logInfo("Stopping Spark Sink: " + getName)
+    handler.foreach(callbackHandler => {
+      callbackHandler.shutdown()
+    })
+    serverOpt.foreach(server => {
+      logInfo("Stopping Avro Server for sink: " + getName)
+      server.close()
+      server.join()
+    })
+    blockingLatch.countDown()
+    super.stop()
+  }
+
+  override def configure(ctx: Context) {
+    import SparkSinkConfig._
+    hostname = ctx.getString(CONF_HOSTNAME, DEFAULT_HOSTNAME)
+    port = Option(ctx.getInteger(CONF_PORT)).
+      getOrElse(throw new ConfigurationException("The port to bind to must be specified"))
+    poolSize = ctx.getInteger(THREADS, DEFAULT_THREADS)
+    transactionTimeout = ctx.getInteger(CONF_TRANSACTION_TIMEOUT, DEFAULT_TRANSACTION_TIMEOUT)
+    backOffInterval = ctx.getInteger(CONF_BACKOFF_INTERVAL, DEFAULT_BACKOFF_INTERVAL)
+    logInfo("Configured Spark Sink with hostname: " + hostname + ", port: " + port + ", " +
+      "poolSize: " + poolSize + ", transactionTimeout: " + transactionTimeout + ", " +
+      "backoffInterval: " + backOffInterval)
+  }
+
+  override def process(): Status = {
+    // This method is called in a loop by the Flume framework - block it until the sink is
+    // stopped to save CPU resources. The sink runner will interrupt this thread when the sink is
+    // being shut down.
+    logInfo("Blocking Sink Runner, sink will continue to run..")
+    blockingLatch.await()
+    Status.BACKOFF
+  }
+
+  private[flume] def getPort(): Int = {
+    serverOpt
+      .map(_.getPort)
+      .getOrElse(
+        throw new RuntimeException("Server was not started!")
+      )
+  }
+
+  /**
+   * Pass in a [[CountDownLatch]] for testing purposes. This batch is counted down when each
+   * batch is received. The test can simply call await on this latch till the expected number of
+   * batches are received.
+   * @param latch
+   */
+  private[flume] def countdownWhenBatchReceived(latch: CountDownLatch) {
+    handler.foreach(_.countDownWhenBatchAcked(latch))
+  }
+}
+
+/**
+ * Configuration parameters and their defaults.
+ */
+private[flume]
+object SparkSinkConfig {
+  val THREADS = "threads"
+  val DEFAULT_THREADS = 10
+
+  val CONF_TRANSACTION_TIMEOUT = "timeout"
+  val DEFAULT_TRANSACTION_TIMEOUT = 60
+
+  val CONF_HOSTNAME = "hostname"
+  val DEFAULT_HOSTNAME = "0.0.0.0"
+
+  val CONF_PORT = "port"
+
+  val CONF_BACKOFF_INTERVAL = "backoffInterval"
+  val DEFAULT_BACKOFF_INTERVAL = 200
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala
new file mode 100644
index 0000000..845fc8d
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import java.util.concurrent.ThreadFactory
+import java.util.concurrent.atomic.AtomicLong
+
+/**
+ * Thread factory that generates daemon threads with a specified name format.
+ */
+private[sink] class SparkSinkThreadFactory(nameFormat: String) extends ThreadFactory {
+
+  private val threadId = new AtomicLong()
+
+  override def newThread(r: Runnable): Thread = {
+    val t = new Thread(r, nameFormat.format(threadId.incrementAndGet()))
+    t.setDaemon(true)
+    t
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala
new file mode 100644
index 0000000..47c0e29
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+private[flume] object SparkSinkUtils {
+  /**
+   * This method determines if this batch represents an error or not.
+   * @param batch - The batch to check
+   * @return - true if the batch represents an error
+   */
+  def isErrorBatch(batch: EventBatch): Boolean = {
+    !batch.getErrorMsg.toString.equals("") // If there is an error message, it is an error batch.
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
new file mode 100644
index 0000000..b15c209
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import java.nio.ByteBuffer
+import java.util
+import java.util.concurrent.{Callable, CountDownLatch, TimeUnit}
+
+import scala.util.control.Breaks
+
+import org.apache.flume.{Channel, Transaction}
+
+// Flume forces transactions to be thread-local (horrible, I know!)
+// So the sink basically spawns a new thread to pull the events out within a transaction.
+// The thread fills in the event batch object that is set before the thread is scheduled.
+// After filling it in, the thread waits on a condition - which is released only
+// when the success message comes back for the specific sequence number for that event batch.
+/**
+ * This class represents a transaction on the Flume channel. This class runs a separate thread
+ * which owns the transaction. The thread is blocked until the success call for that transaction
+ * comes back with an ACK or NACK.
+ * @param channel The channel from which to pull events
+ * @param seqNum The sequence number to use for the transaction. Must be unique
+ * @param maxBatchSize The maximum number of events to process per batch
+ * @param transactionTimeout Time in seconds after which a transaction must be rolled back
+ *                           without waiting for an ACK from Spark
+ * @param parent The parent [[SparkAvroCallbackHandler]] instance, for reporting timeouts
+ */
+private class TransactionProcessor(val channel: Channel, val seqNum: String,
+  var maxBatchSize: Int, val transactionTimeout: Int, val backOffInterval: Int,
+  val parent: SparkAvroCallbackHandler) extends Callable[Void] with Logging {
+
+  // If a real batch is not returned, we always have to return an error batch.
+  @volatile private var eventBatch: EventBatch = new EventBatch("Unknown Error", "",
+    util.Collections.emptyList())
+
+  // Synchronization primitives
+  val batchGeneratedLatch = new CountDownLatch(1)
+  val batchAckLatch = new CountDownLatch(1)
+
+  // Sanity check to ensure we don't loop like crazy
+  val totalAttemptsToRemoveFromChannel = Int.MaxValue / 2
+
+  // OK to use volatile, since the change would only make this true (otherwise it will be
+  // changed to false - we never apply a negation operation to this) - which means the transaction
+  // succeeded.
+  @volatile private var batchSuccess = false
+
+  @volatile private var stopped = false
+
+  @volatile private var isTest = false
+
+  private var testLatch: CountDownLatch = null
+
+  // The transaction that this processor would handle
+  var txOpt: Option[Transaction] = None
+
+  /**
+   * Get an event batch from the channel. This method will block until a batch of events is
+   * available from the channel. If no events are available after a large number of attempts of
+   * polling the channel, this method will return an [[EventBatch]] with a non-empty error message
+   *
+   * @return An [[EventBatch]] instance with sequence number set to seqNum, filled with a
+   *         maximum of maxBatchSize events
+   */
+  def getEventBatch: EventBatch = {
+    batchGeneratedLatch.await()
+    eventBatch
+  }
+
+  /**
+   * This method is to be called by the sink when it receives an ACK or NACK from Spark. This
+   * method is a no-op if it is called after transactionTimeout has expired since
+   * getEventBatch returned a batch of events.
+   * @param success True if an ACK was received and the transaction should be committed, else false.
+   */
+  def batchProcessed(success: Boolean) {
+    logDebug("Batch processed for sequence number: " + seqNum)
+    batchSuccess = success
+    batchAckLatch.countDown()
+  }
+
+  private[flume] def shutdown(): Unit = {
+    logDebug("Shutting down transaction processor")
+    stopped = true
+  }
+
+  /**
+   * Populates events into the event batch. If the batch cannot be populated,
+   * this method will not set the events into the event batch, but it sets an error message.
+   */
+  private def populateEvents() {
+    try {
+      txOpt = Option(channel.getTransaction)
+      if(txOpt.isEmpty) {
+        eventBatch.setErrorMsg("Something went wrong. Channel was " +
+          "unable to create a transaction!")
+      }
+      txOpt.foreach(tx => {
+        tx.begin()
+        val events = new util.ArrayList[SparkSinkEvent](maxBatchSize)
+        val loop = new Breaks
+        var gotEventsInThisTxn = false
+        var loopCounter: Int = 0
+        loop.breakable {
+          while (!stopped && events.size() < maxBatchSize
+            && loopCounter < totalAttemptsToRemoveFromChannel) {
+            loopCounter += 1
+            Option(channel.take()) match {
+              case Some(event) =>
+                events.add(new SparkSinkEvent(toCharSequenceMap(event.getHeaders),
+                  ByteBuffer.wrap(event.getBody)))
+                gotEventsInThisTxn = true
+              case None =>
+                if (!gotEventsInThisTxn && !stopped) {
+                  logDebug("Sleeping for " + backOffInterval + " millis as no events were read in" +
+                    " the current transaction")
+                  TimeUnit.MILLISECONDS.sleep(backOffInterval)
+                } else {
+                  loop.break()
+                }
+            }
+          }
+        }
+        if (!gotEventsInThisTxn && !stopped) {
+          val msg = "Tried several times, " +
+            "but did not get any events from the channel!"
+          logWarning(msg)
+          eventBatch.setErrorMsg(msg)
+        } else {
+          // At this point, the events are available, so fill them into the event batch
+          eventBatch = new EventBatch("", seqNum, events)
+        }
+      })
+    } catch {
+      case interrupted: InterruptedException =>
+        // Don't pollute logs if the InterruptedException came from this being stopped
+        if (!stopped) {
+          logWarning("Error while processing transaction.", interrupted)
+        }
+      case e: Exception =>
+        logWarning("Error while processing transaction.", e)
+        eventBatch.setErrorMsg(e.getMessage)
+        try {
+          txOpt.foreach(tx => {
+            rollbackAndClose(tx, close = true)
+          })
+        } finally {
+          txOpt = None
+        }
+    } finally {
+      batchGeneratedLatch.countDown()
+    }
+  }
+
+  /**
+   * Waits for upto transactionTimeout seconds for an ACK. If an ACK comes in
+   * this method commits the transaction with the channel. If the ACK does not come in within
+   * that time or a NACK comes in, this method rolls back the transaction.
+   */
+  private def processAckOrNack() {
+    batchAckLatch.await(transactionTimeout, TimeUnit.SECONDS)
+    txOpt.foreach(tx => {
+      if (batchSuccess) {
+        try {
+          logDebug("Committing transaction")
+          tx.commit()
+        } catch {
+          case e: Exception =>
+            logWarning("Error while attempting to commit transaction. Transaction will be rolled " +
+              "back", e)
+            rollbackAndClose(tx, close = false) // tx will be closed later anyway
+        } finally {
+          tx.close()
+          if (isTest) {
+            testLatch.countDown()
+          }
+        }
+      } else {
+        logWarning("Spark could not commit transaction, NACK received. Rolling back transaction.")
+        rollbackAndClose(tx, close = true)
+        // This might have been due to timeout or a NACK. Either way the following call does not
+        // cause issues. This is required to ensure the TransactionProcessor instance is not leaked
+        parent.removeAndGetProcessor(seqNum)
+      }
+    })
+  }
+
+  /**
+   * Helper method to rollback and optionally close a transaction
+   * @param tx The transaction to rollback
+   * @param close Whether the transaction should be closed or not after rolling back
+   */
+  private def rollbackAndClose(tx: Transaction, close: Boolean) {
+    try {
+      logWarning("Spark was unable to successfully process the events. Transaction is being " +
+        "rolled back.")
+      tx.rollback()
+    } catch {
+      case e: Exception =>
+        logError("Error rolling back transaction. Rollback may have failed!", e)
+    } finally {
+      if (close) {
+        tx.close()
+      }
+    }
+  }
+
+  /**
+   * Helper method to convert a Map[String, String] to Map[CharSequence, CharSequence]
+   * @param inMap The map to be converted
+   * @return The converted map
+   */
+  private def toCharSequenceMap(inMap: java.util.Map[String, String]): java.util.Map[CharSequence,
+    CharSequence] = {
+    val charSeqMap = new util.HashMap[CharSequence, CharSequence](inMap.size())
+    charSeqMap.putAll(inMap)
+    charSeqMap
+  }
+
+  /**
+   * When the thread is started it sets as many events as the batch size or less (if enough
+   * events aren't available) into the eventBatch and object and lets any threads waiting on the
+   * [[getEventBatch]] method to proceed. Then this thread waits for acks or nacks to come in,
+   * or for a specified timeout and commits or rolls back the transaction.
+   * @return
+   */
+  override def call(): Void = {
+    populateEvents()
+    processAckOrNack()
+    null
+  }
+
+  private[sink] def countDownWhenBatchAcked(latch: CountDownLatch) {
+    testLatch = latch
+    isTest = true
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume-sink/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/test/resources/log4j.properties b/external/flume-sink/src/test/resources/log4j.properties
new file mode 100644
index 0000000..42df879
--- /dev/null
+++ b/external/flume-sink/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file streaming/target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.spark-project.jetty=WARN
+

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
new file mode 100644
index 0000000..e8ca1e7
--- /dev/null
+++ b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import java.net.InetSocketAddress
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+import org.apache.avro.ipc.NettyTransceiver
+import org.apache.avro.ipc.specific.SpecificRequestor
+import org.apache.flume.Context
+import org.apache.flume.channel.MemoryChannel
+import org.apache.flume.event.EventBuilder
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+
+// Due to MNG-1378, there is not a way to include test dependencies transitively.
+// We cannot include Spark core tests as a dependency here because it depends on
+// Spark core main, which has too many dependencies to require here manually.
+// For this reason, we continue to use FunSuite and ignore the scalastyle checks
+// that fail if this is detected.
+// scalastyle:off
+import org.scalatest.FunSuite
+
+class SparkSinkSuite extends FunSuite {
+// scalastyle:on
+
+  val eventsPerBatch = 1000
+  val channelCapacity = 5000
+
+  test("Success with ack") {
+    val (channel, sink, latch) = initializeChannelAndSink()
+    channel.start()
+    sink.start()
+
+    putEvents(channel, eventsPerBatch)
+
+    val port = sink.getPort
+    val address = new InetSocketAddress("0.0.0.0", port)
+
+    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
+    val events = client.getEventBatch(1000)
+    client.ack(events.getSequenceNumber)
+    assert(events.getEvents.size() === 1000)
+    latch.await(1, TimeUnit.SECONDS)
+    assertChannelIsEmpty(channel)
+    sink.stop()
+    channel.stop()
+    transceiver.close()
+  }
+
+  test("Failure with nack") {
+    val (channel, sink, latch) = initializeChannelAndSink()
+    channel.start()
+    sink.start()
+    putEvents(channel, eventsPerBatch)
+
+    val port = sink.getPort
+    val address = new InetSocketAddress("0.0.0.0", port)
+
+    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
+    val events = client.getEventBatch(1000)
+    assert(events.getEvents.size() === 1000)
+    client.nack(events.getSequenceNumber)
+    latch.await(1, TimeUnit.SECONDS)
+    assert(availableChannelSlots(channel) === 4000)
+    sink.stop()
+    channel.stop()
+    transceiver.close()
+  }
+
+  test("Failure with timeout") {
+    val (channel, sink, latch) = initializeChannelAndSink(Map(SparkSinkConfig
+      .CONF_TRANSACTION_TIMEOUT -> 1.toString))
+    channel.start()
+    sink.start()
+    putEvents(channel, eventsPerBatch)
+    val port = sink.getPort
+    val address = new InetSocketAddress("0.0.0.0", port)
+
+    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
+    val events = client.getEventBatch(1000)
+    assert(events.getEvents.size() === 1000)
+    latch.await(1, TimeUnit.SECONDS)
+    assert(availableChannelSlots(channel) === 4000)
+    sink.stop()
+    channel.stop()
+    transceiver.close()
+  }
+
+  test("Multiple consumers") {
+    testMultipleConsumers(failSome = false)
+  }
+
+  test("Multiple consumers with some failures") {
+    testMultipleConsumers(failSome = true)
+  }
+
+  def testMultipleConsumers(failSome: Boolean): Unit = {
+    implicit val executorContext = ExecutionContext
+      .fromExecutorService(Executors.newFixedThreadPool(5))
+    val (channel, sink, latch) = initializeChannelAndSink(Map.empty, 5)
+    channel.start()
+    sink.start()
+    (1 to 5).foreach(_ => putEvents(channel, eventsPerBatch))
+    val port = sink.getPort
+    val address = new InetSocketAddress("0.0.0.0", port)
+    val transceiversAndClients = getTransceiverAndClient(address, 5)
+    val batchCounter = new CountDownLatch(5)
+    val counter = new AtomicInteger(0)
+    transceiversAndClients.foreach(x => {
+      Future {
+        val client = x._2
+        val events = client.getEventBatch(1000)
+        if (!failSome || counter.getAndIncrement() % 2 == 0) {
+          client.ack(events.getSequenceNumber)
+        } else {
+          client.nack(events.getSequenceNumber)
+          throw new RuntimeException("Sending NACK for failure!")
+        }
+        events
+      }.onComplete {
+        case Success(events) =>
+          assert(events.getEvents.size() === 1000)
+          batchCounter.countDown()
+        case Failure(t) =>
+          // Don't re-throw the exception, causes a nasty unnecessary stack trace on stdout
+          batchCounter.countDown()
+      }
+    })
+    batchCounter.await()
+    latch.await(1, TimeUnit.SECONDS)
+    executorContext.shutdown()
+    if(failSome) {
+      assert(availableChannelSlots(channel) === 3000)
+    } else {
+      assertChannelIsEmpty(channel)
+    }
+    sink.stop()
+    channel.stop()
+    transceiversAndClients.foreach(x => x._1.close())
+  }
+
+  private def initializeChannelAndSink(overrides: Map[String, String] = Map.empty,
+    batchCounter: Int = 1): (MemoryChannel, SparkSink, CountDownLatch) = {
+    val channel = new MemoryChannel()
+    val channelContext = new Context()
+
+    channelContext.put("capacity", channelCapacity.toString)
+    channelContext.put("transactionCapacity", 1000.toString)
+    channelContext.put("keep-alive", 0.toString)
+    channelContext.putAll(overrides.asJava)
+    channel.setName(scala.util.Random.nextString(10))
+    channel.configure(channelContext)
+
+    val sink = new SparkSink()
+    val sinkContext = new Context()
+    sinkContext.put(SparkSinkConfig.CONF_HOSTNAME, "0.0.0.0")
+    sinkContext.put(SparkSinkConfig.CONF_PORT, 0.toString)
+    sink.configure(sinkContext)
+    sink.setChannel(channel)
+    val latch = new CountDownLatch(batchCounter)
+    sink.countdownWhenBatchReceived(latch)
+    (channel, sink, latch)
+  }
+
+  private def putEvents(ch: MemoryChannel, count: Int): Unit = {
+    val tx = ch.getTransaction
+    tx.begin()
+    (1 to count).foreach(x =>
+      ch.put(EventBuilder.withBody(x.toString.getBytes(StandardCharsets.UTF_8))))
+    tx.commit()
+    tx.close()
+  }
+
+  private def getTransceiverAndClient(address: InetSocketAddress,
+    count: Int): Seq[(NettyTransceiver, SparkFlumeProtocol.Callback)] = {
+
+    (1 to count).map(_ => {
+      lazy val channelFactoryExecutor = Executors.newCachedThreadPool(
+        new SparkSinkThreadFactory("Flume Receiver Channel Thread - %d"))
+      lazy val channelFactory =
+        new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor)
+      val transceiver = new NettyTransceiver(address, channelFactory)
+      val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
+      (transceiver, client)
+    })
+  }
+
+  private def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
+    assert(availableChannelSlots(channel) === channelCapacity)
+  }
+
+  private def availableChannelSlots(channel: MemoryChannel): Int = {
+    val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
+    queueRemaining.setAccessible(true)
+    val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
+    m.invoke(queueRemaining.get(channel)).asInstanceOf[Int]
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/pom.xml
----------------------------------------------------------------------
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
new file mode 100644
index 0000000..d650dd0
--- /dev/null
+++ b/external/flume/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.11</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.spark</groupId>
+  <artifactId>spark-streaming-flume_2.11</artifactId>
+  <properties>
+    <sbt.project.name>streaming-flume</sbt.project.name>
+  </properties>
+  <packaging>jar</packaging>
+  <name>Spark Project External Flume</name>
+  <url>http://spark.apache.org/</url>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming-flume-sink_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.scalacheck</groupId>
+      <artifactId>scalacheck_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
+    </dependency>
+  </dependencies>
+  <build>
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
new file mode 100644
index 0000000..07c5286
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.io.{ObjectInput, ObjectOutput}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * A simple object that provides the implementation of readExternal and writeExternal for both
+ * the wrapper classes for Flume-style Events.
+ */
+private[streaming] object EventTransformer extends Logging {
+  def readExternal(in: ObjectInput): (java.util.HashMap[CharSequence, CharSequence],
+    Array[Byte]) = {
+    val bodyLength = in.readInt()
+    val bodyBuff = new Array[Byte](bodyLength)
+    in.readFully(bodyBuff)
+
+    val numHeaders = in.readInt()
+    val headers = new java.util.HashMap[CharSequence, CharSequence]
+
+    for (i <- 0 until numHeaders) {
+      val keyLength = in.readInt()
+      val keyBuff = new Array[Byte](keyLength)
+      in.readFully(keyBuff)
+      val key: String = Utils.deserialize(keyBuff)
+
+      val valLength = in.readInt()
+      val valBuff = new Array[Byte](valLength)
+      in.readFully(valBuff)
+      val value: String = Utils.deserialize(valBuff)
+
+      headers.put(key, value)
+    }
+    (headers, bodyBuff)
+  }
+
+  def writeExternal(out: ObjectOutput, headers: java.util.Map[CharSequence, CharSequence],
+    body: Array[Byte]) {
+    out.writeInt(body.length)
+    out.write(body)
+    val numHeaders = headers.size()
+    out.writeInt(numHeaders)
+    for ((k, v) <- headers.asScala) {
+      val keyBuff = Utils.serialize(k.toString)
+      out.writeInt(keyBuff.length)
+      out.write(keyBuff)
+      val valBuff = Utils.serialize(v.toString)
+      out.writeInt(valBuff.length)
+      out.write(valBuff)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
new file mode 100644
index 0000000..5f234b1
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume
+
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.base.Throwables
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.streaming.flume.sink._
+
+/**
+ * This class implements the core functionality of [[FlumePollingReceiver]]. When started it
+ * pulls data from Flume, stores it to Spark and then sends an Ack or Nack. This class should be
+ * run via an [[java.util.concurrent.Executor]] as this implements [[Runnable]]
+ *
+ * @param receiver The receiver that owns this instance.
+ */
+
+private[flume] class FlumeBatchFetcher(receiver: FlumePollingReceiver) extends Runnable with
+  Logging {
+
+  def run(): Unit = {
+    while (!receiver.isStopped()) {
+      val connection = receiver.getConnections.poll()
+      val client = connection.client
+      var batchReceived = false
+      var seq: CharSequence = null
+      try {
+        getBatch(client) match {
+          case Some(eventBatch) =>
+            batchReceived = true
+            seq = eventBatch.getSequenceNumber
+            val events = toSparkFlumeEvents(eventBatch.getEvents)
+            if (store(events)) {
+              sendAck(client, seq)
+            } else {
+              sendNack(batchReceived, client, seq)
+            }
+          case None =>
+        }
+      } catch {
+        case e: Exception =>
+          Throwables.getRootCause(e) match {
+            // If the cause was an InterruptedException, then check if the receiver is stopped -
+            // if yes, just break out of the loop. Else send a Nack and log a warning.
+            // In the unlikely case, the cause was not an Exception,
+            // then just throw it out and exit.
+            case interrupted: InterruptedException =>
+              if (!receiver.isStopped()) {
+                logWarning("Interrupted while receiving data from Flume", interrupted)
+                sendNack(batchReceived, client, seq)
+              }
+            case exception: Exception =>
+              logWarning("Error while receiving data from Flume", exception)
+              sendNack(batchReceived, client, seq)
+          }
+      } finally {
+        receiver.getConnections.add(connection)
+      }
+    }
+  }
+
+  /**
+   * Gets a batch of events from the specified client. This method does not handle any exceptions
+   * which will be propagated to the caller.
+   * @param client Client to get events from
+   * @return [[Some]] which contains the event batch if Flume sent any events back, else [[None]]
+   */
+  private def getBatch(client: SparkFlumeProtocol.Callback): Option[EventBatch] = {
+    val eventBatch = client.getEventBatch(receiver.getMaxBatchSize)
+    if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
+      // No error, proceed with processing data
+      logDebug(s"Received batch of ${eventBatch.getEvents.size} events with sequence " +
+        s"number: ${eventBatch.getSequenceNumber}")
+      Some(eventBatch)
+    } else {
+      logWarning("Did not receive events from Flume agent due to error on the Flume agent: " +
+        eventBatch.getErrorMsg)
+      None
+    }
+  }
+
+  /**
+   * Store the events in the buffer to Spark. This method will not propagate any exceptions,
+   * but will propagate any other errors.
+   * @param buffer The buffer to store
+   * @return true if the data was stored without any exception being thrown, else false
+   */
+  private def store(buffer: ArrayBuffer[SparkFlumeEvent]): Boolean = {
+    try {
+      receiver.store(buffer)
+      true
+    } catch {
+      case e: Exception =>
+        logWarning("Error while attempting to store data received from Flume", e)
+        false
+    }
+  }
+
+  /**
+   * Send an ack to the client for the sequence number. This method does not handle any exceptions
+   * which will be propagated to the caller.
+   * @param client client to send the ack to
+   * @param seq sequence number of the batch to be ack-ed.
+   * @return
+   */
+  private def sendAck(client: SparkFlumeProtocol.Callback, seq: CharSequence): Unit = {
+    logDebug("Sending ack for sequence number: " + seq)
+    client.ack(seq)
+    logDebug("Ack sent for sequence number: " + seq)
+  }
+
+  /**
+   * This method sends a Nack if a batch was received to the client with the given sequence
+   * number. Any exceptions thrown by the RPC call is simply thrown out as is - no effort is made
+   * to handle it.
+   * @param batchReceived true if a batch was received. If this is false, no nack is sent
+   * @param client The client to which the nack should be sent
+   * @param seq The sequence number of the batch that is being nack-ed.
+   */
+  private def sendNack(batchReceived: Boolean, client: SparkFlumeProtocol.Callback,
+    seq: CharSequence): Unit = {
+    if (batchReceived) {
+      // Let Flume know that the events need to be pushed back into the channel.
+      logDebug("Sending nack for sequence number: " + seq)
+      client.nack(seq) // If the agent is down, even this could fail and throw
+      logDebug("Nack sent for sequence number: " + seq)
+    }
+  }
+
+  /**
+   * Utility method to convert [[SparkSinkEvent]]s to [[SparkFlumeEvent]]s
+   * @param events - Events to convert to SparkFlumeEvents
+   * @return - The SparkFlumeEvent generated from SparkSinkEvent
+   */
+  private def toSparkFlumeEvents(events: java.util.List[SparkSinkEvent]):
+    ArrayBuffer[SparkFlumeEvent] = {
+    // Convert each Flume event to a serializable SparkFlumeEvent
+    val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
+    var j = 0
+    while (j < events.size()) {
+      val event = events.get(j)
+      val sparkFlumeEvent = new SparkFlumeEvent()
+      sparkFlumeEvent.event.setBody(event.getBody)
+      sparkFlumeEvent.event.setHeaders(event.getHeaders)
+      buffer += sparkFlumeEvent
+      j += 1
+    }
+    buffer
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org