You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/03/09 19:27:49 UTC

[2/8] spark git commit: [SPARK-13595][BUILD] Move docker, extras modules into external

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
deleted file mode 100644
index 15ac588..0000000
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
+++ /dev/null
@@ -1,560 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.kinesis
-
-import scala.reflect.ClassTag
-
-import com.amazonaws.regions.RegionUtils
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
-import com.amazonaws.services.kinesis.model.Record
-
-import org.apache.spark.api.java.function.{Function => JFunction}
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Duration, StreamingContext}
-import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-
-object KinesisUtils {
-  /**
-   * Create an input stream that pulls messages from a Kinesis stream.
-   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
-   *
-   * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
-   * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
-   * gets the AWS credentials.
-   *
-   * @param ssc StreamingContext object
-   * @param kinesisAppName  Kinesis application name used by the Kinesis Client Library
-   *                        (KCL) to update DynamoDB
-   * @param streamName   Kinesis stream name
-   * @param endpointUrl  Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
-   * @param regionName   Name of region used by the Kinesis Client Library (KCL) to update
-   *                     DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
-   * @param initialPositionInStream  In the absence of Kinesis checkpoint info, this is the
-   *                                 worker's initial starting position in the stream.
-   *                                 The values are either the beginning of the stream
-   *                                 per Kinesis' limit of 24 hours
-   *                                 (InitialPositionInStream.TRIM_HORIZON) or
-   *                                 the tip of the stream (InitialPositionInStream.LATEST).
-   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
-   *                            See the Kinesis Spark Streaming documentation for more
-   *                            details on the different types of checkpoints.
-   * @param storageLevel Storage level to use for storing the received objects.
-   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
-   * @param messageHandler A custom message handler that can generate a generic output from a
-   *                       Kinesis `Record`, which contains both message data, and metadata.
-   */
-  def createStream[T: ClassTag](
-      ssc: StreamingContext,
-      kinesisAppName: String,
-      streamName: String,
-      endpointUrl: String,
-      regionName: String,
-      initialPositionInStream: InitialPositionInStream,
-      checkpointInterval: Duration,
-      storageLevel: StorageLevel,
-      messageHandler: Record => T): ReceiverInputDStream[T] = {
-    val cleanedHandler = ssc.sc.clean(messageHandler)
-    // Setting scope to override receiver stream's scope of "receiver stream"
-    ssc.withNamedScope("kinesis stream") {
-      new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName),
-        initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel,
-        cleanedHandler, None)
-    }
-  }
-
-  /**
-   * Create an input stream that pulls messages from a Kinesis stream.
-   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
-   *
-   * Note:
-   *  The given AWS credentials will get saved in DStream checkpoints if checkpointing
-   *  is enabled. Make sure that your checkpoint directory is secure.
-   *
-   * @param ssc StreamingContext object
-   * @param kinesisAppName  Kinesis application name used by the Kinesis Client Library
-   *                        (KCL) to update DynamoDB
-   * @param streamName   Kinesis stream name
-   * @param endpointUrl  Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
-   * @param regionName   Name of region used by the Kinesis Client Library (KCL) to update
-   *                     DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
-   * @param initialPositionInStream  In the absence of Kinesis checkpoint info, this is the
-   *                                 worker's initial starting position in the stream.
-   *                                 The values are either the beginning of the stream
-   *                                 per Kinesis' limit of 24 hours
-   *                                 (InitialPositionInStream.TRIM_HORIZON) or
-   *                                 the tip of the stream (InitialPositionInStream.LATEST).
-   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
-   *                            See the Kinesis Spark Streaming documentation for more
-   *                            details on the different types of checkpoints.
-   * @param storageLevel Storage level to use for storing the received objects.
-   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
-   * @param messageHandler A custom message handler that can generate a generic output from a
-   *                       Kinesis `Record`, which contains both message data, and metadata.
-   * @param awsAccessKeyId  AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
-   * @param awsSecretKey  AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
-   */
-  // scalastyle:off
-  def createStream[T: ClassTag](
-      ssc: StreamingContext,
-      kinesisAppName: String,
-      streamName: String,
-      endpointUrl: String,
-      regionName: String,
-      initialPositionInStream: InitialPositionInStream,
-      checkpointInterval: Duration,
-      storageLevel: StorageLevel,
-      messageHandler: Record => T,
-      awsAccessKeyId: String,
-      awsSecretKey: String): ReceiverInputDStream[T] = {
-    // scalastyle:on
-    val cleanedHandler = ssc.sc.clean(messageHandler)
-    ssc.withNamedScope("kinesis stream") {
-      new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName),
-        initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel,
-        cleanedHandler, Some(SerializableAWSCredentials(awsAccessKeyId, awsSecretKey)))
-    }
-  }
-
-  /**
-   * Create an input stream that pulls messages from a Kinesis stream.
-   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
-   *
-   * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
-   * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
-   * gets the AWS credentials.
-   *
-   * @param ssc StreamingContext object
-   * @param kinesisAppName  Kinesis application name used by the Kinesis Client Library
-   *                        (KCL) to update DynamoDB
-   * @param streamName   Kinesis stream name
-   * @param endpointUrl  Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
-   * @param regionName   Name of region used by the Kinesis Client Library (KCL) to update
-   *                     DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
-   * @param initialPositionInStream  In the absence of Kinesis checkpoint info, this is the
-   *                                 worker's initial starting position in the stream.
-   *                                 The values are either the beginning of the stream
-   *                                 per Kinesis' limit of 24 hours
-   *                                 (InitialPositionInStream.TRIM_HORIZON) or
-   *                                 the tip of the stream (InitialPositionInStream.LATEST).
-   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
-   *                            See the Kinesis Spark Streaming documentation for more
-   *                            details on the different types of checkpoints.
-   * @param storageLevel Storage level to use for storing the received objects.
-   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
-   */
-  def createStream(
-      ssc: StreamingContext,
-      kinesisAppName: String,
-      streamName: String,
-      endpointUrl: String,
-      regionName: String,
-      initialPositionInStream: InitialPositionInStream,
-      checkpointInterval: Duration,
-      storageLevel: StorageLevel): ReceiverInputDStream[Array[Byte]] = {
-    // Setting scope to override receiver stream's scope of "receiver stream"
-    ssc.withNamedScope("kinesis stream") {
-      new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, validateRegion(regionName),
-        initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel,
-        defaultMessageHandler, None)
-    }
-  }
-
-  /**
-   * Create an input stream that pulls messages from a Kinesis stream.
-   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
-   *
-   * Note:
-   *  The given AWS credentials will get saved in DStream checkpoints if checkpointing
-   *  is enabled. Make sure that your checkpoint directory is secure.
-   *
-   * @param ssc StreamingContext object
-   * @param kinesisAppName  Kinesis application name used by the Kinesis Client Library
-   *                        (KCL) to update DynamoDB
-   * @param streamName   Kinesis stream name
-   * @param endpointUrl  Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
-   * @param regionName   Name of region used by the Kinesis Client Library (KCL) to update
-   *                     DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
-   * @param initialPositionInStream  In the absence of Kinesis checkpoint info, this is the
-   *                                 worker's initial starting position in the stream.
-   *                                 The values are either the beginning of the stream
-   *                                 per Kinesis' limit of 24 hours
-   *                                 (InitialPositionInStream.TRIM_HORIZON) or
-   *                                 the tip of the stream (InitialPositionInStream.LATEST).
-   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
-   *                            See the Kinesis Spark Streaming documentation for more
-   *                            details on the different types of checkpoints.
-   * @param storageLevel Storage level to use for storing the received objects.
-   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
-   * @param awsAccessKeyId  AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
-   * @param awsSecretKey  AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
-   */
-  def createStream(
-      ssc: StreamingContext,
-      kinesisAppName: String,
-      streamName: String,
-      endpointUrl: String,
-      regionName: String,
-      initialPositionInStream: InitialPositionInStream,
-      checkpointInterval: Duration,
-      storageLevel: StorageLevel,
-      awsAccessKeyId: String,
-      awsSecretKey: String): ReceiverInputDStream[Array[Byte]] = {
-    ssc.withNamedScope("kinesis stream") {
-      new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, validateRegion(regionName),
-        initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel,
-        defaultMessageHandler, Some(SerializableAWSCredentials(awsAccessKeyId, awsSecretKey)))
-    }
-  }
-
-  /**
-   * Create an input stream that pulls messages from a Kinesis stream.
-   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
-   *
-   * Note:
-   *
-   *  - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
-   *    on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
-   *    gets AWS credentials.
-   *  - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch.
-   *  - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name
-   *    in [[org.apache.spark.SparkConf]].
-   *
-   * @param ssc StreamingContext object
-   * @param streamName   Kinesis stream name
-   * @param endpointUrl  Endpoint url of Kinesis service
-   *                     (e.g., https://kinesis.us-east-1.amazonaws.com)
-   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
-   *                            See the Kinesis Spark Streaming documentation for more
-   *                            details on the different types of checkpoints.
-   * @param initialPositionInStream  In the absence of Kinesis checkpoint info, this is the
-   *                                 worker's initial starting position in the stream.
-   *                                 The values are either the beginning of the stream
-   *                                 per Kinesis' limit of 24 hours
-   *                                 (InitialPositionInStream.TRIM_HORIZON) or
-   *                                 the tip of the stream (InitialPositionInStream.LATEST).
-   * @param storageLevel Storage level to use for storing the received objects
-   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
-   */
-  @deprecated("use other forms of createStream", "1.4.0")
-  def createStream(
-      ssc: StreamingContext,
-      streamName: String,
-      endpointUrl: String,
-      checkpointInterval: Duration,
-      initialPositionInStream: InitialPositionInStream,
-      storageLevel: StorageLevel
-    ): ReceiverInputDStream[Array[Byte]] = {
-    ssc.withNamedScope("kinesis stream") {
-      new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl,
-        getRegionByEndpoint(endpointUrl), initialPositionInStream, ssc.sc.appName,
-        checkpointInterval, storageLevel, defaultMessageHandler, None)
-    }
-  }
-
-  /**
-   * Create an input stream that pulls messages from a Kinesis stream.
-   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
-   *
-   * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
-   * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
-   * gets the AWS credentials.
-   *
-   * @param jssc Java StreamingContext object
-   * @param kinesisAppName  Kinesis application name used by the Kinesis Client Library
-   *                        (KCL) to update DynamoDB
-   * @param streamName   Kinesis stream name
-   * @param endpointUrl  Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
-   * @param regionName   Name of region used by the Kinesis Client Library (KCL) to update
-   *                     DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
-   * @param initialPositionInStream  In the absence of Kinesis checkpoint info, this is the
-   *                                 worker's initial starting position in the stream.
-   *                                 The values are either the beginning of the stream
-   *                                 per Kinesis' limit of 24 hours
-   *                                 (InitialPositionInStream.TRIM_HORIZON) or
-   *                                 the tip of the stream (InitialPositionInStream.LATEST).
-   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
-   *                            See the Kinesis Spark Streaming documentation for more
-   *                            details on the different types of checkpoints.
-   * @param storageLevel Storage level to use for storing the received objects.
-   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
-   * @param messageHandler A custom message handler that can generate a generic output from a
-   *                       Kinesis `Record`, which contains both message data, and metadata.
-   * @param recordClass Class of the records in DStream
-   */
-  def createStream[T](
-      jssc: JavaStreamingContext,
-      kinesisAppName: String,
-      streamName: String,
-      endpointUrl: String,
-      regionName: String,
-      initialPositionInStream: InitialPositionInStream,
-      checkpointInterval: Duration,
-      storageLevel: StorageLevel,
-      messageHandler: JFunction[Record, T],
-      recordClass: Class[T]): JavaReceiverInputDStream[T] = {
-    implicit val recordCmt: ClassTag[T] = ClassTag(recordClass)
-    val cleanedHandler = jssc.sparkContext.clean(messageHandler.call(_))
-    createStream[T](jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName,
-      initialPositionInStream, checkpointInterval, storageLevel, cleanedHandler)
-  }
-
-  /**
-   * Create an input stream that pulls messages from a Kinesis stream.
-   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
-   *
-   * Note:
-   * The given AWS credentials will get saved in DStream checkpoints if checkpointing
-   * is enabled. Make sure that your checkpoint directory is secure.
-   *
-   * @param jssc Java StreamingContext object
-   * @param kinesisAppName  Kinesis application name used by the Kinesis Client Library
-   *                        (KCL) to update DynamoDB
-   * @param streamName   Kinesis stream name
-   * @param endpointUrl  Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
-   * @param regionName   Name of region used by the Kinesis Client Library (KCL) to update
-   *                     DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
-   * @param initialPositionInStream  In the absence of Kinesis checkpoint info, this is the
-   *                                 worker's initial starting position in the stream.
-   *                                 The values are either the beginning of the stream
-   *                                 per Kinesis' limit of 24 hours
-   *                                 (InitialPositionInStream.TRIM_HORIZON) or
-   *                                 the tip of the stream (InitialPositionInStream.LATEST).
-   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
-   *                            See the Kinesis Spark Streaming documentation for more
-   *                            details on the different types of checkpoints.
-   * @param storageLevel Storage level to use for storing the received objects.
-   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
-   * @param messageHandler A custom message handler that can generate a generic output from a
-   *                       Kinesis `Record`, which contains both message data, and metadata.
-   * @param recordClass Class of the records in DStream
-   * @param awsAccessKeyId  AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
-   * @param awsSecretKey  AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
-   */
-  // scalastyle:off
-  def createStream[T](
-      jssc: JavaStreamingContext,
-      kinesisAppName: String,
-      streamName: String,
-      endpointUrl: String,
-      regionName: String,
-      initialPositionInStream: InitialPositionInStream,
-      checkpointInterval: Duration,
-      storageLevel: StorageLevel,
-      messageHandler: JFunction[Record, T],
-      recordClass: Class[T],
-      awsAccessKeyId: String,
-      awsSecretKey: String): JavaReceiverInputDStream[T] = {
-    // scalastyle:on
-    implicit val recordCmt: ClassTag[T] = ClassTag(recordClass)
-    val cleanedHandler = jssc.sparkContext.clean(messageHandler.call(_))
-    createStream[T](jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName,
-      initialPositionInStream, checkpointInterval, storageLevel, cleanedHandler,
-      awsAccessKeyId, awsSecretKey)
-  }
-
-  /**
-   * Create an input stream that pulls messages from a Kinesis stream.
-   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
-   *
-   * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
-   * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
-   * gets the AWS credentials.
-   *
-   * @param jssc Java StreamingContext object
-   * @param kinesisAppName  Kinesis application name used by the Kinesis Client Library
-   *                        (KCL) to update DynamoDB
-   * @param streamName   Kinesis stream name
-   * @param endpointUrl  Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
-   * @param regionName   Name of region used by the Kinesis Client Library (KCL) to update
-   *                     DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
-   * @param initialPositionInStream  In the absence of Kinesis checkpoint info, this is the
-   *                                 worker's initial starting position in the stream.
-   *                                 The values are either the beginning of the stream
-   *                                 per Kinesis' limit of 24 hours
-   *                                 (InitialPositionInStream.TRIM_HORIZON) or
-   *                                 the tip of the stream (InitialPositionInStream.LATEST).
-   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
-   *                            See the Kinesis Spark Streaming documentation for more
-   *                            details on the different types of checkpoints.
-   * @param storageLevel Storage level to use for storing the received objects.
-   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
-   */
-  def createStream(
-      jssc: JavaStreamingContext,
-      kinesisAppName: String,
-      streamName: String,
-      endpointUrl: String,
-      regionName: String,
-      initialPositionInStream: InitialPositionInStream,
-      checkpointInterval: Duration,
-      storageLevel: StorageLevel
-    ): JavaReceiverInputDStream[Array[Byte]] = {
-    createStream[Array[Byte]](jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName,
-      initialPositionInStream, checkpointInterval, storageLevel, defaultMessageHandler(_))
-  }
-
-  /**
-   * Create an input stream that pulls messages from a Kinesis stream.
-   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
-   *
-   * Note:
-   * The given AWS credentials will get saved in DStream checkpoints if checkpointing
-   * is enabled. Make sure that your checkpoint directory is secure.
-   *
-   * @param jssc Java StreamingContext object
-   * @param kinesisAppName  Kinesis application name used by the Kinesis Client Library
-   *                        (KCL) to update DynamoDB
-   * @param streamName   Kinesis stream name
-   * @param endpointUrl  Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
-   * @param regionName   Name of region used by the Kinesis Client Library (KCL) to update
-   *                     DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
-   * @param initialPositionInStream  In the absence of Kinesis checkpoint info, this is the
-   *                                 worker's initial starting position in the stream.
-   *                                 The values are either the beginning of the stream
-   *                                 per Kinesis' limit of 24 hours
-   *                                 (InitialPositionInStream.TRIM_HORIZON) or
-   *                                 the tip of the stream (InitialPositionInStream.LATEST).
-   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
-   *                            See the Kinesis Spark Streaming documentation for more
-   *                            details on the different types of checkpoints.
-   * @param storageLevel Storage level to use for storing the received objects.
-   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
-   * @param awsAccessKeyId  AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
-   * @param awsSecretKey  AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
-   */
-  def createStream(
-      jssc: JavaStreamingContext,
-      kinesisAppName: String,
-      streamName: String,
-      endpointUrl: String,
-      regionName: String,
-      initialPositionInStream: InitialPositionInStream,
-      checkpointInterval: Duration,
-      storageLevel: StorageLevel,
-      awsAccessKeyId: String,
-      awsSecretKey: String): JavaReceiverInputDStream[Array[Byte]] = {
-    createStream[Array[Byte]](jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName,
-      initialPositionInStream, checkpointInterval, storageLevel,
-      defaultMessageHandler(_), awsAccessKeyId, awsSecretKey)
-  }
-
-  /**
-   * Create an input stream that pulls messages from a Kinesis stream.
-   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
-   *
-   * Note:
-   * - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
-   *   on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
-   *   gets AWS credentials.
-   * - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch.
-   * - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name in
-   *   [[org.apache.spark.SparkConf]].
-   *
-   * @param jssc Java StreamingContext object
-   * @param streamName   Kinesis stream name
-   * @param endpointUrl  Endpoint url of Kinesis service
-   *                     (e.g., https://kinesis.us-east-1.amazonaws.com)
-   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
-   *                            See the Kinesis Spark Streaming documentation for more
-   *                            details on the different types of checkpoints.
-   * @param initialPositionInStream  In the absence of Kinesis checkpoint info, this is the
-   *                                 worker's initial starting position in the stream.
-   *                                 The values are either the beginning of the stream
-   *                                 per Kinesis' limit of 24 hours
-   *                                 (InitialPositionInStream.TRIM_HORIZON) or
-   *                                 the tip of the stream (InitialPositionInStream.LATEST).
-   * @param storageLevel Storage level to use for storing the received objects
-   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
-   */
-  @deprecated("use other forms of createStream", "1.4.0")
-  def createStream(
-      jssc: JavaStreamingContext,
-      streamName: String,
-      endpointUrl: String,
-      checkpointInterval: Duration,
-      initialPositionInStream: InitialPositionInStream,
-      storageLevel: StorageLevel
-    ): JavaReceiverInputDStream[Array[Byte]] = {
-    createStream(
-      jssc.ssc, streamName, endpointUrl, checkpointInterval, initialPositionInStream, storageLevel)
-  }
-
-  private def getRegionByEndpoint(endpointUrl: String): String = {
-    RegionUtils.getRegionByEndpoint(endpointUrl).getName()
-  }
-
-  private def validateRegion(regionName: String): String = {
-    Option(RegionUtils.getRegion(regionName)).map { _.getName }.getOrElse {
-      throw new IllegalArgumentException(s"Region name '$regionName' is not valid")
-    }
-  }
-
-  private[kinesis] def defaultMessageHandler(record: Record): Array[Byte] = {
-    if (record == null) return null
-    val byteBuffer = record.getData()
-    val byteArray = new Array[Byte](byteBuffer.remaining())
-    byteBuffer.get(byteArray)
-    byteArray
-  }
-}
-
-/**
- * This is a helper class that wraps the methods in KinesisUtils into more Python-friendly class and
- * function so that it can be easily instantiated and called from Python's KinesisUtils.
- */
-private class KinesisUtilsPythonHelper {
-
-  def getInitialPositionInStream(initialPositionInStream: Int): InitialPositionInStream = {
-    initialPositionInStream match {
-      case 0 => InitialPositionInStream.LATEST
-      case 1 => InitialPositionInStream.TRIM_HORIZON
-      case _ => throw new IllegalArgumentException(
-        "Illegal InitialPositionInStream. Please use " +
-          "InitialPositionInStream.LATEST or InitialPositionInStream.TRIM_HORIZON")
-    }
-  }
-
-  def createStream(
-      jssc: JavaStreamingContext,
-      kinesisAppName: String,
-      streamName: String,
-      endpointUrl: String,
-      regionName: String,
-      initialPositionInStream: Int,
-      checkpointInterval: Duration,
-      storageLevel: StorageLevel,
-      awsAccessKeyId: String,
-      awsSecretKey: String
-      ): JavaReceiverInputDStream[Array[Byte]] = {
-    if (awsAccessKeyId == null && awsSecretKey != null) {
-      throw new IllegalArgumentException("awsSecretKey is set but awsAccessKeyId is null")
-    }
-    if (awsAccessKeyId != null && awsSecretKey == null) {
-      throw new IllegalArgumentException("awsAccessKeyId is set but awsSecretKey is null")
-    }
-    if (awsAccessKeyId == null && awsSecretKey == null) {
-      KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName,
-        getInitialPositionInStream(initialPositionInStream), checkpointInterval, storageLevel)
-    } else {
-      KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName,
-        getInitialPositionInStream(initialPositionInStream), checkpointInterval, storageLevel,
-        awsAccessKeyId, awsSecretKey)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java b/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
deleted file mode 100644
index 5c2371c..0000000
--- a/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kinesis;
-
-import com.amazonaws.services.kinesis.model.Record;
-import org.junit.Test;
-
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.LocalJavaStreamingContext;
-import org.apache.spark.streaming.api.java.JavaDStream;
-
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
-
-/**
- * Demonstrate the use of the KinesisUtils Java API
- */
-public class JavaKinesisStreamSuite extends LocalJavaStreamingContext {
-  @Test
-  public void testKinesisStream() {
-    // Tests the API, does not actually test data receiving
-    JavaDStream<byte[]> kinesisStream = KinesisUtils.createStream(ssc, "mySparkStream",
-        "https://kinesis.us-west-2.amazonaws.com", new Duration(2000),
-        InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2());
-
-    ssc.stop();
-  }
-
-
-  private static Function<Record, String> handler = new Function<Record, String>() {
-    @Override
-    public String call(Record record) {
-      return record.getPartitionKey() + "-" + record.getSequenceNumber();
-    }
-  };
-
-  @Test
-  public void testCustomHandler() {
-    // Tests the API, does not actually test data receiving
-    JavaDStream<String> kinesisStream = KinesisUtils.createStream(ssc, "testApp", "mySparkStream",
-        "https://kinesis.us-west-2.amazonaws.com", "us-west-2", InitialPositionInStream.LATEST,
-        new Duration(2000), StorageLevel.MEMORY_AND_DISK_2(), handler, String.class);
-
-    ssc.stop();
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/test/resources/log4j.properties b/extras/kinesis-asl/src/test/resources/log4j.properties
deleted file mode 100644
index edbecda..0000000
--- a/extras/kinesis-asl/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=true
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
-
-# Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.spark-project.jetty=WARN

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
deleted file mode 100644
index fdb270e..0000000
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.kinesis
-
-import java.nio.ByteBuffer
-
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-
-import com.amazonaws.services.kinesis.producer.{KinesisProducer => KPLProducer, KinesisProducerConfiguration, UserRecordResult}
-import com.google.common.util.concurrent.{FutureCallback, Futures}
-
-private[kinesis] class KPLBasedKinesisTestUtils extends KinesisTestUtils {
-  override protected def getProducer(aggregate: Boolean): KinesisDataGenerator = {
-    if (!aggregate) {
-      new SimpleDataGenerator(kinesisClient)
-    } else {
-      new KPLDataGenerator(regionName)
-    }
-  }
-}
-
-/** A wrapper for the KinesisProducer provided in the KPL. */
-private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataGenerator {
-
-  private lazy val producer: KPLProducer = {
-    val conf = new KinesisProducerConfiguration()
-      .setRecordMaxBufferedTime(1000)
-      .setMaxConnections(1)
-      .setRegion(regionName)
-      .setMetricsLevel("none")
-
-    new KPLProducer(conf)
-  }
-
-  override def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]] = {
-    val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()
-    data.foreach { num =>
-      val str = num.toString
-      val data = ByteBuffer.wrap(str.getBytes())
-      val future = producer.addUserRecord(streamName, str, data)
-      val kinesisCallBack = new FutureCallback[UserRecordResult]() {
-        override def onFailure(t: Throwable): Unit = {} // do nothing
-
-        override def onSuccess(result: UserRecordResult): Unit = {
-          val shardId = result.getShardId
-          val seqNumber = result.getSequenceNumber()
-          val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
-            new ArrayBuffer[(Int, String)]())
-          sentSeqNumbers += ((num, seqNumber))
-        }
-      }
-      Futures.addCallback(future, kinesisCallBack)
-    }
-    producer.flushSync()
-    shardIdToSeqNumbers.toMap
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
deleted file mode 100644
index 2555332..0000000
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kinesis
-
-import org.scalatest.BeforeAndAfterEach
-
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException}
-import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId}
-
-abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
-  extends KinesisFunSuite with BeforeAndAfterEach with LocalSparkContext {
-
-  private val testData = 1 to 8
-
-  private var testUtils: KinesisTestUtils = null
-  private var shardIds: Seq[String] = null
-  private var shardIdToData: Map[String, Seq[Int]] = null
-  private var shardIdToSeqNumbers: Map[String, Seq[String]] = null
-  private var shardIdToDataAndSeqNumbers: Map[String, Seq[(Int, String)]] = null
-  private var shardIdToRange: Map[String, SequenceNumberRange] = null
-  private var allRanges: Seq[SequenceNumberRange] = null
-
-  private var blockManager: BlockManager = null
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    runIfTestsEnabled("Prepare KinesisTestUtils") {
-      testUtils = new KPLBasedKinesisTestUtils()
-      testUtils.createStream()
-
-      shardIdToDataAndSeqNumbers = testUtils.pushData(testData, aggregate = aggregateTestData)
-      require(shardIdToDataAndSeqNumbers.size > 1, "Need data to be sent to multiple shards")
-
-      shardIds = shardIdToDataAndSeqNumbers.keySet.toSeq
-      shardIdToData = shardIdToDataAndSeqNumbers.mapValues { _.map { _._1 }}
-      shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.mapValues { _.map { _._2 }}
-      shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) =>
-        val seqNumRange = SequenceNumberRange(
-          testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last)
-        (shardId, seqNumRange)
-      }
-      allRanges = shardIdToRange.values.toSeq
-    }
-  }
-
-  override def beforeEach(): Unit = {
-    super.beforeEach()
-    val conf = new SparkConf().setMaster("local[4]").setAppName("KinesisBackedBlockRDDSuite")
-    sc = new SparkContext(conf)
-    blockManager = sc.env.blockManager
-  }
-
-  override def afterAll(): Unit = {
-    try {
-      if (testUtils != null) {
-        testUtils.deleteStream()
-      }
-    } finally {
-      super.afterAll()
-    }
-  }
-
-  testIfEnabled("Basic reading from Kinesis") {
-    // Verify all data using multiple ranges in a single RDD partition
-    val receivedData1 = new KinesisBackedBlockRDD[Array[Byte]](sc, testUtils.regionName,
-      testUtils.endpointUrl, fakeBlockIds(1),
-      Array(SequenceNumberRanges(allRanges.toArray))
-    ).map { bytes => new String(bytes).toInt }.collect()
-    assert(receivedData1.toSet === testData.toSet)
-
-    // Verify all data using one range in each of the multiple RDD partitions
-    val receivedData2 = new KinesisBackedBlockRDD[Array[Byte]](sc, testUtils.regionName,
-      testUtils.endpointUrl, fakeBlockIds(allRanges.size),
-      allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray
-    ).map { bytes => new String(bytes).toInt }.collect()
-    assert(receivedData2.toSet === testData.toSet)
-
-    // Verify ordering within each partition
-    val receivedData3 = new KinesisBackedBlockRDD[Array[Byte]](sc, testUtils.regionName,
-      testUtils.endpointUrl, fakeBlockIds(allRanges.size),
-      allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray
-    ).map { bytes => new String(bytes).toInt }.collectPartitions()
-    assert(receivedData3.length === allRanges.size)
-    for (i <- 0 until allRanges.size) {
-      assert(receivedData3(i).toSeq === shardIdToData(allRanges(i).shardId))
-    }
-  }
-
-  testIfEnabled("Read data available in both block manager and Kinesis") {
-    testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 2)
-  }
-
-  testIfEnabled("Read data available only in block manager, not in Kinesis") {
-    testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 0)
-  }
-
-  testIfEnabled("Read data available only in Kinesis, not in block manager") {
-    testRDD(numPartitions = 2, numPartitionsInBM = 0, numPartitionsInKinesis = 2)
-  }
-
-  testIfEnabled("Read data available partially in block manager, rest in Kinesis") {
-    testRDD(numPartitions = 2, numPartitionsInBM = 1, numPartitionsInKinesis = 1)
-  }
-
-  testIfEnabled("Test isBlockValid skips block fetching from block manager") {
-    testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 0,
-      testIsBlockValid = true)
-  }
-
-  testIfEnabled("Test whether RDD is valid after removing blocks from block anager") {
-    testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 2,
-      testBlockRemove = true)
-  }
-
-  /**
-   * Test the WriteAheadLogBackedRDD, by writing some partitions of the data to block manager
-   * and the rest to a write ahead log, and then reading reading it all back using the RDD.
-   * It can also test if the partitions that were read from the log were again stored in
-   * block manager.
-   *
-   *
-   *
-   * @param numPartitions Number of partitions in RDD
-   * @param numPartitionsInBM Number of partitions to write to the BlockManager.
-   *                          Partitions 0 to (numPartitionsInBM-1) will be written to BlockManager
-   * @param numPartitionsInKinesis Number of partitions to write to the Kinesis.
-   *                           Partitions (numPartitions - 1 - numPartitionsInKinesis) to
-   *                           (numPartitions - 1) will be written to Kinesis
-   * @param testIsBlockValid Test whether setting isBlockValid to false skips block fetching
-   * @param testBlockRemove Test whether calling rdd.removeBlock() makes the RDD still usable with
-   *                        reads falling back to the WAL
-   * Example with numPartitions = 5, numPartitionsInBM = 3, and numPartitionsInWAL = 4
-   *
-   *   numPartitionsInBM = 3
-   *   |------------------|
-   *   |                  |
-   *    0       1       2       3       4
-   *           |                         |
-   *           |-------------------------|
-   *              numPartitionsInKinesis = 4
-   */
-  private def testRDD(
-      numPartitions: Int,
-      numPartitionsInBM: Int,
-      numPartitionsInKinesis: Int,
-      testIsBlockValid: Boolean = false,
-      testBlockRemove: Boolean = false
-    ): Unit = {
-    require(shardIds.size > 1, "Need at least 2 shards to test")
-    require(numPartitionsInBM <= shardIds.size,
-      "Number of partitions in BlockManager cannot be more than the Kinesis test shards available")
-    require(numPartitionsInKinesis <= shardIds.size,
-      "Number of partitions in Kinesis cannot be more than the Kinesis test shards available")
-    require(numPartitionsInBM <= numPartitions,
-      "Number of partitions in BlockManager cannot be more than that in RDD")
-    require(numPartitionsInKinesis <= numPartitions,
-      "Number of partitions in Kinesis cannot be more than that in RDD")
-
-    // Put necessary blocks in the block manager
-    val blockIds = fakeBlockIds(numPartitions)
-    blockIds.foreach(blockManager.removeBlock(_))
-    (0 until numPartitionsInBM).foreach { i =>
-      val blockData = shardIdToData(shardIds(i)).iterator.map { _.toString.getBytes() }
-      blockManager.putIterator(blockIds(i), blockData, StorageLevel.MEMORY_ONLY)
-    }
-
-    // Create the necessary ranges to use in the RDD
-    val fakeRanges = Array.fill(numPartitions - numPartitionsInKinesis)(
-      SequenceNumberRanges(SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy")))
-    val realRanges = Array.tabulate(numPartitionsInKinesis) { i =>
-      val range = shardIdToRange(shardIds(i + (numPartitions - numPartitionsInKinesis)))
-      SequenceNumberRanges(Array(range))
-    }
-    val ranges = (fakeRanges ++ realRanges)
-
-
-    // Make sure that the left `numPartitionsInBM` blocks are in block manager, and others are not
-    require(
-      blockIds.take(numPartitionsInBM).forall(blockManager.get(_).nonEmpty),
-      "Expected blocks not in BlockManager"
-    )
-
-    require(
-      blockIds.drop(numPartitionsInBM).forall(blockManager.get(_).isEmpty),
-      "Unexpected blocks in BlockManager"
-    )
-
-    // Make sure that the right sequence `numPartitionsInKinesis` are configured, and others are not
-    require(
-      ranges.takeRight(numPartitionsInKinesis).forall {
-        _.ranges.forall { _.streamName == testUtils.streamName }
-      }, "Incorrect configuration of RDD, expected ranges not set: "
-    )
-
-    require(
-      ranges.dropRight(numPartitionsInKinesis).forall {
-        _.ranges.forall { _.streamName != testUtils.streamName }
-      }, "Incorrect configuration of RDD, unexpected ranges set"
-    )
-
-    val rdd = new KinesisBackedBlockRDD[Array[Byte]](
-      sc, testUtils.regionName, testUtils.endpointUrl, blockIds, ranges)
-    val collectedData = rdd.map { bytes =>
-      new String(bytes).toInt
-    }.collect()
-    assert(collectedData.toSet === testData.toSet)
-
-    // Verify that the block fetching is skipped when isBlockValid is set to false.
-    // This is done by using a RDD whose data is only in memory but is set to skip block fetching
-    // Using that RDD will throw exception, as it skips block fetching even if the blocks are in
-    // in BlockManager.
-    if (testIsBlockValid) {
-      require(numPartitionsInBM === numPartitions, "All partitions must be in BlockManager")
-      require(numPartitionsInKinesis === 0, "No partitions must be in Kinesis")
-      val rdd2 = new KinesisBackedBlockRDD[Array[Byte]](
-        sc, testUtils.regionName, testUtils.endpointUrl, blockIds.toArray, ranges,
-        isBlockIdValid = Array.fill(blockIds.length)(false))
-      intercept[SparkException] {
-        rdd2.collect()
-      }
-    }
-
-    // Verify that the RDD is not invalid after the blocks are removed and can still read data
-    // from write ahead log
-    if (testBlockRemove) {
-      require(numPartitions === numPartitionsInKinesis,
-        "All partitions must be in WAL for this test")
-      require(numPartitionsInBM > 0, "Some partitions must be in BlockManager for this test")
-      rdd.removeBlocks()
-      assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSet === testData.toSet)
-    }
-  }
-
-  /** Generate fake block ids */
-  private def fakeBlockIds(num: Int): Array[BlockId] = {
-    Array.tabulate(num) { i => new StreamBlockId(0, i) }
-  }
-}
-
-class WithAggregationKinesisBackedBlockRDDSuite
-  extends KinesisBackedBlockRDDTests(aggregateTestData = true)
-
-class WithoutAggregationKinesisBackedBlockRDDSuite
-  extends KinesisBackedBlockRDDTests(aggregateTestData = false)

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
deleted file mode 100644
index e1499a8..0000000
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kinesis
-
-import java.util.concurrent.{ExecutorService, TimeoutException}
-
-import scala.concurrent.{Await, ExecutionContext, Future}
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
-import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
-import org.scalatest.concurrent.Eventually
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.mock.MockitoSugar
-
-import org.apache.spark.streaming.{Duration, TestSuiteBase}
-import org.apache.spark.util.ManualClock
-
-class KinesisCheckpointerSuite extends TestSuiteBase
-  with MockitoSugar
-  with BeforeAndAfterEach
-  with PrivateMethodTester
-  with Eventually {
-
-  private val workerId = "dummyWorkerId"
-  private val shardId = "dummyShardId"
-  private val seqNum = "123"
-  private val otherSeqNum = "245"
-  private val checkpointInterval = Duration(10)
-  private val someSeqNum = Some(seqNum)
-  private val someOtherSeqNum = Some(otherSeqNum)
-
-  private var receiverMock: KinesisReceiver[Array[Byte]] = _
-  private var checkpointerMock: IRecordProcessorCheckpointer = _
-  private var kinesisCheckpointer: KinesisCheckpointer = _
-  private var clock: ManualClock = _
-
-  private val checkpoint = PrivateMethod[Unit]('checkpoint)
-
-  override def beforeEach(): Unit = {
-    receiverMock = mock[KinesisReceiver[Array[Byte]]]
-    checkpointerMock = mock[IRecordProcessorCheckpointer]
-    clock = new ManualClock()
-    kinesisCheckpointer = new KinesisCheckpointer(receiverMock, checkpointInterval, workerId, clock)
-  }
-
-  test("checkpoint is not called twice for the same sequence number") {
-    when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
-    kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock))
-    kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock))
-
-    verify(checkpointerMock, times(1)).checkpoint(anyString())
-  }
-
-  test("checkpoint is called after sequence number increases") {
-    when(receiverMock.getLatestSeqNumToCheckpoint(shardId))
-      .thenReturn(someSeqNum).thenReturn(someOtherSeqNum)
-    kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock))
-    kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock))
-
-    verify(checkpointerMock, times(1)).checkpoint(seqNum)
-    verify(checkpointerMock, times(1)).checkpoint(otherSeqNum)
-  }
-
-  test("should checkpoint if we have exceeded the checkpoint interval") {
-    when(receiverMock.getLatestSeqNumToCheckpoint(shardId))
-      .thenReturn(someSeqNum).thenReturn(someOtherSeqNum)
-
-    kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
-    clock.advance(5 * checkpointInterval.milliseconds)
-
-    eventually(timeout(1 second)) {
-      verify(checkpointerMock, times(1)).checkpoint(seqNum)
-      verify(checkpointerMock, times(1)).checkpoint(otherSeqNum)
-    }
-  }
-
-  test("shouldn't checkpoint if we have not exceeded the checkpoint interval") {
-    when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
-
-    kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
-    clock.advance(checkpointInterval.milliseconds / 2)
-
-    verify(checkpointerMock, never()).checkpoint(anyString())
-  }
-
-  test("should not checkpoint for the same sequence number") {
-    when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
-
-    kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
-
-    clock.advance(checkpointInterval.milliseconds * 5)
-    eventually(timeout(1 second)) {
-      verify(checkpointerMock, atMost(1)).checkpoint(anyString())
-    }
-  }
-
-  test("removing checkpointer checkpoints one last time") {
-    when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
-
-    kinesisCheckpointer.removeCheckpointer(shardId, checkpointerMock)
-    verify(checkpointerMock, times(1)).checkpoint(anyString())
-  }
-
-  test("if checkpointing is going on, wait until finished before removing and checkpointing") {
-    when(receiverMock.getLatestSeqNumToCheckpoint(shardId))
-      .thenReturn(someSeqNum).thenReturn(someOtherSeqNum)
-    when(checkpointerMock.checkpoint(anyString)).thenAnswer(new Answer[Unit] {
-      override def answer(invocations: InvocationOnMock): Unit = {
-        clock.waitTillTime(clock.getTimeMillis() + checkpointInterval.milliseconds / 2)
-      }
-    })
-
-    kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
-    clock.advance(checkpointInterval.milliseconds)
-    eventually(timeout(1 second)) {
-      verify(checkpointerMock, times(1)).checkpoint(anyString())
-    }
-    // don't block test thread
-    val f = Future(kinesisCheckpointer.removeCheckpointer(shardId, checkpointerMock))(
-      ExecutionContext.global)
-
-    intercept[TimeoutException] {
-      Await.ready(f, 50 millis)
-    }
-
-    clock.advance(checkpointInterval.milliseconds / 2)
-    eventually(timeout(1 second)) {
-      verify(checkpointerMock, times(2)).checkpoint(anyString())
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
deleted file mode 100644
index ee428f3..0000000
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kinesis
-
-import org.apache.spark.SparkFunSuite
-
-/**
- * Helper class that runs Kinesis real data transfer tests or
- * ignores them based on env variable is set or not.
- */
-trait KinesisFunSuite extends SparkFunSuite  {
-  import KinesisTestUtils._
-
-  /** Run the test if environment variable is set or ignore the test */
-  def testIfEnabled(testName: String)(testBody: => Unit) {
-    if (shouldRunTests) {
-      test(testName)(testBody)
-    } else {
-      ignore(s"$testName [enable by setting env var $envVarNameForEnablingTests=1]")(testBody)
-    }
-  }
-
-  /** Run the give body of code only if Kinesis tests are enabled */
-  def runIfTestsEnabled(message: String)(body: => Unit): Unit = {
-    if (shouldRunTests) {
-      body
-    } else {
-      ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
deleted file mode 100644
index fd15b6c..0000000
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.kinesis
-
-import java.nio.ByteBuffer
-import java.nio.charset.StandardCharsets
-import java.util.Arrays
-
-import com.amazonaws.services.kinesis.clientlibrary.exceptions._
-import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
-import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
-import com.amazonaws.services.kinesis.model.Record
-import org.mockito.Matchers._
-import org.mockito.Matchers.{eq => meq}
-import org.mockito.Mockito._
-import org.scalatest.{BeforeAndAfter, Matchers}
-import org.scalatest.mock.MockitoSugar
-
-import org.apache.spark.streaming.{Duration, TestSuiteBase}
-import org.apache.spark.util.Utils
-
-/**
- * Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor
- */
-class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAfter
-    with MockitoSugar {
-
-  val app = "TestKinesisReceiver"
-  val stream = "mySparkStream"
-  val endpoint = "endpoint-url"
-  val workerId = "dummyWorkerId"
-  val shardId = "dummyShardId"
-  val seqNum = "dummySeqNum"
-  val checkpointInterval = Duration(10)
-  val someSeqNum = Some(seqNum)
-
-  val record1 = new Record()
-  record1.setData(ByteBuffer.wrap("Spark In Action".getBytes(StandardCharsets.UTF_8)))
-  val record2 = new Record()
-  record2.setData(ByteBuffer.wrap("Learning Spark".getBytes(StandardCharsets.UTF_8)))
-  val batch = Arrays.asList(record1, record2)
-
-  var receiverMock: KinesisReceiver[Array[Byte]] = _
-  var checkpointerMock: IRecordProcessorCheckpointer = _
-
-  override def beforeFunction(): Unit = {
-    receiverMock = mock[KinesisReceiver[Array[Byte]]]
-    checkpointerMock = mock[IRecordProcessorCheckpointer]
-  }
-
-  test("check serializability of SerializableAWSCredentials") {
-    Utils.deserialize[SerializableAWSCredentials](
-      Utils.serialize(new SerializableAWSCredentials("x", "y")))
-  }
-
-  test("process records including store and set checkpointer") {
-    when(receiverMock.isStopped()).thenReturn(false)
-
-    val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
-    recordProcessor.initialize(shardId)
-    recordProcessor.processRecords(batch, checkpointerMock)
-
-    verify(receiverMock, times(1)).isStopped()
-    verify(receiverMock, times(1)).addRecords(shardId, batch)
-    verify(receiverMock, times(1)).setCheckpointer(shardId, checkpointerMock)
-  }
-
-  test("shouldn't store and update checkpointer when receiver is stopped") {
-    when(receiverMock.isStopped()).thenReturn(true)
-
-    val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
-    recordProcessor.processRecords(batch, checkpointerMock)
-
-    verify(receiverMock, times(1)).isStopped()
-    verify(receiverMock, never).addRecords(anyString, anyListOf(classOf[Record]))
-    verify(receiverMock, never).setCheckpointer(anyString, meq(checkpointerMock))
-  }
-
-  test("shouldn't update checkpointer when exception occurs during store") {
-    when(receiverMock.isStopped()).thenReturn(false)
-    when(
-      receiverMock.addRecords(anyString, anyListOf(classOf[Record]))
-    ).thenThrow(new RuntimeException())
-
-    intercept[RuntimeException] {
-      val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
-      recordProcessor.initialize(shardId)
-      recordProcessor.processRecords(batch, checkpointerMock)
-    }
-
-    verify(receiverMock, times(1)).isStopped()
-    verify(receiverMock, times(1)).addRecords(shardId, batch)
-    verify(receiverMock, never).setCheckpointer(anyString, meq(checkpointerMock))
-  }
-
-  test("shutdown should checkpoint if the reason is TERMINATE") {
-    when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
-
-    val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
-    recordProcessor.initialize(shardId)
-    recordProcessor.shutdown(checkpointerMock, ShutdownReason.TERMINATE)
-
-    verify(receiverMock, times(1)).removeCheckpointer(meq(shardId), meq(checkpointerMock))
-  }
-
-
-  test("shutdown should not checkpoint if the reason is something other than TERMINATE") {
-    when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
-
-    val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
-    recordProcessor.initialize(shardId)
-    recordProcessor.shutdown(checkpointerMock, ShutdownReason.ZOMBIE)
-    recordProcessor.shutdown(checkpointerMock, null)
-
-    verify(receiverMock, times(2)).removeCheckpointer(meq(shardId),
-      meq[IRecordProcessorCheckpointer](null))
-  }
-
-  test("retry success on first attempt") {
-    val expectedIsStopped = false
-    when(receiverMock.isStopped()).thenReturn(expectedIsStopped)
-
-    val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
-    assert(actualVal == expectedIsStopped)
-
-    verify(receiverMock, times(1)).isStopped()
-  }
-
-  test("retry success on second attempt after a Kinesis throttling exception") {
-    val expectedIsStopped = false
-    when(receiverMock.isStopped())
-        .thenThrow(new ThrottlingException("error message"))
-        .thenReturn(expectedIsStopped)
-
-    val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
-    assert(actualVal == expectedIsStopped)
-
-    verify(receiverMock, times(2)).isStopped()
-  }
-
-  test("retry success on second attempt after a Kinesis dependency exception") {
-    val expectedIsStopped = false
-    when(receiverMock.isStopped())
-        .thenThrow(new KinesisClientLibDependencyException("error message"))
-        .thenReturn(expectedIsStopped)
-
-    val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
-    assert(actualVal == expectedIsStopped)
-
-    verify(receiverMock, times(2)).isStopped()
-  }
-
-  test("retry failed after a shutdown exception") {
-    when(checkpointerMock.checkpoint()).thenThrow(new ShutdownException("error message"))
-
-    intercept[ShutdownException] {
-      KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
-    }
-
-    verify(checkpointerMock, times(1)).checkpoint()
-  }
-
-  test("retry failed after an invalid state exception") {
-    when(checkpointerMock.checkpoint()).thenThrow(new InvalidStateException("error message"))
-
-    intercept[InvalidStateException] {
-      KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
-    }
-
-    verify(checkpointerMock, times(1)).checkpoint()
-  }
-
-  test("retry failed after unexpected exception") {
-    when(checkpointerMock.checkpoint()).thenThrow(new RuntimeException("error message"))
-
-    intercept[RuntimeException] {
-      KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
-    }
-
-    verify(checkpointerMock, times(1)).checkpoint()
-  }
-
-  test("retry failed after exhausing all retries") {
-    val expectedErrorMessage = "final try error message"
-    when(checkpointerMock.checkpoint())
-        .thenThrow(new ThrottlingException("error message"))
-        .thenThrow(new ThrottlingException(expectedErrorMessage))
-
-    val exception = intercept[RuntimeException] {
-      KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
-    }
-    exception.getMessage().shouldBe(expectedErrorMessage)
-
-    verify(checkpointerMock, times(2)).checkpoint()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
deleted file mode 100644
index ca5d13d..0000000
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kinesis
-
-import scala.collection.mutable
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.util.Random
-
-import com.amazonaws.regions.RegionUtils
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
-import com.amazonaws.services.kinesis.model.Record
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
-import org.scalatest.Matchers._
-import org.scalatest.concurrent.Eventually
-
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.{StorageLevel, StreamBlockId}
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-import org.apache.spark.streaming.kinesis.KinesisTestUtils._
-import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
-import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
-import org.apache.spark.util.Utils
-
-abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFunSuite
-  with Eventually with BeforeAndAfter with BeforeAndAfterAll {
-
-  // This is the name that KCL will use to save metadata to DynamoDB
-  private val appName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}"
-  private val batchDuration = Seconds(1)
-
-  // Dummy parameters for API testing
-  private val dummyEndpointUrl = defaultEndpointUrl
-  private val dummyRegionName = RegionUtils.getRegionByEndpoint(dummyEndpointUrl).getName()
-  private val dummyAWSAccessKey = "dummyAccessKey"
-  private val dummyAWSSecretKey = "dummySecretKey"
-
-  private var testUtils: KinesisTestUtils = null
-  private var ssc: StreamingContext = null
-  private var sc: SparkContext = null
-
-  override def beforeAll(): Unit = {
-    val conf = new SparkConf()
-      .setMaster("local[4]")
-      .setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis app name
-    sc = new SparkContext(conf)
-
-    runIfTestsEnabled("Prepare KinesisTestUtils") {
-      testUtils = new KPLBasedKinesisTestUtils()
-      testUtils.createStream()
-    }
-  }
-
-  override def afterAll(): Unit = {
-    if (ssc != null) {
-      ssc.stop()
-    }
-    if (sc != null) {
-      sc.stop()
-    }
-    if (testUtils != null) {
-      // Delete the Kinesis stream as well as the DynamoDB table generated by
-      // Kinesis Client Library when consuming the stream
-      testUtils.deleteStream()
-      testUtils.deleteDynamoDBTable(appName)
-    }
-  }
-
-  before {
-    ssc = new StreamingContext(sc, batchDuration)
-  }
-
-  after {
-    if (ssc != null) {
-      ssc.stop(stopSparkContext = false)
-      ssc = null
-    }
-    if (testUtils != null) {
-      testUtils.deleteDynamoDBTable(appName)
-    }
-  }
-
-  test("KinesisUtils API") {
-    // Tests the API, does not actually test data receiving
-    val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream",
-      dummyEndpointUrl, Seconds(2),
-      InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
-    val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
-      dummyEndpointUrl, dummyRegionName,
-      InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2)
-    val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
-      dummyEndpointUrl, dummyRegionName,
-      InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2,
-      dummyAWSAccessKey, dummyAWSSecretKey)
-  }
-
-  test("RDD generation") {
-    val inputStream = KinesisUtils.createStream(ssc, appName, "dummyStream",
-      dummyEndpointUrl, dummyRegionName, InitialPositionInStream.LATEST, Seconds(2),
-      StorageLevel.MEMORY_AND_DISK_2, dummyAWSAccessKey, dummyAWSSecretKey)
-    assert(inputStream.isInstanceOf[KinesisInputDStream[Array[Byte]]])
-
-    val kinesisStream = inputStream.asInstanceOf[KinesisInputDStream[Array[Byte]]]
-    val time = Time(1000)
-
-    // Generate block info data for testing
-    val seqNumRanges1 = SequenceNumberRanges(
-      SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy"))
-    val blockId1 = StreamBlockId(kinesisStream.id, 123)
-    val blockInfo1 = ReceivedBlockInfo(
-      0, None, Some(seqNumRanges1), new BlockManagerBasedStoreResult(blockId1, None))
-
-    val seqNumRanges2 = SequenceNumberRanges(
-      SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb"))
-    val blockId2 = StreamBlockId(kinesisStream.id, 345)
-    val blockInfo2 = ReceivedBlockInfo(
-      0, None, Some(seqNumRanges2), new BlockManagerBasedStoreResult(blockId2, None))
-
-    // Verify that the generated KinesisBackedBlockRDD has the all the right information
-    val blockInfos = Seq(blockInfo1, blockInfo2)
-    val nonEmptyRDD = kinesisStream.createBlockRDD(time, blockInfos)
-    nonEmptyRDD shouldBe a [KinesisBackedBlockRDD[_]]
-    val kinesisRDD = nonEmptyRDD.asInstanceOf[KinesisBackedBlockRDD[_]]
-    assert(kinesisRDD.regionName === dummyRegionName)
-    assert(kinesisRDD.endpointUrl === dummyEndpointUrl)
-    assert(kinesisRDD.retryTimeoutMs === batchDuration.milliseconds)
-    assert(kinesisRDD.awsCredentialsOption ===
-      Some(SerializableAWSCredentials(dummyAWSAccessKey, dummyAWSSecretKey)))
-    assert(nonEmptyRDD.partitions.size === blockInfos.size)
-    nonEmptyRDD.partitions.foreach { _ shouldBe a [KinesisBackedBlockRDDPartition] }
-    val partitions = nonEmptyRDD.partitions.map {
-      _.asInstanceOf[KinesisBackedBlockRDDPartition] }.toSeq
-    assert(partitions.map { _.seqNumberRanges } === Seq(seqNumRanges1, seqNumRanges2))
-    assert(partitions.map { _.blockId } === Seq(blockId1, blockId2))
-    assert(partitions.forall { _.isBlockIdValid === true })
-
-    // Verify that KinesisBackedBlockRDD is generated even when there are no blocks
-    val emptyRDD = kinesisStream.createBlockRDD(time, Seq.empty)
-    emptyRDD shouldBe a [KinesisBackedBlockRDD[Array[Byte]]]
-    emptyRDD.partitions shouldBe empty
-
-    // Verify that the KinesisBackedBlockRDD has isBlockValid = false when blocks are invalid
-    blockInfos.foreach { _.setBlockIdInvalid() }
-    kinesisStream.createBlockRDD(time, blockInfos).partitions.foreach { partition =>
-      assert(partition.asInstanceOf[KinesisBackedBlockRDDPartition].isBlockIdValid === false)
-    }
-  }
-
-
-  /**
-   * Test the stream by sending data to a Kinesis stream and receiving from it.
-   * This test is not run by default as it requires AWS credentials that the test
-   * environment may not have. Even if there is AWS credentials available, the user
-   * may not want to run these tests to avoid the Kinesis costs. To enable this test,
-   * you must have AWS credentials available through the default AWS provider chain,
-   * and you have to set the system environment variable RUN_KINESIS_TESTS=1 .
-   */
-  testIfEnabled("basic operation") {
-    val awsCredentials = KinesisTestUtils.getAWSCredentials()
-    val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
-      testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
-      Seconds(10), StorageLevel.MEMORY_ONLY,
-      awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
-
-    val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
-    stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
-      collected ++= rdd.collect()
-      logInfo("Collected = " + collected.mkString(", "))
-    }
-    ssc.start()
-
-    val testData = 1 to 10
-    eventually(timeout(120 seconds), interval(10 second)) {
-      testUtils.pushData(testData, aggregateTestData)
-      assert(collected === testData.toSet, "\nData received does not match data sent")
-    }
-    ssc.stop(stopSparkContext = false)
-  }
-
-  testIfEnabled("custom message handling") {
-    val awsCredentials = KinesisTestUtils.getAWSCredentials()
-    def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
-    val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
-      testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
-      Seconds(10), StorageLevel.MEMORY_ONLY, addFive,
-      awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
-
-    stream shouldBe a [ReceiverInputDStream[_]]
-
-    val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
-    stream.foreachRDD { rdd =>
-      collected ++= rdd.collect()
-      logInfo("Collected = " + collected.mkString(", "))
-    }
-    ssc.start()
-
-    val testData = 1 to 10
-    eventually(timeout(120 seconds), interval(10 second)) {
-      testUtils.pushData(testData, aggregateTestData)
-      val modData = testData.map(_ + 5)
-      assert(collected === modData.toSet, "\nData received does not match data sent")
-    }
-    ssc.stop(stopSparkContext = false)
-  }
-
-  testIfEnabled("failure recovery") {
-    val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
-    val checkpointDir = Utils.createTempDir().getAbsolutePath
-
-    ssc = new StreamingContext(sc, Milliseconds(1000))
-    ssc.checkpoint(checkpointDir)
-
-    val awsCredentials = KinesisTestUtils.getAWSCredentials()
-    val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])]
-
-    val kinesisStream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
-      testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
-      Seconds(10), StorageLevel.MEMORY_ONLY,
-      awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
-
-    // Verify that the generated RDDs are KinesisBackedBlockRDDs, and collect the data in each batch
-    kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => {
-      val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
-      val data = rdd.map { bytes => new String(bytes).toInt }.collect().toSeq
-      collectedData.synchronized {
-        collectedData(time) = (kRdd.arrayOfseqNumberRanges, data)
-      }
-    })
-
-    ssc.remember(Minutes(60)) // remember all the batches so that they are all saved in checkpoint
-    ssc.start()
-
-    def numBatchesWithData: Int =
-      collectedData.synchronized { collectedData.count(_._2._2.nonEmpty) }
-
-    def isCheckpointPresent: Boolean = Checkpoint.getCheckpointFiles(checkpointDir).nonEmpty
-
-    // Run until there are at least 10 batches with some data in them
-    // If this times out because numBatchesWithData is empty, then its likely that foreachRDD
-    // function failed with exceptions, and nothing got added to `collectedData`
-    eventually(timeout(2 minutes), interval(1 seconds)) {
-      testUtils.pushData(1 to 5, aggregateTestData)
-      assert(isCheckpointPresent && numBatchesWithData > 10)
-    }
-    ssc.stop(stopSparkContext = true)  // stop the SparkContext so that the blocks are not reused
-
-    // Restart the context from checkpoint and verify whether the
-    logInfo("Restarting from checkpoint")
-    ssc = new StreamingContext(checkpointDir)
-    ssc.start()
-    val recoveredKinesisStream = ssc.graph.getInputStreams().head
-
-    // Verify that the recomputed RDDs are KinesisBackedBlockRDDs with the same sequence ranges
-    // and return the same data
-    collectedData.synchronized {
-      val times = collectedData.keySet
-      times.foreach { time =>
-        val (arrayOfSeqNumRanges, data) = collectedData(time)
-        val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]]
-        rdd shouldBe a[KinesisBackedBlockRDD[_]]
-
-        // Verify the recovered sequence ranges
-        val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
-        assert(kRdd.arrayOfseqNumberRanges.size === arrayOfSeqNumRanges.size)
-        arrayOfSeqNumRanges.zip(kRdd.arrayOfseqNumberRanges).foreach { case (expected, found) =>
-          assert(expected.ranges.toSeq === found.ranges.toSeq)
-        }
-
-        // Verify the recovered data
-        assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSeq === data)
-      }
-    }
-    ssc.stop()
-  }
-}
-
-class WithAggregationKinesisStreamSuite extends KinesisStreamTests(aggregateTestData = true)
-
-class WithoutAggregationKinesisStreamSuite extends KinesisStreamTests(aggregateTestData = false)

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/spark-ganglia-lgpl/pom.xml
----------------------------------------------------------------------
diff --git a/extras/spark-ganglia-lgpl/pom.xml b/extras/spark-ganglia-lgpl/pom.xml
deleted file mode 100644
index bfb9279..0000000
--- a/extras/spark-ganglia-lgpl/pom.xml
+++ /dev/null
@@ -1,49 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-~ Licensed to the Apache Software Foundation (ASF) under one or more
-~ contributor license agreements.  See the NOTICE file distributed with
-~ this work for additional information regarding copyright ownership.
-~ The ASF licenses this file to You under the Apache License, Version 2.0
-~ (the "License"); you may not use this file except in compliance with
-~ the License.  You may obtain a copy of the License at
-~
-~    http://www.apache.org/licenses/LICENSE-2.0
-~
-~ Unless required by applicable law or agreed to in writing, software
-~ distributed under the License is distributed on an "AS IS" BASIS,
-~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-~ See the License for the specific language governing permissions and
-~ limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.spark</groupId>
-    <artifactId>spark-parent_2.11</artifactId>
-    <version>2.0.0-SNAPSHOT</version>
-    <relativePath>../../pom.xml</relativePath>
-  </parent>
-
-  <!-- Ganglia integration is not included by default due to LGPL-licensed code -->
-  <groupId>org.apache.spark</groupId>
-  <artifactId>spark-ganglia-lgpl_2.11</artifactId>
-  <packaging>jar</packaging>
-  <name>Spark Ganglia Integration</name>
-
-  <properties>
-    <sbt.project.name>ganglia-lgpl</sbt.project.name>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>io.dropwizard.metrics</groupId>
-      <artifactId>metrics-ganglia</artifactId>
-    </dependency>
-  </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
----------------------------------------------------------------------
diff --git a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
deleted file mode 100644
index 3b1880e..0000000
--- a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.metrics.sink
-
-import java.util.Properties
-import java.util.concurrent.TimeUnit
-
-import com.codahale.metrics.MetricRegistry
-import com.codahale.metrics.ganglia.GangliaReporter
-import info.ganglia.gmetric4j.gmetric.GMetric
-import info.ganglia.gmetric4j.gmetric.GMetric.UDPAddressingMode
-
-import org.apache.spark.SecurityManager
-import org.apache.spark.metrics.MetricsSystem
-
-class GangliaSink(val property: Properties, val registry: MetricRegistry,
-    securityMgr: SecurityManager) extends Sink {
-  val GANGLIA_KEY_PERIOD = "period"
-  val GANGLIA_DEFAULT_PERIOD = 10
-
-  val GANGLIA_KEY_UNIT = "unit"
-  val GANGLIA_DEFAULT_UNIT: TimeUnit = TimeUnit.SECONDS
-
-  val GANGLIA_KEY_MODE = "mode"
-  val GANGLIA_DEFAULT_MODE: UDPAddressingMode = GMetric.UDPAddressingMode.MULTICAST
-
-  // TTL for multicast messages. If listeners are X hops away in network, must be at least X.
-  val GANGLIA_KEY_TTL = "ttl"
-  val GANGLIA_DEFAULT_TTL = 1
-
-  val GANGLIA_KEY_HOST = "host"
-  val GANGLIA_KEY_PORT = "port"
-
-  def propertyToOption(prop: String): Option[String] = Option(property.getProperty(prop))
-
-  if (!propertyToOption(GANGLIA_KEY_HOST).isDefined) {
-    throw new Exception("Ganglia sink requires 'host' property.")
-  }
-
-  if (!propertyToOption(GANGLIA_KEY_PORT).isDefined) {
-    throw new Exception("Ganglia sink requires 'port' property.")
-  }
-
-  val host = propertyToOption(GANGLIA_KEY_HOST).get
-  val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt
-  val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL)
-  val mode: UDPAddressingMode = propertyToOption(GANGLIA_KEY_MODE)
-    .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE)
-  val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt)
-    .getOrElse(GANGLIA_DEFAULT_PERIOD)
-  val pollUnit: TimeUnit = propertyToOption(GANGLIA_KEY_UNIT)
-    .map(u => TimeUnit.valueOf(u.toUpperCase))
-    .getOrElse(GANGLIA_DEFAULT_UNIT)
-
-  MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
-
-  val ganglia = new GMetric(host, port, mode, ttl)
-  val reporter: GangliaReporter = GangliaReporter.forRegistry(registry)
-      .convertDurationsTo(TimeUnit.MILLISECONDS)
-      .convertRatesTo(TimeUnit.SECONDS)
-      .build(ganglia)
-
-  override def start() {
-    reporter.start(pollPeriod, pollUnit)
-  }
-
-  override def stop() {
-    reporter.stop()
-  }
-
-  override def report() {
-    reporter.report()
-  }
-}
-


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org