You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/05/07 02:28:00 UTC
[3/4] SPARK-1637: Clean up examples for 1.0
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/python/transitive_closure.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/transitive_closure.py b/examples/src/main/python/transitive_closure.py
new file mode 100755
index 0000000..744cce6
--- /dev/null
+++ b/examples/src/main/python/transitive_closure.py
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+from random import Random
+
+from pyspark import SparkContext
+
+numEdges = 200
+numVertices = 100
+rand = Random(42)
+
+
+def generateGraph():
+ edges = set()
+ while len(edges) < numEdges:
+ src = rand.randrange(0, numEdges)
+ dst = rand.randrange(0, numEdges)
+ if src != dst:
+ edges.add((src, dst))
+ return edges
+
+
+if __name__ == "__main__":
+ if len(sys.argv) == 1:
+ print >> sys.stderr, "Usage: transitive_closure <master> [<slices>]"
+ exit(-1)
+ sc = SparkContext(sys.argv[1], "PythonTransitiveClosure")
+ slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2
+ tc = sc.parallelize(generateGraph(), slices).cache()
+
+ # Linear transitive closure: each round grows paths by one edge,
+ # by joining the graph's edges with the already-discovered paths.
+ # e.g. join the path (y, z) from the TC with the edge (x, y) from
+ # the graph to obtain the path (x, z).
+
+ # Because join() joins on keys, the edges are stored in reversed order.
+ edges = tc.map(lambda (x, y): (y, x))
+
+ oldCount = 0L
+ nextCount = tc.count()
+ while True:
+ oldCount = nextCount
+ # Perform the join, obtaining an RDD of (y, (z, x)) pairs,
+ # then project the result to obtain the new (x, z) paths.
+ new_edges = tc.join(edges).map(lambda (_, (a, b)): (b, a))
+ tc = tc.union(new_edges).distinct().cache()
+ nextCount = tc.count()
+ if nextCount == oldCount:
+ break
+
+ print "TC has %i edges" % tc.count()
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/python/wordcount.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/wordcount.py b/examples/src/main/python/wordcount.py
new file mode 100755
index 0000000..b9139b9
--- /dev/null
+++ b/examples/src/main/python/wordcount.py
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+from operator import add
+
+from pyspark import SparkContext
+
+
+if __name__ == "__main__":
+ if len(sys.argv) < 3:
+ print >> sys.stderr, "Usage: wordcount <master> <file>"
+ exit(-1)
+ sc = SparkContext(sys.argv[1], "PythonWordCount")
+ lines = sc.textFile(sys.argv[2], 1)
+ counts = lines.flatMap(lambda x: x.split(' ')) \
+ .map(lambda x: (x, 1)) \
+ .reduceByKey(add)
+ output = counts.collect()
+ for (word, count) in output:
+ print "%s: %i" % (word, count)
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
new file mode 100644
index 0000000..ff9254b
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.sql
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+
+// One method for defining the schema of an RDD is to make a case class with the desired column
+// names and types.
+case class Record(key: Int, value: String)
+
+object RDDRelation {
+ def main(args: Array[String]) {
+ val sc = new SparkContext("local", "RDDRelation")
+ val sqlContext = new SQLContext(sc)
+
+ // Importing the SQL context gives access to all the SQL functions and implicit conversions.
+ import sqlContext._
+
+ val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
+ // Any RDD containing case classes can be registered as a table. The schema of the table is
+ // automatically inferred using scala reflection.
+ rdd.registerAsTable("records")
+
+ // Once tables have been registered, you can run SQL queries over them.
+ println("Result of SELECT *:")
+ sql("SELECT * FROM records").collect().foreach(println)
+
+ // Aggregation queries are also supported.
+ val count = sql("SELECT COUNT(*) FROM records").collect().head.getInt(0)
+ println(s"COUNT(*): $count")
+
+ // The results of SQL queries are themselves RDDs and support all normal RDD functions. The
+ // items in the RDD are of type Row, which allows you to access each column by ordinal.
+ val rddFromSql = sql("SELECT key, value FROM records WHERE key < 10")
+
+ println("Result of RDD.map:")
+ rddFromSql.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect.foreach(println)
+
+ // Queries can also be written using a LINQ-like Scala DSL.
+ rdd.where('key === 1).orderBy('value.asc).select('key).collect().foreach(println)
+
+ // Write out an RDD as a parquet file.
+ rdd.saveAsParquetFile("pair.parquet")
+
+ // Read in parquet file. Parquet files are self-describing so the schmema is preserved.
+ val parquetFile = sqlContext.parquetFile("pair.parquet")
+
+ // Queries can be run using the DSL on parequet files just like the original RDD.
+ parquetFile.where('key === 1).select('value as 'a).collect().foreach(println)
+
+ // These files can also be registered as tables.
+ parquetFile.registerAsTable("parquetFile")
+ sql("SELECT * FROM parquetFile").collect().foreach(println)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
new file mode 100644
index 0000000..66ce93a
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.sql.hive
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql._
+import org.apache.spark.sql.hive.LocalHiveContext
+
+object HiveFromSpark {
+ case class Record(key: Int, value: String)
+
+ def main(args: Array[String]) {
+ val sc = new SparkContext("local", "HiveFromSpark")
+
+ // A local hive context creates an instance of the Hive Metastore in process, storing the
+ // the warehouse data in the current directory. This location can be overridden by
+ // specifying a second parameter to the constructor.
+ val hiveContext = new LocalHiveContext(sc)
+ import hiveContext._
+
+ hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
+ hql("LOAD DATA LOCAL INPATH 'src/main/resources/kv1.txt' INTO TABLE src")
+
+ // Queries are expressed in HiveQL
+ println("Result of 'SELECT *': ")
+ hql("SELECT * FROM src").collect.foreach(println)
+
+ // Aggregation queries are also supported.
+ val count = hql("SELECT COUNT(*) FROM src").collect().head.getInt(0)
+ println(s"COUNT(*): $count")
+
+ // The results of SQL queries are themselves RDDs and support all normal RDD functions. The
+ // items in the RDD are of type Row, which allows you to access each column by ordinal.
+ val rddFromSql = hql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
+
+ println("Result of RDD.map:")
+ val rddAsStrings = rddFromSql.map {
+ case Row(key: Int, value: String) => s"Key: $key, Value: $value"
+ }
+
+ // You can also register RDDs as temporary tables within a HiveContext.
+ val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
+ rdd.registerAsTable("records")
+
+ // Queries can then join RDD data with data stored in Hive.
+ println("Result of SELECT *:")
+ hql("SELECT * FROM records r JOIN src s ON r.key = s.key").collect().foreach(println)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
new file mode 100644
index 0000000..84cf43d
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import scala.collection.mutable.LinkedList
+import scala.reflect.ClassTag
+import scala.util.Random
+
+import akka.actor.{Actor, ActorRef, Props, actorRef2Scala}
+
+import org.apache.spark.{SparkConf, SecurityManager}
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
+import org.apache.spark.util.AkkaUtils
+import org.apache.spark.streaming.receiver.ActorHelper
+
+case class SubscribeReceiver(receiverActor: ActorRef)
+case class UnsubscribeReceiver(receiverActor: ActorRef)
+
+/**
+ * Sends the random content to every receiver subscribed with 1/2
+ * second delay.
+ */
+class FeederActor extends Actor {
+
+ val rand = new Random()
+ var receivers: LinkedList[ActorRef] = new LinkedList[ActorRef]()
+
+ val strings: Array[String] = Array("words ", "may ", "count ")
+
+ def makeMessage(): String = {
+ val x = rand.nextInt(3)
+ strings(x) + strings(2 - x)
+ }
+
+ /*
+ * A thread to generate random messages
+ */
+ new Thread() {
+ override def run() {
+ while (true) {
+ Thread.sleep(500)
+ receivers.foreach(_ ! makeMessage)
+ }
+ }
+ }.start()
+
+ def receive: Receive = {
+
+ case SubscribeReceiver(receiverActor: ActorRef) =>
+ println("received subscribe from %s".format(receiverActor.toString))
+ receivers = LinkedList(receiverActor) ++ receivers
+
+ case UnsubscribeReceiver(receiverActor: ActorRef) =>
+ println("received unsubscribe from %s".format(receiverActor.toString))
+ receivers = receivers.dropWhile(x => x eq receiverActor)
+
+ }
+}
+
+/**
+ * A sample actor as receiver, is also simplest. This receiver actor
+ * goes and subscribe to a typical publisher/feeder actor and receives
+ * data.
+ *
+ * @see [[org.apache.spark.examples.streaming.FeederActor]]
+ */
+class SampleActorReceiver[T: ClassTag](urlOfPublisher: String)
+extends Actor with ActorHelper {
+
+ lazy private val remotePublisher = context.actorSelection(urlOfPublisher)
+
+ override def preStart = remotePublisher ! SubscribeReceiver(context.self)
+
+ def receive = {
+ case msg => store(msg.asInstanceOf[T])
+ }
+
+ override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)
+
+}
+
+/**
+ * A sample feeder actor
+ *
+ * Usage: FeederActor <hostname> <port>
+ * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder would start on.
+ */
+object FeederActor {
+
+ def main(args: Array[String]) {
+ if(args.length < 2){
+ System.err.println(
+ "Usage: FeederActor <hostname> <port>\n"
+ )
+ System.exit(1)
+ }
+ val Seq(host, port) = args.toSeq
+
+ val conf = new SparkConf
+ val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, conf = conf,
+ securityManager = new SecurityManager(conf))._1
+ val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")
+
+ println("Feeder started as:" + feeder)
+
+ actorSystem.awaitTermination()
+ }
+}
+
+/**
+ * A sample word count program demonstrating the use of plugging in
+ * Actor as Receiver
+ * Usage: ActorWordCount <master> <hostname> <port>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
+ *
+ * To run this example locally, you may run Feeder Actor as
+ * `$ ./bin/run-example org.apache.spark.examples.streaming.FeederActor 127.0.1.1 9999`
+ * and then run the example
+ * `./bin/run-example org.apache.spark.examples.streaming.ActorWordCount local[2] 127.0.1.1 9999`
+ */
+object ActorWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println(
+ "Usage: ActorWordCount <master> <hostname> <port>" +
+ "In local mode, <master> should be 'local[n]' with n > 1")
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ val Seq(master, host, port) = args.toSeq
+
+ // Create the context and set the batch size
+ val ssc = new StreamingContext(master, "ActorWordCount", Seconds(2),
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
+
+ /*
+ * Following is the use of actorStream to plug in custom actor as receiver
+ *
+ * An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e type of data received and InputDstream
+ * should be same.
+ *
+ * For example: Both actorStream and SampleActorReceiver are parameterized
+ * to same type to ensure type safety.
+ */
+
+ val lines = ssc.actorStream[String](
+ Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
+ host, port.toInt))), "SampleReceiver")
+
+ // compute wordcount
+ lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print()
+
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
new file mode 100644
index 0000000..5b2a1035
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.flume._
+import org.apache.spark.util.IntParam
+
+/**
+ * Produces a count of events received from Flume.
+ *
+ * This should be used in conjunction with an AvroSink in Flume. It will start
+ * an Avro server on at the request host:port address and listen for requests.
+ * Your Flume AvroSink should be pointed to this address.
+ *
+ * Usage: FlumeEventCount <master> <host> <port>
+ *
+ * <master> is a Spark master URL
+ * <host> is the host the Flume receiver will be started on - a receiver
+ * creates a server and listens for flume events.
+ * <port> is the port the Flume receiver will listen on.
+ */
+object FlumeEventCount {
+ def main(args: Array[String]) {
+ if (args.length != 3) {
+ System.err.println(
+ "Usage: FlumeEventCount <master> <host> <port>")
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ val Array(master, host, IntParam(port)) = args
+
+ val batchInterval = Milliseconds(2000)
+ // Create the context and set the batch size
+ val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval,
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
+
+ // Create a flume stream
+ val stream = FlumeUtils.createStream(ssc, host,port,StorageLevel.MEMORY_ONLY_SER_2)
+
+ // Print out the count of events received from this server in each batch
+ stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
+
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala
new file mode 100644
index 0000000..b440956
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+
+/**
+ * Counts words in new text files created in the given directory
+ * Usage: HdfsWordCount <master> <directory>
+ * <master> is the Spark master URL.
+ * <directory> is the directory that Spark Streaming will use to find and read new text files.
+ *
+ * To run this on your local machine on directory `localdir`, run this example
+ * `$ ./bin/run-example org.apache.spark.examples.streaming.HdfsWordCount local[2] localdir`
+ * Then create a text file in `localdir` and the words in the file will get counted.
+ */
+object HdfsWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ System.err.println("Usage: HdfsWordCount <master> <directory>")
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ // Create the context
+ val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2),
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
+
+ // Create the FileInputDStream on the directory and use the
+ // stream to count words in new files created
+ val lines = ssc.textFileStream(args(1))
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ wordCounts.print()
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
new file mode 100644
index 0000000..c3aae5a
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import java.util.Properties
+
+import kafka.producer._
+
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.kafka._
+
+// scalastyle:off
+/**
+ * Consumes messages from one or more topics in Kafka and does wordcount.
+ * Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <zkQuorum> is a list of one or more zookeeper servers that make quorum
+ * <group> is the name of kafka consumer group
+ * <topics> is a list of one or more kafka topics to consume from
+ * <numThreads> is the number of threads the kafka consumer should use
+ *
+ * Example:
+ * `./bin/run-example org.apache.spark.examples.streaming.KafkaWordCount local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1`
+ */
+// scalastyle:on
+object KafkaWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 5) {
+ System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>")
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ val Array(master, zkQuorum, group, topics, numThreads) = args
+
+ val ssc = new StreamingContext(master, "KafkaWordCount", Seconds(2),
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
+ ssc.checkpoint("checkpoint")
+
+ val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
+ val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1L))
+ .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
+ wordCounts.print()
+
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
+
+// Produces some random words between 1 and 100.
+object KafkaWordCountProducer {
+
+ def main(args: Array[String]) {
+ if (args.length < 4) {
+ System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
+ "<messagesPerSec> <wordsPerMessage>")
+ System.exit(1)
+ }
+
+ val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
+
+ // Zookeper connection properties
+ val props = new Properties()
+ props.put("metadata.broker.list", brokers)
+ props.put("serializer.class", "kafka.serializer.StringEncoder")
+
+ val config = new ProducerConfig(props)
+ val producer = new Producer[String, String](config)
+
+ // Send some messages
+ while(true) {
+ val messages = (1 to messagesPerSec.toInt).map { messageNum =>
+ val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
+ .mkString(" ")
+
+ new KeyedMessage[String, String](topic, str)
+ }.toArray
+
+ producer.send(messages: _*)
+ Thread.sleep(100)
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
new file mode 100644
index 0000000..47bf1e5
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import org.eclipse.paho.client.mqttv3.{MqttClient, MqttClientPersistence, MqttException, MqttMessage, MqttTopic}
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.mqtt._
+
+/**
+ * A simple Mqtt publisher for demonstration purposes, repeatedly publishes
+ * Space separated String Message "hello mqtt demo for spark streaming"
+ */
+object MQTTPublisher {
+
+ var client: MqttClient = _
+
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ System.err.println("Usage: MQTTPublisher <MqttBrokerUrl> <topic>")
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ val Seq(brokerUrl, topic) = args.toSeq
+
+ try {
+ var peristance:MqttClientPersistence =new MqttDefaultFilePersistence("/tmp")
+ client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance)
+ } catch {
+ case e: MqttException => println("Exception Caught: " + e)
+ }
+
+ client.connect()
+
+ val msgtopic: MqttTopic = client.getTopic(topic)
+ val msg: String = "hello mqtt demo for spark streaming"
+
+ while (true) {
+ val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes("utf-8"))
+ msgtopic.publish(message)
+ println("Published data. topic: " + msgtopic.getName() + " Message: " + message)
+ }
+ client.disconnect()
+ }
+}
+
+// scalastyle:off
+/**
+ * A sample wordcount with MqttStream stream
+ *
+ * To work with Mqtt, Mqtt Message broker/server required.
+ * Mosquitto (http://mosquitto.org/) is an open source Mqtt Broker
+ * In ubuntu mosquitto can be installed using the command `$ sudo apt-get install mosquitto`
+ * Eclipse paho project provides Java library for Mqtt Client http://www.eclipse.org/paho/
+ * Example Java code for Mqtt Publisher and Subscriber can be found here
+ * https://bitbucket.org/mkjinesh/mqttclient
+ * Usage: MQTTWordCount <master> <MqttbrokerUrl> <topic>
+ * In local mode, <master> should be 'local[n]' with n > 1
+ * <MqttbrokerUrl> and <topic> describe where Mqtt publisher is running.
+ *
+ * To run this example locally, you may run publisher as
+ * `$ ./bin/run-example org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo`
+ * and run the example as
+ * `$ ./bin/run-example org.apache.spark.examples.streaming.MQTTWordCount local[2] tcp://localhost:1883 foo`
+ */
+// scalastyle:on
+object MQTTWordCount {
+
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println(
+ "Usage: MQTTWordCount <master> <MqttbrokerUrl> <topic>" +
+ " In local mode, <master> should be 'local[n]' with n > 1")
+ System.exit(1)
+ }
+
+ val Seq(master, brokerUrl, topic) = args.toSeq
+
+ val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"),
+ StreamingContext.jarOfClass(this.getClass).toSeq)
+ val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2)
+
+ val words = lines.flatMap(x => x.toString.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ wordCounts.print()
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala
new file mode 100644
index 0000000..acfe9a4
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.storage.StorageLevel
+
+// scalastyle:off
+/**
+ * Counts words in text encoded with UTF8 received from the network every second.
+ *
+ * Usage: NetworkWordCount <master> <hostname> <port>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ ./bin/run-example org.apache.spark.examples.streaming.NetworkWordCount local[2] localhost 9999`
+ */
+// scalastyle:on
+object NetworkWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
+ "In local mode, <master> should be 'local[n]' with n > 1")
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ // Create the context with a 1 second batch size
+ val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
+
+ // Create a NetworkInputDStream on target ip:port and count the
+ // words in input stream of \n delimited text (eg. generated by 'nc')
+ val lines = ssc.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_ONLY_SER)
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ wordCounts.print()
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala
new file mode 100644
index 0000000..f92f72f
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import scala.collection.mutable.SynchronizedQueue
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+
+object QueueStream {
+
+ def main(args: Array[String]) {
+ if (args.length < 1) {
+ System.err.println("Usage: QueueStream <master>")
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ // Create the context
+ val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1),
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
+
+ // Create the queue through which RDDs can be pushed to
+ // a QueueInputDStream
+ val rddQueue = new SynchronizedQueue[RDD[Int]]()
+
+ // Create the QueueInputDStream and use it do some processing
+ val inputStream = ssc.queueStream(rddQueue)
+ val mappedStream = inputStream.map(x => (x % 10, 1))
+ val reducedStream = mappedStream.reduceByKey(_ + _)
+ reducedStream.print()
+ ssc.start()
+
+ // Create and push some RDDs into
+ for (i <- 1 to 30) {
+ rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10)
+ Thread.sleep(1000)
+ }
+ ssc.stop()
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala
new file mode 100644
index 0000000..1b0319a
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.util.IntParam
+
+/**
+ * Receives text from multiple rawNetworkStreams and counts how many '\n' delimited
+ * lines have the word 'the' in them. This is useful for benchmarking purposes. This
+ * will only work with spark.streaming.util.RawTextSender running on all worker nodes
+ * and with Spark using Kryo serialization (set Java property "spark.serializer" to
+ * "org.apache.spark.serializer.KryoSerializer").
+ * Usage: RawNetworkGrep <master> <numStreams> <host> <port> <batchMillis>
+ * <master> is the Spark master URL
+ * <numStream> is the number rawNetworkStreams, which should be same as number
+ * of work nodes in the cluster
+ * <host> is "localhost".
+ * <port> is the port on which RawTextSender is running in the worker nodes.
+ * <batchMillise> is the Spark Streaming batch duration in milliseconds.
+ */
+
+object RawNetworkGrep {
+ def main(args: Array[String]) {
+ if (args.length != 5) {
+ System.err.println("Usage: RawNetworkGrep <master> <numStreams> <host> <port> <batchMillis>")
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args
+
+ // Create the context
+ val ssc = new StreamingContext(master, "RawNetworkGrep", Milliseconds(batchMillis),
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
+
+ val rawStreams = (1 to numStreams).map(_ =>
+ ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray
+ val union = ssc.union(rawStreams)
+ union.filter(_.contains("the")).count().foreachRDD(r =>
+ println("Grep count: " + r.collect().mkString))
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
new file mode 100644
index 0000000..b0bc31c
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.util.IntParam
+import java.io.File
+import org.apache.spark.rdd.RDD
+import com.google.common.io.Files
+import java.nio.charset.Charset
+
+/**
+ * Counts words in text encoded with UTF8 received from the network every second.
+ *
+ * Usage: NetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-file>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
+ * data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
+ * <output-file> file to which the word counts will be appended
+ *
+ * In local mode, <master> should be 'local[n]' with n > 1
+ * <checkpoint-directory> and <output-file> must be absolute paths
+ *
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ *
+ * `$ nc -lk 9999`
+ *
+ * and run the example as
+ *
+ * `$ ./run-example org.apache.spark.examples.streaming.RecoverableNetworkWordCount \
+ * local[2] localhost 9999 ~/checkpoint/ ~/out`
+ *
+ * If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create
+ * a new StreamingContext (will print "Creating new context" to the console). Otherwise, if
+ * checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
+ * the checkpoint data.
+ *
+ * To run this example in a local standalone cluster with automatic driver recovery,
+ *
+ * `$ ./spark-class org.apache.spark.deploy.Client -s launch <cluster-url> \
+ * <path-to-examples-jar> \
+ * org.apache.spark.examples.streaming.RecoverableNetworkWordCount <cluster-url> \
+ * localhost 9999 ~/checkpoint ~/out`
+ *
+ * <path-to-examples-jar> would typically be
+ * <spark-dir>/examples/target/scala-XX/spark-examples....jar
+ *
+ * Refer to the online documentation for more details.
+ */
+
+object RecoverableNetworkWordCount {
+
+ def createContext(master: String, ip: String, port: Int, outputPath: String) = {
+
+ // If you do not see this printed, that means the StreamingContext has been loaded
+ // from the new checkpoint
+ println("Creating new context")
+ val outputFile = new File(outputPath)
+ if (outputFile.exists()) outputFile.delete()
+
+ // Create the context with a 1 second batch size
+ val ssc = new StreamingContext(master, "RecoverableNetworkWordCount", Seconds(1),
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
+
+ // Create a NetworkInputDStream on target ip:port and count the
+ // words in input stream of \n delimited text (eg. generated by 'nc')
+ val lines = ssc.socketTextStream(ip, port)
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
+ val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]")
+ println(counts)
+ println("Appending to " + outputFile.getAbsolutePath)
+ Files.append(counts + "\n", outputFile, Charset.defaultCharset())
+ })
+ ssc
+ }
+
+ def main(args: Array[String]) {
+ if (args.length != 5) {
+ System.err.println("You arguments were " + args.mkString("[", ", ", "]"))
+ System.err.println(
+ """
+ |Usage: RecoverableNetworkWordCount <master> <hostname> <port> <checkpoint-directory>
+ | <output-file> <master> is the Spark master URL. In local mode, <master> should be
+ | 'local[n]' with n > 1. <hostname> and <port> describe the TCP server that Spark
+ | Streaming would connect to receive data. <checkpoint-directory> directory to
+ | HDFS-compatible file system which checkpoint data <output-file> file to which the
+ | word counts will be appended
+ |
+ |In local mode, <master> should be 'local[n]' with n > 1
+ |Both <checkpoint-directory> and <output-file> must be absolute paths
+ """.stripMargin
+ )
+ System.exit(1)
+ }
+ val Array(master, ip, IntParam(port), checkpointDirectory, outputPath) = args
+ val ssc = StreamingContext.getOrCreate(checkpointDirectory,
+ () => {
+ createContext(master, ip, port, outputPath)
+ })
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
new file mode 100644
index 0000000..8001d56
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.StreamingContext._
+// scalastyle:off
+/**
+ * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every
+ * second.
+ * Usage: StatefulNetworkWordCount <master> <hostname> <port>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
+ * data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ ./bin/run-example org.apache.spark.examples.streaming.StatefulNetworkWordCount local[2] localhost 9999`
+ */
+// scalastyle:on
+object StatefulNetworkWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println("Usage: StatefulNetworkWordCount <master> <hostname> <port>\n" +
+ "In local mode, <master> should be 'local[n]' with n > 1")
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+ val currentCount = values.foldLeft(0)(_ + _)
+
+ val previousCount = state.getOrElse(0)
+
+ Some(currentCount + previousCount)
+ }
+
+ // Create the context with a 1 second batch size
+ val ssc = new StreamingContext(args(0), "NetworkWordCumulativeCountUpdateStateByKey",
+ Seconds(1), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
+ ssc.checkpoint(".")
+
+ // Create a NetworkInputDStream on target ip:port and count the
+ // words in input stream of \n delimited test (eg. generated by 'nc')
+ val lines = ssc.socketTextStream(args(1), args(2).toInt)
+ val words = lines.flatMap(_.split(" "))
+ val wordDstream = words.map(x => (x, 1))
+
+ // Update the cumulative count using updateStateByKey
+ // This will give a Dstream made of state (which is the cumulative count of the words)
+ val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
+ stateDstream.print()
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/scala/org/apache/spark/examples/streaming/StreamingExamples.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/StreamingExamples.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/StreamingExamples.scala
new file mode 100644
index 0000000..8396e65
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/StreamingExamples.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import org.apache.spark.Logging
+
+import org.apache.log4j.{Level, Logger}
+
+/** Utility functions for Spark Streaming examples. */
+object StreamingExamples extends Logging {
+
+ /** Set reasonable logging levels for streaming if the user has not configured log4j. */
+ def setStreamingLogLevels() {
+ val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
+ if (!log4jInitialized) {
+ // We first log something to initialize Spark's default logging, then we override the
+ // logging level.
+ logInfo("Setting log level to [WARN] for streaming example." +
+ " To override add a custom log4j.properties to the classpath.")
+ Logger.getRootLogger.setLevel(Level.WARN)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
new file mode 100644
index 0000000..b12617d
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import com.twitter.algebird._
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.twitter._
+// scalastyle:off
+/**
+ * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute
+ * windowed and global Top-K estimates of user IDs occurring in a Twitter stream.
+ * <br>
+ * <strong>Note</strong> that since Algebird's implementation currently only supports Long inputs,
+ * the example operates on Long IDs. Once the implementation supports other inputs (such as String),
+ * the same approach could be used for computing popular topics for example.
+ * <p>
+ * <p>
+ * <a href=
+ * "http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
+ * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a data
+ * structure for approximate frequency estimation in data streams (e.g. Top-K elements, frequency
+ * of any given element, etc), that uses space sub-linear in the number of elements in the
+ * stream. Once elements are added to the CMS, the estimated count of an element can be computed,
+ * as well as "heavy-hitters" that occur more than a threshold percentage of the overall total
+ * count.
+ * <p><p>
+ * Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the
+ * reduce operation.
+ */
+// scalastyle:on
+object TwitterAlgebirdCMS {
+ def main(args: Array[String]) {
+ if (args.length < 1) {
+ System.err.println("Usage: TwitterAlgebirdCMS <master>" +
+ " [filter1] [filter2] ... [filter n]")
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ // CMS parameters
+ val DELTA = 1E-3
+ val EPS = 0.01
+ val SEED = 1
+ val PERC = 0.001
+ // K highest frequency elements to take
+ val TOPK = 10
+
+ val (master, filters) = (args.head, args.tail)
+
+ val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10),
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
+ val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER_2)
+
+ val users = stream.map(status => status.getUser.getId)
+
+ val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC)
+ var globalCMS = cms.zero
+ val mm = new MapMonoid[Long, Int]()
+ var globalExact = Map[Long, Int]()
+
+ val approxTopUsers = users.mapPartitions(ids => {
+ ids.map(id => cms.create(id))
+ }).reduce(_ ++ _)
+
+ val exactTopUsers = users.map(id => (id, 1))
+ .reduceByKey((a, b) => a + b)
+
+ approxTopUsers.foreachRDD(rdd => {
+ if (rdd.count() != 0) {
+ val partial = rdd.first()
+ val partialTopK = partial.heavyHitters.map(id =>
+ (id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
+ globalCMS ++= partial
+ val globalTopK = globalCMS.heavyHitters.map(id =>
+ (id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
+ println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC,
+ partialTopK.mkString("[", ",", "]")))
+ println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC,
+ globalTopK.mkString("[", ",", "]")))
+ }
+ })
+
+ exactTopUsers.foreachRDD(rdd => {
+ if (rdd.count() != 0) {
+ val partialMap = rdd.collect().toMap
+ val partialTopK = rdd.map(
+ {case (id, count) => (count, id)})
+ .sortByKey(ascending = false).take(TOPK)
+ globalExact = mm.plus(globalExact.toMap, partialMap)
+ val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK)
+ println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]")))
+ println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]")))
+ }
+ })
+
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
new file mode 100644
index 0000000..22f232c
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import com.twitter.algebird.HyperLogLogMonoid
+import com.twitter.algebird.HyperLogLog._
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.twitter._
+// scalastyle:off
+/**
+ * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute
+ * a windowed and global estimate of the unique user IDs occurring in a Twitter stream.
+ * <p>
+ * <p>
+ * This <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
+ * blog post</a> and this
+ * <a href= "http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html">
+ * blog post</a>
+ * have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for
+ * estimating the cardinality of a data stream, i.e. the number of unique elements.
+ * <p><p>
+ * Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the
+ * reduce operation.
+ */
+// scalastyle:on
+object TwitterAlgebirdHLL {
+ def main(args: Array[String]) {
+ if (args.length < 1) {
+ System.err.println("Usage: TwitterAlgebirdHLL <master>" +
+ " [filter1] [filter2] ... [filter n]")
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ /** Bit size parameter for HyperLogLog, trades off accuracy vs size */
+ val BIT_SIZE = 12
+ val (master, filters) = (args.head, args.tail)
+
+ val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5),
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
+ val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER)
+
+ val users = stream.map(status => status.getUser.getId)
+
+ val hll = new HyperLogLogMonoid(BIT_SIZE)
+ var globalHll = hll.zero
+ var userSet: Set[Long] = Set()
+
+ val approxUsers = users.mapPartitions(ids => {
+ ids.map(id => hll(id))
+ }).reduce(_ + _)
+
+ val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
+
+ approxUsers.foreachRDD(rdd => {
+ if (rdd.count() != 0) {
+ val partial = rdd.first()
+ globalHll += partial
+ println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt))
+ println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt))
+ }
+ })
+
+ exactUsers.foreachRDD(rdd => {
+ if (rdd.count() != 0) {
+ val partial = rdd.first()
+ userSet ++= partial
+ println("Exact distinct users this batch: %d".format(partial.size))
+ println("Exact distinct users overall: %d".format(userSet.size))
+ println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1
+ ) * 100))
+ }
+ })
+
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
new file mode 100644
index 0000000..5b58e94
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import StreamingContext._
+import org.apache.spark.SparkContext._
+import org.apache.spark.streaming.twitter._
+
+/**
+ * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter
+ * stream. The stream is instantiated with credentials and optionally filters supplied by the
+ * command line arguments.
+ *
+ */
+object TwitterPopularTags {
+ def main(args: Array[String]) {
+ if (args.length < 1) {
+ System.err.println("Usage: TwitterPopularTags <master>" +
+ " [filter1] [filter2] ... [filter n]")
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ val (master, filters) = (args.head, args.tail)
+
+ val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2),
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
+ val stream = TwitterUtils.createStream(ssc, None, filters)
+
+ val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
+
+ val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
+ .map{case (topic, count) => (count, topic)}
+ .transform(_.sortByKey(false))
+
+ val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
+ .map{case (topic, count) => (count, topic)}
+ .transform(_.sortByKey(false))
+
+
+ // Print popular hashtags
+ topCounts60.foreachRDD(rdd => {
+ val topList = rdd.take(5)
+ println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
+ topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
+ })
+
+ topCounts10.foreachRDD(rdd => {
+ val topList = rdd.take(5)
+ println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
+ topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
+ })
+
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
new file mode 100644
index 0000000..de46e5f
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import akka.actor.ActorSystem
+import akka.actor.actorRef2Scala
+import akka.zeromq._
+import akka.zeromq.Subscribe
+import akka.util.ByteString
+
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.zeromq._
+
+import scala.language.implicitConversions
+
+/**
+ * A simple publisher for demonstration purposes, repeatedly publishes random Messages
+ * every one second.
+ */
+object SimpleZeroMQPublisher {
+
+ def main(args: Array[String]) = {
+ if (args.length < 2) {
+ System.err.println("Usage: SimpleZeroMQPublisher <zeroMQUrl> <topic> ")
+ System.exit(1)
+ }
+
+ val Seq(url, topic) = args.toSeq
+ val acs: ActorSystem = ActorSystem()
+
+ val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url))
+ implicit def stringToByteString(x: String) = ByteString(x)
+ val messages: List[ByteString] = List("words ", "may ", "count ")
+ while (true) {
+ Thread.sleep(1000)
+ pubSocket ! ZMQMessage(ByteString(topic) :: messages)
+ }
+ acs.awaitTermination()
+ }
+}
+
+// scalastyle:off
+/**
+ * A sample wordcount with ZeroMQStream stream
+ *
+ * To work with zeroMQ, some native libraries have to be installed.
+ * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide]
+ * (http://www.zeromq.org/intro:get-the-software)
+ *
+ * Usage: ZeroMQWordCount <master> <zeroMQurl> <topic>
+ * In local mode, <master> should be 'local[n]' with n > 1
+ * <zeroMQurl> and <topic> describe where zeroMq publisher is running.
+ *
+ * To run this example locally, you may run publisher as
+ * `$ ./bin/run-example org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar`
+ * and run the example as
+ * `$ ./bin/run-example org.apache.spark.examples.streaming.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo`
+ */
+// scalastyle:on
+object ZeroMQWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println(
+ "Usage: ZeroMQWordCount <master> <zeroMQurl> <topic>" +
+ "In local mode, <master> should be 'local[n]' with n > 1")
+ System.exit(1)
+ }
+ StreamingExamples.setStreamingLogLevels()
+ val Seq(master, url, topic) = args.toSeq
+
+ // Create the context and set the batch size
+ val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2),
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
+
+ def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator
+
+ // For this stream, a zeroMQ publisher should be running.
+ val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _)
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ wordCounts.print()
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
new file mode 100644
index 0000000..97e0cb9
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming.clickstream
+
+import java.net.ServerSocket
+import java.io.PrintWriter
+import util.Random
+
+/** Represents a page view on a website with associated dimension data. */
+class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int)
+ extends Serializable {
+ override def toString() : String = {
+ "%s\t%s\t%s\t%s\n".format(url, status, zipCode, userID)
+ }
+}
+
+object PageView extends Serializable {
+ def fromString(in : String) : PageView = {
+ val parts = in.split("\t")
+ new PageView(parts(0), parts(1).toInt, parts(2).toInt, parts(3).toInt)
+ }
+}
+
+// scalastyle:off
+/** Generates streaming events to simulate page views on a website.
+ *
+ * This should be used in tandem with PageViewStream.scala. Example:
+ * $ ./bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10
+ * $ ./bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444
+ *
+ * When running this, you may want to set the root logging level to ERROR in
+ * conf/log4j.properties to reduce the verbosity of the output.
+ */
+// scalastyle:on
+object PageViewGenerator {
+ val pages = Map("http://foo.com/" -> .7,
+ "http://foo.com/news" -> 0.2,
+ "http://foo.com/contact" -> .1)
+ val httpStatus = Map(200 -> .95,
+ 404 -> .05)
+ val userZipCode = Map(94709 -> .5,
+ 94117 -> .5)
+ val userID = Map((1 to 100).map(_ -> .01):_*)
+
+
+ def pickFromDistribution[T](inputMap : Map[T, Double]) : T = {
+ val rand = new Random().nextDouble()
+ var total = 0.0
+ for ((item, prob) <- inputMap) {
+ total = total + prob
+ if (total > rand) {
+ return item
+ }
+ }
+ inputMap.take(1).head._1 // Shouldn't get here if probabilities add up to 1.0
+ }
+
+ def getNextClickEvent() : String = {
+ val id = pickFromDistribution(userID)
+ val page = pickFromDistribution(pages)
+ val status = pickFromDistribution(httpStatus)
+ val zipCode = pickFromDistribution(userZipCode)
+ new PageView(page, status, zipCode, id).toString()
+ }
+
+ def main(args : Array[String]) {
+ if (args.length != 2) {
+ System.err.println("Usage: PageViewGenerator <port> <viewsPerSecond>")
+ System.exit(1)
+ }
+ val port = args(0).toInt
+ val viewsPerSecond = args(1).toFloat
+ val sleepDelayMs = (1000.0 / viewsPerSecond).toInt
+ val listener = new ServerSocket(port)
+ println("Listening on port: " + port)
+
+ while (true) {
+ val socket = listener.accept()
+ new Thread() {
+ override def run = {
+ println("Got client connected from: " + socket.getInetAddress)
+ val out = new PrintWriter(socket.getOutputStream(), true)
+
+ while (true) {
+ Thread.sleep(sleepDelayMs)
+ out.write(getNextClickEvent())
+ out.flush()
+ }
+ socket.close()
+ }
+ }.start()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala
new file mode 100644
index 0000000..d30ceff
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming.clickstream
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.examples.streaming.StreamingExamples
+// scalastyle:off
+/** Analyses a streaming dataset of web page views. This class demonstrates several types of
+ * operators available in Spark streaming.
+ *
+ * This should be used in tandem with PageViewStream.scala. Example:
+ * $ ./bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10
+ * $ ./bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444
+ */
+// scalastyle:on
+object PageViewStream {
+ def main(args: Array[String]) {
+ if (args.length != 3) {
+ System.err.println("Usage: PageViewStream <metric> <host> <port>")
+ System.err.println("<metric> must be one of pageCounts, slidingPageCounts," +
+ " errorRatePerZipCode, activeUserCount, popularUsersSeen")
+ System.exit(1)
+ }
+ StreamingExamples.setStreamingLogLevels()
+ val metric = args(0)
+ val host = args(1)
+ val port = args(2).toInt
+
+ // Create the context
+ val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1),
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
+
+ // Create a NetworkInputDStream on target host:port and convert each line to a PageView
+ val pageViews = ssc.socketTextStream(host, port)
+ .flatMap(_.split("\n"))
+ .map(PageView.fromString(_))
+
+ // Return a count of views per URL seen in each batch
+ val pageCounts = pageViews.map(view => view.url).countByValue()
+
+ // Return a sliding window of page views per URL in the last ten seconds
+ val slidingPageCounts = pageViews.map(view => view.url)
+ .countByValueAndWindow(Seconds(10), Seconds(2))
+
+
+ // Return the rate of error pages (a non 200 status) in each zip code over the last 30 seconds
+ val statusesPerZipCode = pageViews.window(Seconds(30), Seconds(2))
+ .map(view => ((view.zipCode, view.status)))
+ .groupByKey()
+ val errorRatePerZipCode = statusesPerZipCode.map{
+ case(zip, statuses) =>
+ val normalCount = statuses.filter(_ == 200).size
+ val errorCount = statuses.size - normalCount
+ val errorRatio = errorCount.toFloat / statuses.size
+ if (errorRatio > 0.05) {
+ "%s: **%s**".format(zip, errorRatio)
+ } else {
+ "%s: %s".format(zip, errorRatio)
+ }
+ }
+
+ // Return the number unique users in last 15 seconds
+ val activeUserCount = pageViews.window(Seconds(15), Seconds(2))
+ .map(view => (view.userID, 1))
+ .groupByKey()
+ .count()
+ .map("Unique active users: " + _)
+
+ // An external dataset we want to join to this stream
+ val userList = ssc.sparkContext.parallelize(
+ Map(1 -> "Patrick Wendell", 2->"Reynold Xin", 3->"Matei Zaharia").toSeq)
+
+ metric match {
+ case "pageCounts" => pageCounts.print()
+ case "slidingPageCounts" => slidingPageCounts.print()
+ case "errorRatePerZipCode" => errorRatePerZipCode.print()
+ case "activeUserCount" => activeUserCount.print()
+ case "popularUsersSeen" =>
+ // Look for users in our existing dataset and print it out if we have a match
+ pageViews.map(view => (view.userID, 1))
+ .foreachRDD((rdd, time) => rdd.join(userList)
+ .map(_._2._2)
+ .take(10)
+ .foreach(u => println("Saw user %s at time %s".format(u, time))))
+ case _ => println("Invalid metric entered: " + metric)
+ }
+
+ ssc.start()
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/scala/org/apache/spark/sql/examples/HiveFromSpark.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/sql/examples/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/sql/examples/HiveFromSpark.scala
deleted file mode 100644
index 62329bd..0000000
--- a/examples/src/main/scala/org/apache/spark/sql/examples/HiveFromSpark.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.examples
-
-import org.apache.spark.SparkContext
-import org.apache.spark.sql._
-import org.apache.spark.sql.hive.LocalHiveContext
-
-object HiveFromSpark {
- case class Record(key: Int, value: String)
-
- def main(args: Array[String]) {
- val sc = new SparkContext("local", "HiveFromSpark")
-
- // A local hive context creates an instance of the Hive Metastore in process, storing the
- // the warehouse data in the current directory. This location can be overridden by
- // specifying a second parameter to the constructor.
- val hiveContext = new LocalHiveContext(sc)
- import hiveContext._
-
- hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
- hql("LOAD DATA LOCAL INPATH 'src/main/resources/kv1.txt' INTO TABLE src")
-
- // Queries are expressed in HiveQL
- println("Result of 'SELECT *': ")
- hql("SELECT * FROM src").collect.foreach(println)
-
- // Aggregation queries are also supported.
- val count = hql("SELECT COUNT(*) FROM src").collect().head.getInt(0)
- println(s"COUNT(*): $count")
-
- // The results of SQL queries are themselves RDDs and support all normal RDD functions. The
- // items in the RDD are of type Row, which allows you to access each column by ordinal.
- val rddFromSql = hql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
-
- println("Result of RDD.map:")
- val rddAsStrings = rddFromSql.map {
- case Row(key: Int, value: String) => s"Key: $key, Value: $value"
- }
-
- // You can also register RDDs as temporary tables within a HiveContext.
- val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
- rdd.registerAsTable("records")
-
- // Queries can then join RDD data with data stored in Hive.
- println("Result of SELECT *:")
- hql("SELECT * FROM records r JOIN src s ON r.key = s.key").collect().foreach(println)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/scala/org/apache/spark/sql/examples/RDDRelation.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/sql/examples/RDDRelation.scala b/examples/src/main/scala/org/apache/spark/sql/examples/RDDRelation.scala
deleted file mode 100644
index 8210ad9..0000000
--- a/examples/src/main/scala/org/apache/spark/sql/examples/RDDRelation.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.examples
-
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
-
-// One method for defining the schema of an RDD is to make a case class with the desired column
-// names and types.
-case class Record(key: Int, value: String)
-
-object RDDRelation {
- def main(args: Array[String]) {
- val sc = new SparkContext("local", "RDDRelation")
- val sqlContext = new SQLContext(sc)
-
- // Importing the SQL context gives access to all the SQL functions and implicit conversions.
- import sqlContext._
-
- val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
- // Any RDD containing case classes can be registered as a table. The schema of the table is
- // automatically inferred using scala reflection.
- rdd.registerAsTable("records")
-
- // Once tables have been registered, you can run SQL queries over them.
- println("Result of SELECT *:")
- sql("SELECT * FROM records").collect().foreach(println)
-
- // Aggregation queries are also supported.
- val count = sql("SELECT COUNT(*) FROM records").collect().head.getInt(0)
- println(s"COUNT(*): $count")
-
- // The results of SQL queries are themselves RDDs and support all normal RDD functions. The
- // items in the RDD are of type Row, which allows you to access each column by ordinal.
- val rddFromSql = sql("SELECT key, value FROM records WHERE key < 10")
-
- println("Result of RDD.map:")
- rddFromSql.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect.foreach(println)
-
- // Queries can also be written using a LINQ-like Scala DSL.
- rdd.where('key === 1).orderBy('value.asc).select('key).collect().foreach(println)
-
- // Write out an RDD as a parquet file.
- rdd.saveAsParquetFile("pair.parquet")
-
- // Read in parquet file. Parquet files are self-describing so the schmema is preserved.
- val parquetFile = sqlContext.parquetFile("pair.parquet")
-
- // Queries can be run using the DSL on parequet files just like the original RDD.
- parquetFile.where('key === 1).select('value as 'a).collect().foreach(println)
-
- // These files can also be registered as tables.
- parquetFile.registerAsTable("parquetFile")
- sql("SELECT * FROM parquetFile").collect().foreach(println)
- }
-}