You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/04/13 07:49:24 UTC

spark git commit: [SPARK-20189][DSTREAM] Fix spark kinesis testcases to remove deprecated createStream and use Builders

Repository: spark
Updated Branches:
  refs/heads/master c5f1cc370 -> ec68d8f8c


[SPARK-20189][DSTREAM] Fix spark kinesis testcases to remove deprecated createStream and use Builders

## What changes were proposed in this pull request?

The spark-kinesis testcases use the KinesisUtils.createStream which are deprecated now. Modify the testcases to use the recommended KinesisInputDStream.builder instead.
This change will also enable the testcases to automatically use the session tokens automatically.

## How was this patch tested?

All the existing testcases work fine as expected with the changes.

https://issues.apache.org/jira/browse/SPARK-20189

Author: Yash Sharma <ys...@atlassian.com>

Closes #17506 from yssharma/ysharma/cleanup_kinesis_testcases.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec68d8f8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec68d8f8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec68d8f8

Branch: refs/heads/master
Commit: ec68d8f8cfdede8a0de1d56476205158544cc4eb
Parents: c5f1cc3
Author: Yash Sharma <ys...@atlassian.com>
Authored: Thu Apr 13 08:49:19 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Apr 13 08:49:19 2017 +0100

----------------------------------------------------------------------
 .../streaming/kinesis/KinesisInputDStream.scala |  2 +-
 .../streaming/kinesis/KinesisStreamSuite.scala  | 58 +++++++++++++-------
 2 files changed, 38 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ec68d8f8/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
index 8970ad2..7755341 100644
--- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
@@ -267,7 +267,7 @@ object KinesisInputDStream {
         getRequiredParam(checkpointAppName, "checkpointAppName"),
         checkpointInterval.getOrElse(ssc.graph.batchDuration),
         storageLevel.getOrElse(DEFAULT_STORAGE_LEVEL),
-        handler,
+        ssc.sc.clean(handler),
         kinesisCredsProvider.getOrElse(DefaultCredentials),
         dynamoDBCredsProvider,
         cloudWatchCredsProvider)

http://git-wip-us.apache.org/repos/asf/spark/blob/ec68d8f8/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index ed7e358..341a689 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -22,7 +22,6 @@ import scala.concurrent.duration._
 import scala.language.postfixOps
 import scala.util.Random
 
-import com.amazonaws.regions.RegionUtils
 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
 import com.amazonaws.services.kinesis.model.Record
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
@@ -173,11 +172,15 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
    * and you have to set the system environment variable RUN_KINESIS_TESTS=1 .
    */
   testIfEnabled("basic operation") {
-    val awsCredentials = KinesisTestUtils.getAWSCredentials()
-    val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
-      testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
-      Seconds(10), StorageLevel.MEMORY_ONLY,
-      awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
+    val stream = KinesisInputDStream.builder.streamingContext(ssc)
+      .checkpointAppName(appName)
+      .streamName(testUtils.streamName)
+      .endpointUrl(testUtils.endpointUrl)
+      .regionName(testUtils.regionName)
+      .initialPositionInStream(InitialPositionInStream.LATEST)
+      .checkpointInterval(Seconds(10))
+      .storageLevel(StorageLevel.MEMORY_ONLY)
+      .build()
 
     val collected = new mutable.HashSet[Int]
     stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
@@ -198,12 +201,17 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
   }
 
   testIfEnabled("custom message handling") {
-    val awsCredentials = KinesisTestUtils.getAWSCredentials()
     def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
-    val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
-      testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
-      Seconds(10), StorageLevel.MEMORY_ONLY, addFive(_),
-      awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
+
+    val stream = KinesisInputDStream.builder.streamingContext(ssc)
+      .checkpointAppName(appName)
+      .streamName(testUtils.streamName)
+      .endpointUrl(testUtils.endpointUrl)
+      .regionName(testUtils.regionName)
+      .initialPositionInStream(InitialPositionInStream.LATEST)
+      .checkpointInterval(Seconds(10))
+      .storageLevel(StorageLevel.MEMORY_ONLY)
+      .buildWithMessageHandler(addFive(_))
 
     stream shouldBe a [ReceiverInputDStream[_]]
 
@@ -233,11 +241,15 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
     val localTestUtils = new KPLBasedKinesisTestUtils(1)
     localTestUtils.createStream()
     try {
-      val awsCredentials = KinesisTestUtils.getAWSCredentials()
-      val stream = KinesisUtils.createStream(ssc, localAppName, localTestUtils.streamName,
-        localTestUtils.endpointUrl, localTestUtils.regionName, InitialPositionInStream.LATEST,
-        Seconds(10), StorageLevel.MEMORY_ONLY,
-        awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
+      val stream = KinesisInputDStream.builder.streamingContext(ssc)
+        .checkpointAppName(localAppName)
+        .streamName(localTestUtils.streamName)
+        .endpointUrl(localTestUtils.endpointUrl)
+        .regionName(localTestUtils.regionName)
+        .initialPositionInStream(InitialPositionInStream.LATEST)
+        .checkpointInterval(Seconds(10))
+        .storageLevel(StorageLevel.MEMORY_ONLY)
+        .build()
 
       val collected = new mutable.HashSet[Int]
       stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
@@ -303,13 +315,17 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
     ssc = new StreamingContext(sc, Milliseconds(1000))
     ssc.checkpoint(checkpointDir)
 
-    val awsCredentials = KinesisTestUtils.getAWSCredentials()
     val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])]
 
-    val kinesisStream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
-      testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
-      Seconds(10), StorageLevel.MEMORY_ONLY,
-      awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
+    val kinesisStream = KinesisInputDStream.builder.streamingContext(ssc)
+      .checkpointAppName(appName)
+      .streamName(testUtils.streamName)
+      .endpointUrl(testUtils.endpointUrl)
+      .regionName(testUtils.regionName)
+      .initialPositionInStream(InitialPositionInStream.LATEST)
+      .checkpointInterval(Seconds(10))
+      .storageLevel(StorageLevel.MEMORY_ONLY)
+      .build()
 
     // Verify that the generated RDDs are KinesisBackedBlockRDDs, and collect the data in each batch
     kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org