You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/03/15 00:56:16 UTC

[5/5] spark git commit: [SPARK-13843][STREAMING] Remove streaming-flume, streaming-mqtt, streaming-zeromq, streaming-akka, streaming-twitter to Spark packages

[SPARK-13843][STREAMING] Remove streaming-flume, streaming-mqtt, streaming-zeromq, streaming-akka, streaming-twitter to Spark packages

## What changes were proposed in this pull request?

Currently there are a few sub-projects, each for integrating with different external sources for Streaming.  Now that we have better ability to include external libraries (spark packages) and with Spark 2.0 coming up, we can move the following projects out of Spark to https://github.com/spark-packages

- streaming-flume
- streaming-akka
- streaming-mqtt
- streaming-zeromq
- streaming-twitter

They are just some ancillary packages and considering the overhead of maintenance, running tests and PR failures, it's better to maintain them out of Spark. In addition, these projects can have their different release cycles and we can release them faster.

I have already copied these projects to https://github.com/spark-packages

## How was this patch tested?

Jenkins tests

Author: Shixiong Zhu <sh...@databricks.com>

Closes #11672 from zsxwing/remove-external-pkg.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/06dec374
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/06dec374
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/06dec374

Branch: refs/heads/master
Commit: 06dec37455c3f800897defee6fad0da623f26050
Parents: 8301fad
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Mon Mar 14 16:56:04 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Mar 14 16:56:04 2016 -0700

----------------------------------------------------------------------
 dev/audit-release/audit_release.py              |   3 +-
 dev/run-tests.py                                |   3 -
 dev/sparktestsupport/modules.py                 |  84 -----
 examples/pom.xml                                |  31 --
 .../examples/streaming/JavaActorWordCount.java  | 144 ---------
 .../examples/streaming/JavaFlumeEventCount.java |  75 -----
 .../JavaTwitterHashTagJoinSentiments.java       | 175 -----------
 .../examples/streaming/ActorWordCount.scala     | 175 -----------
 .../examples/streaming/FlumeEventCount.scala    |  70 -----
 .../streaming/FlumePollingEventCount.scala      |  67 ----
 .../examples/streaming/MQTTWordCount.scala      | 119 -------
 .../examples/streaming/TwitterAlgebirdCMS.scala | 116 -------
 .../examples/streaming/TwitterAlgebirdHLL.scala |  94 ------
 .../TwitterHashTagJoinSentiments.scala          |  96 ------
 .../examples/streaming/TwitterPopularTags.scala |  85 -----
 .../examples/streaming/ZeroMQWordCount.scala    | 105 -------
 external/akka/pom.xml                           |  70 -----
 .../spark/streaming/akka/ActorReceiver.scala    | 306 ------------------
 .../apache/spark/streaming/akka/AkkaUtils.scala | 147 ---------
 .../streaming/akka/JavaAkkaUtilsSuite.java      |  68 ----
 .../spark/streaming/akka/AkkaUtilsSuite.scala   |  67 ----
 external/flume-assembly/pom.xml                 | 168 ----------
 external/flume-sink/pom.xml                     | 129 --------
 .../flume-sink/src/main/avro/sparkflume.avdl    |  40 ---
 .../spark/streaming/flume/sink/Logging.scala    | 127 --------
 .../flume/sink/SparkAvroCallbackHandler.scala   | 166 ----------
 .../spark/streaming/flume/sink/SparkSink.scala  | 171 ----------
 .../flume/sink/SparkSinkThreadFactory.scala     |  35 ---
 .../streaming/flume/sink/SparkSinkUtils.scala   |  28 --
 .../flume/sink/TransactionProcessor.scala       | 252 ---------------
 .../src/test/resources/log4j.properties         |  28 --
 .../streaming/flume/sink/SparkSinkSuite.scala   | 218 -------------
 external/flume/pom.xml                          |  78 -----
 .../streaming/flume/EventTransformer.scala      |  72 -----
 .../streaming/flume/FlumeBatchFetcher.scala     | 166 ----------
 .../streaming/flume/FlumeInputDStream.scala     | 205 ------------
 .../flume/FlumePollingInputDStream.scala        | 123 --------
 .../spark/streaming/flume/FlumeTestUtils.scala  | 117 -------
 .../spark/streaming/flume/FlumeUtils.scala      | 311 -------------------
 .../streaming/flume/PollingFlumeTestUtils.scala | 209 -------------
 .../spark/streaming/flume/package-info.java     |  21 --
 .../apache/spark/streaming/flume/package.scala  |  23 --
 .../streaming/LocalJavaStreamingContext.java    |  44 ---
 .../flume/JavaFlumePollingStreamSuite.java      |  44 ---
 .../streaming/flume/JavaFlumeStreamSuite.java   |  36 ---
 .../flume/src/test/resources/log4j.properties   |  28 --
 .../spark/streaming/TestOutputStream.scala      |  48 ---
 .../flume/FlumePollingStreamSuite.scala         | 129 --------
 .../streaming/flume/FlumeStreamSuite.scala      | 102 ------
 external/mqtt-assembly/pom.xml                  | 175 -----------
 external/mqtt/pom.xml                           | 104 -------
 external/mqtt/src/main/assembly/assembly.xml    |  44 ---
 .../spark/streaming/mqtt/MQTTInputDStream.scala | 102 ------
 .../apache/spark/streaming/mqtt/MQTTUtils.scala |  92 ------
 .../spark/streaming/mqtt/package-info.java      |  21 --
 .../apache/spark/streaming/mqtt/package.scala   |  23 --
 .../streaming/LocalJavaStreamingContext.java    |  44 ---
 .../streaming/mqtt/JavaMQTTStreamSuite.java     |  37 ---
 .../mqtt/src/test/resources/log4j.properties    |  28 --
 .../spark/streaming/mqtt/MQTTStreamSuite.scala  |  79 -----
 .../spark/streaming/mqtt/MQTTTestUtils.scala    | 111 -------
 external/twitter/pom.xml                        |  70 -----
 .../streaming/twitter/TwitterInputDStream.scala | 115 -------
 .../spark/streaming/twitter/TwitterUtils.scala  | 132 --------
 .../spark/streaming/twitter/package-info.java   |  21 --
 .../spark/streaming/twitter/package.scala       |  23 --
 .../streaming/LocalJavaStreamingContext.java    |  44 ---
 .../twitter/JavaTwitterStreamSuite.java         |  44 ---
 .../twitter/src/test/resources/log4j.properties |  28 --
 .../streaming/twitter/TwitterStreamSuite.scala  |  59 ----
 external/zeromq/pom.xml                         |  74 -----
 .../spark/streaming/zeromq/ZeroMQReceiver.scala |  55 ----
 .../spark/streaming/zeromq/ZeroMQUtils.scala    | 163 ----------
 .../spark/streaming/zeromq/package-info.java    |  21 --
 .../apache/spark/streaming/zeromq/package.scala |  23 --
 .../streaming/LocalJavaStreamingContext.java    |  44 ---
 .../streaming/zeromq/JavaZeroMQStreamSuite.java |  63 ----
 .../zeromq/src/test/resources/log4j.properties  |  28 --
 .../streaming/zeromq/ZeroMQStreamSuite.scala    |  63 ----
 pom.xml                                         |  86 -----
 project/SparkBuild.scala                        |  37 +--
 python/pyspark/streaming/flume.py               | 140 ---------
 python/pyspark/streaming/mqtt.py                |  70 -----
 python/pyspark/streaming/tests.py               | 266 +---------------
 84 files changed, 13 insertions(+), 7734 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/dev/audit-release/audit_release.py
----------------------------------------------------------------------
diff --git a/dev/audit-release/audit_release.py b/dev/audit-release/audit_release.py
index 4dabb51..426b311 100755
--- a/dev/audit-release/audit_release.py
+++ b/dev/audit-release/audit_release.py
@@ -116,8 +116,7 @@ original_dir = os.getcwd()
 # dependencies within those projects.
 modules = [
     "spark-core", "spark-mllib", "spark-streaming", "spark-repl",
-    "spark-graphx", "spark-streaming-flume", "spark-streaming-kafka",
-    "spark-streaming-mqtt", "spark-streaming-twitter", "spark-streaming-zeromq",
+    "spark-graphx", "spark-streaming-kafka",
     "spark-catalyst", "spark-sql", "spark-hive", "spark-streaming-kinesis-asl"
 ]
 modules = map(lambda m: "%s_%s" % (m, SCALA_BINARY_VERSION), modules)

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/dev/run-tests.py
----------------------------------------------------------------------
diff --git a/dev/run-tests.py b/dev/run-tests.py
index a1e6f1b..d940cda 100755
--- a/dev/run-tests.py
+++ b/dev/run-tests.py
@@ -337,9 +337,6 @@ def build_spark_sbt(hadoop_version):
     build_profiles = get_hadoop_profiles(hadoop_version) + modules.root.build_profile_flags
     sbt_goals = ["package",
                  "streaming-kafka-assembly/assembly",
-                 "streaming-flume-assembly/assembly",
-                 "streaming-mqtt-assembly/assembly",
-                 "streaming-mqtt/test:assembly",
                  "streaming-kinesis-asl-assembly/assembly"]
     profiles_and_goals = build_profiles + sbt_goals
 

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/dev/sparktestsupport/modules.py
----------------------------------------------------------------------
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 1781de4..d118488 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -210,43 +210,6 @@ streaming_kinesis_asl = Module(
 )
 
 
-streaming_zeromq = Module(
-    name="streaming-zeromq",
-    dependencies=[streaming],
-    source_file_regexes=[
-        "external/zeromq",
-    ],
-    sbt_test_goals=[
-        "streaming-zeromq/test",
-    ]
-)
-
-
-streaming_twitter = Module(
-    name="streaming-twitter",
-    dependencies=[streaming],
-    source_file_regexes=[
-        "external/twitter",
-    ],
-    sbt_test_goals=[
-        "streaming-twitter/test",
-    ]
-)
-
-
-streaming_mqtt = Module(
-    name="streaming-mqtt",
-    dependencies=[streaming],
-    source_file_regexes=[
-        "external/mqtt",
-        "external/mqtt-assembly",
-    ],
-    sbt_test_goals=[
-        "streaming-mqtt/test",
-    ]
-)
-
-
 streaming_kafka = Module(
     name="streaming-kafka",
     dependencies=[streaming],
@@ -260,51 +223,6 @@ streaming_kafka = Module(
 )
 
 
-streaming_flume_sink = Module(
-    name="streaming-flume-sink",
-    dependencies=[streaming],
-    source_file_regexes=[
-        "external/flume-sink",
-    ],
-    sbt_test_goals=[
-        "streaming-flume-sink/test",
-    ]
-)
-
-
-streaming_akka = Module(
-    name="streaming-akka",
-    dependencies=[streaming],
-    source_file_regexes=[
-        "external/akka",
-    ],
-    sbt_test_goals=[
-        "streaming-akka/test",
-    ]
-)
-
-
-streaming_flume = Module(
-    name="streaming-flume",
-    dependencies=[streaming],
-    source_file_regexes=[
-        "external/flume",
-    ],
-    sbt_test_goals=[
-        "streaming-flume/test",
-    ]
-)
-
-
-streaming_flume_assembly = Module(
-    name="streaming-flume-assembly",
-    dependencies=[streaming_flume, streaming_flume_sink],
-    source_file_regexes=[
-        "external/flume-assembly",
-    ]
-)
-
-
 mllib = Module(
     name="mllib",
     dependencies=[streaming, sql],
@@ -376,8 +294,6 @@ pyspark_streaming = Module(
         pyspark_core,
         streaming,
         streaming_kafka,
-        streaming_flume_assembly,
-        streaming_mqtt,
         streaming_kinesis_asl
     ],
     source_file_regexes=[

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 3a3f547..92bb373 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -67,37 +67,6 @@
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming-twitter_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming-akka_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming-zeromq_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.spark-project.protobuf</groupId>
-          <artifactId>protobuf-java</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
       <artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
deleted file mode 100644
index 7884b8c..0000000
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming;
-
-import java.util.Arrays;
-import java.util.Iterator;
-
-import scala.Tuple2;
-
-import akka.actor.ActorSelection;
-import akka.actor.Props;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.akka.AkkaUtils;
-import org.apache.spark.streaming.akka.JavaActorReceiver;
-
-/**
- * A sample actor as receiver, is also simplest. This receiver actor
- * goes and subscribe to a typical publisher/feeder actor and receives
- * data.
- *
- * @see [[org.apache.spark.examples.streaming.FeederActor]]
- */
-class JavaSampleActorReceiver<T> extends JavaActorReceiver {
-
-  private final String urlOfPublisher;
-
-  public JavaSampleActorReceiver(String urlOfPublisher) {
-    this.urlOfPublisher = urlOfPublisher;
-  }
-
-  private ActorSelection remotePublisher;
-
-  @Override
-  public void preStart() {
-    remotePublisher = getContext().actorSelection(urlOfPublisher);
-    remotePublisher.tell(new SubscribeReceiver(getSelf()), getSelf());
-  }
-
-  @Override
-  public void onReceive(Object msg) throws Exception {
-    @SuppressWarnings("unchecked")
-    T msgT = (T) msg;
-    store(msgT);
-  }
-
-  @Override
-  public void postStop() {
-    remotePublisher.tell(new UnsubscribeReceiver(getSelf()), getSelf());
-  }
-}
-
-/**
- * A sample word count program demonstrating the use of plugging in
- * Actor as Receiver
- * Usage: JavaActorWordCount <hostname> <port>
- *   <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
- *
- * To run this example locally, you may run Feeder Actor as
- * <code><pre>
- *     $ bin/run-example org.apache.spark.examples.streaming.FeederActor localhost 9999
- * </pre></code>
- * and then run the example
- * <code><pre>
- *     $ bin/run-example org.apache.spark.examples.streaming.JavaActorWordCount localhost 9999
- * </pre></code>
- */
-public class JavaActorWordCount {
-
-  public static void main(String[] args) {
-    if (args.length < 2) {
-      System.err.println("Usage: JavaActorWordCount <hostname> <port>");
-      System.exit(1);
-    }
-
-    StreamingExamples.setStreamingLogLevels();
-
-    final String host = args[0];
-    final String port = args[1];
-    SparkConf sparkConf = new SparkConf().setAppName("JavaActorWordCount");
-    // Create the context and set the batch size
-    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
-
-    String feederActorURI = "akka.tcp://test@" + host + ":" + port + "/user/FeederActor";
-
-    /*
-     * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver
-     *
-     * An important point to note:
-     * Since Actor may exist outside the spark framework, It is thus user's responsibility
-     * to ensure the type safety, i.e type of data received and InputDstream
-     * should be same.
-     *
-     * For example: Both AkkaUtils.createStream and JavaSampleActorReceiver are parameterized
-     * to same type to ensure type safety.
-     */
-    JavaDStream<String> lines = AkkaUtils.createStream(
-        jssc,
-        Props.create(JavaSampleActorReceiver.class, feederActorURI),
-        "SampleReceiver");
-
-    // compute wordcount
-    lines.flatMap(new FlatMapFunction<String, String>() {
-      @Override
-      public Iterator<String> call(String s) {
-        return Arrays.asList(s.split("\\s+")).iterator();
-      }
-    }).mapToPair(new PairFunction<String, String, Integer>() {
-      @Override
-      public Tuple2<String, Integer> call(String s) {
-        return new Tuple2<>(s, 1);
-      }
-    }).reduceByKey(new Function2<Integer, Integer, Integer>() {
-      @Override
-      public Integer call(Integer i1, Integer i2) {
-        return i1 + i2;
-      }
-    }).print();
-
-    jssc.start();
-    jssc.awaitTermination();
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
deleted file mode 100644
index da56637..0000000
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.examples.streaming.StreamingExamples;
-import org.apache.spark.streaming.*;
-import org.apache.spark.streaming.api.java.*;
-import org.apache.spark.streaming.flume.FlumeUtils;
-import org.apache.spark.streaming.flume.SparkFlumeEvent;
-
-/**
- *  Produces a count of events received from Flume.
- *
- *  This should be used in conjunction with an AvroSink in Flume. It will start
- *  an Avro server on at the request host:port address and listen for requests.
- *  Your Flume AvroSink should be pointed to this address.
- *
- *  Usage: JavaFlumeEventCount <host> <port>
- *    <host> is the host the Flume receiver will be started on - a receiver
- *           creates a server and listens for flume events.
- *    <port> is the port the Flume receiver will listen on.
- *
- *  To run this example:
- *     `$ bin/run-example org.apache.spark.examples.streaming.JavaFlumeEventCount <host> <port>`
- */
-public final class JavaFlumeEventCount {
-  private JavaFlumeEventCount() {
-  }
-
-  public static void main(String[] args) {
-    if (args.length != 2) {
-      System.err.println("Usage: JavaFlumeEventCount <host> <port>");
-      System.exit(1);
-    }
-
-    StreamingExamples.setStreamingLogLevels();
-
-    String host = args[0];
-    int port = Integer.parseInt(args[1]);
-
-    Duration batchInterval = new Duration(2000);
-    SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
-    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
-    JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);
-
-    flumeStream.count();
-
-    flumeStream.count().map(new Function<Long, String>() {
-      @Override
-      public String call(Long in) {
-        return "Received " + in + " flume events.";
-      }
-    }).print();
-
-    ssc.start();
-    ssc.awaitTermination();
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
deleted file mode 100644
index f0ae9a9..0000000
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.api.java.function.VoidFunction;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.twitter.TwitterUtils;
-import scala.Tuple2;
-import twitter4j.Status;
-
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Displays the most positive hash tags by joining the streaming Twitter data with a static RDD of
- * the AFINN word list (http://neuro.imm.dtu.dk/wiki/AFINN)
- */
-public class JavaTwitterHashTagJoinSentiments {
-
-  public static void main(String[] args) {
-    if (args.length < 4) {
-      System.err.println("Usage: JavaTwitterHashTagJoinSentiments <consumer key> <consumer secret>" +
-        " <access token> <access token secret> [<filters>]");
-      System.exit(1);
-    }
-
-    StreamingExamples.setStreamingLogLevels();
-
-    String consumerKey = args[0];
-    String consumerSecret = args[1];
-    String accessToken = args[2];
-    String accessTokenSecret = args[3];
-    String[] filters = Arrays.copyOfRange(args, 4, args.length);
-
-    // Set the system properties so that Twitter4j library used by Twitter stream
-    // can use them to generate OAuth credentials
-    System.setProperty("twitter4j.oauth.consumerKey", consumerKey);
-    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret);
-    System.setProperty("twitter4j.oauth.accessToken", accessToken);
-    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret);
-
-    SparkConf sparkConf = new SparkConf().setAppName("JavaTwitterHashTagJoinSentiments");
-    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
-    JavaReceiverInputDStream<Status> stream = TwitterUtils.createStream(jssc, filters);
-
-    JavaDStream<String> words = stream.flatMap(new FlatMapFunction<Status, String>() {
-      @Override
-      public Iterator<String> call(Status s) {
-        return Arrays.asList(s.getText().split(" ")).iterator();
-      }
-    });
-
-    JavaDStream<String> hashTags = words.filter(new Function<String, Boolean>() {
-      @Override
-      public Boolean call(String word) {
-        return word.startsWith("#");
-      }
-    });
-
-    // Read in the word-sentiment list and create a static RDD from it
-    String wordSentimentFilePath = "data/streaming/AFINN-111.txt";
-    final JavaPairRDD<String, Double> wordSentiments = jssc.sparkContext().textFile(wordSentimentFilePath)
-      .mapToPair(new PairFunction<String, String, Double>(){
-        @Override
-        public Tuple2<String, Double> call(String line) {
-          String[] columns = line.split("\t");
-          return new Tuple2<>(columns[0], Double.parseDouble(columns[1]));
-        }
-      });
-
-    JavaPairDStream<String, Integer> hashTagCount = hashTags.mapToPair(
-      new PairFunction<String, String, Integer>() {
-        @Override
-        public Tuple2<String, Integer> call(String s) {
-          // leave out the # character
-          return new Tuple2<>(s.substring(1), 1);
-        }
-      });
-
-    JavaPairDStream<String, Integer> hashTagTotals = hashTagCount.reduceByKeyAndWindow(
-      new Function2<Integer, Integer, Integer>() {
-        @Override
-        public Integer call(Integer a, Integer b) {
-          return a + b;
-        }
-      }, new Duration(10000));
-
-    // Determine the hash tags with the highest sentiment values by joining the streaming RDD
-    // with the static RDD inside the transform() method and then multiplying
-    // the frequency of the hash tag by its sentiment value
-    JavaPairDStream<String, Tuple2<Double, Integer>> joinedTuples =
-      hashTagTotals.transformToPair(new Function<JavaPairRDD<String, Integer>,
-        JavaPairRDD<String, Tuple2<Double, Integer>>>() {
-        @Override
-        public JavaPairRDD<String, Tuple2<Double, Integer>> call(
-            JavaPairRDD<String, Integer> topicCount) {
-          return wordSentiments.join(topicCount);
-        }
-      });
-
-    JavaPairDStream<String, Double> topicHappiness = joinedTuples.mapToPair(
-      new PairFunction<Tuple2<String, Tuple2<Double, Integer>>, String, Double>() {
-        @Override
-        public Tuple2<String, Double> call(Tuple2<String,
-          Tuple2<Double, Integer>> topicAndTuplePair) {
-          Tuple2<Double, Integer> happinessAndCount = topicAndTuplePair._2();
-          return new Tuple2<>(topicAndTuplePair._1(),
-            happinessAndCount._1() * happinessAndCount._2());
-        }
-      });
-
-    JavaPairDStream<Double, String> happinessTopicPairs = topicHappiness.mapToPair(
-      new PairFunction<Tuple2<String, Double>, Double, String>() {
-        @Override
-        public Tuple2<Double, String> call(Tuple2<String, Double> topicHappiness) {
-          return new Tuple2<>(topicHappiness._2(),
-            topicHappiness._1());
-        }
-      });
-
-    JavaPairDStream<Double, String> happiest10 = happinessTopicPairs.transformToPair(
-      new Function<JavaPairRDD<Double, String>, JavaPairRDD<Double, String>>() {
-        @Override
-        public JavaPairRDD<Double, String> call(
-            JavaPairRDD<Double, String> happinessAndTopics) {
-          return happinessAndTopics.sortByKey(false);
-        }
-      }
-    );
-
-    // Print hash tags with the most positive sentiment values
-    happiest10.foreachRDD(new VoidFunction<JavaPairRDD<Double, String>>() {
-      @Override
-      public void call(JavaPairRDD<Double, String> happinessTopicPairs) {
-        List<Tuple2<Double, String>> topList = happinessTopicPairs.take(10);
-        System.out.println(
-          String.format("\nHappiest topics in last 10 seconds (%s total):",
-            happinessTopicPairs.count()));
-        for (Tuple2<Double, String> pair : topList) {
-          System.out.println(
-            String.format("%s (%s happiness)", pair._2(), pair._1()));
-        }
-      }
-    });
-
-    jssc.start();
-    jssc.awaitTermination();
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
deleted file mode 100644
index 844772a..0000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-
-import scala.collection.mutable.LinkedHashSet
-import scala.util.Random
-
-import akka.actor._
-import com.typesafe.config.ConfigFactory
-
-import org.apache.spark.SparkConf
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
-
-case class SubscribeReceiver(receiverActor: ActorRef)
-case class UnsubscribeReceiver(receiverActor: ActorRef)
-
-/**
- * Sends the random content to every receiver subscribed with 1/2
- *  second delay.
- */
-class FeederActor extends Actor {
-
-  val rand = new Random()
-  val receivers = new LinkedHashSet[ActorRef]()
-
-  val strings: Array[String] = Array("words ", "may ", "count ")
-
-  def makeMessage(): String = {
-    val x = rand.nextInt(3)
-    strings(x) + strings(2 - x)
-  }
-
-  /*
-   * A thread to generate random messages
-   */
-  new Thread() {
-    override def run() {
-      while (true) {
-        Thread.sleep(500)
-        receivers.foreach(_ ! makeMessage)
-      }
-    }
-  }.start()
-
-  def receive: Receive = {
-    case SubscribeReceiver(receiverActor: ActorRef) =>
-      println("received subscribe from %s".format(receiverActor.toString))
-      receivers += receiverActor
-
-    case UnsubscribeReceiver(receiverActor: ActorRef) =>
-      println("received unsubscribe from %s".format(receiverActor.toString))
-      receivers -= receiverActor
-  }
-}
-
-/**
- * A sample actor as receiver, is also simplest. This receiver actor
- * goes and subscribe to a typical publisher/feeder actor and receives
- * data.
- *
- * @see [[org.apache.spark.examples.streaming.FeederActor]]
- */
-class SampleActorReceiver[T](urlOfPublisher: String) extends ActorReceiver {
-
-  lazy private val remotePublisher = context.actorSelection(urlOfPublisher)
-
-  override def preStart(): Unit = remotePublisher ! SubscribeReceiver(context.self)
-
-  def receive: PartialFunction[Any, Unit] = {
-    case msg => store(msg.asInstanceOf[T])
-  }
-
-  override def postStop(): Unit = remotePublisher ! UnsubscribeReceiver(context.self)
-
-}
-
-/**
- * A sample feeder actor
- *
- * Usage: FeederActor <hostname> <port>
- *   <hostname> and <port> describe the AkkaSystem that Spark Sample feeder would start on.
- */
-object FeederActor {
-
-  def main(args: Array[String]) {
-    if (args.length < 2) {
-      System.err.println("Usage: FeederActor <hostname> <port>\n")
-      System.exit(1)
-    }
-    val Seq(host, port) = args.toSeq
-
-    val akkaConf = ConfigFactory.parseString(
-      s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
-         |akka.remote.enabled-transports = ["akka.remote.netty.tcp"]
-         |akka.remote.netty.tcp.hostname = "$host"
-         |akka.remote.netty.tcp.port = $port
-         |""".stripMargin)
-       val actorSystem = ActorSystem("test", akkaConf)
-    val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")
-
-    println("Feeder started as:" + feeder)
-
-    actorSystem.awaitTermination()
-  }
-}
-
-/**
- * A sample word count program demonstrating the use of plugging in
- *
- * Actor as Receiver
- * Usage: ActorWordCount <hostname> <port>
- *   <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
- *
- * To run this example locally, you may run Feeder Actor as
- *    `$ bin/run-example org.apache.spark.examples.streaming.FeederActor localhost 9999`
- * and then run the example
- *    `$ bin/run-example org.apache.spark.examples.streaming.ActorWordCount localhost 9999`
- */
-object ActorWordCount {
-  def main(args: Array[String]) {
-    if (args.length < 2) {
-      System.err.println(
-        "Usage: ActorWordCount <hostname> <port>")
-      System.exit(1)
-    }
-
-    StreamingExamples.setStreamingLogLevels()
-
-    val Seq(host, port) = args.toSeq
-    val sparkConf = new SparkConf().setAppName("ActorWordCount")
-    // Create the context and set the batch size
-    val ssc = new StreamingContext(sparkConf, Seconds(2))
-
-    /*
-     * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver
-     *
-     * An important point to note:
-     * Since Actor may exist outside the spark framework, It is thus user's responsibility
-     * to ensure the type safety, i.e type of data received and InputDStream
-     * should be same.
-     *
-     * For example: Both AkkaUtils.createStream and SampleActorReceiver are parameterized
-     * to same type to ensure type safety.
-     */
-    val lines = AkkaUtils.createStream[String](
-      ssc,
-      Props(classOf[SampleActorReceiver[String]],
-        "akka.tcp://test@%s:%s/user/FeederActor".format(host, port.toInt)),
-      "SampleReceiver")
-
-    // compute wordcount
-    lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print()
-
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
deleted file mode 100644
index 91e52e4..0000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-
-import org.apache.spark.SparkConf
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.flume._
-import org.apache.spark.util.IntParam
-
-/**
- *  Produces a count of events received from Flume.
- *
- *  This should be used in conjunction with an AvroSink in Flume. It will start
- *  an Avro server on at the request host:port address and listen for requests.
- *  Your Flume AvroSink should be pointed to this address.
- *
- *  Usage: FlumeEventCount <host> <port>
- *    <host> is the host the Flume receiver will be started on - a receiver
- *           creates a server and listens for flume events.
- *    <port> is the port the Flume receiver will listen on.
- *
- *  To run this example:
- *    `$ bin/run-example org.apache.spark.examples.streaming.FlumeEventCount <host> <port> `
- */
-object FlumeEventCount {
-  def main(args: Array[String]) {
-    if (args.length < 2) {
-      System.err.println(
-        "Usage: FlumeEventCount <host> <port>")
-      System.exit(1)
-    }
-
-    StreamingExamples.setStreamingLogLevels()
-
-    val Array(host, IntParam(port)) = args
-
-    val batchInterval = Milliseconds(2000)
-
-    // Create the context and set the batch size
-    val sparkConf = new SparkConf().setAppName("FlumeEventCount")
-    val ssc = new StreamingContext(sparkConf, batchInterval)
-
-    // Create a flume stream
-    val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
-
-    // Print out the count of events received from this server in each batch
-    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
-
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala
deleted file mode 100644
index dd725d7..0000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-
-import org.apache.spark.SparkConf
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.flume._
-import org.apache.spark.util.IntParam
-
-/**
- *  Produces a count of events received from Flume.
- *
- *  This should be used in conjunction with the Spark Sink running in a Flume agent. See
- *  the Spark Streaming programming guide for more details.
- *
- *  Usage: FlumePollingEventCount <host> <port>
- *    `host` is the host on which the Spark Sink is running.
- *    `port` is the port at which the Spark Sink is listening.
- *
- *  To run this example:
- *    `$ bin/run-example org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] `
- */
-object FlumePollingEventCount {
-  def main(args: Array[String]) {
-    if (args.length < 2) {
-      System.err.println(
-        "Usage: FlumePollingEventCount <host> <port>")
-      System.exit(1)
-    }
-
-    StreamingExamples.setStreamingLogLevels()
-
-    val Array(host, IntParam(port)) = args
-
-    val batchInterval = Milliseconds(2000)
-
-    // Create the context and set the batch size
-    val sparkConf = new SparkConf().setAppName("FlumePollingEventCount")
-    val ssc = new StreamingContext(sparkConf, batchInterval)
-
-    // Create a flume stream that polls the Spark Sink running in a Flume agent
-    val stream = FlumeUtils.createPollingStream(ssc, host, port)
-
-    // Print out the count of events received from this server in each batch
-    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
-
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
deleted file mode 100644
index d772ae3..0000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-
-import org.eclipse.paho.client.mqttv3._
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.mqtt._
-import org.apache.spark.SparkConf
-
-/**
- * A simple Mqtt publisher for demonstration purposes, repeatedly publishes
- * Space separated String Message "hello mqtt demo for spark streaming"
- */
-object MQTTPublisher {
-
-  def main(args: Array[String]) {
-    if (args.length < 2) {
-      System.err.println("Usage: MQTTPublisher <MqttBrokerUrl> <topic>")
-      System.exit(1)
-    }
-
-    StreamingExamples.setStreamingLogLevels()
-
-    val Seq(brokerUrl, topic) = args.toSeq
-
-    var client: MqttClient = null
-
-    try {
-      val persistence = new MemoryPersistence()
-      client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
-
-      client.connect()
-
-      val msgtopic = client.getTopic(topic)
-      val msgContent = "hello mqtt demo for spark streaming"
-      val message = new MqttMessage(msgContent.getBytes("utf-8"))
-
-      while (true) {
-        try {
-          msgtopic.publish(message)
-          println(s"Published data. topic: ${msgtopic.getName()}; Message: $message")
-        } catch {
-          case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT =>
-            Thread.sleep(10)
-            println("Queue is full, wait for to consume data from the message queue")
-        }
-      }
-    } catch {
-      case e: MqttException => println("Exception Caught: " + e)
-    } finally {
-      if (client != null) {
-        client.disconnect()
-      }
-    }
-  }
-}
-
-/**
- * A sample wordcount with MqttStream stream
- *
- * To work with Mqtt, Mqtt Message broker/server required.
- * Mosquitto (http://mosquitto.org/) is an open source Mqtt Broker
- * In ubuntu mosquitto can be installed using the command  `$ sudo apt-get install mosquitto`
- * Eclipse paho project provides Java library for Mqtt Client http://www.eclipse.org/paho/
- * Example Java code for Mqtt Publisher and Subscriber can be found here
- * https://bitbucket.org/mkjinesh/mqttclient
- * Usage: MQTTWordCount <MqttbrokerUrl> <topic>
- *   <MqttbrokerUrl> and <topic> describe where Mqtt publisher is running.
- *
- * To run this example locally, you may run publisher as
- *    `$ bin/run-example \
- *      org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo`
- * and run the example as
- *    `$ bin/run-example \
- *      org.apache.spark.examples.streaming.MQTTWordCount tcp://localhost:1883 foo`
- */
-object MQTTWordCount {
-
-  def main(args: Array[String]) {
-    if (args.length < 2) {
-      // scalastyle:off println
-      System.err.println(
-        "Usage: MQTTWordCount <MqttbrokerUrl> <topic>")
-      // scalastyle:on println
-      System.exit(1)
-    }
-
-    val Seq(brokerUrl, topic) = args.toSeq
-    val sparkConf = new SparkConf().setAppName("MQTTWordCount")
-    val ssc = new StreamingContext(sparkConf, Seconds(2))
-    val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2)
-    val words = lines.flatMap(x => x.split(" "))
-    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
-
-    wordCounts.print()
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
deleted file mode 100644
index 5af82e1..0000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-
-import com.twitter.algebird._
-import com.twitter.algebird.CMSHasherImplicits._
-
-import org.apache.spark.SparkConf
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.twitter._
-
-// scalastyle:off
-/**
- * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute
- * windowed and global Top-K estimates of user IDs occurring in a Twitter stream.
- * <br>
- *   <strong>Note</strong> that since Algebird's implementation currently only supports Long inputs,
- *   the example operates on Long IDs. Once the implementation supports other inputs (such as String),
- *   the same approach could be used for computing popular topics for example.
- * <p>
- * <p>
- *   <a href=
- *   "http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
- *   This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a data
- *   structure for approximate frequency estimation in data streams (e.g. Top-K elements, frequency
- *   of any given element, etc), that uses space sub-linear in the number of elements in the
- *   stream. Once elements are added to the CMS, the estimated count of an element can be computed,
- *   as well as "heavy-hitters" that occur more than a threshold percentage of the overall total
- *   count.
- * <p><p>
- *   Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the
- *   reduce operation.
- */
-// scalastyle:on
-object TwitterAlgebirdCMS {
-  def main(args: Array[String]) {
-    StreamingExamples.setStreamingLogLevels()
-
-    // CMS parameters
-    val DELTA = 1E-3
-    val EPS = 0.01
-    val SEED = 1
-    val PERC = 0.001
-    // K highest frequency elements to take
-    val TOPK = 10
-
-    val filters = args
-    val sparkConf = new SparkConf().setAppName("TwitterAlgebirdCMS")
-    val ssc = new StreamingContext(sparkConf, Seconds(10))
-    val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER_2)
-
-    val users = stream.map(status => status.getUser.getId)
-
-    // val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC)
-    val cms = TopPctCMS.monoid[Long](EPS, DELTA, SEED, PERC)
-    var globalCMS = cms.zero
-    val mm = new MapMonoid[Long, Int]()
-    var globalExact = Map[Long, Int]()
-
-    val approxTopUsers = users.mapPartitions(ids => {
-      ids.map(id => cms.create(id))
-    }).reduce(_ ++ _)
-
-    val exactTopUsers = users.map(id => (id, 1))
-      .reduceByKey((a, b) => a + b)
-
-    approxTopUsers.foreachRDD(rdd => {
-      if (rdd.count() != 0) {
-        val partial = rdd.first()
-        val partialTopK = partial.heavyHitters.map(id =>
-          (id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
-        globalCMS ++= partial
-        val globalTopK = globalCMS.heavyHitters.map(id =>
-          (id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
-        println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC,
-          partialTopK.mkString("[", ",", "]")))
-        println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC,
-          globalTopK.mkString("[", ",", "]")))
-      }
-    })
-
-    exactTopUsers.foreachRDD(rdd => {
-      if (rdd.count() != 0) {
-        val partialMap = rdd.collect().toMap
-        val partialTopK = rdd.map(
-          {case (id, count) => (count, id)})
-          .sortByKey(ascending = false).take(TOPK)
-        globalExact = mm.plus(globalExact.toMap, partialMap)
-        val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK)
-        println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]")))
-        println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]")))
-      }
-    })
-
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
deleted file mode 100644
index 6442b2a..0000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-
-import com.twitter.algebird.HyperLogLog._
-import com.twitter.algebird.HyperLogLogMonoid
-
-import org.apache.spark.SparkConf
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.twitter._
-
-// scalastyle:off
-/**
- * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute
- * a windowed and global estimate of the unique user IDs occurring in a Twitter stream.
- * <p>
- * <p>
- *   This <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
- *   blog post</a> and this
- *   <a href= "http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html">
- *     blog post</a>
- *   have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for
- *   estimating the cardinality of a data stream, i.e. the number of unique elements.
- * <p><p>
- *   Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the
- *   reduce operation.
- */
-// scalastyle:on
-object TwitterAlgebirdHLL {
-  def main(args: Array[String]) {
-
-    StreamingExamples.setStreamingLogLevels()
-
-    /** Bit size parameter for HyperLogLog, trades off accuracy vs size */
-    val BIT_SIZE = 12
-    val filters = args
-    val sparkConf = new SparkConf().setAppName("TwitterAlgebirdHLL")
-    val ssc = new StreamingContext(sparkConf, Seconds(5))
-    val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER)
-
-    val users = stream.map(status => status.getUser.getId)
-
-    val hll = new HyperLogLogMonoid(BIT_SIZE)
-    var globalHll = hll.zero
-    var userSet: Set[Long] = Set()
-
-    val approxUsers = users.mapPartitions(ids => {
-      ids.map(id => hll.create(id))
-    }).reduce(_ + _)
-
-    val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
-
-    approxUsers.foreachRDD(rdd => {
-      if (rdd.count() != 0) {
-        val partial = rdd.first()
-        globalHll += partial
-        println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt))
-        println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt))
-      }
-    })
-
-    exactUsers.foreachRDD(rdd => {
-      if (rdd.count() != 0) {
-        val partial = rdd.first()
-        userSet ++= partial
-        println("Exact distinct users this batch: %d".format(partial.size))
-        println("Exact distinct users overall: %d".format(userSet.size))
-        println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1
-          ) * 100))
-      }
-    })
-
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterHashTagJoinSentiments.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterHashTagJoinSentiments.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterHashTagJoinSentiments.scala
deleted file mode 100644
index a8d392c..0000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterHashTagJoinSentiments.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-
-import org.apache.spark.SparkConf
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.twitter.TwitterUtils
-
-/**
- * Displays the most positive hash tags by joining the streaming Twitter data with a static RDD of
- * the AFINN word list (http://neuro.imm.dtu.dk/wiki/AFINN)
- */
-object TwitterHashTagJoinSentiments {
-  def main(args: Array[String]) {
-    if (args.length < 4) {
-      System.err.println("Usage: TwitterHashTagJoinSentiments <consumer key> <consumer secret> " +
-        "<access token> <access token secret> [<filters>]")
-      System.exit(1)
-    }
-
-    StreamingExamples.setStreamingLogLevels()
-
-    val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
-    val filters = args.takeRight(args.length - 4)
-
-    // Set the system properties so that Twitter4j library used by Twitter stream
-    // can use them to generate OAuth credentials
-    System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
-    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
-    System.setProperty("twitter4j.oauth.accessToken", accessToken)
-    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
-
-    val sparkConf = new SparkConf().setAppName("TwitterHashTagJoinSentiments")
-    val ssc = new StreamingContext(sparkConf, Seconds(2))
-    val stream = TwitterUtils.createStream(ssc, None, filters)
-
-    val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
-
-    // Read in the word-sentiment list and create a static RDD from it
-    val wordSentimentFilePath = "data/streaming/AFINN-111.txt"
-    val wordSentiments = ssc.sparkContext.textFile(wordSentimentFilePath).map { line =>
-      val Array(word, happinessValue) = line.split("\t")
-      (word, happinessValue.toInt)
-    }.cache()
-
-    // Determine the hash tags with the highest sentiment values by joining the streaming RDD
-    // with the static RDD inside the transform() method and then multiplying
-    // the frequency of the hash tag by its sentiment value
-    val happiest60 = hashTags.map(hashTag => (hashTag.tail, 1))
-      .reduceByKeyAndWindow(_ + _, Seconds(60))
-      .transform{topicCount => wordSentiments.join(topicCount)}
-      .map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}
-      .map{case (topic, happinessValue) => (happinessValue, topic)}
-      .transform(_.sortByKey(false))
-
-    val happiest10 = hashTags.map(hashTag => (hashTag.tail, 1))
-      .reduceByKeyAndWindow(_ + _, Seconds(10))
-      .transform{topicCount => wordSentiments.join(topicCount)}
-      .map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}
-      .map{case (topic, happinessValue) => (happinessValue, topic)}
-      .transform(_.sortByKey(false))
-
-    // Print hash tags with the most positive sentiment values
-    happiest60.foreachRDD(rdd => {
-      val topList = rdd.take(10)
-      println("\nHappiest topics in last 60 seconds (%s total):".format(rdd.count()))
-      topList.foreach{case (happiness, tag) => println("%s (%s happiness)".format(tag, happiness))}
-    })
-
-    happiest10.foreachRDD(rdd => {
-      val topList = rdd.take(10)
-      println("\nHappiest topics in last 10 seconds (%s total):".format(rdd.count()))
-      topList.foreach{case (happiness, tag) => println("%s (%s happiness)".format(tag, happiness))}
-    })
-
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
deleted file mode 100644
index 5b69963..0000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.twitter._
-import org.apache.spark.SparkConf
-
-/**
- * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter
- * stream. The stream is instantiated with credentials and optionally filters supplied by the
- * command line arguments.
- *
- * Run this on your local machine as
- *
- */
-object TwitterPopularTags {
-  def main(args: Array[String]) {
-    if (args.length < 4) {
-      System.err.println("Usage: TwitterPopularTags <consumer key> <consumer secret> " +
-        "<access token> <access token secret> [<filters>]")
-      System.exit(1)
-    }
-
-    StreamingExamples.setStreamingLogLevels()
-
-    val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
-    val filters = args.takeRight(args.length - 4)
-
-    // Set the system properties so that Twitter4j library used by twitter stream
-    // can use them to generate OAuth credentials
-    System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
-    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
-    System.setProperty("twitter4j.oauth.accessToken", accessToken)
-    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
-
-    val sparkConf = new SparkConf().setAppName("TwitterPopularTags")
-    val ssc = new StreamingContext(sparkConf, Seconds(2))
-    val stream = TwitterUtils.createStream(ssc, None, filters)
-
-    val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
-
-    val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
-                     .map{case (topic, count) => (count, topic)}
-                     .transform(_.sortByKey(false))
-
-    val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
-                     .map{case (topic, count) => (count, topic)}
-                     .transform(_.sortByKey(false))
-
-
-    // Print popular hashtags
-    topCounts60.foreachRDD(rdd => {
-      val topList = rdd.take(10)
-      println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
-      topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
-    })
-
-    topCounts10.foreachRDD(rdd => {
-      val topList = rdd.take(10)
-      println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
-      topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
-    })
-
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
deleted file mode 100644
index 99b5617..0000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-
-import scala.language.implicitConversions
-
-import akka.actor.ActorSystem
-import akka.actor.actorRef2Scala
-import akka.util.ByteString
-import akka.zeromq._
-import akka.zeromq.Subscribe
-
-import org.apache.spark.SparkConf
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.zeromq._
-
-/**
- * A simple publisher for demonstration purposes, repeatedly publishes random Messages
- * every one second.
- */
-object SimpleZeroMQPublisher {
-
-  def main(args: Array[String]): Unit = {
-    if (args.length < 2) {
-      System.err.println("Usage: SimpleZeroMQPublisher <zeroMQUrl> <topic> ")
-      System.exit(1)
-    }
-
-    val Seq(url, topic) = args.toSeq
-    val acs: ActorSystem = ActorSystem()
-
-    val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url))
-    implicit def stringToByteString(x: String): ByteString = ByteString(x)
-    val messages: List[ByteString] = List("words ", "may ", "count ")
-    while (true) {
-      Thread.sleep(1000)
-      pubSocket ! ZMQMessage(ByteString(topic) :: messages)
-    }
-    acs.awaitTermination()
-  }
-}
-
-// scalastyle:off
-/**
- * A sample wordcount with ZeroMQStream stream
- *
- * To work with zeroMQ, some native libraries have to be installed.
- * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide]
- * (http://www.zeromq.org/intro:get-the-software)
- *
- * Usage: ZeroMQWordCount <zeroMQurl> <topic>
- *   <zeroMQurl> and <topic> describe where zeroMq publisher is running.
- *
- * To run this example locally, you may run publisher as
- *    `$ bin/run-example \
- *      org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.0.1:1234 foo`
- * and run the example as
- *    `$ bin/run-example \
- *      org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.0.1:1234 foo`
- */
-// scalastyle:on
-object ZeroMQWordCount {
-  def main(args: Array[String]) {
-    if (args.length < 2) {
-      System.err.println("Usage: ZeroMQWordCount <zeroMQurl> <topic>")
-      System.exit(1)
-    }
-    StreamingExamples.setStreamingLogLevels()
-    val Seq(url, topic) = args.toSeq
-    val sparkConf = new SparkConf().setAppName("ZeroMQWordCount")
-    // Create the context and set the batch size
-    val ssc = new StreamingContext(sparkConf, Seconds(2))
-
-    def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator
-
-    // For this stream, a zeroMQ publisher should be running.
-    val lines = ZeroMQUtils.createStream(
-      ssc,
-      url,
-      Subscribe(topic),
-      bytesToStringIterator _)
-    val words = lines.flatMap(_.split(" "))
-    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
-    wordCounts.print()
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/akka/pom.xml
----------------------------------------------------------------------
diff --git a/external/akka/pom.xml b/external/akka/pom.xml
deleted file mode 100644
index bbe644e..0000000
--- a/external/akka/pom.xml
+++ /dev/null
@@ -1,70 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.spark</groupId>
-    <artifactId>spark-parent_2.11</artifactId>
-    <version>2.0.0-SNAPSHOT</version>
-    <relativePath>../../pom.xml</relativePath>
-  </parent>
-
-  <groupId>org.apache.spark</groupId>
-  <artifactId>spark-streaming-akka_2.11</artifactId>
-  <properties>
-    <sbt.project.name>streaming-akka</sbt.project.name>
-  </properties>
-  <packaging>jar</packaging>
-  <name>Spark Project External Akka</name>
-  <url>http://spark.apache.org/</url>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>${akka.group}</groupId>
-      <artifactId>akka-actor_${scala.binary.version}</artifactId>
-      <version>${akka.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>${akka.group}</groupId>
-      <artifactId>akka-remote_${scala.binary.version}</artifactId>
-      <version>${akka.version}</version>
-    </dependency>
-  </dependencies>
-  <build>
-    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
-    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
-  </build>
-</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
----------------------------------------------------------------------
diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
deleted file mode 100644
index 33415c1..0000000
--- a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.akka
-
-import java.nio.ByteBuffer
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.concurrent.Future
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.reflect.ClassTag
-
-import akka.actor._
-import akka.actor.SupervisorStrategy.{Escalate, Restart}
-import akka.pattern.ask
-import akka.util.Timeout
-import com.typesafe.config.ConfigFactory
-
-import org.apache.spark.{Logging, TaskContext}
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.receiver.Receiver
-
-/**
- * :: DeveloperApi ::
- * A helper with set of defaults for supervisor strategy
- */
-@DeveloperApi
-object ActorReceiver {
-
-  /**
-   * A OneForOneStrategy supervisor strategy with `maxNrOfRetries = 10` and
-   * `withinTimeRange = 15 millis`. For RuntimeException, it will restart the ActorReceiver; for
-   * others, it just escalates the failure to the supervisor of the supervisor.
-   */
-  val defaultSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
-    15 millis) {
-    case _: RuntimeException => Restart
-    case _: Exception => Escalate
-  }
-
-  /**
-   * A default ActorSystem creator. It will use a unique system name
-   * (streaming-actor-system-<spark-task-attempt-id>) to start an ActorSystem that supports remote
-   * communication.
-   */
-  val defaultActorSystemCreator: () => ActorSystem = () => {
-    val uniqueSystemName = s"streaming-actor-system-${TaskContext.get().taskAttemptId()}"
-    val akkaConf = ConfigFactory.parseString(
-      s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
-         |akka.remote.enabled-transports = ["akka.remote.netty.tcp"]
-         |""".stripMargin)
-    ActorSystem(uniqueSystemName, akkaConf)
-  }
-}
-
-/**
- * :: DeveloperApi ::
- * A base Actor that provides APIs for pushing received data into Spark Streaming for processing.
- *
- * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
- *
- * @example {{{
- *  class MyActor extends ActorReceiver {
- *      def receive {
- *          case anything: String => store(anything)
- *      }
- *  }
- *
- *  AkkaUtils.createStream[String](ssc, Props[MyActor](),"MyActorReceiver")
- *
- * }}}
- *
- * @note Since Actor may exist outside the spark framework, It is thus user's responsibility
- *       to ensure the type safety, i.e. parametrized type of push block and InputDStream
- *       should be same.
- */
-@DeveloperApi
-abstract class ActorReceiver extends Actor {
-
-  /** Store an iterator of received data as a data block into Spark's memory. */
-  def store[T](iter: Iterator[T]) {
-    context.parent ! IteratorData(iter)
-  }
-
-  /**
-   * Store the bytes of received data as a data block into Spark's memory. Note
-   * that the data in the ByteBuffer must be serialized using the same serializer
-   * that Spark is configured to use.
-   */
-  def store(bytes: ByteBuffer) {
-    context.parent ! ByteBufferData(bytes)
-  }
-
-  /**
-   * Store a single item of received data to Spark's memory asynchronously.
-   * These single items will be aggregated together into data blocks before
-   * being pushed into Spark's memory.
-   */
-  def store[T](item: T) {
-    context.parent ! SingleItemData(item)
-  }
-
-  /**
-   * Store a single item of received data to Spark's memory and returns a `Future`.
-   * The `Future` will be completed when the operator finishes, or with an
-   * `akka.pattern.AskTimeoutException` after the given timeout has expired.
-   * These single items will be aggregated together into data blocks before
-   * being pushed into Spark's memory.
-   *
-   * This method allows the user to control the flow speed using `Future`
-   */
-  def store[T](item: T, timeout: Timeout): Future[Unit] = {
-    context.parent.ask(AskStoreSingleItemData(item))(timeout).map(_ => ())(context.dispatcher)
-  }
-}
-
-/**
- * :: DeveloperApi ::
- * A Java UntypedActor that provides APIs for pushing received data into Spark Streaming for
- * processing.
- *
- * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
- *
- * @example {{{
- *  class MyActor extends JavaActorReceiver {
- *      @Override
- *      public void onReceive(Object msg) throws Exception {
- *          store((String) msg);
- *      }
- *  }
- *
- *  AkkaUtils.<String>createStream(jssc, Props.create(MyActor.class), "MyActorReceiver");
- *
- * }}}
- *
- * @note Since Actor may exist outside the spark framework, It is thus user's responsibility
- *       to ensure the type safety, i.e. parametrized type of push block and InputDStream
- *       should be same.
- */
-@DeveloperApi
-abstract class JavaActorReceiver extends UntypedActor {
-
-  /** Store an iterator of received data as a data block into Spark's memory. */
-  def store[T](iter: Iterator[T]) {
-    context.parent ! IteratorData(iter)
-  }
-
-  /**
-   * Store the bytes of received data as a data block into Spark's memory. Note
-   * that the data in the ByteBuffer must be serialized using the same serializer
-   * that Spark is configured to use.
-   */
-  def store(bytes: ByteBuffer) {
-    context.parent ! ByteBufferData(bytes)
-  }
-
-  /**
-   * Store a single item of received data to Spark's memory.
-   * These single items will be aggregated together into data blocks before
-   * being pushed into Spark's memory.
-   */
-  def store[T](item: T) {
-    context.parent ! SingleItemData(item)
-  }
-
-  /**
-   * Store a single item of received data to Spark's memory and returns a `Future`.
-   * The `Future` will be completed when the operator finishes, or with an
-   * `akka.pattern.AskTimeoutException` after the given timeout has expired.
-   * These single items will be aggregated together into data blocks before
-   * being pushed into Spark's memory.
-   *
-   * This method allows the user to control the flow speed using `Future`
-   */
-  def store[T](item: T, timeout: Timeout): Future[Unit] = {
-    context.parent.ask(AskStoreSingleItemData(item))(timeout).map(_ => ())(context.dispatcher)
-  }
-}
-
-/**
- * :: DeveloperApi ::
- * Statistics for querying the supervisor about state of workers. Used in
- * conjunction with `AkkaUtils.createStream` and
- * [[org.apache.spark.streaming.akka.ActorReceiverSupervisor]].
- */
-@DeveloperApi
-case class Statistics(numberOfMsgs: Int,
-  numberOfWorkers: Int,
-  numberOfHiccups: Int,
-  otherInfo: String)
-
-/** Case class to receive data sent by child actors */
-private[akka] sealed trait ActorReceiverData
-private[akka] case class SingleItemData[T](item: T) extends ActorReceiverData
-private[akka] case class AskStoreSingleItemData[T](item: T) extends ActorReceiverData
-private[akka] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData
-private[akka] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData
-private[akka] object Ack extends ActorReceiverData
-
-/**
- * Provides Actors as receivers for receiving stream.
- *
- * As Actors can also be used to receive data from almost any stream source.
- * A nice set of abstraction(s) for actors as receivers is already provided for
- * a few general cases. It is thus exposed as an API where user may come with
- * their own Actor to run as receiver for Spark Streaming input source.
- *
- * This starts a supervisor actor which starts workers and also provides
- * [http://doc.akka.io/docs/akka/snapshot/scala/fault-tolerance.html fault-tolerance].
- *
- * Here's a way to start more supervisor/workers as its children.
- *
- * @example {{{
- *  context.parent ! Props(new Supervisor)
- * }}} OR {{{
- *  context.parent ! Props(new Worker, "Worker")
- * }}}
- */
-private[akka] class ActorReceiverSupervisor[T: ClassTag](
-    actorSystemCreator: () => ActorSystem,
-    props: Props,
-    name: String,
-    storageLevel: StorageLevel,
-    receiverSupervisorStrategy: SupervisorStrategy
-  ) extends Receiver[T](storageLevel) with Logging {
-
-  private lazy val actorSystem = actorSystemCreator()
-  protected lazy val actorSupervisor = actorSystem.actorOf(Props(new Supervisor),
-    "Supervisor" + streamId)
-
-  class Supervisor extends Actor {
-
-    override val supervisorStrategy = receiverSupervisorStrategy
-    private val worker = context.actorOf(props, name)
-    logInfo("Started receiver worker at:" + worker.path)
-
-    private val n: AtomicInteger = new AtomicInteger(0)
-    private val hiccups: AtomicInteger = new AtomicInteger(0)
-
-    override def receive: PartialFunction[Any, Unit] = {
-
-      case IteratorData(iterator) =>
-        logDebug("received iterator")
-        store(iterator.asInstanceOf[Iterator[T]])
-
-      case SingleItemData(msg) =>
-        logDebug("received single")
-        store(msg.asInstanceOf[T])
-        n.incrementAndGet
-
-      case AskStoreSingleItemData(msg) =>
-        logDebug("received single sync")
-        store(msg.asInstanceOf[T])
-        n.incrementAndGet
-        sender() ! Ack
-
-      case ByteBufferData(bytes) =>
-        logDebug("received bytes")
-        store(bytes)
-
-      case props: Props =>
-        val worker = context.actorOf(props)
-        logInfo("Started receiver worker at:" + worker.path)
-        sender ! worker
-
-      case (props: Props, name: String) =>
-        val worker = context.actorOf(props, name)
-        logInfo("Started receiver worker at:" + worker.path)
-        sender ! worker
-
-      case _: PossiblyHarmful => hiccups.incrementAndGet()
-
-      case _: Statistics =>
-        val workers = context.children
-        sender ! Statistics(n.get, workers.size, hiccups.get, workers.mkString("\n"))
-
-    }
-  }
-
-  def onStart(): Unit = {
-    actorSupervisor
-    logInfo("Supervision tree for receivers initialized at:" + actorSupervisor.path)
-  }
-
-  def onStop(): Unit = {
-    actorSupervisor ! PoisonPill
-    actorSystem.shutdown()
-    actorSystem.awaitTermination()
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org