You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/05/19 03:24:34 UTC

spark git commit: [SPARK-7692] Updated Kinesis examples

Repository: spark
Updated Branches:
  refs/heads/master 0a7a94eab -> 3a6003866


[SPARK-7692] Updated Kinesis examples

- Updated Kinesis examples to use stable API
- Cleaned up comments, etc.
- Renamed KinesisWordCountProducerASL to KinesisWordProducerASL

Author: Tathagata Das <ta...@gmail.com>

Closes #6249 from tdas/kinesis-examples and squashes the following commits:

7cc307b [Tathagata Das] More tweaks
f080872 [Tathagata Das] More cleanup
841987f [Tathagata Das] Small update
011cbe2 [Tathagata Das] More fixes
b0d74f9 [Tathagata Das] Updated examples.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3a600386
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3a600386
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3a600386

Branch: refs/heads/master
Commit: 3a6003866ade45974b43a9e785ec35fb76a32b99
Parents: 0a7a94e
Author: Tathagata Das <ta...@gmail.com>
Authored: Mon May 18 18:24:15 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon May 18 18:24:15 2015 -0700

----------------------------------------------------------------------
 .../streaming/JavaKinesisWordCountASL.java      | 245 ++++++++---------
 .../streaming/KinesisWordCountASL.scala         | 260 ++++++++++---------
 2 files changed, 268 insertions(+), 237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3a600386/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
index b0bff27..06e0ff2 100644
--- a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
+++ b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.regex.Pattern;
 
+import com.amazonaws.regions.RegionUtils;
 import org.apache.log4j.Logger;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.function.FlatMapFunction;
@@ -40,140 +41,146 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn
 import com.google.common.collect.Lists;
 
 /**
- * Java-friendly Kinesis Spark Streaming WordCount example
+ * Consumes messages from a Amazon Kinesis streams and does wordcount.
  *
- * See http://spark.apache.org/docs/latest/streaming-kinesis.html for more details
- * on the Kinesis Spark Streaming integration.
+ * This example spins up 1 Kinesis Receiver per shard for the given stream.
+ * It then starts pulling from the last checkpointed sequence number of the given stream.
  *
- * This example spins up 1 Kinesis Worker (Spark Streaming Receiver) per shard
- *   for the given stream.
- * It then starts pulling from the last checkpointed sequence number of the given
- *   <stream-name> and <endpoint-url>. 
+ * Usage: JavaKinesisWordCountASL [app-name] [stream-name] [endpoint-url] [region-name]
+ *   [app-name] is the name of the consumer app, used to track the read data in DynamoDB
+ *   [stream-name] name of the Kinesis stream (ie. mySparkStream)
+ *   [endpoint-url] endpoint of the Kinesis service
+ *     (e.g. https://kinesis.us-east-1.amazonaws.com)
  *
- * Valid endpoint urls:  http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
- *
- * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials 
- *  in the following order of precedence: 
- *         Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
- *         Java System Properties - aws.accessKeyId and aws.secretKey
- *         Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
- *         Instance profile credentials - delivered through the Amazon EC2 metadata service
- *
- * Usage: JavaKinesisWordCountASL <stream-name> <endpoint-url>
- *         <stream-name> is the name of the Kinesis stream (ie. mySparkStream)
- *         <endpoint-url> is the endpoint of the Kinesis service 
- *           (ie. https://kinesis.us-east-1.amazonaws.com)
  *
  * Example:
- *      $ export AWS_ACCESS_KEY_ID=<your-access-key>
+ *      # export AWS keys if necessary
+ *      $ export AWS_ACCESS_KEY_ID=[your-access-key]
  *      $ export AWS_SECRET_KEY=<your-secret-key>
- *      $ $SPARK_HOME/bin/run-example \
- *            org.apache.spark.examples.streaming.JavaKinesisWordCountASL mySparkStream \
- *            https://kinesis.us-east-1.amazonaws.com
  *
- * Note that number of workers/threads should be 1 more than the number of receivers.
- * This leaves one thread available for actually processing the data.
+ *      # run the example
+ *      $ SPARK_HOME/bin/run-example   streaming.JavaKinesisWordCountASL myAppName  mySparkStream \
+ *             https://kinesis.us-east-1.amazonaws.com
+ *
+ * There is a companion helper class called KinesisWordProducerASL which puts dummy data
+ * onto the Kinesis stream.
  *
- * There is a companion helper class called KinesisWordCountProducerASL which puts dummy data 
- *   onto the Kinesis stream. 
- * Usage instructions for KinesisWordCountProducerASL are provided in the class definition.
+ * This code uses the DefaultAWSCredentialsProviderChain to find credentials
+ * in the following order:
+ *    Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ *    Java System Properties - aws.accessKeyId and aws.secretKey
+ *    Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
+ *    Instance profile credentials - delivered through the Amazon EC2 metadata service
+ * For more information, see
+ * http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html
+ *
+ * See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on
+ * the Kinesis Spark Streaming integration.
  */
 public final class JavaKinesisWordCountASL { // needs to be public for access from run-example
-    private static final Pattern WORD_SEPARATOR = Pattern.compile(" ");
-    private static final Logger logger = Logger.getLogger(JavaKinesisWordCountASL.class);
-
-    /* Make the constructor private to enforce singleton */
-    private JavaKinesisWordCountASL() {
+  private static final Pattern WORD_SEPARATOR = Pattern.compile(" ");
+  private static final Logger logger = Logger.getLogger(JavaKinesisWordCountASL.class);
+
+  public static void main(String[] args) {
+    // Check that all required args were passed in.
+    if (args.length != 3) {
+      System.err.println(
+          "Usage: JavaKinesisWordCountASL <stream-name> <endpoint-url>\n\n" +
+          "    <app-name> is the name of the app, used to track the read data in DynamoDB\n" +
+          "    <stream-name> is the name of the Kinesis stream\n" +
+          "    <endpoint-url> is the endpoint of the Kinesis service\n" +
+          "                   (e.g. https://kinesis.us-east-1.amazonaws.com)\n" +
+          "Generate data for the Kinesis stream using the example KinesisWordProducerASL.\n" +
+          "See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more\n" +
+          "details.\n"
+      );
+      System.exit(1);
     }
 
-    public static void main(String[] args) {
-        /* Check that all required args were passed in. */
-        if (args.length < 2) {
-          System.err.println(
-              "Usage: JavaKinesisWordCountASL <stream-name> <endpoint-url>\n" +
-              "    <stream-name> is the name of the Kinesis stream\n" +
-              "    <endpoint-url> is the endpoint of the Kinesis service\n" +
-              "                   (e.g. https://kinesis.us-east-1.amazonaws.com)\n");
-          System.exit(1);
-        }
-
-        StreamingExamples.setStreamingLogLevels();
-
-        /* Populate the appropriate variables from the given args */
-        String streamName = args[0];
-        String endpointUrl = args[1];
-        /* Set the batch interval to a fixed 2000 millis (2 seconds) */
-        Duration batchInterval = new Duration(2000);
-
-        /* Create a Kinesis client in order to determine the number of shards for the given stream */
-        AmazonKinesisClient kinesisClient = new AmazonKinesisClient(
-                new DefaultAWSCredentialsProviderChain());
-        kinesisClient.setEndpoint(endpointUrl);
-
-        /* Determine the number of shards from the stream */
-        int numShards = kinesisClient.describeStream(streamName)
-                .getStreamDescription().getShards().size();
-
-        /* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard */ 
-        int numStreams = numShards;
-
-        /* Setup the Spark config. */
-        SparkConf sparkConfig = new SparkConf().setAppName("KinesisWordCount");
-
-        /* Kinesis checkpoint interval.  Same as batchInterval for this example. */
-        Duration checkpointInterval = batchInterval;
+    // Set default log4j logging level to WARN to hide Spark logs
+    StreamingExamples.setStreamingLogLevels();
+
+    // Populate the appropriate variables from the given args
+    String kinesisAppName = args[0];
+    String streamName = args[1];
+    String endpointUrl = args[2];
+
+    // Create a Kinesis client in order to determine the number of shards for the given stream
+    AmazonKinesisClient kinesisClient =
+        new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain());
+    kinesisClient.setEndpoint(endpointUrl);
+    int numShards =
+        kinesisClient.describeStream(streamName).getStreamDescription().getShards().size();
+
+
+    // In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard.
+    // This is not a necessity; if there are less receivers/DStreams than the number of shards,
+    // then the shards will be automatically distributed among the receivers and each receiver
+    // will receive data from multiple shards.
+    int numStreams = numShards;
+
+    // Spark Streaming batch interval
+    Duration batchInterval = new Duration(2000);
+
+    // Kinesis checkpoint interval.  Same as batchInterval for this example.
+    Duration kinesisCheckpointInterval = batchInterval;
+
+    // Get the region name from the endpoint URL to save Kinesis Client Library metadata in
+    // DynamoDB of the same region as the Kinesis stream
+    String regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName();
+
+    // Setup the Spark config and StreamingContext
+    SparkConf sparkConfig = new SparkConf().setAppName("JavaKinesisWordCountASL");
+    JavaStreamingContext jssc = new JavaStreamingContext(sparkConfig, batchInterval);
+
+    // Create the Kinesis DStreams
+    List<JavaDStream<byte[]>> streamsList = new ArrayList<JavaDStream<byte[]>>(numStreams);
+    for (int i = 0; i < numStreams; i++) {
+      streamsList.add(
+          KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName,
+              InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2())
+      );
+    }
 
-        /* Setup the StreamingContext */
-        JavaStreamingContext jssc = new JavaStreamingContext(sparkConfig, batchInterval);
+    // Union all the streams if there is more than 1 stream
+    JavaDStream<byte[]> unionStreams;
+    if (streamsList.size() > 1) {
+      unionStreams = jssc.union(streamsList.get(0), streamsList.subList(1, streamsList.size()));
+    } else {
+      // Otherwise, just use the 1 stream
+      unionStreams = streamsList.get(0);
+    }
 
-        /* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */
-        List<JavaDStream<byte[]>> streamsList = new ArrayList<JavaDStream<byte[]>>(numStreams);
-        for (int i = 0; i < numStreams; i++) {
-          streamsList.add(
-            KinesisUtils.createStream(jssc, streamName, endpointUrl, checkpointInterval, 
-            InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2())
-          );
+    // Convert each line of Array[Byte] to String, and split into words
+    JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() {
+      @Override
+      public Iterable<String> call(byte[] line) {
+        return Lists.newArrayList(WORD_SEPARATOR.split(new String(line)));
+      }
+    });
+
+    // Map each word to a (word, 1) tuple so we can reduce by key to count the words
+    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
+        new PairFunction<String, String, Integer>() {
+          @Override
+          public Tuple2<String, Integer> call(String s) {
+            return new Tuple2<String, Integer>(s, 1);
+          }
         }
-
-        /* Union all the streams if there is more than 1 stream */
-        JavaDStream<byte[]> unionStreams;
-        if (streamsList.size() > 1) {
-            unionStreams = jssc.union(streamsList.get(0), streamsList.subList(1, streamsList.size()));
-        } else {
-            /* Otherwise, just use the 1 stream */
-            unionStreams = streamsList.get(0);
+    ).reduceByKey(
+        new Function2<Integer, Integer, Integer>() {
+          @Override
+          public Integer call(Integer i1, Integer i2) {
+            return i1 + i2;
+          }
         }
+    );
 
-        /*
-         * Split each line of the union'd DStreams into multiple words using flatMap to produce the collection.
-         * Convert lines of byte[] to multiple Strings by first converting to String, then splitting on WORD_SEPARATOR.
-         */
-        JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() {
-                @Override
-                public Iterable<String> call(byte[] line) {
-                    return Lists.newArrayList(WORD_SEPARATOR.split(new String(line)));
-                }
-            });
-
-        /* Map each word to a (word, 1) tuple, then reduce/aggregate by word. */
-        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
-            new PairFunction<String, String, Integer>() {
-                @Override
-                public Tuple2<String, Integer> call(String s) {
-                    return new Tuple2<String, Integer>(s, 1);
-                }
-            }).reduceByKey(new Function2<Integer, Integer, Integer>() {
-                @Override
-                public Integer call(Integer i1, Integer i2) {
-                  return i1 + i2;
-                }
-            });
-
-        /* Print the first 10 wordCounts */
-        wordCounts.print();
-
-        /* Start the streaming context and await termination */
-        jssc.start();
-        jssc.awaitTermination();
-    }
+    // Print the first 10 wordCounts
+    wordCounts.print();
+
+    // Start the streaming context and await termination
+    jssc.start();
+    jssc.awaitTermination();
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3a600386/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
index 32da085..640ca04 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
@@ -18,213 +18,238 @@
 package org.apache.spark.examples.streaming
 
 import java.nio.ByteBuffer
+
 import scala.util.Random
-import org.apache.spark.Logging
-import org.apache.spark.SparkConf
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.Milliseconds
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
-import org.apache.spark.streaming.kinesis.KinesisUtils
-import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+
+import com.amazonaws.auth.{DefaultAWSCredentialsProviderChain, BasicAWSCredentials}
+import com.amazonaws.regions.RegionUtils
 import com.amazonaws.services.kinesis.AmazonKinesisClient
 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
 import com.amazonaws.services.kinesis.model.PutRecordRequest
-import org.apache.log4j.Logger
-import org.apache.log4j.Level
+import org.apache.log4j.{Level, Logger}
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
+import org.apache.spark.streaming.kinesis.KinesisUtils
+
 
 /**
- * Kinesis Spark Streaming WordCount example.
+ * Consumes messages from a Amazon Kinesis streams and does wordcount.
  *
- * See http://spark.apache.org/docs/latest/streaming-kinesis.html for more details on
- *   the Kinesis Spark Streaming integration.
+ * This example spins up 1 Kinesis Receiver per shard for the given stream.
+ * It then starts pulling from the last checkpointed sequence number of the given stream.
  *
- * This example spins up 1 Kinesis Worker (Spark Streaming Receiver) per shard 
- *   for the given stream.
- * It then starts pulling from the last checkpointed sequence number of the given 
- *   <stream-name> and <endpoint-url>. 
+ * Usage: KinesisWordCountASL <app-name> <stream-name> <endpoint-url> <region-name>
+ *   <app-name> is the name of the consumer app, used to track the read data in DynamoDB
+ *   <stream-name> name of the Kinesis stream (ie. mySparkStream)
+ *   <endpoint-url> endpoint of the Kinesis service
+ *     (e.g. https://kinesis.us-east-1.amazonaws.com)
  *
- * Valid endpoint urls:  http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
- * 
- * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials
- *   in the following order of precedence:
- * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
- * Java System Properties - aws.accessKeyId and aws.secretKey
- * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
- * Instance profile credentials - delivered through the Amazon EC2 metadata service
- *
- * Usage: KinesisWordCountASL <stream-name> <endpoint-url>
- *   <stream-name> is the name of the Kinesis stream (ie. mySparkStream)
- *   <endpoint-url> is the endpoint of the Kinesis service
- *     (ie. https://kinesis.us-east-1.amazonaws.com)
  *
  * Example:
- *    $ export AWS_ACCESS_KEY_ID=<your-access-key>
- *    $ export AWS_SECRET_KEY=<your-secret-key>
- *    $ $SPARK_HOME/bin/run-example \
- *        org.apache.spark.examples.streaming.KinesisWordCountASL mySparkStream \
- *        https://kinesis.us-east-1.amazonaws.com
+ *      # export AWS keys if necessary
+ *      $ export AWS_ACCESS_KEY_ID=<your-access-key>
+ *      $ export AWS_SECRET_KEY=<your-secret-key>
+ *
+ *      # run the example
+ *      $ SPARK_HOME/bin/run-example  streaming.KinesisWordCountASL myAppName  mySparkStream \
+ *              https://kinesis.us-east-1.amazonaws.com
  *
- * 
- * Note that number of workers/threads should be 1 more than the number of receivers.
- * This leaves one thread available for actually processing the data.
+ * There is a companion helper class called KinesisWordProducerASL which puts dummy data
+ * onto the Kinesis stream.
  *
- * There is a companion helper class below called KinesisWordCountProducerASL which puts
- *   dummy data onto the Kinesis stream.
- * Usage instructions for KinesisWordCountProducerASL are provided in that class definition.
+ * This code uses the DefaultAWSCredentialsProviderChain to find credentials
+ * in the following order:
+ *    Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ *    Java System Properties - aws.accessKeyId and aws.secretKey
+ *    Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
+ *    Instance profile credentials - delivered through the Amazon EC2 metadata service
+ * For more information, see
+ * http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html
+ *
+ * See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on
+ * the Kinesis Spark Streaming integration.
  */
-private object KinesisWordCountASL extends Logging {
+object KinesisWordCountASL extends Logging {
   def main(args: Array[String]) {
-    /* Check that all required args were passed in. */
-    if (args.length < 2) {
+    // Check that all required args were passed in.
+    if (args.length != 3) {
       System.err.println(
         """
-          |Usage: KinesisWordCount <stream-name> <endpoint-url>
+          |Usage: KinesisWordCountASL <app-name> <stream-name> <endpoint-url> <region-name>
+          |
+          |    <app-name> is the name of the consumer app, used to track the read data in DynamoDB
           |    <stream-name> is the name of the Kinesis stream
           |    <endpoint-url> is the endpoint of the Kinesis service
           |                   (e.g. https://kinesis.us-east-1.amazonaws.com)
+          |
+          |Generate input data for Kinesis stream using the example KinesisWordProducerASL.
+          |See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more
+          |details.
         """.stripMargin)
       System.exit(1)
     }
 
     StreamingExamples.setStreamingLogLevels()
 
-    /* Populate the appropriate variables from the given args */
-    val Array(streamName, endpointUrl) = args
+    // Populate the appropriate variables from the given args
+    val Array(appName, streamName, endpointUrl) = args
 
-    /* Determine the number of shards from the stream */
-    val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
+
+    // Determine the number of shards from the stream using the low-level Kinesis Client
+    // from the AWS Java SDK.
+    val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()
+    require(credentials != null,
+      "No AWS credentials found. Please specify credentials using one of the methods specified " +
+        "in http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html")
+    val kinesisClient = new AmazonKinesisClient(credentials)
     kinesisClient.setEndpoint(endpointUrl)
-    val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards()
-      .size()
+    val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size
+
 
-    /* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard. */
+    // In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard.
+    // This is not a necessity; if there are less receivers/DStreams than the number of shards,
+    // then the shards will be automatically distributed among the receivers and each receiver
+    // will receive data from multiple shards.
     val numStreams = numShards
 
-    /* Setup the and SparkConfig and StreamingContext */
-    /* Spark Streaming batch interval */
+    // Spark Streaming batch interval
     val batchInterval = Milliseconds(2000)
-    val sparkConfig = new SparkConf().setAppName("KinesisWordCount")
-    val ssc = new StreamingContext(sparkConfig, batchInterval)
 
-    /* Kinesis checkpoint interval.  Same as batchInterval for this example. */
+    // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
+    //on sequence number of records that have been received. Same as batchInterval for this example.
     val kinesisCheckpointInterval = batchInterval
 
-    /* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */
+    // Get the region name from the endpoint URL to save Kinesis Client Library metadata in
+    // DynamoDB of the same region as the Kinesis stream
+    val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
+
+    // Setup the SparkConfig and StreamingContext
+    val sparkConfig = new SparkConf().setAppName("KinesisWordCountASL")
+    val ssc = new StreamingContext(sparkConfig, batchInterval)
+
+    // Create the Kinesis DStreams
     val kinesisStreams = (0 until numStreams).map { i =>
-      KinesisUtils.createStream(ssc, streamName, endpointUrl, kinesisCheckpointInterval,
-          InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
+      KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName,
+        InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
     }
 
-    /* Union all the streams */
+    // Union all the streams
     val unionStreams = ssc.union(kinesisStreams)
 
-    /* Convert each line of Array[Byte] to String, split into words, and count them */
-    val words = unionStreams.flatMap(byteArray => new String(byteArray)
-      .split(" "))
+    // Convert each line of Array[Byte] to String, and split into words
+    val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
 
-    /* Map each word to a (word, 1) tuple so we can reduce/aggregate by key. */
+    // Map each word to a (word, 1) tuple so we can reduce by key to count the words
     val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
-
-    /* Print the first 10 wordCounts */
+ 
+    // Print the first 10 wordCounts
     wordCounts.print()
 
-    /* Start the streaming context and await termination */
+    // Start the streaming context and await termination
     ssc.start()
     ssc.awaitTermination()
   }
 }
 
 /**
- * Usage: KinesisWordCountProducerASL <stream-name> <kinesis-endpoint-url>
- *     <recordsPerSec> <wordsPerRecord>
+ * Usage: KinesisWordProducerASL <stream-name> <endpoint-url> \
+ *   <records-per-sec> <words-per-record>
+ *
  *   <stream-name> is the name of the Kinesis stream (ie. mySparkStream)
- *   <kinesis-endpoint-url> is the endpoint of the Kinesis service
+ *   <endpoint-url> is the endpoint of the Kinesis service
  *     (ie. https://kinesis.us-east-1.amazonaws.com)
  *   <records-per-sec> is the rate of records per second to put onto the stream
  *   <words-per-record> is the rate of records per second to put onto the stream
  *
  * Example:
- *    $ export AWS_ACCESS_KEY_ID=<your-access-key>
- *    $ export AWS_SECRET_KEY=<your-secret-key>
- *    $ $SPARK_HOME/bin/run-example \
- *         org.apache.spark.examples.streaming.KinesisWordCountProducerASL mySparkStream \
- *         https://kinesis.us-east-1.amazonaws.com 10 5
+ *    $ SPARK_HOME/bin/run-example streaming.KinesisWordProducerASL mySparkStream \
+ *         https://kinesis.us-east-1.amazonaws.com us-east-1 10 5
  */
-private object KinesisWordCountProducerASL {
+object KinesisWordProducerASL {
   def main(args: Array[String]) {
-    if (args.length < 4) {
-      System.err.println("Usage: KinesisWordCountProducerASL <stream-name> <endpoint-url>" +
-          " <records-per-sec> <words-per-record>")
+    if (args.length != 4) {
+      System.err.println(
+        """
+          |Usage: KinesisWordProducerASL <stream-name> <endpoint-url> <records-per-sec> <words-per-record>
+          |
+          |    <stream-name> is the name of the Kinesis stream
+          |    <endpoint-url> is the endpoint of the Kinesis service
+          |                   (e.g. https://kinesis.us-east-1.amazonaws.com)
+          |    <records-per-sec> is the rate of records per second to put onto the stream
+          |    <words-per-record> is the rate of records per second to put onto the stream
+          |
+        """.stripMargin)
+
       System.exit(1)
     }
 
+    // Set default log4j logging level to WARN to hide Spark logs
     StreamingExamples.setStreamingLogLevels()
 
-    /* Populate the appropriate variables from the given args */
+    // Populate the appropriate variables from the given args
     val Array(stream, endpoint, recordsPerSecond, wordsPerRecord) = args
 
-    /* Generate the records and return the totals */
-    val totals = generate(stream, endpoint, recordsPerSecond.toInt, wordsPerRecord.toInt)
+    // Generate the records and return the totals
+    val totals = generate(stream, endpoint, recordsPerSecond.toInt,
+        wordsPerRecord.toInt)
 
-    /* Print the array of (index, total) tuples */
-    println("Totals")
-    totals.foreach(total => println(total.toString()))
+    // Print the array of (word, total) tuples
+    println("Totals for the words sent")
+    totals.foreach(println(_))
   }
 
   def generate(stream: String,
       endpoint: String,
       recordsPerSecond: Int,
-      wordsPerRecord: Int): Seq[(Int, Int)] = {
-
-    val MaxRandomInts = 10
+      wordsPerRecord: Int): Seq[(String, Int)] = {
 
-    /* Create the Kinesis client */
+    val randomWords = List("spark","you","are","my","father")
+    val totals = scala.collection.mutable.Map[String, Int]()
+  
+    // Create the low-level Kinesis Client from the AWS Java SDK.
     val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
     kinesisClient.setEndpoint(endpoint)
 
     println(s"Putting records onto stream $stream and endpoint $endpoint at a rate of" +
-      s" $recordsPerSecond records per second and $wordsPerRecord words per record");
-
-    val totals = new Array[Int](MaxRandomInts)
-    /* Put String records onto the stream per the given recordPerSec and wordsPerRecord */
-    for (i <- 1 to 5) {
-
-      /* Generate recordsPerSec records to put onto the stream */
-      val records = (1 to recordsPerSecond.toInt).map { recordNum =>
-        /* 
-         *  Randomly generate each wordsPerRec words between 0 (inclusive)
-         *  and MAX_RANDOM_INTS (exclusive) 
-         */
+        s" $recordsPerSecond records per second and $wordsPerRecord words per record")
+  
+    // Iterate and put records onto the stream per the given recordPerSec and wordsPerRecord
+    for (i <- 1 to 10) {
+      // Generate recordsPerSec records to put onto the stream
+      val records = (1 to recordsPerSecond.toInt).foreach { recordNum =>
+        // Randomly generate wordsPerRecord number of words
         val data = (1 to wordsPerRecord.toInt).map(x => {
-          /* Generate the random int */
-          val randomInt = Random.nextInt(MaxRandomInts)
+          // Get a random index to a word
+          val randomWordIdx = Random.nextInt(randomWords.size)
+          val randomWord = randomWords(randomWordIdx)
 
-          /* Keep track of the totals */
-          totals(randomInt) += 1
+          // Increment total count to compare to server counts later
+          totals(randomWord) = totals.getOrElse(randomWord, 0) + 1
 
-          randomInt.toString()
+          randomWord
         }).mkString(" ")
 
-        /* Create a partitionKey based on recordNum */
+        // Create a partitionKey based on recordNum
         val partitionKey = s"partitionKey-$recordNum"
 
-        /* Create a PutRecordRequest with an Array[Byte] version of the data */
+        // Create a PutRecordRequest with an Array[Byte] version of the data
         val putRecordRequest = new PutRecordRequest().withStreamName(stream)
             .withPartitionKey(partitionKey)
-            .withData(ByteBuffer.wrap(data.getBytes()));
+            .withData(ByteBuffer.wrap(data.getBytes()))
 
-        /* Put the record onto the stream and capture the PutRecordResult */
-        val putRecordResult = kinesisClient.putRecord(putRecordRequest);
+        // Put the record onto the stream and capture the PutRecordResult
+        val putRecordResult = kinesisClient.putRecord(putRecordRequest)
       }
 
-      /* Sleep for a second */
+      // Sleep for a second
       Thread.sleep(1000)
       println("Sent " + recordsPerSecond + " records")
     }
-
-    /* Convert the totals to (index, total) tuple */
-    (0 to (MaxRandomInts - 1)).zip(totals)
+     // Convert the totals to (index, total) tuple
+    totals.toSeq.sortBy(_._1)
   }
 }
 
@@ -233,8 +258,7 @@ private object KinesisWordCountProducerASL {
  *  This has been lifted from the examples/ project to remove the circular dependency.
  */
 private[streaming] object StreamingExamples extends Logging {
-
-  /** Set reasonable logging levels for streaming if the user has not configured log4j. */
+  // Set reasonable logging levels for streaming if the user has not configured log4j.
   def setStreamingLogLevels() {
     val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
     if (!log4jInitialized) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org