You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/03/09 19:27:50 UTC

[3/8] spark git commit: [SPARK-13595][BUILD] Move docker, extras modules into external

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
deleted file mode 100644
index 6a73bc0..0000000
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-
-import java.nio.ByteBuffer
-
-import scala.util.Random
-
-import com.amazonaws.auth.{BasicAWSCredentials, DefaultAWSCredentialsProviderChain}
-import com.amazonaws.regions.RegionUtils
-import com.amazonaws.services.kinesis.AmazonKinesisClient
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
-import com.amazonaws.services.kinesis.model.PutRecordRequest
-import org.apache.log4j.{Level, Logger}
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Milliseconds, StreamingContext}
-import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
-import org.apache.spark.streaming.kinesis.KinesisUtils
-
-
-/**
- * Consumes messages from a Amazon Kinesis streams and does wordcount.
- *
- * This example spins up 1 Kinesis Receiver per shard for the given stream.
- * It then starts pulling from the last checkpointed sequence number of the given stream.
- *
- * Usage: KinesisWordCountASL <app-name> <stream-name> <endpoint-url> <region-name>
- *   <app-name> is the name of the consumer app, used to track the read data in DynamoDB
- *   <stream-name> name of the Kinesis stream (ie. mySparkStream)
- *   <endpoint-url> endpoint of the Kinesis service
- *     (e.g. https://kinesis.us-east-1.amazonaws.com)
- *
- *
- * Example:
- *      # export AWS keys if necessary
- *      $ export AWS_ACCESS_KEY_ID=<your-access-key>
- *      $ export AWS_SECRET_KEY=<your-secret-key>
- *
- *      # run the example
- *      $ SPARK_HOME/bin/run-example  streaming.KinesisWordCountASL myAppName  mySparkStream \
- *              https://kinesis.us-east-1.amazonaws.com
- *
- * There is a companion helper class called KinesisWordProducerASL which puts dummy data
- * onto the Kinesis stream.
- *
- * This code uses the DefaultAWSCredentialsProviderChain to find credentials
- * in the following order:
- *    Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
- *    Java System Properties - aws.accessKeyId and aws.secretKey
- *    Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
- *    Instance profile credentials - delivered through the Amazon EC2 metadata service
- * For more information, see
- * http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html
- *
- * See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on
- * the Kinesis Spark Streaming integration.
- */
-object KinesisWordCountASL extends Logging {
-  def main(args: Array[String]) {
-    // Check that all required args were passed in.
-    if (args.length != 3) {
-      System.err.println(
-        """
-          |Usage: KinesisWordCountASL <app-name> <stream-name> <endpoint-url> <region-name>
-          |
-          |    <app-name> is the name of the consumer app, used to track the read data in DynamoDB
-          |    <stream-name> is the name of the Kinesis stream
-          |    <endpoint-url> is the endpoint of the Kinesis service
-          |                   (e.g. https://kinesis.us-east-1.amazonaws.com)
-          |
-          |Generate input data for Kinesis stream using the example KinesisWordProducerASL.
-          |See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more
-          |details.
-        """.stripMargin)
-      System.exit(1)
-    }
-
-    StreamingExamples.setStreamingLogLevels()
-
-    // Populate the appropriate variables from the given args
-    val Array(appName, streamName, endpointUrl) = args
-
-
-    // Determine the number of shards from the stream using the low-level Kinesis Client
-    // from the AWS Java SDK.
-    val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()
-    require(credentials != null,
-      "No AWS credentials found. Please specify credentials using one of the methods specified " +
-        "in http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html")
-    val kinesisClient = new AmazonKinesisClient(credentials)
-    kinesisClient.setEndpoint(endpointUrl)
-    val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size
-
-
-    // In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard.
-    // This is not a necessity; if there are less receivers/DStreams than the number of shards,
-    // then the shards will be automatically distributed among the receivers and each receiver
-    // will receive data from multiple shards.
-    val numStreams = numShards
-
-    // Spark Streaming batch interval
-    val batchInterval = Milliseconds(2000)
-
-    // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
-    // on sequence number of records that have been received. Same as batchInterval for this
-    // example.
-    val kinesisCheckpointInterval = batchInterval
-
-    // Get the region name from the endpoint URL to save Kinesis Client Library metadata in
-    // DynamoDB of the same region as the Kinesis stream
-    val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
-
-    // Setup the SparkConfig and StreamingContext
-    val sparkConfig = new SparkConf().setAppName("KinesisWordCountASL")
-    val ssc = new StreamingContext(sparkConfig, batchInterval)
-
-    // Create the Kinesis DStreams
-    val kinesisStreams = (0 until numStreams).map { i =>
-      KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName,
-        InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
-    }
-
-    // Union all the streams
-    val unionStreams = ssc.union(kinesisStreams)
-
-    // Convert each line of Array[Byte] to String, and split into words
-    val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
-
-    // Map each word to a (word, 1) tuple so we can reduce by key to count the words
-    val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
-
-    // Print the first 10 wordCounts
-    wordCounts.print()
-
-    // Start the streaming context and await termination
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}
-
-/**
- * Usage: KinesisWordProducerASL <stream-name> <endpoint-url> \
- *   <records-per-sec> <words-per-record>
- *
- *   <stream-name> is the name of the Kinesis stream (ie. mySparkStream)
- *   <endpoint-url> is the endpoint of the Kinesis service
- *     (ie. https://kinesis.us-east-1.amazonaws.com)
- *   <records-per-sec> is the rate of records per second to put onto the stream
- *   <words-per-record> is the rate of records per second to put onto the stream
- *
- * Example:
- *    $ SPARK_HOME/bin/run-example streaming.KinesisWordProducerASL mySparkStream \
- *         https://kinesis.us-east-1.amazonaws.com us-east-1 10 5
- */
-object KinesisWordProducerASL {
-  def main(args: Array[String]) {
-    if (args.length != 4) {
-      System.err.println(
-        """
-          |Usage: KinesisWordProducerASL <stream-name> <endpoint-url> <records-per-sec>
-                                         <words-per-record>
-          |
-          |    <stream-name> is the name of the Kinesis stream
-          |    <endpoint-url> is the endpoint of the Kinesis service
-          |                   (e.g. https://kinesis.us-east-1.amazonaws.com)
-          |    <records-per-sec> is the rate of records per second to put onto the stream
-          |    <words-per-record> is the rate of records per second to put onto the stream
-          |
-        """.stripMargin)
-
-      System.exit(1)
-    }
-
-    // Set default log4j logging level to WARN to hide Spark logs
-    StreamingExamples.setStreamingLogLevels()
-
-    // Populate the appropriate variables from the given args
-    val Array(stream, endpoint, recordsPerSecond, wordsPerRecord) = args
-
-    // Generate the records and return the totals
-    val totals = generate(stream, endpoint, recordsPerSecond.toInt,
-        wordsPerRecord.toInt)
-
-    // Print the array of (word, total) tuples
-    println("Totals for the words sent")
-    totals.foreach(println(_))
-  }
-
-  def generate(stream: String,
-      endpoint: String,
-      recordsPerSecond: Int,
-      wordsPerRecord: Int): Seq[(String, Int)] = {
-
-    val randomWords = List("spark", "you", "are", "my", "father")
-    val totals = scala.collection.mutable.Map[String, Int]()
-
-    // Create the low-level Kinesis Client from the AWS Java SDK.
-    val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
-    kinesisClient.setEndpoint(endpoint)
-
-    println(s"Putting records onto stream $stream and endpoint $endpoint at a rate of" +
-        s" $recordsPerSecond records per second and $wordsPerRecord words per record")
-
-    // Iterate and put records onto the stream per the given recordPerSec and wordsPerRecord
-    for (i <- 1 to 10) {
-      // Generate recordsPerSec records to put onto the stream
-      val records = (1 to recordsPerSecond.toInt).foreach { recordNum =>
-        // Randomly generate wordsPerRecord number of words
-        val data = (1 to wordsPerRecord.toInt).map(x => {
-          // Get a random index to a word
-          val randomWordIdx = Random.nextInt(randomWords.size)
-          val randomWord = randomWords(randomWordIdx)
-
-          // Increment total count to compare to server counts later
-          totals(randomWord) = totals.getOrElse(randomWord, 0) + 1
-
-          randomWord
-        }).mkString(" ")
-
-        // Create a partitionKey based on recordNum
-        val partitionKey = s"partitionKey-$recordNum"
-
-        // Create a PutRecordRequest with an Array[Byte] version of the data
-        val putRecordRequest = new PutRecordRequest().withStreamName(stream)
-            .withPartitionKey(partitionKey)
-            .withData(ByteBuffer.wrap(data.getBytes()))
-
-        // Put the record onto the stream and capture the PutRecordResult
-        val putRecordResult = kinesisClient.putRecord(putRecordRequest)
-      }
-
-      // Sleep for a second
-      Thread.sleep(1000)
-      println("Sent " + recordsPerSecond + " records")
-    }
-     // Convert the totals to (index, total) tuple
-    totals.toSeq.sortBy(_._1)
-  }
-}
-
-/**
- *  Utility functions for Spark Streaming examples.
- *  This has been lifted from the examples/ project to remove the circular dependency.
- */
-private[streaming] object StreamingExamples extends Logging {
-  // Set reasonable logging levels for streaming if the user has not configured log4j.
-  def setStreamingLogLevels() {
-    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
-    if (!log4jInitialized) {
-      // We first log something to initialize Spark's default logging, then we override the
-      // logging level.
-      logInfo("Setting log level to [WARN] for streaming example." +
-        " To override add a custom log4j.properties to the classpath.")
-      Logger.getRootLogger.setLevel(Level.WARN)
-    }
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
deleted file mode 100644
index 3996f16..0000000
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kinesis
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-import scala.util.control.NonFatal
-
-import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
-import com.amazonaws.services.kinesis.AmazonKinesisClient
-import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord
-import com.amazonaws.services.kinesis.model._
-
-import org.apache.spark._
-import org.apache.spark.rdd.{BlockRDD, BlockRDDPartition}
-import org.apache.spark.storage.BlockId
-import org.apache.spark.util.NextIterator
-
-
-/** Class representing a range of Kinesis sequence numbers. Both sequence numbers are inclusive. */
-private[kinesis]
-case class SequenceNumberRange(
-    streamName: String, shardId: String, fromSeqNumber: String, toSeqNumber: String)
-
-/** Class representing an array of Kinesis sequence number ranges */
-private[kinesis]
-case class SequenceNumberRanges(ranges: Seq[SequenceNumberRange]) {
-  def isEmpty(): Boolean = ranges.isEmpty
-
-  def nonEmpty(): Boolean = ranges.nonEmpty
-
-  override def toString(): String = ranges.mkString("SequenceNumberRanges(", ", ", ")")
-}
-
-private[kinesis]
-object SequenceNumberRanges {
-  def apply(range: SequenceNumberRange): SequenceNumberRanges = {
-    new SequenceNumberRanges(Seq(range))
-  }
-}
-
-
-/** Partition storing the information of the ranges of Kinesis sequence numbers to read */
-private[kinesis]
-class KinesisBackedBlockRDDPartition(
-    idx: Int,
-    blockId: BlockId,
-    val isBlockIdValid: Boolean,
-    val seqNumberRanges: SequenceNumberRanges
-  ) extends BlockRDDPartition(blockId, idx)
-
-/**
- * A BlockRDD where the block data is backed by Kinesis, which can accessed using the
- * sequence numbers of the corresponding blocks.
- */
-private[kinesis]
-class KinesisBackedBlockRDD[T: ClassTag](
-    sc: SparkContext,
-    val regionName: String,
-    val endpointUrl: String,
-    @transient private val _blockIds: Array[BlockId],
-    @transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges],
-    @transient private val isBlockIdValid: Array[Boolean] = Array.empty,
-    val retryTimeoutMs: Int = 10000,
-    val messageHandler: Record => T = KinesisUtils.defaultMessageHandler _,
-    val awsCredentialsOption: Option[SerializableAWSCredentials] = None
-  ) extends BlockRDD[T](sc, _blockIds) {
-
-  require(_blockIds.length == arrayOfseqNumberRanges.length,
-    "Number of blockIds is not equal to the number of sequence number ranges")
-
-  override def isValid(): Boolean = true
-
-  override def getPartitions: Array[Partition] = {
-    Array.tabulate(_blockIds.length) { i =>
-      val isValid = if (isBlockIdValid.length == 0) true else isBlockIdValid(i)
-      new KinesisBackedBlockRDDPartition(i, _blockIds(i), isValid, arrayOfseqNumberRanges(i))
-    }
-  }
-
-  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
-    val blockManager = SparkEnv.get.blockManager
-    val partition = split.asInstanceOf[KinesisBackedBlockRDDPartition]
-    val blockId = partition.blockId
-
-    def getBlockFromBlockManager(): Option[Iterator[T]] = {
-      logDebug(s"Read partition data of $this from block manager, block $blockId")
-      blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[T]])
-    }
-
-    def getBlockFromKinesis(): Iterator[T] = {
-      val credentials = awsCredentialsOption.getOrElse {
-        new DefaultAWSCredentialsProviderChain().getCredentials()
-      }
-      partition.seqNumberRanges.ranges.iterator.flatMap { range =>
-        new KinesisSequenceRangeIterator(credentials, endpointUrl, regionName,
-          range, retryTimeoutMs).map(messageHandler)
-      }
-    }
-    if (partition.isBlockIdValid) {
-      getBlockFromBlockManager().getOrElse { getBlockFromKinesis() }
-    } else {
-      getBlockFromKinesis()
-    }
-  }
-}
-
-
-/**
- * An iterator that return the Kinesis data based on the given range of sequence numbers.
- * Internally, it repeatedly fetches sets of records starting from the fromSequenceNumber,
- * until the endSequenceNumber is reached.
- */
-private[kinesis]
-class KinesisSequenceRangeIterator(
-    credentials: AWSCredentials,
-    endpointUrl: String,
-    regionId: String,
-    range: SequenceNumberRange,
-    retryTimeoutMs: Int) extends NextIterator[Record] with Logging {
-
-  private val client = new AmazonKinesisClient(credentials)
-  private val streamName = range.streamName
-  private val shardId = range.shardId
-
-  private var toSeqNumberReceived = false
-  private var lastSeqNumber: String = null
-  private var internalIterator: Iterator[Record] = null
-
-  client.setEndpoint(endpointUrl, "kinesis", regionId)
-
-  override protected def getNext(): Record = {
-    var nextRecord: Record = null
-    if (toSeqNumberReceived) {
-      finished = true
-    } else {
-
-      if (internalIterator == null) {
-
-        // If the internal iterator has not been initialized,
-        // then fetch records from starting sequence number
-        internalIterator = getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, range.fromSeqNumber)
-      } else if (!internalIterator.hasNext) {
-
-        // If the internal iterator does not have any more records,
-        // then fetch more records after the last consumed sequence number
-        internalIterator = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER, lastSeqNumber)
-      }
-
-      if (!internalIterator.hasNext) {
-
-        // If the internal iterator still does not have any data, then throw exception
-        // and terminate this iterator
-        finished = true
-        throw new SparkException(
-          s"Could not read until the end sequence number of the range: $range")
-      } else {
-
-        // Get the record, copy the data into a byte array and remember its sequence number
-        nextRecord = internalIterator.next()
-        lastSeqNumber = nextRecord.getSequenceNumber()
-
-        // If the this record's sequence number matches the stopping sequence number, then make sure
-        // the iterator is marked finished next time getNext() is called
-        if (nextRecord.getSequenceNumber == range.toSeqNumber) {
-          toSeqNumberReceived = true
-        }
-      }
-    }
-    nextRecord
-  }
-
-  override protected def close(): Unit = {
-    client.shutdown()
-  }
-
-  /**
-   * Get records starting from or after the given sequence number.
-   */
-  private def getRecords(iteratorType: ShardIteratorType, seqNum: String): Iterator[Record] = {
-    val shardIterator = getKinesisIterator(iteratorType, seqNum)
-    val result = getRecordsAndNextKinesisIterator(shardIterator)
-    result._1
-  }
-
-  /**
-   * Get the records starting from using a Kinesis shard iterator (which is a progress handle
-   * to get records from Kinesis), and get the next shard iterator for next consumption.
-   */
-  private def getRecordsAndNextKinesisIterator(
-      shardIterator: String): (Iterator[Record], String) = {
-    val getRecordsRequest = new GetRecordsRequest
-    getRecordsRequest.setRequestCredentials(credentials)
-    getRecordsRequest.setShardIterator(shardIterator)
-    val getRecordsResult = retryOrTimeout[GetRecordsResult](
-      s"getting records using shard iterator") {
-        client.getRecords(getRecordsRequest)
-      }
-    // De-aggregate records, if KPL was used in producing the records. The KCL automatically
-    // handles de-aggregation during regular operation. This code path is used during recovery
-    val recordIterator = UserRecord.deaggregate(getRecordsResult.getRecords)
-    (recordIterator.iterator().asScala, getRecordsResult.getNextShardIterator)
-  }
-
-  /**
-   * Get the Kinesis shard iterator for getting records starting from or after the given
-   * sequence number.
-   */
-  private def getKinesisIterator(
-      iteratorType: ShardIteratorType,
-      sequenceNumber: String): String = {
-    val getShardIteratorRequest = new GetShardIteratorRequest
-    getShardIteratorRequest.setRequestCredentials(credentials)
-    getShardIteratorRequest.setStreamName(streamName)
-    getShardIteratorRequest.setShardId(shardId)
-    getShardIteratorRequest.setShardIteratorType(iteratorType.toString)
-    getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber)
-    val getShardIteratorResult = retryOrTimeout[GetShardIteratorResult](
-        s"getting shard iterator from sequence number $sequenceNumber") {
-          client.getShardIterator(getShardIteratorRequest)
-        }
-    getShardIteratorResult.getShardIterator
-  }
-
-  /** Helper method to retry Kinesis API request with exponential backoff and timeouts */
-  private def retryOrTimeout[T](message: String)(body: => T): T = {
-    import KinesisSequenceRangeIterator._
-
-    var startTimeMs = System.currentTimeMillis()
-    var retryCount = 0
-    var waitTimeMs = MIN_RETRY_WAIT_TIME_MS
-    var result: Option[T] = None
-    var lastError: Throwable = null
-
-    def isTimedOut = (System.currentTimeMillis() - startTimeMs) >= retryTimeoutMs
-    def isMaxRetryDone = retryCount >= MAX_RETRIES
-
-    while (result.isEmpty && !isTimedOut && !isMaxRetryDone) {
-      if (retryCount > 0) {  // wait only if this is a retry
-        Thread.sleep(waitTimeMs)
-        waitTimeMs *= 2  // if you have waited, then double wait time for next round
-      }
-      try {
-        result = Some(body)
-      } catch {
-        case NonFatal(t) =>
-          lastError = t
-           t match {
-             case ptee: ProvisionedThroughputExceededException =>
-               logWarning(s"Error while $message [attempt = ${retryCount + 1}]", ptee)
-             case e: Throwable =>
-               throw new SparkException(s"Error while $message", e)
-           }
-      }
-      retryCount += 1
-    }
-    result.getOrElse {
-      if (isTimedOut) {
-        throw new SparkException(
-          s"Timed out after $retryTimeoutMs ms while $message, last exception: ", lastError)
-      } else {
-        throw new SparkException(
-          s"Gave up after $retryCount retries while $message, last exception: ", lastError)
-      }
-    }
-  }
-}
-
-private[streaming]
-object KinesisSequenceRangeIterator {
-  val MAX_RETRIES = 3
-  val MIN_RETRY_WAIT_TIME_MS = 100
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
deleted file mode 100644
index 1ca6d43..0000000
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.kinesis
-
-import java.util.concurrent._
-
-import scala.util.control.NonFatal
-
-import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
-import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
-
-import org.apache.spark.Logging
-import org.apache.spark.streaming.Duration
-import org.apache.spark.streaming.util.RecurringTimer
-import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
-
-/**
- * This is a helper class for managing Kinesis checkpointing.
- *
- * @param receiver The receiver that keeps track of which sequence numbers we can checkpoint
- * @param checkpointInterval How frequently we will checkpoint to DynamoDB
- * @param workerId Worker Id of KCL worker for logging purposes
- * @param clock In order to use ManualClocks for the purpose of testing
- */
-private[kinesis] class KinesisCheckpointer(
-    receiver: KinesisReceiver[_],
-    checkpointInterval: Duration,
-    workerId: String,
-    clock: Clock = new SystemClock) extends Logging {
-
-  // a map from shardId's to checkpointers
-  private val checkpointers = new ConcurrentHashMap[String, IRecordProcessorCheckpointer]()
-
-  private val lastCheckpointedSeqNums = new ConcurrentHashMap[String, String]()
-
-  private val checkpointerThread: RecurringTimer = startCheckpointerThread()
-
-  /** Update the checkpointer instance to the most recent one for the given shardId. */
-  def setCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = {
-    checkpointers.put(shardId, checkpointer)
-  }
-
-  /**
-   * Stop tracking the specified shardId.
-   *
-   * If a checkpointer is provided, e.g. on IRecordProcessor.shutdown [[ShutdownReason.TERMINATE]],
-   * we will use that to make the final checkpoint. If `null` is provided, we will not make the
-   * checkpoint, e.g. in case of [[ShutdownReason.ZOMBIE]].
-   */
-  def removeCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = {
-    synchronized {
-      checkpointers.remove(shardId)
-      checkpoint(shardId, checkpointer)
-    }
-  }
-
-  /** Perform the checkpoint. */
-  private def checkpoint(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = {
-    try {
-      if (checkpointer != null) {
-        receiver.getLatestSeqNumToCheckpoint(shardId).foreach { latestSeqNum =>
-          val lastSeqNum = lastCheckpointedSeqNums.get(shardId)
-          // Kinesis sequence numbers are monotonically increasing strings, therefore we can do
-          // safely do the string comparison
-          if (lastSeqNum == null || latestSeqNum > lastSeqNum) {
-            /* Perform the checkpoint */
-            KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(latestSeqNum), 4, 100)
-            logDebug(s"Checkpoint:  WorkerId $workerId completed checkpoint at sequence number" +
-              s" $latestSeqNum for shardId $shardId")
-            lastCheckpointedSeqNums.put(shardId, latestSeqNum)
-          }
-        }
-      } else {
-        logDebug(s"Checkpointing skipped for shardId $shardId. Checkpointer not set.")
-      }
-    } catch {
-      case NonFatal(e) =>
-        logWarning(s"Failed to checkpoint shardId $shardId to DynamoDB.", e)
-    }
-  }
-
-  /** Checkpoint the latest saved sequence numbers for all active shardId's. */
-  private def checkpointAll(): Unit = synchronized {
-    // if this method throws an exception, then the scheduled task will not run again
-    try {
-      val shardIds = checkpointers.keys()
-      while (shardIds.hasMoreElements) {
-        val shardId = shardIds.nextElement()
-        checkpoint(shardId, checkpointers.get(shardId))
-      }
-    } catch {
-      case NonFatal(e) =>
-        logWarning("Failed to checkpoint to DynamoDB.", e)
-    }
-  }
-
-  /**
-   * Start the checkpointer thread with the given checkpoint duration.
-   */
-  private def startCheckpointerThread(): RecurringTimer = {
-    val period = checkpointInterval.milliseconds
-    val threadName = s"Kinesis Checkpointer - Worker $workerId"
-    val timer = new RecurringTimer(clock, period, _ => checkpointAll(), threadName)
-    timer.start()
-    logDebug(s"Started checkpointer thread: $threadName")
-    timer
-  }
-
-  /**
-   * Shutdown the checkpointer. Should be called on the onStop of the Receiver.
-   */
-  def shutdown(): Unit = {
-    // the recurring timer checkpoints for us one last time.
-    checkpointerThread.stop(interruptTimer = false)
-    checkpointers.clear()
-    lastCheckpointedSeqNums.clear()
-    logInfo("Successfully shutdown Kinesis Checkpointer.")
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
deleted file mode 100644
index 5223c81..0000000
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kinesis
-
-import scala.reflect.ClassTag
-
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
-import com.amazonaws.services.kinesis.model.Record
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.{BlockId, StorageLevel}
-import org.apache.spark.streaming.{Duration, StreamingContext, Time}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
-
-private[kinesis] class KinesisInputDStream[T: ClassTag](
-    _ssc: StreamingContext,
-    streamName: String,
-    endpointUrl: String,
-    regionName: String,
-    initialPositionInStream: InitialPositionInStream,
-    checkpointAppName: String,
-    checkpointInterval: Duration,
-    storageLevel: StorageLevel,
-    messageHandler: Record => T,
-    awsCredentialsOption: Option[SerializableAWSCredentials]
-  ) extends ReceiverInputDStream[T](_ssc) {
-
-  private[streaming]
-  override def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {
-
-    // This returns true even for when blockInfos is empty
-    val allBlocksHaveRanges = blockInfos.map { _.metadataOption }.forall(_.nonEmpty)
-
-    if (allBlocksHaveRanges) {
-      // Create a KinesisBackedBlockRDD, even when there are no blocks
-      val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
-      val seqNumRanges = blockInfos.map {
-        _.metadataOption.get.asInstanceOf[SequenceNumberRanges] }.toArray
-      val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
-      logDebug(s"Creating KinesisBackedBlockRDD for $time with ${seqNumRanges.length} " +
-          s"seq number ranges: ${seqNumRanges.mkString(", ")} ")
-      new KinesisBackedBlockRDD(
-        context.sc, regionName, endpointUrl, blockIds, seqNumRanges,
-        isBlockIdValid = isBlockIdValid,
-        retryTimeoutMs = ssc.graph.batchDuration.milliseconds.toInt,
-        messageHandler = messageHandler,
-        awsCredentialsOption = awsCredentialsOption)
-    } else {
-      logWarning("Kinesis sequence number information was not present with some block metadata," +
-        " it may not be possible to recover from failures")
-      super.createBlockRDD(time, blockInfos)
-    }
-  }
-
-  override def getReceiver(): Receiver[T] = {
-    new KinesisReceiver(streamName, endpointUrl, regionName, initialPositionInStream,
-      checkpointAppName, checkpointInterval, storageLevel, messageHandler, awsCredentialsOption)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
deleted file mode 100644
index 48ee2a9..0000000
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ /dev/null
@@ -1,361 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.kinesis
-
-import java.util.UUID
-import java.util.concurrent.ConcurrentHashMap
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.util.control.NonFatal
-
-import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, DefaultAWSCredentialsProviderChain}
-import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorCheckpointer, IRecordProcessorFactory}
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration, Worker}
-import com.amazonaws.services.kinesis.model.Record
-
-import org.apache.spark.storage.{StorageLevel, StreamBlockId}
-import org.apache.spark.streaming.Duration
-import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver}
-import org.apache.spark.util.Utils
-import org.apache.spark.Logging
-
-private[kinesis]
-case class SerializableAWSCredentials(accessKeyId: String, secretKey: String)
-  extends AWSCredentials {
-  override def getAWSAccessKeyId: String = accessKeyId
-  override def getAWSSecretKey: String = secretKey
-}
-
-/**
- * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver.
- * This implementation relies on the Kinesis Client Library (KCL) Worker as described here:
- * https://github.com/awslabs/amazon-kinesis-client
- *
- * The way this Receiver works is as follows:
- *
- *  - The receiver starts a KCL Worker, which is essentially runs a threadpool of multiple
- *    KinesisRecordProcessor
- *  - Each KinesisRecordProcessor receives data from a Kinesis shard in batches. Each batch is
- *    inserted into a Block Generator, and the corresponding range of sequence numbers is recorded.
- *  - When the block generator defines a block, then the recorded sequence number ranges that were
- *    inserted into the block are recorded separately for being used later.
- *  - When the block is ready to be pushed, the block is pushed and the ranges are reported as
- *    metadata of the block. In addition, the ranges are used to find out the latest sequence
- *    number for each shard that can be checkpointed through the DynamoDB.
- *  - Periodically, each KinesisRecordProcessor checkpoints the latest successfully stored sequence
- *    number for it own shard.
- *
- * @param streamName   Kinesis stream name
- * @param endpointUrl  Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
- * @param regionName  Region name used by the Kinesis Client Library for
- *                    DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
- * @param initialPositionInStream  In the absence of Kinesis checkpoint info, this is the
- *                                 worker's initial starting position in the stream.
- *                                 The values are either the beginning of the stream
- *                                 per Kinesis' limit of 24 hours
- *                                 (InitialPositionInStream.TRIM_HORIZON) or
- *                                 the tip of the stream (InitialPositionInStream.LATEST).
- * @param checkpointAppName  Kinesis application name. Kinesis Apps are mapped to Kinesis Streams
- *                 by the Kinesis Client Library.  If you change the App name or Stream name,
- *                 the KCL will throw errors.  This usually requires deleting the backing
- *                 DynamoDB table with the same name this Kinesis application.
- * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
- *                            See the Kinesis Spark Streaming documentation for more
- *                            details on the different types of checkpoints.
- * @param storageLevel Storage level to use for storing the received objects
- * @param awsCredentialsOption Optional AWS credentials, used when user directly specifies
- *                             the credentials
- */
-private[kinesis] class KinesisReceiver[T](
-    val streamName: String,
-    endpointUrl: String,
-    regionName: String,
-    initialPositionInStream: InitialPositionInStream,
-    checkpointAppName: String,
-    checkpointInterval: Duration,
-    storageLevel: StorageLevel,
-    messageHandler: Record => T,
-    awsCredentialsOption: Option[SerializableAWSCredentials])
-  extends Receiver[T](storageLevel) with Logging { receiver =>
-
-  /*
-   * =================================================================================
-   * The following vars are initialize in the onStart() method which executes in the
-   * Spark worker after this Receiver is serialized and shipped to the worker.
-   * =================================================================================
-   */
-
-  /**
-   * workerId is used by the KCL should be based on the ip address of the actual Spark Worker
-   * where this code runs (not the driver's IP address.)
-   */
-  @volatile private var workerId: String = null
-
-  /**
-   * Worker is the core client abstraction from the Kinesis Client Library (KCL).
-   * A worker can process more than one shards from the given stream.
-   * Each shard is assigned its own IRecordProcessor and the worker run multiple such
-   * processors.
-   */
-  @volatile private var worker: Worker = null
-  @volatile private var workerThread: Thread = null
-
-  /** BlockGenerator used to generates blocks out of Kinesis data */
-  @volatile private var blockGenerator: BlockGenerator = null
-
-  /**
-   * Sequence number ranges added to the current block being generated.
-   * Accessing and updating of this map is synchronized by locks in BlockGenerator.
-   */
-  private val seqNumRangesInCurrentBlock = new mutable.ArrayBuffer[SequenceNumberRange]
-
-  /** Sequence number ranges of data added to each generated block */
-  private val blockIdToSeqNumRanges = new ConcurrentHashMap[StreamBlockId, SequenceNumberRanges]
-
-  /**
-   * The centralized kinesisCheckpointer that checkpoints based on the given checkpointInterval.
-   */
-  @volatile private var kinesisCheckpointer: KinesisCheckpointer = null
-
-  /**
-   * Latest sequence number ranges that have been stored successfully.
-   * This is used for checkpointing through KCL */
-  private val shardIdToLatestStoredSeqNum = new ConcurrentHashMap[String, String]
-
-  /**
-   * This is called when the KinesisReceiver starts and must be non-blocking.
-   * The KCL creates and manages the receiving/processing thread pool through Worker.run().
-   */
-  override def onStart() {
-    blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler)
-
-    workerId = Utils.localHostName() + ":" + UUID.randomUUID()
-
-    kinesisCheckpointer = new KinesisCheckpointer(receiver, checkpointInterval, workerId)
-    // KCL config instance
-    val awsCredProvider = resolveAWSCredentialsProvider()
-    val kinesisClientLibConfiguration =
-      new KinesisClientLibConfiguration(checkpointAppName, streamName, awsCredProvider, workerId)
-      .withKinesisEndpoint(endpointUrl)
-      .withInitialPositionInStream(initialPositionInStream)
-      .withTaskBackoffTimeMillis(500)
-      .withRegionName(regionName)
-
-   /*
-    *  RecordProcessorFactory creates impls of IRecordProcessor.
-    *  IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the
-    *  IRecordProcessor.processRecords() method.
-    *  We're using our custom KinesisRecordProcessor in this case.
-    */
-    val recordProcessorFactory = new IRecordProcessorFactory {
-      override def createProcessor: IRecordProcessor =
-        new KinesisRecordProcessor(receiver, workerId)
-    }
-
-    worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration)
-    workerThread = new Thread() {
-      override def run(): Unit = {
-        try {
-          worker.run()
-        } catch {
-          case NonFatal(e) =>
-            restart("Error running the KCL worker in Receiver", e)
-        }
-      }
-    }
-
-    blockIdToSeqNumRanges.clear()
-    blockGenerator.start()
-
-    workerThread.setName(s"Kinesis Receiver ${streamId}")
-    workerThread.setDaemon(true)
-    workerThread.start()
-
-    logInfo(s"Started receiver with workerId $workerId")
-  }
-
-  /**
-   * This is called when the KinesisReceiver stops.
-   * The KCL worker.shutdown() method stops the receiving/processing threads.
-   * The KCL will do its best to drain and checkpoint any in-flight records upon shutdown.
-   */
-  override def onStop() {
-    if (workerThread != null) {
-      if (worker != null) {
-        worker.shutdown()
-        worker = null
-      }
-      workerThread.join()
-      workerThread = null
-      logInfo(s"Stopped receiver for workerId $workerId")
-    }
-    workerId = null
-    if (kinesisCheckpointer != null) {
-      kinesisCheckpointer.shutdown()
-      kinesisCheckpointer = null
-    }
-  }
-
-  /** Add records of the given shard to the current block being generated */
-  private[kinesis] def addRecords(shardId: String, records: java.util.List[Record]): Unit = {
-    if (records.size > 0) {
-      val dataIterator = records.iterator().asScala.map(messageHandler)
-      val metadata = SequenceNumberRange(streamName, shardId,
-        records.get(0).getSequenceNumber(), records.get(records.size() - 1).getSequenceNumber())
-      blockGenerator.addMultipleDataWithCallback(dataIterator, metadata)
-    }
-  }
-
-  /** Get the latest sequence number for the given shard that can be checkpointed through KCL */
-  private[kinesis] def getLatestSeqNumToCheckpoint(shardId: String): Option[String] = {
-    Option(shardIdToLatestStoredSeqNum.get(shardId))
-  }
-
-  /**
-   * Set the checkpointer that will be used to checkpoint sequence numbers to DynamoDB for the
-   * given shardId.
-   */
-  def setCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = {
-    assert(kinesisCheckpointer != null, "Kinesis Checkpointer not initialized!")
-    kinesisCheckpointer.setCheckpointer(shardId, checkpointer)
-  }
-
-  /**
-   * Remove the checkpointer for the given shardId. The provided checkpointer will be used to
-   * checkpoint one last time for the given shard. If `checkpointer` is `null`, then we will not
-   * checkpoint.
-   */
-  def removeCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = {
-    assert(kinesisCheckpointer != null, "Kinesis Checkpointer not initialized!")
-    kinesisCheckpointer.removeCheckpointer(shardId, checkpointer)
-  }
-
-  /**
-   * Remember the range of sequence numbers that was added to the currently active block.
-   * Internally, this is synchronized with `finalizeRangesForCurrentBlock()`.
-   */
-  private def rememberAddedRange(range: SequenceNumberRange): Unit = {
-    seqNumRangesInCurrentBlock += range
-  }
-
-  /**
-   * Finalize the ranges added to the block that was active and prepare the ranges buffer
-   * for next block. Internally, this is synchronized with `rememberAddedRange()`.
-   */
-  private def finalizeRangesForCurrentBlock(blockId: StreamBlockId): Unit = {
-    blockIdToSeqNumRanges.put(blockId, SequenceNumberRanges(seqNumRangesInCurrentBlock.toArray))
-    seqNumRangesInCurrentBlock.clear()
-    logDebug(s"Generated block $blockId has $blockIdToSeqNumRanges")
-  }
-
-  /** Store the block along with its associated ranges */
-  private def storeBlockWithRanges(
-      blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[T]): Unit = {
-    val rangesToReportOption = Option(blockIdToSeqNumRanges.remove(blockId))
-    if (rangesToReportOption.isEmpty) {
-      stop("Error while storing block into Spark, could not find sequence number ranges " +
-        s"for block $blockId")
-      return
-    }
-
-    val rangesToReport = rangesToReportOption.get
-    var attempt = 0
-    var stored = false
-    var throwable: Throwable = null
-    while (!stored && attempt <= 3) {
-      try {
-        store(arrayBuffer, rangesToReport)
-        stored = true
-      } catch {
-        case NonFatal(th) =>
-          attempt += 1
-          throwable = th
-      }
-    }
-    if (!stored) {
-      stop("Error while storing block into Spark", throwable)
-    }
-
-    // Update the latest sequence number that have been successfully stored for each shard
-    // Note that we are doing this sequentially because the array of sequence number ranges
-    // is assumed to be
-    rangesToReport.ranges.foreach { range =>
-      shardIdToLatestStoredSeqNum.put(range.shardId, range.toSeqNumber)
-    }
-  }
-
-  /**
-   * If AWS credential is provided, return a AWSCredentialProvider returning that credential.
-   * Otherwise, return the DefaultAWSCredentialsProviderChain.
-   */
-  private def resolveAWSCredentialsProvider(): AWSCredentialsProvider = {
-    awsCredentialsOption match {
-      case Some(awsCredentials) =>
-        logInfo("Using provided AWS credentials")
-        new AWSCredentialsProvider {
-          override def getCredentials: AWSCredentials = awsCredentials
-          override def refresh(): Unit = { }
-        }
-      case None =>
-        logInfo("Using DefaultAWSCredentialsProviderChain")
-        new DefaultAWSCredentialsProviderChain()
-    }
-  }
-
-
-  /**
-   * Class to handle blocks generated by this receiver's block generator. Specifically, in
-   * the context of the Kinesis Receiver, this handler does the following.
-   *
-   * - When an array of records is added to the current active block in the block generator,
-   *   this handler keeps track of the corresponding sequence number range.
-   * - When the currently active block is ready to sealed (not more records), this handler
-   *   keep track of the list of ranges added into this block in another H
-   */
-  private class GeneratedBlockHandler extends BlockGeneratorListener {
-
-    /**
-     * Callback method called after a data item is added into the BlockGenerator.
-     * The data addition, block generation, and calls to onAddData and onGenerateBlock
-     * are all synchronized through the same lock.
-     */
-    def onAddData(data: Any, metadata: Any): Unit = {
-      rememberAddedRange(metadata.asInstanceOf[SequenceNumberRange])
-    }
-
-    /**
-     * Callback method called after a block has been generated.
-     * The data addition, block generation, and calls to onAddData and onGenerateBlock
-     * are all synchronized through the same lock.
-     */
-    def onGenerateBlock(blockId: StreamBlockId): Unit = {
-      finalizeRangesForCurrentBlock(blockId)
-    }
-
-    /** Callback method called when a block is ready to be pushed / stored. */
-    def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
-      storeBlockWithRanges(blockId,
-        arrayBuffer.asInstanceOf[mutable.ArrayBuffer[T]])
-    }
-
-    /** Callback called in case of any error in internal of the BlockGenerator */
-    def onError(message: String, throwable: Throwable): Unit = {
-      reportError(message, throwable)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
deleted file mode 100644
index b5b76cb..0000000
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.kinesis
-
-import java.util.List
-
-import scala.util.Random
-import scala.util.control.NonFatal
-
-import com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException, KinesisClientLibDependencyException, ShutdownException, ThrottlingException}
-import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorCheckpointer}
-import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
-import com.amazonaws.services.kinesis.model.Record
-
-import org.apache.spark.Logging
-import org.apache.spark.streaming.Duration
-
-/**
- * Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor.
- * This implementation operates on the Array[Byte] from the KinesisReceiver.
- * The Kinesis Worker creates an instance of this KinesisRecordProcessor for each
- * shard in the Kinesis stream upon startup.  This is normally done in separate threads,
- * but the KCLs within the KinesisReceivers will balance themselves out if you create
- * multiple Receivers.
- *
- * @param receiver Kinesis receiver
- * @param workerId for logging purposes
- */
-private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], workerId: String)
-  extends IRecordProcessor with Logging {
-
-  // shardId populated during initialize()
-  @volatile
-  private var shardId: String = _
-
-  /**
-   * The Kinesis Client Library calls this method during IRecordProcessor initialization.
-   *
-   * @param shardId assigned by the KCL to this particular RecordProcessor.
-   */
-  override def initialize(shardId: String) {
-    this.shardId = shardId
-    logInfo(s"Initialized workerId $workerId with shardId $shardId")
-  }
-
-  /**
-   * This method is called by the KCL when a batch of records is pulled from the Kinesis stream.
-   * This is the record-processing bridge between the KCL's IRecordProcessor.processRecords()
-   * and Spark Streaming's Receiver.store().
-   *
-   * @param batch list of records from the Kinesis stream shard
-   * @param checkpointer used to update Kinesis when this batch has been processed/stored
-   *   in the DStream
-   */
-  override def processRecords(batch: List[Record], checkpointer: IRecordProcessorCheckpointer) {
-    if (!receiver.isStopped()) {
-      try {
-        receiver.addRecords(shardId, batch)
-        logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId")
-        receiver.setCheckpointer(shardId, checkpointer)
-      } catch {
-        case NonFatal(e) => {
-          /*
-           *  If there is a failure within the batch, the batch will not be checkpointed.
-           *  This will potentially cause records since the last checkpoint to be processed
-           *     more than once.
-           */
-          logError(s"Exception:  WorkerId $workerId encountered and exception while storing " +
-              s" or checkpointing a batch for workerId $workerId and shardId $shardId.", e)
-
-          /* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor. */
-          throw e
-        }
-      }
-    } else {
-      /* RecordProcessor has been stopped. */
-      logInfo(s"Stopped:  KinesisReceiver has stopped for workerId $workerId" +
-          s" and shardId $shardId.  No more records will be processed.")
-    }
-  }
-
-  /**
-   * Kinesis Client Library is shutting down this Worker for 1 of 2 reasons:
-   * 1) the stream is resharding by splitting or merging adjacent shards
-   *     (ShutdownReason.TERMINATE)
-   * 2) the failed or latent Worker has stopped sending heartbeats for whatever reason
-   *     (ShutdownReason.ZOMBIE)
-   *
-   * @param checkpointer used to perform a Kinesis checkpoint for ShutdownReason.TERMINATE
-   * @param reason for shutdown (ShutdownReason.TERMINATE or ShutdownReason.ZOMBIE)
-   */
-  override def shutdown(checkpointer: IRecordProcessorCheckpointer, reason: ShutdownReason) {
-    logInfo(s"Shutdown:  Shutting down workerId $workerId with reason $reason")
-    reason match {
-      /*
-       * TERMINATE Use Case.  Checkpoint.
-       * Checkpoint to indicate that all records from the shard have been drained and processed.
-       * It's now OK to read from the new shards that resulted from a resharding event.
-       */
-      case ShutdownReason.TERMINATE =>
-        receiver.removeCheckpointer(shardId, checkpointer)
-
-      /*
-       * ZOMBIE Use Case or Unknown reason.  NoOp.
-       * No checkpoint because other workers may have taken over and already started processing
-       *    the same records.
-       * This may lead to records being processed more than once.
-       */
-      case _ =>
-        receiver.removeCheckpointer(shardId, null) // return null so that we don't checkpoint
-    }
-
-  }
-}
-
-private[kinesis] object KinesisRecordProcessor extends Logging {
-  /**
-   * Retry the given amount of times with a random backoff time (millis) less than the
-   *   given maxBackOffMillis
-   *
-   * @param expression expression to evalute
-   * @param numRetriesLeft number of retries left
-   * @param maxBackOffMillis: max millis between retries
-   *
-   * @return evaluation of the given expression
-   * @throws Unretryable exception, unexpected exception,
-   *  or any exception that persists after numRetriesLeft reaches 0
-   */
-  @annotation.tailrec
-  def retryRandom[T](expression: => T, numRetriesLeft: Int, maxBackOffMillis: Int): T = {
-    util.Try { expression } match {
-      /* If the function succeeded, evaluate to x. */
-      case util.Success(x) => x
-      /* If the function failed, either retry or throw the exception */
-      case util.Failure(e) => e match {
-        /* Retry:  Throttling or other Retryable exception has occurred */
-        case _: ThrottlingException | _: KinesisClientLibDependencyException if numRetriesLeft > 1
-          => {
-               val backOffMillis = Random.nextInt(maxBackOffMillis)
-               Thread.sleep(backOffMillis)
-               logError(s"Retryable Exception:  Random backOffMillis=${backOffMillis}", e)
-               retryRandom(expression, numRetriesLeft - 1, maxBackOffMillis)
-             }
-        /* Throw:  Shutdown has been requested by the Kinesis Client Library. */
-        case _: ShutdownException => {
-          logError(s"ShutdownException:  Caught shutdown exception, skipping checkpoint.", e)
-          throw e
-        }
-        /* Throw:  Non-retryable exception has occurred with the Kinesis Client Library */
-        case _: InvalidStateException => {
-          logError(s"InvalidStateException:  Cannot save checkpoint to the DynamoDB table used" +
-              s" by the Amazon Kinesis Client Library.  Table likely doesn't exist.", e)
-          throw e
-        }
-        /* Throw:  Unexpected exception has occurred */
-        case _ => {
-          logError(s"Unexpected, non-retryable exception.", e)
-          throw e
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
deleted file mode 100644
index 0ace453..0000000
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kinesis
-
-import java.nio.ByteBuffer
-import java.util.concurrent.TimeUnit
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-import scala.util.{Failure, Random, Success, Try}
-
-import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
-import com.amazonaws.regions.RegionUtils
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
-import com.amazonaws.services.dynamodbv2.document.DynamoDB
-import com.amazonaws.services.kinesis.AmazonKinesisClient
-import com.amazonaws.services.kinesis.model._
-
-import org.apache.spark.Logging
-
-/**
- * Shared utility methods for performing Kinesis tests that actually transfer data.
- *
- * PLEASE KEEP THIS FILE UNDER src/main AS PYTHON TESTS NEED ACCESS TO THIS FILE!
- */
-private[kinesis] class KinesisTestUtils extends Logging {
-
-  val endpointUrl = KinesisTestUtils.endpointUrl
-  val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
-  val streamShardCount = 2
-
-  private val createStreamTimeoutSeconds = 300
-  private val describeStreamPollTimeSeconds = 1
-
-  @volatile
-  private var streamCreated = false
-
-  @volatile
-  private var _streamName: String = _
-
-  protected lazy val kinesisClient = {
-    val client = new AmazonKinesisClient(KinesisTestUtils.getAWSCredentials())
-    client.setEndpoint(endpointUrl)
-    client
-  }
-
-  private lazy val dynamoDB = {
-    val dynamoDBClient = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain())
-    dynamoDBClient.setRegion(RegionUtils.getRegion(regionName))
-    new DynamoDB(dynamoDBClient)
-  }
-
-  protected def getProducer(aggregate: Boolean): KinesisDataGenerator = {
-    if (!aggregate) {
-      new SimpleDataGenerator(kinesisClient)
-    } else {
-      throw new UnsupportedOperationException("Aggregation is not supported through this code path")
-    }
-  }
-
-  def streamName: String = {
-    require(streamCreated, "Stream not yet created, call createStream() to create one")
-    _streamName
-  }
-
-  def createStream(): Unit = {
-    require(!streamCreated, "Stream already created")
-    _streamName = findNonExistentStreamName()
-
-    // Create a stream. The number of shards determines the provisioned throughput.
-    logInfo(s"Creating stream ${_streamName}")
-    val createStreamRequest = new CreateStreamRequest()
-    createStreamRequest.setStreamName(_streamName)
-    createStreamRequest.setShardCount(2)
-    kinesisClient.createStream(createStreamRequest)
-
-    // The stream is now being created. Wait for it to become active.
-    waitForStreamToBeActive(_streamName)
-    streamCreated = true
-    logInfo(s"Created stream ${_streamName}")
-  }
-
-  /**
-   * Push data to Kinesis stream and return a map of
-   * shardId -> seq of (data, seq number) pushed to corresponding shard
-   */
-  def pushData(testData: Seq[Int], aggregate: Boolean): Map[String, Seq[(Int, String)]] = {
-    require(streamCreated, "Stream not yet created, call createStream() to create one")
-    val producer = getProducer(aggregate)
-    val shardIdToSeqNumbers = producer.sendData(streamName, testData)
-    logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}")
-    shardIdToSeqNumbers.toMap
-  }
-
-  /**
-   * Expose a Python friendly API.
-   */
-  def pushData(testData: java.util.List[Int]): Unit = {
-    pushData(testData.asScala, aggregate = false)
-  }
-
-  def deleteStream(): Unit = {
-    try {
-      if (streamCreated) {
-        kinesisClient.deleteStream(streamName)
-      }
-    } catch {
-      case e: Exception =>
-        logWarning(s"Could not delete stream $streamName")
-    }
-  }
-
-  def deleteDynamoDBTable(tableName: String): Unit = {
-    try {
-      val table = dynamoDB.getTable(tableName)
-      table.delete()
-      table.waitForDelete()
-    } catch {
-      case e: Exception =>
-        logWarning(s"Could not delete DynamoDB table $tableName")
-    }
-  }
-
-  private def describeStream(streamNameToDescribe: String): Option[StreamDescription] = {
-    try {
-      val describeStreamRequest = new DescribeStreamRequest().withStreamName(streamNameToDescribe)
-      val desc = kinesisClient.describeStream(describeStreamRequest).getStreamDescription()
-      Some(desc)
-    } catch {
-      case rnfe: ResourceNotFoundException =>
-        None
-    }
-  }
-
-  private def findNonExistentStreamName(): String = {
-    var testStreamName: String = null
-    do {
-      Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
-      testStreamName = s"KinesisTestUtils-${math.abs(Random.nextLong())}"
-    } while (describeStream(testStreamName).nonEmpty)
-    testStreamName
-  }
-
-  private def waitForStreamToBeActive(streamNameToWaitFor: String): Unit = {
-    val startTime = System.currentTimeMillis()
-    val endTime = startTime + TimeUnit.SECONDS.toMillis(createStreamTimeoutSeconds)
-    while (System.currentTimeMillis() < endTime) {
-      Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
-      describeStream(streamNameToWaitFor).foreach { description =>
-        val streamStatus = description.getStreamStatus()
-        logDebug(s"\t- current state: $streamStatus\n")
-        if ("ACTIVE".equals(streamStatus)) {
-          return
-        }
-      }
-    }
-    require(false, s"Stream $streamName never became active")
-  }
-}
-
-private[kinesis] object KinesisTestUtils {
-
-  val envVarNameForEnablingTests = "ENABLE_KINESIS_TESTS"
-  val endVarNameForEndpoint = "KINESIS_TEST_ENDPOINT_URL"
-  val defaultEndpointUrl = "https://kinesis.us-west-2.amazonaws.com"
-
-  lazy val shouldRunTests = {
-    val isEnvSet = sys.env.get(envVarNameForEnablingTests) == Some("1")
-    if (isEnvSet) {
-      // scalastyle:off println
-      // Print this so that they are easily visible on the console and not hidden in the log4j logs.
-      println(
-        s"""
-          |Kinesis tests that actually send data has been enabled by setting the environment
-          |variable $envVarNameForEnablingTests to 1. This will create Kinesis Streams and
-          |DynamoDB tables in AWS. Please be aware that this may incur some AWS costs.
-          |By default, the tests use the endpoint URL $defaultEndpointUrl to create Kinesis streams.
-          |To change this endpoint URL to a different region, you can set the environment variable
-          |$endVarNameForEndpoint to the desired endpoint URL
-          |(e.g. $endVarNameForEndpoint="https://kinesis.us-west-2.amazonaws.com").
-        """.stripMargin)
-      // scalastyle:on println
-    }
-    isEnvSet
-  }
-
-  lazy val endpointUrl = {
-    val url = sys.env.getOrElse(endVarNameForEndpoint, defaultEndpointUrl)
-    // scalastyle:off println
-    // Print this so that they are easily visible on the console and not hidden in the log4j logs.
-    println(s"Using endpoint URL $url for creating Kinesis streams for tests.")
-    // scalastyle:on println
-    url
-  }
-
-  def isAWSCredentialsPresent: Boolean = {
-    Try { new DefaultAWSCredentialsProviderChain().getCredentials() }.isSuccess
-  }
-
-  def getAWSCredentials(): AWSCredentials = {
-    assert(shouldRunTests,
-      "Kinesis test not enabled, should not attempt to get AWS credentials")
-    Try { new DefaultAWSCredentialsProviderChain().getCredentials() } match {
-      case Success(cred) => cred
-      case Failure(e) =>
-        throw new Exception(
-          s"""
-             |Kinesis tests enabled using environment variable $envVarNameForEnablingTests
-             |but could not find AWS credentials. Please follow instructions in AWS documentation
-             |to set the credentials in your system such that the DefaultAWSCredentialsProviderChain
-             |can find the credentials.
-           """.stripMargin)
-    }
-  }
-}
-
-/** A wrapper interface that will allow us to consolidate the code for synthetic data generation. */
-private[kinesis] trait KinesisDataGenerator {
-  /** Sends the data to Kinesis and returns the metadata for everything that has been sent. */
-  def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]]
-}
-
-private[kinesis] class SimpleDataGenerator(
-    client: AmazonKinesisClient) extends KinesisDataGenerator {
-  override def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]] = {
-    val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()
-    data.foreach { num =>
-      val str = num.toString
-      val data = ByteBuffer.wrap(str.getBytes())
-      val putRecordRequest = new PutRecordRequest().withStreamName(streamName)
-        .withData(data)
-        .withPartitionKey(str)
-
-      val putRecordResult = client.putRecord(putRecordRequest)
-      val shardId = putRecordResult.getShardId
-      val seqNumber = putRecordResult.getSequenceNumber()
-      val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
-        new ArrayBuffer[(Int, String)]())
-      sentSeqNumbers += ((num, seqNumber))
-    }
-
-    shardIdToSeqNumbers.toMap
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org