You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/03/21 09:01:28 UTC

spark git commit: [SPARK-14028][STREAMING][KINESIS][TESTS] Remove deprecated methods; fix two other warnings

Repository: spark
Updated Branches:
  refs/heads/master 761c2d1b6 -> c35c60fa9


[SPARK-14028][STREAMING][KINESIS][TESTS] Remove deprecated methods; fix two other warnings

## What changes were proposed in this pull request?

- Removed two methods that has been deprecated since 1.4
- Fixed two other compilation warnings

## How was this patch tested?

existing test suits

Author: proflin <pr...@gmail.com>

Closes #11850 from lw-lin/streaming-kinesis-deprecates-warnings.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c35c60fa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c35c60fa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c35c60fa

Branch: refs/heads/master
Commit: c35c60fa916e92916442a98f4af123704bb9692e
Parents: 761c2d1
Author: proflin <pr...@gmail.com>
Authored: Mon Mar 21 08:02:06 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Mar 21 08:02:06 2016 +0000

----------------------------------------------------------------------
 .../spark/streaming/kinesis/KinesisUtils.scala  | 86 --------------------
 .../kinesis/JavaKinesisStreamSuite.java         | 11 ++-
 .../streaming/kinesis/KinesisFunSuite.scala     |  2 +-
 .../streaming/kinesis/KinesisStreamSuite.scala  | 12 ++-
 4 files changed, 13 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c35c60fa/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
----------------------------------------------------------------------
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
index 15ac588..a0007d3 100644
--- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
@@ -225,51 +225,6 @@ object KinesisUtils {
    * Create an input stream that pulls messages from a Kinesis stream.
    * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
    *
-   * Note:
-   *
-   *  - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
-   *    on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
-   *    gets AWS credentials.
-   *  - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch.
-   *  - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name
-   *    in [[org.apache.spark.SparkConf]].
-   *
-   * @param ssc StreamingContext object
-   * @param streamName   Kinesis stream name
-   * @param endpointUrl  Endpoint url of Kinesis service
-   *                     (e.g., https://kinesis.us-east-1.amazonaws.com)
-   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
-   *                            See the Kinesis Spark Streaming documentation for more
-   *                            details on the different types of checkpoints.
-   * @param initialPositionInStream  In the absence of Kinesis checkpoint info, this is the
-   *                                 worker's initial starting position in the stream.
-   *                                 The values are either the beginning of the stream
-   *                                 per Kinesis' limit of 24 hours
-   *                                 (InitialPositionInStream.TRIM_HORIZON) or
-   *                                 the tip of the stream (InitialPositionInStream.LATEST).
-   * @param storageLevel Storage level to use for storing the received objects
-   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
-   */
-  @deprecated("use other forms of createStream", "1.4.0")
-  def createStream(
-      ssc: StreamingContext,
-      streamName: String,
-      endpointUrl: String,
-      checkpointInterval: Duration,
-      initialPositionInStream: InitialPositionInStream,
-      storageLevel: StorageLevel
-    ): ReceiverInputDStream[Array[Byte]] = {
-    ssc.withNamedScope("kinesis stream") {
-      new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl,
-        getRegionByEndpoint(endpointUrl), initialPositionInStream, ssc.sc.appName,
-        checkpointInterval, storageLevel, defaultMessageHandler, None)
-    }
-  }
-
-  /**
-   * Create an input stream that pulls messages from a Kinesis stream.
-   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
-   *
    * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
    * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
    * gets the AWS credentials.
@@ -453,47 +408,6 @@ object KinesisUtils {
       defaultMessageHandler(_), awsAccessKeyId, awsSecretKey)
   }
 
-  /**
-   * Create an input stream that pulls messages from a Kinesis stream.
-   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
-   *
-   * Note:
-   * - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
-   *   on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
-   *   gets AWS credentials.
-   * - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch.
-   * - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name in
-   *   [[org.apache.spark.SparkConf]].
-   *
-   * @param jssc Java StreamingContext object
-   * @param streamName   Kinesis stream name
-   * @param endpointUrl  Endpoint url of Kinesis service
-   *                     (e.g., https://kinesis.us-east-1.amazonaws.com)
-   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
-   *                            See the Kinesis Spark Streaming documentation for more
-   *                            details on the different types of checkpoints.
-   * @param initialPositionInStream  In the absence of Kinesis checkpoint info, this is the
-   *                                 worker's initial starting position in the stream.
-   *                                 The values are either the beginning of the stream
-   *                                 per Kinesis' limit of 24 hours
-   *                                 (InitialPositionInStream.TRIM_HORIZON) or
-   *                                 the tip of the stream (InitialPositionInStream.LATEST).
-   * @param storageLevel Storage level to use for storing the received objects
-   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
-   */
-  @deprecated("use other forms of createStream", "1.4.0")
-  def createStream(
-      jssc: JavaStreamingContext,
-      streamName: String,
-      endpointUrl: String,
-      checkpointInterval: Duration,
-      initialPositionInStream: InitialPositionInStream,
-      storageLevel: StorageLevel
-    ): JavaReceiverInputDStream[Array[Byte]] = {
-    createStream(
-      jssc.ssc, streamName, endpointUrl, checkpointInterval, initialPositionInStream, storageLevel)
-  }
-
   private def getRegionByEndpoint(endpointUrl: String): String = {
     RegionUtils.getRegionByEndpoint(endpointUrl).getName()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/c35c60fa/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java b/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
index 5c2371c..f078973 100644
--- a/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
+++ b/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.streaming.kinesis;
 
+import com.amazonaws.regions.RegionUtils;
 import com.amazonaws.services.kinesis.model.Record;
 import org.junit.Test;
 
@@ -34,11 +35,13 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn
 public class JavaKinesisStreamSuite extends LocalJavaStreamingContext {
   @Test
   public void testKinesisStream() {
-    // Tests the API, does not actually test data receiving
-    JavaDStream<byte[]> kinesisStream = KinesisUtils.createStream(ssc, "mySparkStream",
-        "https://kinesis.us-west-2.amazonaws.com", new Duration(2000),
-        InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2());
+    String dummyEndpointUrl = KinesisTestUtils.defaultEndpointUrl();
+    String dummyRegionName = RegionUtils.getRegionByEndpoint(dummyEndpointUrl).getName();
 
+    // Tests the API, does not actually test data receiving
+    JavaDStream<byte[]> kinesisStream = KinesisUtils.createStream(ssc, "myAppName", "mySparkStream",
+        dummyEndpointUrl, dummyRegionName, InitialPositionInStream.LATEST, new Duration(2000),
+        StorageLevel.MEMORY_AND_DISK_2());
     ssc.stop();
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c35c60fa/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
----------------------------------------------------------------------
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
index ee428f3..1c81298 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
@@ -40,7 +40,7 @@ trait KinesisFunSuite extends SparkFunSuite  {
     if (shouldRunTests) {
       body
     } else {
-      ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")()
+      ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")(())
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c35c60fa/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index 4460b6b..0e71bf9 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -99,14 +99,10 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
   }
 
   test("KinesisUtils API") {
-    // Tests the API, does not actually test data receiving
-    val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream",
-      dummyEndpointUrl, Seconds(2),
-      InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
-    val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
+    val kinesisStream1 = KinesisUtils.createStream(ssc, "myAppName", "mySparkStream",
       dummyEndpointUrl, dummyRegionName,
       InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2)
-    val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
+    val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppName", "mySparkStream",
       dummyEndpointUrl, dummyRegionName,
       InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2,
       dummyAWSAccessKey, dummyAWSSecretKey)
@@ -154,7 +150,9 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
 
     // Verify that KinesisBackedBlockRDD is generated even when there are no blocks
     val emptyRDD = kinesisStream.createBlockRDD(time, Seq.empty)
-    emptyRDD shouldBe a [KinesisBackedBlockRDD[Array[Byte]]]
+    // Verify it's KinesisBackedBlockRDD[_] rather than KinesisBackedBlockRDD[Array[Byte]], because
+    // the type parameter will be erased at runtime
+    emptyRDD shouldBe a [KinesisBackedBlockRDD[_]]
     emptyRDD.partitions shouldBe empty
 
     // Verify that the KinesisBackedBlockRDD has isBlockValid = false when blocks are invalid


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org