You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/07/31 21:09:52 UTC

spark git commit: [SPARK-8564] [STREAMING] Add the Python API for Kinesis

Repository: spark
Updated Branches:
  refs/heads/master 39ab199a3 -> 3afc1de89


[SPARK-8564] [STREAMING] Add the Python API for Kinesis

This PR adds the Python API for Kinesis, including a Python example and a simple unit test.

Author: zsxwing <zs...@gmail.com>

Closes #6955 from zsxwing/kinesis-python and squashes the following commits:

e42e471 [zsxwing] Merge branch 'master' into kinesis-python
455f7ea [zsxwing] Remove streaming_kinesis_asl_assembly module and simply add the source folder to streaming_kinesis_asl module
32e6451 [zsxwing] Merge remote-tracking branch 'origin/master' into kinesis-python
5082d28 [zsxwing] Fix the syntax error for Python 2.6
fca416b [zsxwing] Fix wrong comparison
96670ff [zsxwing] Fix the compilation error after merging master
756a128 [zsxwing] Merge branch 'master' into kinesis-python
6c37395 [zsxwing] Print stack trace for debug
7c5cfb0 [zsxwing] RUN_KINESIS_TESTS -> ENABLE_KINESIS_TESTS
cc9d071 [zsxwing] Fix the python test errors
466b425 [zsxwing] Add python tests for Kinesis
e33d505 [zsxwing] Merge remote-tracking branch 'origin/master' into kinesis-python
3da2601 [zsxwing] Fix the kinesis folder
687446b [zsxwing] Fix the error message and the maven output path
add2beb [zsxwing] Merge branch 'master' into kinesis-python
4957c0b [zsxwing] Add the Python API for Kinesis


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3afc1de8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3afc1de8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3afc1de8

Branch: refs/heads/master
Commit: 3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0
Parents: 39ab199
Author: zsxwing <zs...@gmail.com>
Authored: Fri Jul 31 12:09:48 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri Jul 31 12:09:48 2015 -0700

----------------------------------------------------------------------
 dev/run-tests.py                                |   3 +-
 dev/sparktestsupport/modules.py                 |   9 +-
 docs/streaming-kinesis-integration.md           |  19 ++++
 extras/kinesis-asl-assembly/pom.xml             | 103 +++++++++++++++++
 .../examples/streaming/kinesis_wordcount_asl.py |  81 ++++++++++++++
 .../streaming/kinesis/KinesisTestUtils.scala    |  19 +++-
 .../spark/streaming/kinesis/KinesisUtils.scala  |  78 ++++++++++---
 pom.xml                                         |   1 +
 project/SparkBuild.scala                        |   6 +-
 python/pyspark/streaming/kinesis.py             | 112 +++++++++++++++++++
 python/pyspark/streaming/tests.py               |  86 +++++++++++++-
 11 files changed, 492 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3afc1de8/dev/run-tests.py
----------------------------------------------------------------------
diff --git a/dev/run-tests.py b/dev/run-tests.py
index 29420da..b6d1814 100755
--- a/dev/run-tests.py
+++ b/dev/run-tests.py
@@ -301,7 +301,8 @@ def build_spark_sbt(hadoop_version):
     sbt_goals = ["package",
                  "assembly/assembly",
                  "streaming-kafka-assembly/assembly",
-                 "streaming-flume-assembly/assembly"]
+                 "streaming-flume-assembly/assembly",
+                 "streaming-kinesis-asl-assembly/assembly"]
     profiles_and_goals = build_profiles + sbt_goals
 
     print("[info] Building Spark (w/Hive 0.13.1) using SBT with these arguments: ",

http://git-wip-us.apache.org/repos/asf/spark/blob/3afc1de8/dev/sparktestsupport/modules.py
----------------------------------------------------------------------
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 44600cb..956dc81 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -138,6 +138,7 @@ streaming_kinesis_asl = Module(
     dependencies=[],
     source_file_regexes=[
         "extras/kinesis-asl/",
+        "extras/kinesis-asl-assembly/",
     ],
     build_profile_flags=[
         "-Pkinesis-asl",
@@ -300,7 +301,13 @@ pyspark_sql = Module(
 
 pyspark_streaming = Module(
     name="pyspark-streaming",
-    dependencies=[pyspark_core, streaming, streaming_kafka, streaming_flume_assembly],
+    dependencies=[
+        pyspark_core,
+        streaming,
+        streaming_kafka,
+        streaming_flume_assembly,
+        streaming_kinesis_asl
+    ],
     source_file_regexes=[
         "python/pyspark/streaming"
     ],

http://git-wip-us.apache.org/repos/asf/spark/blob/3afc1de8/docs/streaming-kinesis-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kinesis-integration.md b/docs/streaming-kinesis-integration.md
index aa9749a..a7bcaec 100644
--- a/docs/streaming-kinesis-integration.md
+++ b/docs/streaming-kinesis-integration.md
@@ -52,6 +52,17 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
 	and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java). Refer to the next subsection for instructions to run the example.
 
 	</div>
+	<div data-lang="python" markdown="1">
+		from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
+
+		kinesisStream = KinesisUtils.createStream(
+			streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
+			[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)
+
+	See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.kinesis.KinesisUtils)
+	and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py). Refer to the next subsection for instructions to run the example.
+
+	</div>
 	</div>
 
     - `streamingContext`: StreamingContext containg an application name used by Kinesis to tie this Kinesis application to the Kinesis stream
@@ -136,6 +147,14 @@ To run the example,
         bin/run-example streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
 
 	</div>
+	<div data-lang="python" markdown="1">
+
+        bin/spark-submit --jars extras/kinesis-asl/target/scala-*/\
+            spark-streaming-kinesis-asl-assembly_*.jar \
+            extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \
+            [Kinesis app name] [Kinesis stream name] [endpoint URL] [region name]
+
+	</div>
 	</div>
 
     This will wait for data to be received from the Kinesis stream.

http://git-wip-us.apache.org/repos/asf/spark/blob/3afc1de8/extras/kinesis-asl-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl-assembly/pom.xml b/extras/kinesis-asl-assembly/pom.xml
new file mode 100644
index 0000000..70d2c9c
--- /dev/null
+++ b/extras/kinesis-asl-assembly/pom.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.10</artifactId>
+    <version>1.5.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.spark</groupId>
+  <artifactId>spark-streaming-kinesis-asl-assembly_2.10</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Project Kinesis Assembly</name>
+  <url>http://spark.apache.org/</url>
+
+  <properties>
+    <sbt.project.name>streaming-kinesis-asl-assembly</sbt.project.name>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming-kinesis-asl_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+  <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+  <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+  <plugins>
+    <plugin>
+      <groupId>org.apache.maven.plugins</groupId>
+      <artifactId>maven-shade-plugin</artifactId>
+      <configuration>
+        <shadedArtifactAttached>false</shadedArtifactAttached>
+        <outputFile>${project.build.directory}/scala-${scala.binary.version}/spark-streaming-kinesis-asl-assembly-${project.version}.jar</outputFile>
+        <artifactSet>
+          <includes>
+            <include>*:*</include>
+          </includes>
+        </artifactSet>
+        <filters>
+          <filter>
+            <artifact>*:*</artifact>
+            <excludes>
+              <exclude>META-INF/*.SF</exclude>
+              <exclude>META-INF/*.DSA</exclude>
+              <exclude>META-INF/*.RSA</exclude>
+            </excludes>
+          </filter>
+        </filters>
+      </configuration>
+      <executions>
+        <execution>
+          <phase>package</phase>
+          <goals>
+            <goal>shade</goal>
+          </goals>
+          <configuration>
+            <transformers>
+              <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+              <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+                <resource>reference.conf</resource>
+              </transformer>
+              <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+                <resource>log4j.properties</resource>
+              </transformer>
+              <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
+              <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
+            </transformers>
+          </configuration>
+        </execution>
+      </executions>
+    </plugin>
+  </plugins>
+</build>
+</project>
+

http://git-wip-us.apache.org/repos/asf/spark/blob/3afc1de8/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py b/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py
new file mode 100644
index 0000000..f428f64
--- /dev/null
+++ b/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+  Consumes messages from a Amazon Kinesis streams and does wordcount.
+
+  This example spins up 1 Kinesis Receiver per shard for the given stream.
+  It then starts pulling from the last checkpointed sequence number of the given stream.
+
+  Usage: kinesis_wordcount_asl.py <app-name> <stream-name> <endpoint-url> <region-name>
+    <app-name> is the name of the consumer app, used to track the read data in DynamoDB
+    <stream-name> name of the Kinesis stream (ie. mySparkStream)
+    <endpoint-url> endpoint of the Kinesis service
+      (e.g. https://kinesis.us-east-1.amazonaws.com)
+
+
+  Example:
+      # export AWS keys if necessary
+      $ export AWS_ACCESS_KEY_ID=<your-access-key>
+      $ export AWS_SECRET_KEY=<your-secret-key>
+
+      # run the example
+      $ bin/spark-submit -jar extras/kinesis-asl/target/scala-*/\
+        spark-streaming-kinesis-asl-assembly_*.jar \
+        extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \
+        myAppName mySparkStream https://kinesis.us-east-1.amazonaws.com
+
+  There is a companion helper class called KinesisWordProducerASL which puts dummy data
+  onto the Kinesis stream.
+
+  This code uses the DefaultAWSCredentialsProviderChain to find credentials
+  in the following order:
+      Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+      Java System Properties - aws.accessKeyId and aws.secretKey
+      Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
+      Instance profile credentials - delivered through the Amazon EC2 metadata service
+  For more information, see
+      http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html
+
+  See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on
+  the Kinesis Spark Streaming integration.
+"""
+import sys
+
+from pyspark import SparkContext
+from pyspark.streaming import StreamingContext
+from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
+
+if __name__ == "__main__":
+    if len(sys.argv) != 5:
+        print(
+            "Usage: kinesis_wordcount_asl.py <app-name> <stream-name> <endpoint-url> <region-name>",
+            file=sys.stderr)
+        sys.exit(-1)
+
+    sc = SparkContext(appName="PythonStreamingKinesisWordCountAsl")
+    ssc = StreamingContext(sc, 1)
+    appName, streamName, endpointUrl, regionName = sys.argv[1:]
+    lines = KinesisUtils.createStream(
+        ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, 2)
+    counts = lines.flatMap(lambda line: line.split(" ")) \
+        .map(lambda word: (word, 1)) \
+        .reduceByKey(lambda a, b: a+b)
+    counts.pprint()
+
+    ssc.start()
+    ssc.awaitTermination()

http://git-wip-us.apache.org/repos/asf/spark/blob/3afc1de8/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index ca39358..255ac27 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -36,9 +36,15 @@ import org.apache.spark.Logging
 /**
  * Shared utility methods for performing Kinesis tests that actually transfer data
  */
-private class KinesisTestUtils(
-    val endpointUrl: String = "https://kinesis.us-west-2.amazonaws.com",
-    _regionName: String = "") extends Logging {
+private class KinesisTestUtils(val endpointUrl: String, _regionName: String) extends Logging {
+
+  def this() {
+    this("https://kinesis.us-west-2.amazonaws.com", "")
+  }
+
+  def this(endpointUrl: String) {
+    this(endpointUrl, "")
+  }
 
   val regionName = if (_regionName.length == 0) {
     RegionUtils.getRegionByEndpoint(endpointUrl).getName()
@@ -117,6 +123,13 @@ private class KinesisTestUtils(
     shardIdToSeqNumbers.toMap
   }
 
+  /**
+   * Expose a Python friendly API.
+   */
+  def pushData(testData: java.util.List[Int]): Unit = {
+    pushData(scala.collection.JavaConversions.asScalaBuffer(testData))
+  }
+
   def deleteStream(): Unit = {
     try {
       if (streamCreated) {

http://git-wip-us.apache.org/repos/asf/spark/blob/3afc1de8/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
index e5acab5..7dab17e 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
@@ -86,19 +86,19 @@ object KinesisUtils {
    * @param endpointUrl  Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
    * @param regionName   Name of region used by the Kinesis Client Library (KCL) to update
    *                     DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
-   * @param awsAccessKeyId  AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
-   * @param awsSecretKey  AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
-   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
-   *                            See the Kinesis Spark Streaming documentation for more
-   *                            details on the different types of checkpoints.
    * @param initialPositionInStream  In the absence of Kinesis checkpoint info, this is the
    *                                 worker's initial starting position in the stream.
    *                                 The values are either the beginning of the stream
    *                                 per Kinesis' limit of 24 hours
    *                                 (InitialPositionInStream.TRIM_HORIZON) or
    *                                 the tip of the stream (InitialPositionInStream.LATEST).
+   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
+   *                            See the Kinesis Spark Streaming documentation for more
+   *                            details on the different types of checkpoints.
    * @param storageLevel Storage level to use for storing the received objects.
    *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
+   * @param awsAccessKeyId  AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
+   * @param awsSecretKey  AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
    */
   def createStream(
       ssc: StreamingContext,
@@ -130,7 +130,7 @@ object KinesisUtils {
    * - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name in
    *   [[org.apache.spark.SparkConf]].
    *
-   * @param ssc Java StreamingContext object
+   * @param ssc StreamingContext object
    * @param streamName   Kinesis stream name
    * @param endpointUrl  Endpoint url of Kinesis service
    *                     (e.g., https://kinesis.us-east-1.amazonaws.com)
@@ -175,15 +175,15 @@ object KinesisUtils {
    * @param endpointUrl  Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
    * @param regionName   Name of region used by the Kinesis Client Library (KCL) to update
    *                     DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
-   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
-   *                            See the Kinesis Spark Streaming documentation for more
-   *                            details on the different types of checkpoints.
    * @param initialPositionInStream  In the absence of Kinesis checkpoint info, this is the
    *                                 worker's initial starting position in the stream.
    *                                 The values are either the beginning of the stream
    *                                 per Kinesis' limit of 24 hours
    *                                 (InitialPositionInStream.TRIM_HORIZON) or
    *                                 the tip of the stream (InitialPositionInStream.LATEST).
+   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
+   *                            See the Kinesis Spark Streaming documentation for more
+   *                            details on the different types of checkpoints.
    * @param storageLevel Storage level to use for storing the received objects.
    *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
    */
@@ -206,8 +206,8 @@ object KinesisUtils {
    * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
    *
    * Note:
-   *  The given AWS credentials will get saved in DStream checkpoints if checkpointing
-   *  is enabled. Make sure that your checkpoint directory is secure.
+   * The given AWS credentials will get saved in DStream checkpoints if checkpointing
+   * is enabled. Make sure that your checkpoint directory is secure.
    *
    * @param jssc Java StreamingContext object
    * @param kinesisAppName  Kinesis application name used by the Kinesis Client Library
@@ -216,19 +216,19 @@ object KinesisUtils {
    * @param endpointUrl  Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
    * @param regionName   Name of region used by the Kinesis Client Library (KCL) to update
    *                     DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
-   * @param awsAccessKeyId  AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
-   * @param awsSecretKey  AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
-   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
-   *                            See the Kinesis Spark Streaming documentation for more
-   *                            details on the different types of checkpoints.
    * @param initialPositionInStream  In the absence of Kinesis checkpoint info, this is the
    *                                 worker's initial starting position in the stream.
    *                                 The values are either the beginning of the stream
    *                                 per Kinesis' limit of 24 hours
    *                                 (InitialPositionInStream.TRIM_HORIZON) or
    *                                 the tip of the stream (InitialPositionInStream.LATEST).
+   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
+   *                            See the Kinesis Spark Streaming documentation for more
+   *                            details on the different types of checkpoints.
    * @param storageLevel Storage level to use for storing the received objects.
    *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
+   * @param awsAccessKeyId  AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
+   * @param awsSecretKey  AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
    */
   def createStream(
       jssc: JavaStreamingContext,
@@ -297,3 +297,49 @@ object KinesisUtils {
     }
   }
 }
+
+/**
+ * This is a helper class that wraps the methods in KinesisUtils into more Python-friendly class and
+ * function so that it can be easily instantiated and called from Python's KinesisUtils.
+ */
+private class KinesisUtilsPythonHelper {
+
+  def getInitialPositionInStream(initialPositionInStream: Int): InitialPositionInStream = {
+    initialPositionInStream match {
+      case 0 => InitialPositionInStream.LATEST
+      case 1 => InitialPositionInStream.TRIM_HORIZON
+      case _ => throw new IllegalArgumentException(
+        "Illegal InitialPositionInStream. Please use " +
+          "InitialPositionInStream.LATEST or InitialPositionInStream.TRIM_HORIZON")
+    }
+  }
+
+  def createStream(
+      jssc: JavaStreamingContext,
+      kinesisAppName: String,
+      streamName: String,
+      endpointUrl: String,
+      regionName: String,
+      initialPositionInStream: Int,
+      checkpointInterval: Duration,
+      storageLevel: StorageLevel,
+      awsAccessKeyId: String,
+      awsSecretKey: String
+      ): JavaReceiverInputDStream[Array[Byte]] = {
+    if (awsAccessKeyId == null && awsSecretKey != null) {
+      throw new IllegalArgumentException("awsSecretKey is set but awsAccessKeyId is null")
+    }
+    if (awsAccessKeyId != null && awsSecretKey == null) {
+      throw new IllegalArgumentException("awsAccessKeyId is set but awsSecretKey is null")
+    }
+    if (awsAccessKeyId == null && awsSecretKey == null) {
+      KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName,
+        getInitialPositionInStream(initialPositionInStream), checkpointInterval, storageLevel)
+    } else {
+      KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName,
+        getInitialPositionInStream(initialPositionInStream), checkpointInterval, storageLevel,
+        awsAccessKeyId, awsSecretKey)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3afc1de8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 35fc8c4..e351c7c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1642,6 +1642,7 @@
       <id>kinesis-asl</id>
       <modules>
         <module>extras/kinesis-asl</module>
+        <module>extras/kinesis-asl-assembly</module>
       </modules>
     </profile>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3afc1de8/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 61a05d3..9a33baa 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -45,8 +45,8 @@ object BuildCommons {
     sparkKinesisAsl) = Seq("yarn", "yarn-stable", "java8-tests", "ganglia-lgpl",
     "kinesis-asl").map(ProjectRef(buildLocation, _))
 
-  val assemblyProjects@Seq(assembly, examples, networkYarn, streamingFlumeAssembly, streamingKafkaAssembly) =
-    Seq("assembly", "examples", "network-yarn", "streaming-flume-assembly", "streaming-kafka-assembly")
+  val assemblyProjects@Seq(assembly, examples, networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingKinesisAslAssembly) =
+    Seq("assembly", "examples", "network-yarn", "streaming-flume-assembly", "streaming-kafka-assembly", "streaming-kinesis-asl-assembly")
       .map(ProjectRef(buildLocation, _))
 
   val tools = ProjectRef(buildLocation, "tools")
@@ -382,7 +382,7 @@ object Assembly {
         .getOrElse(SbtPomKeys.effectivePom.value.getProperties.get("hadoop.version").asInstanceOf[String])
     },
     jarName in assembly <<= (version, moduleName, hadoopVersion) map { (v, mName, hv) =>
-      if (mName.contains("streaming-flume-assembly") || mName.contains("streaming-kafka-assembly")) {
+      if (mName.contains("streaming-flume-assembly") || mName.contains("streaming-kafka-assembly") || mName.contains("streaming-kinesis-asl-assembly")) {
         // This must match the same name used in maven (see external/kafka-assembly/pom.xml)
         s"${mName}-${v}.jar"
       } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/3afc1de8/python/pyspark/streaming/kinesis.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/kinesis.py b/python/pyspark/streaming/kinesis.py
new file mode 100644
index 0000000..bcfe270
--- /dev/null
+++ b/python/pyspark/streaming/kinesis.py
@@ -0,0 +1,112 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from py4j.java_gateway import Py4JJavaError
+
+from pyspark.serializers import PairDeserializer, NoOpSerializer
+from pyspark.storagelevel import StorageLevel
+from pyspark.streaming import DStream
+
+__all__ = ['KinesisUtils', 'InitialPositionInStream', 'utf8_decoder']
+
+
+def utf8_decoder(s):
+    """ Decode the unicode as UTF-8 """
+    return s and s.decode('utf-8')
+
+
+class KinesisUtils(object):
+
+    @staticmethod
+    def createStream(ssc, kinesisAppName, streamName, endpointUrl, regionName,
+                     initialPositionInStream, checkpointInterval,
+                     storageLevel=StorageLevel.MEMORY_AND_DISK_2,
+                     awsAccessKeyId=None, awsSecretKey=None, decoder=utf8_decoder):
+        """
+        Create an input stream that pulls messages from a Kinesis stream. This uses the
+        Kinesis Client Library (KCL) to pull messages from Kinesis.
+
+        Note: The given AWS credentials will get saved in DStream checkpoints if checkpointing is
+        enabled. Make sure that your checkpoint directory is secure.
+
+        :param ssc:  StreamingContext object
+        :param kinesisAppName:  Kinesis application name used by the Kinesis Client Library (KCL) to
+                                update DynamoDB
+        :param streamName:  Kinesis stream name
+        :param endpointUrl:  Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
+        :param regionName:  Name of region used by the Kinesis Client Library (KCL) to update
+                            DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
+        :param initialPositionInStream:  In the absence of Kinesis checkpoint info, this is the
+                                         worker's initial starting position in the stream. The
+                                         values are either the beginning of the stream per Kinesis'
+                                         limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) or
+                                         the tip of the stream (InitialPositionInStream.LATEST).
+        :param checkpointInterval:  Checkpoint interval for Kinesis checkpointing. See the Kinesis
+                                    Spark Streaming documentation for more details on the different
+                                    types of checkpoints.
+        :param storageLevel:  Storage level to use for storing the received objects (default is
+                              StorageLevel.MEMORY_AND_DISK_2)
+        :param awsAccessKeyId:  AWS AccessKeyId (default is None. If None, will use
+                                DefaultAWSCredentialsProviderChain)
+        :param awsSecretKey:  AWS SecretKey (default is None. If None, will use
+                              DefaultAWSCredentialsProviderChain)
+        :param decoder:  A function used to decode value (default is utf8_decoder)
+        :return: A DStream object
+        """
+        jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
+        jduration = ssc._jduration(checkpointInterval)
+
+        try:
+            # Use KinesisUtilsPythonHelper to access Scala's KinesisUtils
+            helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
+                .loadClass("org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper")
+            helper = helperClass.newInstance()
+            jstream = helper.createStream(ssc._jssc, kinesisAppName, streamName, endpointUrl,
+                                          regionName, initialPositionInStream, jduration, jlevel,
+                                          awsAccessKeyId, awsSecretKey)
+        except Py4JJavaError as e:
+            if 'ClassNotFoundException' in str(e.java_exception):
+                KinesisUtils._printErrorMsg(ssc.sparkContext)
+            raise e
+        stream = DStream(jstream, ssc, NoOpSerializer())
+        return stream.map(lambda v: decoder(v))
+
+    @staticmethod
+    def _printErrorMsg(sc):
+        print("""
+________________________________________________________________________________________________
+
+  Spark Streaming's Kinesis libraries not found in class path. Try one of the following.
+
+  1. Include the Kinesis library and its dependencies with in the
+     spark-submit command as
+
+     $ bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl:%s ...
+
+  2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
+     Group Id = org.apache.spark, Artifact Id = spark-streaming-kinesis-asl-assembly, Version = %s.
+     Then, include the jar in the spark-submit command as
+
+     $ bin/spark-submit --jars <spark-streaming-kinesis-asl-assembly.jar> ...
+
+________________________________________________________________________________________________
+
+""" % (sc.version, sc.version))
+
+
+class InitialPositionInStream(object):
+    LATEST, TRIM_HORIZON = (0, 1)

http://git-wip-us.apache.org/repos/asf/spark/blob/3afc1de8/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 4ecae1e..5cd544b 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -36,9 +36,11 @@ else:
     import unittest
 
 from pyspark.context import SparkConf, SparkContext, RDD
+from pyspark.storagelevel import StorageLevel
 from pyspark.streaming.context import StreamingContext
 from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPartition
 from pyspark.streaming.flume import FlumeUtils
+from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
 
 
 class PySparkStreamingTestCase(unittest.TestCase):
@@ -891,6 +893,67 @@ class FlumePollingStreamTests(PySparkStreamingTestCase):
         self._testMultipleTimes(self._testFlumePollingMultipleHosts)
 
 
+class KinesisStreamTests(PySparkStreamingTestCase):
+
+    def test_kinesis_stream_api(self):
+        # Don't start the StreamingContext because we cannot test it in Jenkins
+        kinesisStream1 = KinesisUtils.createStream(
+            self.ssc, "myAppNam", "mySparkStream",
+            "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
+            InitialPositionInStream.LATEST, 2, StorageLevel.MEMORY_AND_DISK_2)
+        kinesisStream2 = KinesisUtils.createStream(
+            self.ssc, "myAppNam", "mySparkStream",
+            "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
+            InitialPositionInStream.LATEST, 2, StorageLevel.MEMORY_AND_DISK_2,
+            "awsAccessKey", "awsSecretKey")
+
+    def test_kinesis_stream(self):
+        if os.environ.get('ENABLE_KINESIS_TESTS') != '1':
+            print("Skip test_kinesis_stream")
+            return
+
+        import random
+        kinesisAppName = ("KinesisStreamTests-%d" % abs(random.randint(0, 10000000)))
+        kinesisTestUtilsClz = \
+            self.sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
+                .loadClass("org.apache.spark.streaming.kinesis.KinesisTestUtils")
+        kinesisTestUtils = kinesisTestUtilsClz.newInstance()
+        try:
+            kinesisTestUtils.createStream()
+            aWSCredentials = kinesisTestUtils.getAWSCredentials()
+            stream = KinesisUtils.createStream(
+                self.ssc, kinesisAppName, kinesisTestUtils.streamName(),
+                kinesisTestUtils.endpointUrl(), kinesisTestUtils.regionName(),
+                InitialPositionInStream.LATEST, 10, StorageLevel.MEMORY_ONLY,
+                aWSCredentials.getAWSAccessKeyId(), aWSCredentials.getAWSSecretKey())
+
+            outputBuffer = []
+
+            def get_output(_, rdd):
+                for e in rdd.collect():
+                    outputBuffer.append(e)
+
+            stream.foreachRDD(get_output)
+            self.ssc.start()
+
+            testData = [i for i in range(1, 11)]
+            expectedOutput = set([str(i) for i in testData])
+            start_time = time.time()
+            while time.time() - start_time < 120:
+                kinesisTestUtils.pushData(testData)
+                if expectedOutput == set(outputBuffer):
+                    break
+                time.sleep(10)
+            self.assertEqual(expectedOutput, set(outputBuffer))
+        except:
+            import traceback
+            traceback.print_exc()
+            raise
+        finally:
+            kinesisTestUtils.deleteStream()
+            kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
+
+
 def search_kafka_assembly_jar():
     SPARK_HOME = os.environ["SPARK_HOME"]
     kafka_assembly_dir = os.path.join(SPARK_HOME, "external/kafka-assembly")
@@ -926,10 +989,31 @@ def search_flume_assembly_jar():
     else:
         return jars[0]
 
+
+def search_kinesis_asl_assembly_jar():
+    SPARK_HOME = os.environ["SPARK_HOME"]
+    kinesis_asl_assembly_dir = os.path.join(SPARK_HOME, "extras/kinesis-asl-assembly")
+    jars = glob.glob(
+        os.path.join(kinesis_asl_assembly_dir,
+                     "target/scala-*/spark-streaming-kinesis-asl-assembly-*.jar"))
+    if not jars:
+        raise Exception(
+            ("Failed to find Spark Streaming Kinesis ASL assembly jar in %s. " %
+             kinesis_asl_assembly_dir) + "You need to build Spark with "
+            "'build/sbt -Pkinesis-asl assembly/assembly streaming-kinesis-asl-assembly/assembly' "
+            "or 'build/mvn -Pkinesis-asl package' before running this test")
+    elif len(jars) > 1:
+        raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly JARs in %s; please "
+                         "remove all but one") % kinesis_asl_assembly_dir)
+    else:
+        return jars[0]
+
+
 if __name__ == "__main__":
     kafka_assembly_jar = search_kafka_assembly_jar()
     flume_assembly_jar = search_flume_assembly_jar()
-    jars = "%s,%s" % (kafka_assembly_jar, flume_assembly_jar)
+    kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar()
+    jars = "%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, kinesis_asl_assembly_jar)
 
     os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
     unittest.main()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org