You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/03/15 00:56:15 UTC

[4/5] spark git commit: [SPARK-13843][STREAMING] Remove streaming-flume, streaming-mqtt, streaming-zeromq, streaming-akka, streaming-twitter to Spark packages

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala
deleted file mode 100644
index 38c35c5..0000000
--- a/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.akka
-
-import scala.reflect.ClassTag
-
-import akka.actor.{ActorSystem, Props, SupervisorStrategy}
-
-import org.apache.spark.api.java.function.{Function0 => JFunction0}
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-
-object AkkaUtils {
-
-  /**
-   * Create an input stream with a user-defined actor. See [[ActorReceiver]] for more details.
-   *
-   * @param ssc The StreamingContext instance
-   * @param propsForActor Props object defining creation of the actor
-   * @param actorName Name of the actor
-   * @param storageLevel RDD storage level (default: StorageLevel.MEMORY_AND_DISK_SER_2)
-   * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
-   *                           be shut down when the receiver is stopping (default:
-   *                           ActorReceiver.defaultActorSystemCreator)
-   * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy)
-   *
-   * @note An important point to note:
-   *       Since Actor may exist outside the spark framework, It is thus user's responsibility
-   *       to ensure the type safety, i.e. parametrized type of data received and createStream
-   *       should be same.
-   */
-  def createStream[T: ClassTag](
-      ssc: StreamingContext,
-      propsForActor: Props,
-      actorName: String,
-      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
-      actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator,
-      supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultSupervisorStrategy
-    ): ReceiverInputDStream[T] = ssc.withNamedScope("actor stream") {
-    val cleanF = ssc.sc.clean(actorSystemCreator)
-    ssc.receiverStream(new ActorReceiverSupervisor[T](
-      cleanF,
-      propsForActor,
-      actorName,
-      storageLevel,
-      supervisorStrategy))
-  }
-
-  /**
-   * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details.
-   *
-   * @param jssc The StreamingContext instance
-   * @param propsForActor Props object defining creation of the actor
-   * @param actorName Name of the actor
-   * @param storageLevel Storage level to use for storing the received objects
-   * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
-   *                           be shut down when the receiver is stopping.
-   * @param supervisorStrategy the supervisor strategy
-   *
-   * @note An important point to note:
-   *       Since Actor may exist outside the spark framework, It is thus user's responsibility
-   *       to ensure the type safety, i.e. parametrized type of data received and createStream
-   *       should be same.
-   */
-  def createStream[T](
-      jssc: JavaStreamingContext,
-      propsForActor: Props,
-      actorName: String,
-      storageLevel: StorageLevel,
-      actorSystemCreator: JFunction0[ActorSystem],
-      supervisorStrategy: SupervisorStrategy
-    ): JavaReceiverInputDStream[T] = {
-    implicit val cm: ClassTag[T] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    createStream[T](
-      jssc.ssc,
-      propsForActor,
-      actorName,
-      storageLevel,
-      () => actorSystemCreator.call(),
-      supervisorStrategy)
-  }
-
-  /**
-   * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details.
-   *
-   * @param jssc The StreamingContext instance
-   * @param propsForActor Props object defining creation of the actor
-   * @param actorName Name of the actor
-   * @param storageLevel Storage level to use for storing the received objects
-   *
-   * @note An important point to note:
-   *       Since Actor may exist outside the spark framework, It is thus user's responsibility
-   *       to ensure the type safety, i.e. parametrized type of data received and createStream
-   *       should be same.
-   */
-  def createStream[T](
-      jssc: JavaStreamingContext,
-      propsForActor: Props,
-      actorName: String,
-      storageLevel: StorageLevel
-    ): JavaReceiverInputDStream[T] = {
-    implicit val cm: ClassTag[T] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    createStream[T](jssc.ssc, propsForActor, actorName, storageLevel)
-  }
-
-  /**
-   * Create an input stream with a user-defined actor. Storage level of the data will be the default
-   * StorageLevel.MEMORY_AND_DISK_SER_2. See [[JavaActorReceiver]] for more details.
-   *
-   * @param jssc The StreamingContext instance
-   * @param propsForActor Props object defining creation of the actor
-   * @param actorName Name of the actor
-   *
-   * @note An important point to note:
-   *       Since Actor may exist outside the spark framework, It is thus user's responsibility
-   *       to ensure the type safety, i.e. parametrized type of data received and createStream
-   *       should be same.
-   */
-  def createStream[T](
-      jssc: JavaStreamingContext,
-      propsForActor: Props,
-      actorName: String
-    ): JavaReceiverInputDStream[T] = {
-    implicit val cm: ClassTag[T] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    createStream[T](jssc.ssc, propsForActor, actorName)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
----------------------------------------------------------------------
diff --git a/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java b/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
deleted file mode 100644
index ac5ef31..0000000
--- a/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.akka;
-
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.SupervisorStrategy;
-import akka.util.Timeout;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.Test;
-
-import org.apache.spark.api.java.function.Function0;
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-
-public class JavaAkkaUtilsSuite {
-
-  @Test // tests the API, does not actually test data receiving
-  public void testAkkaUtils() {
-    JavaStreamingContext jsc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
-    try {
-      JavaReceiverInputDStream<String> test1 = AkkaUtils.<String>createStream(
-        jsc, Props.create(JavaTestActor.class), "test");
-      JavaReceiverInputDStream<String> test2 = AkkaUtils.<String>createStream(
-        jsc, Props.create(JavaTestActor.class), "test", StorageLevel.MEMORY_AND_DISK_SER_2());
-      JavaReceiverInputDStream<String> test3 = AkkaUtils.<String>createStream(
-        jsc,
-        Props.create(JavaTestActor.class),
-        "test", StorageLevel.MEMORY_AND_DISK_SER_2(),
-        new ActorSystemCreatorForTest(),
-        SupervisorStrategy.defaultStrategy());
-    } finally {
-      jsc.stop();
-    }
-  }
-}
-
-class ActorSystemCreatorForTest implements Function0<ActorSystem> {
-  @Override
-  public ActorSystem call() {
-    return null;
-  }
-}
-
-
-class JavaTestActor extends JavaActorReceiver {
-  @Override
-  public void onReceive(Object message) throws Exception {
-    store((String) message);
-    store((String) message, new Timeout(1000));
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
deleted file mode 100644
index ce95d9d..0000000
--- a/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.akka
-
-import scala.concurrent.duration._
-
-import akka.actor.{Props, SupervisorStrategy}
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-
-class AkkaUtilsSuite extends SparkFunSuite {
-
-  test("createStream") {
-    val ssc: StreamingContext = new StreamingContext("local[2]", "test", Seconds(1000))
-    try {
-      // tests the API, does not actually test data receiving
-      val test1: ReceiverInputDStream[String] = AkkaUtils.createStream(
-        ssc, Props[TestActor](), "test")
-      val test2: ReceiverInputDStream[String] = AkkaUtils.createStream(
-        ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2)
-      val test3: ReceiverInputDStream[String] = AkkaUtils.createStream(
-        ssc,
-        Props[TestActor](),
-        "test",
-        StorageLevel.MEMORY_AND_DISK_SER_2,
-        supervisorStrategy = SupervisorStrategy.defaultStrategy)
-      val test4: ReceiverInputDStream[String] = AkkaUtils.createStream(
-        ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
-      val test5: ReceiverInputDStream[String] = AkkaUtils.createStream(
-        ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
-      val test6: ReceiverInputDStream[String] = AkkaUtils.createStream(
-        ssc,
-        Props[TestActor](),
-        "test",
-        StorageLevel.MEMORY_AND_DISK_SER_2,
-        () => null,
-        SupervisorStrategy.defaultStrategy)
-    } finally {
-      ssc.stop()
-    }
-  }
-}
-
-class TestActor extends ActorReceiver {
-  override def receive: Receive = {
-    case m: String => store(m)
-    case m => store(m, 10.seconds)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml
deleted file mode 100644
index ac15b93..0000000
--- a/external/flume-assembly/pom.xml
+++ /dev/null
@@ -1,168 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.spark</groupId>
-    <artifactId>spark-parent_2.11</artifactId>
-    <version>2.0.0-SNAPSHOT</version>
-    <relativePath>../../pom.xml</relativePath>
-  </parent>
-
-  <groupId>org.apache.spark</groupId>
-  <artifactId>spark-streaming-flume-assembly_2.11</artifactId>
-  <packaging>jar</packaging>
-  <name>Spark Project External Flume Assembly</name>
-  <url>http://spark.apache.org/</url>
-
-  <properties>
-    <hadoop.deps.scope>provided</hadoop.deps.scope>
-    <sbt.project.name>streaming-flume-assembly</sbt.project.name>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>jetty</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>jetty-util</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <!--
-      Demote already included in the Spark assembly. These are transitive dependencies of flume
-      or spark-streaming-flume, and this need to be explicitly included even through the parent
-      pom may declare them with ${hadoop.deps.scope}.
-    -->
-    <dependency>
-      <groupId>commons-codec</groupId>
-      <artifactId>commons-codec</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>commons-lang</groupId>
-      <artifactId>commons-lang</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>commons-net</groupId>
-      <artifactId>commons-net</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>com.google.protobuf</groupId>
-      <artifactId>protobuf-java</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro-ipc</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro-mapred</artifactId>
-      <classifier>${avro.mapred.classifier}</classifier>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-library</artifactId>
-      <scope>provided</scope>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
-    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-shade-plugin</artifactId>
-        <configuration>
-          <shadedArtifactAttached>false</shadedArtifactAttached>
-          <artifactSet>
-            <includes>
-              <include>*:*</include>
-            </includes>
-          </artifactSet>
-          <filters>
-            <filter>
-              <artifact>*:*</artifact>
-              <excludes>
-                <exclude>META-INF/*.SF</exclude>
-                <exclude>META-INF/*.DSA</exclude>
-                <exclude>META-INF/*.RSA</exclude>
-              </excludes>
-            </filter>
-          </filters>
-        </configuration>
-        <executions>
-          <execution>
-            <phase>package</phase>
-            <goals>
-              <goal>shade</goal>
-            </goals>
-            <configuration>
-              <transformers>
-                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
-                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
-                  <resource>reference.conf</resource>
-                </transformer>
-                <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
-                  <resource>log4j.properties</resource>
-                </transformer>
-                <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
-                <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
-              </transformers>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-
-  <profiles>
-    <profile>
-      <id>flume-provided</id>
-      <properties>
-        <flume.deps.scope>provided</flume.deps.scope>
-      </properties>
-    </profile>
-  </profiles>
-</project>
-

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume-sink/pom.xml
----------------------------------------------------------------------
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
deleted file mode 100644
index e4effe1..0000000
--- a/external/flume-sink/pom.xml
+++ /dev/null
@@ -1,129 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.spark</groupId>
-    <artifactId>spark-parent_2.11</artifactId>
-    <version>2.0.0-SNAPSHOT</version>
-    <relativePath>../../pom.xml</relativePath>
-  </parent>
-
-  <groupId>org.apache.spark</groupId>
-  <artifactId>spark-streaming-flume-sink_2.11</artifactId>
-  <properties>
-    <sbt.project.name>streaming-flume-sink</sbt.project.name>
-  </properties>
-  <packaging>jar</packaging>
-  <name>Spark Project External Flume Sink</name>
-  <url>http://spark.apache.org/</url>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.flume</groupId>
-      <artifactId>flume-ng-sdk</artifactId>
-      <exclusions>
-        <!-- Guava is excluded to avoid its use in this module. -->
-        <exclusion>
-          <groupId>com.google.guava</groupId>
-          <artifactId>guava</artifactId>
-        </exclusion>
-        <!--
-          Exclude libthrift since the flume poms seem to confuse sbt, which fails to find the
-          dependency.
-        -->
-        <exclusion>
-          <groupId>org.apache.thrift</groupId>
-          <artifactId>libthrift</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.flume</groupId>
-      <artifactId>flume-ng-core</artifactId>
-      <exclusions>
-        <exclusion>
-          <groupId>com.google.guava</groupId>
-          <artifactId>guava</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.thrift</groupId>
-          <artifactId>libthrift</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-library</artifactId>
-    </dependency>
-    <dependency>
-      <!-- Add Guava in test scope since flume actually needs it. -->
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <!--
-        Netty explicitly added in test as it has been excluded from
-        Flume dependency (to avoid runtime problems when running with
-        Spark) but unit tests need it. Version of Netty on which
-        Flume 1.4.0 depends on is "3.4.0.Final" .
-      -->
-      <groupId>io.netty</groupId>
-      <artifactId>netty</artifactId>
-      <version>3.4.0.Final</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
-    </dependency>
-  </dependencies>
-  <build>
-    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
-    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.avro</groupId>
-        <artifactId>avro-maven-plugin</artifactId>
-        <version>${avro.version}</version>
-        <configuration>
-          <!-- Generate the output in the same directory as the sbt-avro-plugin -->
-          <outputDirectory>${project.basedir}/target/scala-${scala.binary.version}/src_managed/main/compiled_avro</outputDirectory>
-        </configuration>
-        <executions>
-          <execution>
-            <phase>generate-sources</phase>
-            <goals>
-              <goal>idl-protocol</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-shade-plugin</artifactId>
-        <configuration>
-          <!-- Disable all relocations defined in the parent pom. -->
-          <relocations combine.self="override" />
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
-</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume-sink/src/main/avro/sparkflume.avdl
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/avro/sparkflume.avdl b/external/flume-sink/src/main/avro/sparkflume.avdl
deleted file mode 100644
index 8806e86..0000000
--- a/external/flume-sink/src/main/avro/sparkflume.avdl
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-@namespace("org.apache.spark.streaming.flume.sink")
-
-protocol SparkFlumeProtocol {
-
-  record SparkSinkEvent {
-    map<string> headers;
-    bytes body;
-  }
-
-  record EventBatch {
-    string errorMsg = ""; // If this is empty it is a valid message, else it represents an error
-    string sequenceNumber;
-    array<SparkSinkEvent> events;
-  }
-
-  EventBatch getEventBatch (int n);
-
-  void ack (string sequenceNumber);
-
-  void nack (string sequenceNumber);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
deleted file mode 100644
index 09d3fe9..0000000
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.flume.sink
-
-import org.slf4j.{Logger, LoggerFactory}
-
-/**
- * Copy of the org.apache.spark.Logging for being used in the Spark Sink.
- * The org.apache.spark.Logging is not used so that all of Spark is not brought
- * in as a dependency.
- */
-private[sink] trait Logging {
-  // Make the log field transient so that objects with Logging can
-  // be serialized and used on another machine
-  @transient private var _log: Logger = null
-
-  // Method to get or create the logger for this object
-  protected def log: Logger = {
-    if (_log == null) {
-      initializeIfNecessary()
-      var className = this.getClass.getName
-      // Ignore trailing $'s in the class names for Scala objects
-      if (className.endsWith("$")) {
-        className = className.substring(0, className.length - 1)
-      }
-      _log = LoggerFactory.getLogger(className)
-    }
-    _log
-  }
-
-  // Log methods that take only a String
-  protected def logInfo(msg: => String) {
-    if (log.isInfoEnabled) log.info(msg)
-  }
-
-  protected def logDebug(msg: => String) {
-    if (log.isDebugEnabled) log.debug(msg)
-  }
-
-  protected def logTrace(msg: => String) {
-    if (log.isTraceEnabled) log.trace(msg)
-  }
-
-  protected def logWarning(msg: => String) {
-    if (log.isWarnEnabled) log.warn(msg)
-  }
-
-  protected def logError(msg: => String) {
-    if (log.isErrorEnabled) log.error(msg)
-  }
-
-  // Log methods that take Throwables (Exceptions/Errors) too
-  protected def logInfo(msg: => String, throwable: Throwable) {
-    if (log.isInfoEnabled) log.info(msg, throwable)
-  }
-
-  protected def logDebug(msg: => String, throwable: Throwable) {
-    if (log.isDebugEnabled) log.debug(msg, throwable)
-  }
-
-  protected def logTrace(msg: => String, throwable: Throwable) {
-    if (log.isTraceEnabled) log.trace(msg, throwable)
-  }
-
-  protected def logWarning(msg: => String, throwable: Throwable) {
-    if (log.isWarnEnabled) log.warn(msg, throwable)
-  }
-
-  protected def logError(msg: => String, throwable: Throwable) {
-    if (log.isErrorEnabled) log.error(msg, throwable)
-  }
-
-  protected def isTraceEnabled(): Boolean = {
-    log.isTraceEnabled
-  }
-
-  private def initializeIfNecessary() {
-    if (!Logging.initialized) {
-      Logging.initLock.synchronized {
-        if (!Logging.initialized) {
-          initializeLogging()
-        }
-      }
-    }
-  }
-
-  private def initializeLogging() {
-    Logging.initialized = true
-
-    // Force a call into slf4j to initialize it. Avoids this happening from multiple threads
-    // and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html
-    log
-  }
-}
-
-private[sink] object Logging {
-  @volatile private var initialized = false
-  val initLock = new Object()
-  try {
-    // We use reflection here to handle the case where users remove the
-    // slf4j-to-jul bridge order to route their logs to JUL.
-    // scalastyle:off classforname
-    val bridgeClass = Class.forName("org.slf4j.bridge.SLF4JBridgeHandler")
-    // scalastyle:on classforname
-    bridgeClass.getMethod("removeHandlersForRootLogger").invoke(null)
-    val installed = bridgeClass.getMethod("isInstalled").invoke(null).asInstanceOf[Boolean]
-    if (!installed) {
-      bridgeClass.getMethod("install").invoke(null)
-    }
-  } catch {
-    case e: ClassNotFoundException => // can't log anything yet so just fail silently
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
deleted file mode 100644
index 719fca0..0000000
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.flume.sink
-
-import java.util.UUID
-import java.util.concurrent.{CountDownLatch, Executors}
-import java.util.concurrent.atomic.AtomicLong
-
-import scala.collection.mutable
-
-import org.apache.flume.Channel
-
-/**
- * Class that implements the SparkFlumeProtocol, that is used by the Avro Netty Server to process
- * requests. Each getEvents, ack and nack call is forwarded to an instance of this class.
- * @param threads Number of threads to use to process requests.
- * @param channel The channel that the sink pulls events from
- * @param transactionTimeout Timeout in millis after which the transaction if not acked by Spark
- *                           is rolled back.
- */
-// Flume forces transactions to be thread-local. So each transaction *must* be committed, or
-// rolled back from the thread it was originally created in. So each getEvents call from Spark
-// creates a TransactionProcessor which runs in a new thread, in which the transaction is created
-// and events are pulled off the channel. Once the events are sent to spark,
-// that thread is blocked and the TransactionProcessor is saved in a map,
-// until an ACK or NACK comes back or the transaction times out (after the specified timeout).
-// When the response comes or a timeout is hit, the TransactionProcessor is retrieved and then
-// unblocked, at which point the transaction is committed or rolled back.
-
-private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
-  val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging {
-  val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
-    new SparkSinkThreadFactory("Spark Sink Processor Thread - %d")))
-  // Protected by `sequenceNumberToProcessor`
-  private val sequenceNumberToProcessor = mutable.HashMap[CharSequence, TransactionProcessor]()
-  // This sink will not persist sequence numbers and reuses them if it gets restarted.
-  // So it is possible to commit a transaction which may have been meant for the sink before the
-  // restart.
-  // Since the new txn may not have the same sequence number we must guard against accidentally
-  // committing a new transaction. To reduce the probability of that happening a random string is
-  // prepended to the sequence number. Does not change for life of sink
-  private val seqBase = UUID.randomUUID().toString.substring(0, 8)
-  private val seqCounter = new AtomicLong(0)
-
-  // Protected by `sequenceNumberToProcessor`
-  private var stopped = false
-
-  @volatile private var isTest = false
-  private var testLatch: CountDownLatch = null
-
-  /**
-   * Returns a bunch of events to Spark over Avro RPC.
-   * @param n Maximum number of events to return in a batch
-   * @return [[EventBatch]] instance that has a sequence number and an array of at most n events
-   */
-  override def getEventBatch(n: Int): EventBatch = {
-    logDebug("Got getEventBatch call from Spark.")
-    val sequenceNumber = seqBase + seqCounter.incrementAndGet()
-    createProcessor(sequenceNumber, n) match {
-      case Some(processor) =>
-        transactionExecutorOpt.foreach(_.submit(processor))
-        // Wait until a batch is available - will be an error if error message is non-empty
-        val batch = processor.getEventBatch
-        if (SparkSinkUtils.isErrorBatch(batch)) {
-          // Remove the processor if it is an error batch since no ACK is sent.
-          removeAndGetProcessor(sequenceNumber)
-          logWarning("Received an error batch - no events were received from channel! ")
-        }
-        batch
-      case None =>
-        new EventBatch("Spark sink has been stopped!", "", java.util.Collections.emptyList())
-    }
-  }
-
-  private def createProcessor(seq: String, n: Int): Option[TransactionProcessor] = {
-    sequenceNumberToProcessor.synchronized {
-      if (!stopped) {
-        val processor = new TransactionProcessor(
-          channel, seq, n, transactionTimeout, backOffInterval, this)
-        sequenceNumberToProcessor.put(seq, processor)
-        if (isTest) {
-          processor.countDownWhenBatchAcked(testLatch)
-        }
-        Some(processor)
-      } else {
-        None
-      }
-    }
-  }
-
-  /**
-   * Called by Spark to indicate successful commit of a batch
-   * @param sequenceNumber The sequence number of the event batch that was successful
-   */
-  override def ack(sequenceNumber: CharSequence): Void = {
-    logDebug("Received Ack for batch with sequence number: " + sequenceNumber)
-    completeTransaction(sequenceNumber, success = true)
-    null
-  }
-
-  /**
-   * Called by Spark to indicate failed commit of a batch
-   * @param sequenceNumber The sequence number of the event batch that failed
-   * @return
-   */
-  override def nack(sequenceNumber: CharSequence): Void = {
-    completeTransaction(sequenceNumber, success = false)
-    logInfo("Spark failed to commit transaction. Will reattempt events.")
-    null
-  }
-
-  /**
-   * Helper method to commit or rollback a transaction.
-   * @param sequenceNumber The sequence number of the batch that was completed
-   * @param success Whether the batch was successful or not.
-   */
-  private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) {
-    removeAndGetProcessor(sequenceNumber).foreach(processor => {
-      processor.batchProcessed(success)
-    })
-  }
-
-  /**
-   * Helper method to remove the TxnProcessor for a Sequence Number. Can be used to avoid a leak.
-   * @param sequenceNumber
-   * @return An `Option` of the transaction processor for the corresponding batch. Note that this
-   *         instance is no longer tracked and the caller is responsible for that txn processor.
-   */
-  private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence):
-      Option[TransactionProcessor] = {
-    sequenceNumberToProcessor.synchronized {
-      sequenceNumberToProcessor.remove(sequenceNumber.toString)
-    }
-  }
-
-  private[sink] def countDownWhenBatchAcked(latch: CountDownLatch) {
-    testLatch = latch
-    isTest = true
-  }
-
-  /**
-   * Shuts down the executor used to process transactions.
-   */
-  def shutdown() {
-    logInfo("Shutting down Spark Avro Callback Handler")
-    sequenceNumberToProcessor.synchronized {
-      stopped = true
-      sequenceNumberToProcessor.values.foreach(_.shutdown())
-    }
-    transactionExecutorOpt.foreach(_.shutdownNow())
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
deleted file mode 100644
index 14dffb1..0000000
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.flume.sink
-
-import java.net.InetSocketAddress
-import java.util.concurrent._
-
-import org.apache.avro.ipc.NettyServer
-import org.apache.avro.ipc.specific.SpecificResponder
-import org.apache.flume.Context
-import org.apache.flume.Sink.Status
-import org.apache.flume.conf.{Configurable, ConfigurationException}
-import org.apache.flume.sink.AbstractSink
-
-/**
- * A sink that uses Avro RPC to run a server that can be polled by Spark's
- * FlumePollingInputDStream. This sink has the following configuration parameters:
- *
- * hostname - The hostname to bind to. Default: 0.0.0.0
- * port - The port to bind to. (No default - mandatory)
- * timeout - Time in seconds after which a transaction is rolled back,
- * if an ACK is not received from Spark within that time
- * threads - Number of threads to use to receive requests from Spark (Default: 10)
- *
- * This sink is unlike other Flume sinks in the sense that it does not push data,
- * instead the process method in this sink simply blocks the SinkRunner the first time it is
- * called. This sink starts up an Avro IPC server that uses the SparkFlumeProtocol.
- *
- * Each time a getEventBatch call comes, creates a transaction and reads events
- * from the channel. When enough events are read, the events are sent to the Spark receiver and
- * the thread itself is blocked and a reference to it saved off.
- *
- * When the ack for that batch is received,
- * the thread which created the transaction is is retrieved and it commits the transaction with the
- * channel from the same thread it was originally created in (since Flume transactions are
- * thread local). If a nack is received instead, the sink rolls back the transaction. If no ack
- * is received within the specified timeout, the transaction is rolled back too. If an ack comes
- * after that, it is simply ignored and the events get re-sent.
- *
- */
-
-class SparkSink extends AbstractSink with Logging with Configurable {
-
-  // Size of the pool to use for holding transaction processors.
-  private var poolSize: Integer = SparkSinkConfig.DEFAULT_THREADS
-
-  // Timeout for each transaction. If spark does not respond in this much time,
-  // rollback the transaction
-  private var transactionTimeout = SparkSinkConfig.DEFAULT_TRANSACTION_TIMEOUT
-
-  // Address info to bind on
-  private var hostname: String = SparkSinkConfig.DEFAULT_HOSTNAME
-  private var port: Int = 0
-
-  private var backOffInterval: Int = 200
-
-  // Handle to the server
-  private var serverOpt: Option[NettyServer] = None
-
-  // The handler that handles the callback from Avro
-  private var handler: Option[SparkAvroCallbackHandler] = None
-
-  // Latch that blocks off the Flume framework from wasting 1 thread.
-  private val blockingLatch = new CountDownLatch(1)
-
-  override def start() {
-    logInfo("Starting Spark Sink: " + getName + " on port: " + port + " and interface: " +
-      hostname + " with " + "pool size: " + poolSize + " and transaction timeout: " +
-      transactionTimeout + ".")
-    handler = Option(new SparkAvroCallbackHandler(poolSize, getChannel, transactionTimeout,
-      backOffInterval))
-    val responder = new SpecificResponder(classOf[SparkFlumeProtocol], handler.get)
-    // Using the constructor that takes specific thread-pools requires bringing in netty
-    // dependencies which are being excluded in the build. In practice,
-    // Netty dependencies are already available on the JVM as Flume would have pulled them in.
-    serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port)))
-    serverOpt.foreach(server => {
-      logInfo("Starting Avro server for sink: " + getName)
-      server.start()
-    })
-    super.start()
-  }
-
-  override def stop() {
-    logInfo("Stopping Spark Sink: " + getName)
-    handler.foreach(callbackHandler => {
-      callbackHandler.shutdown()
-    })
-    serverOpt.foreach(server => {
-      logInfo("Stopping Avro Server for sink: " + getName)
-      server.close()
-      server.join()
-    })
-    blockingLatch.countDown()
-    super.stop()
-  }
-
-  override def configure(ctx: Context) {
-    import SparkSinkConfig._
-    hostname = ctx.getString(CONF_HOSTNAME, DEFAULT_HOSTNAME)
-    port = Option(ctx.getInteger(CONF_PORT)).
-      getOrElse(throw new ConfigurationException("The port to bind to must be specified"))
-    poolSize = ctx.getInteger(THREADS, DEFAULT_THREADS)
-    transactionTimeout = ctx.getInteger(CONF_TRANSACTION_TIMEOUT, DEFAULT_TRANSACTION_TIMEOUT)
-    backOffInterval = ctx.getInteger(CONF_BACKOFF_INTERVAL, DEFAULT_BACKOFF_INTERVAL)
-    logInfo("Configured Spark Sink with hostname: " + hostname + ", port: " + port + ", " +
-      "poolSize: " + poolSize + ", transactionTimeout: " + transactionTimeout + ", " +
-      "backoffInterval: " + backOffInterval)
-  }
-
-  override def process(): Status = {
-    // This method is called in a loop by the Flume framework - block it until the sink is
-    // stopped to save CPU resources. The sink runner will interrupt this thread when the sink is
-    // being shut down.
-    logInfo("Blocking Sink Runner, sink will continue to run..")
-    blockingLatch.await()
-    Status.BACKOFF
-  }
-
-  private[flume] def getPort(): Int = {
-    serverOpt
-      .map(_.getPort)
-      .getOrElse(
-        throw new RuntimeException("Server was not started!")
-      )
-  }
-
-  /**
-   * Pass in a [[CountDownLatch]] for testing purposes. This batch is counted down when each
-   * batch is received. The test can simply call await on this latch till the expected number of
-   * batches are received.
-   * @param latch
-   */
-  private[flume] def countdownWhenBatchReceived(latch: CountDownLatch) {
-    handler.foreach(_.countDownWhenBatchAcked(latch))
-  }
-}
-
-/**
- * Configuration parameters and their defaults.
- */
-private[flume]
-object SparkSinkConfig {
-  val THREADS = "threads"
-  val DEFAULT_THREADS = 10
-
-  val CONF_TRANSACTION_TIMEOUT = "timeout"
-  val DEFAULT_TRANSACTION_TIMEOUT = 60
-
-  val CONF_HOSTNAME = "hostname"
-  val DEFAULT_HOSTNAME = "0.0.0.0"
-
-  val CONF_PORT = "port"
-
-  val CONF_BACKOFF_INTERVAL = "backoffInterval"
-  val DEFAULT_BACKOFF_INTERVAL = 200
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala
deleted file mode 100644
index 845fc8d..0000000
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.flume.sink
-
-import java.util.concurrent.ThreadFactory
-import java.util.concurrent.atomic.AtomicLong
-
-/**
- * Thread factory that generates daemon threads with a specified name format.
- */
-private[sink] class SparkSinkThreadFactory(nameFormat: String) extends ThreadFactory {
-
-  private val threadId = new AtomicLong()
-
-  override def newThread(r: Runnable): Thread = {
-    val t = new Thread(r, nameFormat.format(threadId.incrementAndGet()))
-    t.setDaemon(true)
-    t
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala
deleted file mode 100644
index 47c0e29..0000000
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.flume.sink
-
-private[flume] object SparkSinkUtils {
-  /**
-   * This method determines if this batch represents an error or not.
-   * @param batch - The batch to check
-   * @return - true if the batch represents an error
-   */
-  def isErrorBatch(batch: EventBatch): Boolean = {
-    !batch.getErrorMsg.toString.equals("") // If there is an error message, it is an error batch.
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
deleted file mode 100644
index b15c209..0000000
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.flume.sink
-
-import java.nio.ByteBuffer
-import java.util
-import java.util.concurrent.{Callable, CountDownLatch, TimeUnit}
-
-import scala.util.control.Breaks
-
-import org.apache.flume.{Channel, Transaction}
-
-// Flume forces transactions to be thread-local (horrible, I know!)
-// So the sink basically spawns a new thread to pull the events out within a transaction.
-// The thread fills in the event batch object that is set before the thread is scheduled.
-// After filling it in, the thread waits on a condition - which is released only
-// when the success message comes back for the specific sequence number for that event batch.
-/**
- * This class represents a transaction on the Flume channel. This class runs a separate thread
- * which owns the transaction. The thread is blocked until the success call for that transaction
- * comes back with an ACK or NACK.
- * @param channel The channel from which to pull events
- * @param seqNum The sequence number to use for the transaction. Must be unique
- * @param maxBatchSize The maximum number of events to process per batch
- * @param transactionTimeout Time in seconds after which a transaction must be rolled back
- *                           without waiting for an ACK from Spark
- * @param parent The parent [[SparkAvroCallbackHandler]] instance, for reporting timeouts
- */
-private class TransactionProcessor(val channel: Channel, val seqNum: String,
-  var maxBatchSize: Int, val transactionTimeout: Int, val backOffInterval: Int,
-  val parent: SparkAvroCallbackHandler) extends Callable[Void] with Logging {
-
-  // If a real batch is not returned, we always have to return an error batch.
-  @volatile private var eventBatch: EventBatch = new EventBatch("Unknown Error", "",
-    util.Collections.emptyList())
-
-  // Synchronization primitives
-  val batchGeneratedLatch = new CountDownLatch(1)
-  val batchAckLatch = new CountDownLatch(1)
-
-  // Sanity check to ensure we don't loop like crazy
-  val totalAttemptsToRemoveFromChannel = Int.MaxValue / 2
-
-  // OK to use volatile, since the change would only make this true (otherwise it will be
-  // changed to false - we never apply a negation operation to this) - which means the transaction
-  // succeeded.
-  @volatile private var batchSuccess = false
-
-  @volatile private var stopped = false
-
-  @volatile private var isTest = false
-
-  private var testLatch: CountDownLatch = null
-
-  // The transaction that this processor would handle
-  var txOpt: Option[Transaction] = None
-
-  /**
-   * Get an event batch from the channel. This method will block until a batch of events is
-   * available from the channel. If no events are available after a large number of attempts of
-   * polling the channel, this method will return an [[EventBatch]] with a non-empty error message
-   *
-   * @return An [[EventBatch]] instance with sequence number set to seqNum, filled with a
-   *         maximum of maxBatchSize events
-   */
-  def getEventBatch: EventBatch = {
-    batchGeneratedLatch.await()
-    eventBatch
-  }
-
-  /**
-   * This method is to be called by the sink when it receives an ACK or NACK from Spark. This
-   * method is a no-op if it is called after transactionTimeout has expired since
-   * getEventBatch returned a batch of events.
-   * @param success True if an ACK was received and the transaction should be committed, else false.
-   */
-  def batchProcessed(success: Boolean) {
-    logDebug("Batch processed for sequence number: " + seqNum)
-    batchSuccess = success
-    batchAckLatch.countDown()
-  }
-
-  private[flume] def shutdown(): Unit = {
-    logDebug("Shutting down transaction processor")
-    stopped = true
-  }
-
-  /**
-   * Populates events into the event batch. If the batch cannot be populated,
-   * this method will not set the events into the event batch, but it sets an error message.
-   */
-  private def populateEvents() {
-    try {
-      txOpt = Option(channel.getTransaction)
-      if(txOpt.isEmpty) {
-        eventBatch.setErrorMsg("Something went wrong. Channel was " +
-          "unable to create a transaction!")
-      }
-      txOpt.foreach(tx => {
-        tx.begin()
-        val events = new util.ArrayList[SparkSinkEvent](maxBatchSize)
-        val loop = new Breaks
-        var gotEventsInThisTxn = false
-        var loopCounter: Int = 0
-        loop.breakable {
-          while (!stopped && events.size() < maxBatchSize
-            && loopCounter < totalAttemptsToRemoveFromChannel) {
-            loopCounter += 1
-            Option(channel.take()) match {
-              case Some(event) =>
-                events.add(new SparkSinkEvent(toCharSequenceMap(event.getHeaders),
-                  ByteBuffer.wrap(event.getBody)))
-                gotEventsInThisTxn = true
-              case None =>
-                if (!gotEventsInThisTxn && !stopped) {
-                  logDebug("Sleeping for " + backOffInterval + " millis as no events were read in" +
-                    " the current transaction")
-                  TimeUnit.MILLISECONDS.sleep(backOffInterval)
-                } else {
-                  loop.break()
-                }
-            }
-          }
-        }
-        if (!gotEventsInThisTxn && !stopped) {
-          val msg = "Tried several times, " +
-            "but did not get any events from the channel!"
-          logWarning(msg)
-          eventBatch.setErrorMsg(msg)
-        } else {
-          // At this point, the events are available, so fill them into the event batch
-          eventBatch = new EventBatch("", seqNum, events)
-        }
-      })
-    } catch {
-      case interrupted: InterruptedException =>
-        // Don't pollute logs if the InterruptedException came from this being stopped
-        if (!stopped) {
-          logWarning("Error while processing transaction.", interrupted)
-        }
-      case e: Exception =>
-        logWarning("Error while processing transaction.", e)
-        eventBatch.setErrorMsg(e.getMessage)
-        try {
-          txOpt.foreach(tx => {
-            rollbackAndClose(tx, close = true)
-          })
-        } finally {
-          txOpt = None
-        }
-    } finally {
-      batchGeneratedLatch.countDown()
-    }
-  }
-
-  /**
-   * Waits for upto transactionTimeout seconds for an ACK. If an ACK comes in
-   * this method commits the transaction with the channel. If the ACK does not come in within
-   * that time or a NACK comes in, this method rolls back the transaction.
-   */
-  private def processAckOrNack() {
-    batchAckLatch.await(transactionTimeout, TimeUnit.SECONDS)
-    txOpt.foreach(tx => {
-      if (batchSuccess) {
-        try {
-          logDebug("Committing transaction")
-          tx.commit()
-        } catch {
-          case e: Exception =>
-            logWarning("Error while attempting to commit transaction. Transaction will be rolled " +
-              "back", e)
-            rollbackAndClose(tx, close = false) // tx will be closed later anyway
-        } finally {
-          tx.close()
-          if (isTest) {
-            testLatch.countDown()
-          }
-        }
-      } else {
-        logWarning("Spark could not commit transaction, NACK received. Rolling back transaction.")
-        rollbackAndClose(tx, close = true)
-        // This might have been due to timeout or a NACK. Either way the following call does not
-        // cause issues. This is required to ensure the TransactionProcessor instance is not leaked
-        parent.removeAndGetProcessor(seqNum)
-      }
-    })
-  }
-
-  /**
-   * Helper method to rollback and optionally close a transaction
-   * @param tx The transaction to rollback
-   * @param close Whether the transaction should be closed or not after rolling back
-   */
-  private def rollbackAndClose(tx: Transaction, close: Boolean) {
-    try {
-      logWarning("Spark was unable to successfully process the events. Transaction is being " +
-        "rolled back.")
-      tx.rollback()
-    } catch {
-      case e: Exception =>
-        logError("Error rolling back transaction. Rollback may have failed!", e)
-    } finally {
-      if (close) {
-        tx.close()
-      }
-    }
-  }
-
-  /**
-   * Helper method to convert a Map[String, String] to Map[CharSequence, CharSequence]
-   * @param inMap The map to be converted
-   * @return The converted map
-   */
-  private def toCharSequenceMap(inMap: java.util.Map[String, String]): java.util.Map[CharSequence,
-    CharSequence] = {
-    val charSeqMap = new util.HashMap[CharSequence, CharSequence](inMap.size())
-    charSeqMap.putAll(inMap)
-    charSeqMap
-  }
-
-  /**
-   * When the thread is started it sets as many events as the batch size or less (if enough
-   * events aren't available) into the eventBatch and object and lets any threads waiting on the
-   * [[getEventBatch]] method to proceed. Then this thread waits for acks or nacks to come in,
-   * or for a specified timeout and commits or rolls back the transaction.
-   * @return
-   */
-  override def call(): Void = {
-    populateEvents()
-    processAckOrNack()
-    null
-  }
-
-  private[sink] def countDownWhenBatchAcked(latch: CountDownLatch) {
-    testLatch = latch
-    isTest = true
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume-sink/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/test/resources/log4j.properties b/external/flume-sink/src/test/resources/log4j.properties
deleted file mode 100644
index 42df879..0000000
--- a/external/flume-sink/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the file streaming/target/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=true
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
-
-# Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.spark-project.jetty=WARN
-

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
deleted file mode 100644
index e8ca1e7..0000000
--- a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.flume.sink
-
-import java.net.InetSocketAddress
-import java.nio.charset.StandardCharsets
-import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.JavaConverters._
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Failure, Success}
-
-import org.apache.avro.ipc.NettyTransceiver
-import org.apache.avro.ipc.specific.SpecificRequestor
-import org.apache.flume.Context
-import org.apache.flume.channel.MemoryChannel
-import org.apache.flume.event.EventBuilder
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
-
-// Due to MNG-1378, there is not a way to include test dependencies transitively.
-// We cannot include Spark core tests as a dependency here because it depends on
-// Spark core main, which has too many dependencies to require here manually.
-// For this reason, we continue to use FunSuite and ignore the scalastyle checks
-// that fail if this is detected.
-// scalastyle:off
-import org.scalatest.FunSuite
-
-class SparkSinkSuite extends FunSuite {
-// scalastyle:on
-
-  val eventsPerBatch = 1000
-  val channelCapacity = 5000
-
-  test("Success with ack") {
-    val (channel, sink, latch) = initializeChannelAndSink()
-    channel.start()
-    sink.start()
-
-    putEvents(channel, eventsPerBatch)
-
-    val port = sink.getPort
-    val address = new InetSocketAddress("0.0.0.0", port)
-
-    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
-    val events = client.getEventBatch(1000)
-    client.ack(events.getSequenceNumber)
-    assert(events.getEvents.size() === 1000)
-    latch.await(1, TimeUnit.SECONDS)
-    assertChannelIsEmpty(channel)
-    sink.stop()
-    channel.stop()
-    transceiver.close()
-  }
-
-  test("Failure with nack") {
-    val (channel, sink, latch) = initializeChannelAndSink()
-    channel.start()
-    sink.start()
-    putEvents(channel, eventsPerBatch)
-
-    val port = sink.getPort
-    val address = new InetSocketAddress("0.0.0.0", port)
-
-    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
-    val events = client.getEventBatch(1000)
-    assert(events.getEvents.size() === 1000)
-    client.nack(events.getSequenceNumber)
-    latch.await(1, TimeUnit.SECONDS)
-    assert(availableChannelSlots(channel) === 4000)
-    sink.stop()
-    channel.stop()
-    transceiver.close()
-  }
-
-  test("Failure with timeout") {
-    val (channel, sink, latch) = initializeChannelAndSink(Map(SparkSinkConfig
-      .CONF_TRANSACTION_TIMEOUT -> 1.toString))
-    channel.start()
-    sink.start()
-    putEvents(channel, eventsPerBatch)
-    val port = sink.getPort
-    val address = new InetSocketAddress("0.0.0.0", port)
-
-    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
-    val events = client.getEventBatch(1000)
-    assert(events.getEvents.size() === 1000)
-    latch.await(1, TimeUnit.SECONDS)
-    assert(availableChannelSlots(channel) === 4000)
-    sink.stop()
-    channel.stop()
-    transceiver.close()
-  }
-
-  test("Multiple consumers") {
-    testMultipleConsumers(failSome = false)
-  }
-
-  test("Multiple consumers with some failures") {
-    testMultipleConsumers(failSome = true)
-  }
-
-  def testMultipleConsumers(failSome: Boolean): Unit = {
-    implicit val executorContext = ExecutionContext
-      .fromExecutorService(Executors.newFixedThreadPool(5))
-    val (channel, sink, latch) = initializeChannelAndSink(Map.empty, 5)
-    channel.start()
-    sink.start()
-    (1 to 5).foreach(_ => putEvents(channel, eventsPerBatch))
-    val port = sink.getPort
-    val address = new InetSocketAddress("0.0.0.0", port)
-    val transceiversAndClients = getTransceiverAndClient(address, 5)
-    val batchCounter = new CountDownLatch(5)
-    val counter = new AtomicInteger(0)
-    transceiversAndClients.foreach(x => {
-      Future {
-        val client = x._2
-        val events = client.getEventBatch(1000)
-        if (!failSome || counter.getAndIncrement() % 2 == 0) {
-          client.ack(events.getSequenceNumber)
-        } else {
-          client.nack(events.getSequenceNumber)
-          throw new RuntimeException("Sending NACK for failure!")
-        }
-        events
-      }.onComplete {
-        case Success(events) =>
-          assert(events.getEvents.size() === 1000)
-          batchCounter.countDown()
-        case Failure(t) =>
-          // Don't re-throw the exception, causes a nasty unnecessary stack trace on stdout
-          batchCounter.countDown()
-      }
-    })
-    batchCounter.await()
-    latch.await(1, TimeUnit.SECONDS)
-    executorContext.shutdown()
-    if(failSome) {
-      assert(availableChannelSlots(channel) === 3000)
-    } else {
-      assertChannelIsEmpty(channel)
-    }
-    sink.stop()
-    channel.stop()
-    transceiversAndClients.foreach(x => x._1.close())
-  }
-
-  private def initializeChannelAndSink(overrides: Map[String, String] = Map.empty,
-    batchCounter: Int = 1): (MemoryChannel, SparkSink, CountDownLatch) = {
-    val channel = new MemoryChannel()
-    val channelContext = new Context()
-
-    channelContext.put("capacity", channelCapacity.toString)
-    channelContext.put("transactionCapacity", 1000.toString)
-    channelContext.put("keep-alive", 0.toString)
-    channelContext.putAll(overrides.asJava)
-    channel.setName(scala.util.Random.nextString(10))
-    channel.configure(channelContext)
-
-    val sink = new SparkSink()
-    val sinkContext = new Context()
-    sinkContext.put(SparkSinkConfig.CONF_HOSTNAME, "0.0.0.0")
-    sinkContext.put(SparkSinkConfig.CONF_PORT, 0.toString)
-    sink.configure(sinkContext)
-    sink.setChannel(channel)
-    val latch = new CountDownLatch(batchCounter)
-    sink.countdownWhenBatchReceived(latch)
-    (channel, sink, latch)
-  }
-
-  private def putEvents(ch: MemoryChannel, count: Int): Unit = {
-    val tx = ch.getTransaction
-    tx.begin()
-    (1 to count).foreach(x =>
-      ch.put(EventBuilder.withBody(x.toString.getBytes(StandardCharsets.UTF_8))))
-    tx.commit()
-    tx.close()
-  }
-
-  private def getTransceiverAndClient(address: InetSocketAddress,
-    count: Int): Seq[(NettyTransceiver, SparkFlumeProtocol.Callback)] = {
-
-    (1 to count).map(_ => {
-      lazy val channelFactoryExecutor = Executors.newCachedThreadPool(
-        new SparkSinkThreadFactory("Flume Receiver Channel Thread - %d"))
-      lazy val channelFactory =
-        new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor)
-      val transceiver = new NettyTransceiver(address, channelFactory)
-      val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
-      (transceiver, client)
-    })
-  }
-
-  private def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
-    assert(availableChannelSlots(channel) === channelCapacity)
-  }
-
-  private def availableChannelSlots(channel: MemoryChannel): Int = {
-    val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
-    queueRemaining.setAccessible(true)
-    val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
-    m.invoke(queueRemaining.get(channel)).asInstanceOf[Int]
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume/pom.xml
----------------------------------------------------------------------
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
deleted file mode 100644
index d650dd0..0000000
--- a/external/flume/pom.xml
+++ /dev/null
@@ -1,78 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.spark</groupId>
-    <artifactId>spark-parent_2.11</artifactId>
-    <version>2.0.0-SNAPSHOT</version>
-    <relativePath>../../pom.xml</relativePath>
-  </parent>
-
-  <groupId>org.apache.spark</groupId>
-  <artifactId>spark-streaming-flume_2.11</artifactId>
-  <properties>
-    <sbt.project.name>streaming-flume</sbt.project.name>
-  </properties>
-  <packaging>jar</packaging>
-  <name>Spark Project External Flume</name>
-  <url>http://spark.apache.org/</url>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming-flume-sink_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.flume</groupId>
-      <artifactId>flume-ng-core</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.flume</groupId>
-      <artifactId>flume-ng-sdk</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.scalacheck</groupId>
-      <artifactId>scalacheck_${scala.binary.version}</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
-    </dependency>
-  </dependencies>
-  <build>
-    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
-    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
-  </build>
-</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
deleted file mode 100644
index 5c773d4..0000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.io.{ObjectInput, ObjectOutput}
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.Logging
-import org.apache.spark.util.Utils
-
-/**
- * A simple object that provides the implementation of readExternal and writeExternal for both
- * the wrapper classes for Flume-style Events.
- */
-private[streaming] object EventTransformer extends Logging {
-  def readExternal(in: ObjectInput): (java.util.HashMap[CharSequence, CharSequence],
-    Array[Byte]) = {
-    val bodyLength = in.readInt()
-    val bodyBuff = new Array[Byte](bodyLength)
-    in.readFully(bodyBuff)
-
-    val numHeaders = in.readInt()
-    val headers = new java.util.HashMap[CharSequence, CharSequence]
-
-    for (i <- 0 until numHeaders) {
-      val keyLength = in.readInt()
-      val keyBuff = new Array[Byte](keyLength)
-      in.readFully(keyBuff)
-      val key: String = Utils.deserialize(keyBuff)
-
-      val valLength = in.readInt()
-      val valBuff = new Array[Byte](valLength)
-      in.readFully(valBuff)
-      val value: String = Utils.deserialize(valBuff)
-
-      headers.put(key, value)
-    }
-    (headers, bodyBuff)
-  }
-
-  def writeExternal(out: ObjectOutput, headers: java.util.Map[CharSequence, CharSequence],
-    body: Array[Byte]) {
-    out.writeInt(body.length)
-    out.write(body)
-    val numHeaders = headers.size()
-    out.writeInt(numHeaders)
-    for ((k, v) <- headers.asScala) {
-      val keyBuff = Utils.serialize(k.toString)
-      out.writeInt(keyBuff.length)
-      out.write(keyBuff)
-      val valBuff = Utils.serialize(v.toString)
-      out.writeInt(valBuff.length)
-      out.write(valBuff)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
deleted file mode 100644
index 3555fa6..0000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.flume
-
-import scala.collection.mutable.ArrayBuffer
-
-import com.google.common.base.Throwables
-
-import org.apache.spark.Logging
-import org.apache.spark.streaming.flume.sink._
-
-/**
- * This class implements the core functionality of [[FlumePollingReceiver]]. When started it
- * pulls data from Flume, stores it to Spark and then sends an Ack or Nack. This class should be
- * run via an [[java.util.concurrent.Executor]] as this implements [[Runnable]]
- *
- * @param receiver The receiver that owns this instance.
- */
-
-private[flume] class FlumeBatchFetcher(receiver: FlumePollingReceiver) extends Runnable with
-  Logging {
-
-  def run(): Unit = {
-    while (!receiver.isStopped()) {
-      val connection = receiver.getConnections.poll()
-      val client = connection.client
-      var batchReceived = false
-      var seq: CharSequence = null
-      try {
-        getBatch(client) match {
-          case Some(eventBatch) =>
-            batchReceived = true
-            seq = eventBatch.getSequenceNumber
-            val events = toSparkFlumeEvents(eventBatch.getEvents)
-            if (store(events)) {
-              sendAck(client, seq)
-            } else {
-              sendNack(batchReceived, client, seq)
-            }
-          case None =>
-        }
-      } catch {
-        case e: Exception =>
-          Throwables.getRootCause(e) match {
-            // If the cause was an InterruptedException, then check if the receiver is stopped -
-            // if yes, just break out of the loop. Else send a Nack and log a warning.
-            // In the unlikely case, the cause was not an Exception,
-            // then just throw it out and exit.
-            case interrupted: InterruptedException =>
-              if (!receiver.isStopped()) {
-                logWarning("Interrupted while receiving data from Flume", interrupted)
-                sendNack(batchReceived, client, seq)
-              }
-            case exception: Exception =>
-              logWarning("Error while receiving data from Flume", exception)
-              sendNack(batchReceived, client, seq)
-          }
-      } finally {
-        receiver.getConnections.add(connection)
-      }
-    }
-  }
-
-  /**
-   * Gets a batch of events from the specified client. This method does not handle any exceptions
-   * which will be propagated to the caller.
-   * @param client Client to get events from
-   * @return [[Some]] which contains the event batch if Flume sent any events back, else [[None]]
-   */
-  private def getBatch(client: SparkFlumeProtocol.Callback): Option[EventBatch] = {
-    val eventBatch = client.getEventBatch(receiver.getMaxBatchSize)
-    if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
-      // No error, proceed with processing data
-      logDebug(s"Received batch of ${eventBatch.getEvents.size} events with sequence " +
-        s"number: ${eventBatch.getSequenceNumber}")
-      Some(eventBatch)
-    } else {
-      logWarning("Did not receive events from Flume agent due to error on the Flume agent: " +
-        eventBatch.getErrorMsg)
-      None
-    }
-  }
-
-  /**
-   * Store the events in the buffer to Spark. This method will not propagate any exceptions,
-   * but will propagate any other errors.
-   * @param buffer The buffer to store
-   * @return true if the data was stored without any exception being thrown, else false
-   */
-  private def store(buffer: ArrayBuffer[SparkFlumeEvent]): Boolean = {
-    try {
-      receiver.store(buffer)
-      true
-    } catch {
-      case e: Exception =>
-        logWarning("Error while attempting to store data received from Flume", e)
-        false
-    }
-  }
-
-  /**
-   * Send an ack to the client for the sequence number. This method does not handle any exceptions
-   * which will be propagated to the caller.
-   * @param client client to send the ack to
-   * @param seq sequence number of the batch to be ack-ed.
-   * @return
-   */
-  private def sendAck(client: SparkFlumeProtocol.Callback, seq: CharSequence): Unit = {
-    logDebug("Sending ack for sequence number: " + seq)
-    client.ack(seq)
-    logDebug("Ack sent for sequence number: " + seq)
-  }
-
-  /**
-   * This method sends a Nack if a batch was received to the client with the given sequence
-   * number. Any exceptions thrown by the RPC call is simply thrown out as is - no effort is made
-   * to handle it.
-   * @param batchReceived true if a batch was received. If this is false, no nack is sent
-   * @param client The client to which the nack should be sent
-   * @param seq The sequence number of the batch that is being nack-ed.
-   */
-  private def sendNack(batchReceived: Boolean, client: SparkFlumeProtocol.Callback,
-    seq: CharSequence): Unit = {
-    if (batchReceived) {
-      // Let Flume know that the events need to be pushed back into the channel.
-      logDebug("Sending nack for sequence number: " + seq)
-      client.nack(seq) // If the agent is down, even this could fail and throw
-      logDebug("Nack sent for sequence number: " + seq)
-    }
-  }
-
-  /**
-   * Utility method to convert [[SparkSinkEvent]]s to [[SparkFlumeEvent]]s
-   * @param events - Events to convert to SparkFlumeEvents
-   * @return - The SparkFlumeEvent generated from SparkSinkEvent
-   */
-  private def toSparkFlumeEvents(events: java.util.List[SparkSinkEvent]):
-    ArrayBuffer[SparkFlumeEvent] = {
-    // Convert each Flume event to a serializable SparkFlumeEvent
-    val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
-    var j = 0
-    while (j < events.size()) {
-      val event = events.get(j)
-      val sparkFlumeEvent = new SparkFlumeEvent()
-      sparkFlumeEvent.event.setBody(event.getBody)
-      sparkFlumeEvent.event.setHeaders(event.getHeaders)
-      buffer += sparkFlumeEvent
-      j += 1
-    }
-    buffer
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org