You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/03/21 09:01:28 UTC
spark git commit: [SPARK-14028][STREAMING][KINESIS][TESTS] Remove
deprecated methods; fix two other warnings
Repository: spark
Updated Branches:
refs/heads/master 761c2d1b6 -> c35c60fa9
[SPARK-14028][STREAMING][KINESIS][TESTS] Remove deprecated methods; fix two other warnings
## What changes were proposed in this pull request?
- Removed two methods that has been deprecated since 1.4
- Fixed two other compilation warnings
## How was this patch tested?
existing test suits
Author: proflin <pr...@gmail.com>
Closes #11850 from lw-lin/streaming-kinesis-deprecates-warnings.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c35c60fa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c35c60fa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c35c60fa
Branch: refs/heads/master
Commit: c35c60fa916e92916442a98f4af123704bb9692e
Parents: 761c2d1
Author: proflin <pr...@gmail.com>
Authored: Mon Mar 21 08:02:06 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Mar 21 08:02:06 2016 +0000
----------------------------------------------------------------------
.../spark/streaming/kinesis/KinesisUtils.scala | 86 --------------------
.../kinesis/JavaKinesisStreamSuite.java | 11 ++-
.../streaming/kinesis/KinesisFunSuite.scala | 2 +-
.../streaming/kinesis/KinesisStreamSuite.scala | 12 ++-
4 files changed, 13 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/c35c60fa/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
----------------------------------------------------------------------
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
index 15ac588..a0007d3 100644
--- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
@@ -225,51 +225,6 @@ object KinesisUtils {
* Create an input stream that pulls messages from a Kinesis stream.
* This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
*
- * Note:
- *
- * - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
- * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
- * gets AWS credentials.
- * - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch.
- * - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name
- * in [[org.apache.spark.SparkConf]].
- *
- * @param ssc StreamingContext object
- * @param streamName Kinesis stream name
- * @param endpointUrl Endpoint url of Kinesis service
- * (e.g., https://kinesis.us-east-1.amazonaws.com)
- * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
- * See the Kinesis Spark Streaming documentation for more
- * details on the different types of checkpoints.
- * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
- * worker's initial starting position in the stream.
- * The values are either the beginning of the stream
- * per Kinesis' limit of 24 hours
- * (InitialPositionInStream.TRIM_HORIZON) or
- * the tip of the stream (InitialPositionInStream.LATEST).
- * @param storageLevel Storage level to use for storing the received objects
- * StorageLevel.MEMORY_AND_DISK_2 is recommended.
- */
- @deprecated("use other forms of createStream", "1.4.0")
- def createStream(
- ssc: StreamingContext,
- streamName: String,
- endpointUrl: String,
- checkpointInterval: Duration,
- initialPositionInStream: InitialPositionInStream,
- storageLevel: StorageLevel
- ): ReceiverInputDStream[Array[Byte]] = {
- ssc.withNamedScope("kinesis stream") {
- new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl,
- getRegionByEndpoint(endpointUrl), initialPositionInStream, ssc.sc.appName,
- checkpointInterval, storageLevel, defaultMessageHandler, None)
- }
- }
-
- /**
- * Create an input stream that pulls messages from a Kinesis stream.
- * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
- *
* Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
* on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
* gets the AWS credentials.
@@ -453,47 +408,6 @@ object KinesisUtils {
defaultMessageHandler(_), awsAccessKeyId, awsSecretKey)
}
- /**
- * Create an input stream that pulls messages from a Kinesis stream.
- * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
- *
- * Note:
- * - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
- * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
- * gets AWS credentials.
- * - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch.
- * - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name in
- * [[org.apache.spark.SparkConf]].
- *
- * @param jssc Java StreamingContext object
- * @param streamName Kinesis stream name
- * @param endpointUrl Endpoint url of Kinesis service
- * (e.g., https://kinesis.us-east-1.amazonaws.com)
- * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
- * See the Kinesis Spark Streaming documentation for more
- * details on the different types of checkpoints.
- * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
- * worker's initial starting position in the stream.
- * The values are either the beginning of the stream
- * per Kinesis' limit of 24 hours
- * (InitialPositionInStream.TRIM_HORIZON) or
- * the tip of the stream (InitialPositionInStream.LATEST).
- * @param storageLevel Storage level to use for storing the received objects
- * StorageLevel.MEMORY_AND_DISK_2 is recommended.
- */
- @deprecated("use other forms of createStream", "1.4.0")
- def createStream(
- jssc: JavaStreamingContext,
- streamName: String,
- endpointUrl: String,
- checkpointInterval: Duration,
- initialPositionInStream: InitialPositionInStream,
- storageLevel: StorageLevel
- ): JavaReceiverInputDStream[Array[Byte]] = {
- createStream(
- jssc.ssc, streamName, endpointUrl, checkpointInterval, initialPositionInStream, storageLevel)
- }
-
private def getRegionByEndpoint(endpointUrl: String): String = {
RegionUtils.getRegionByEndpoint(endpointUrl).getName()
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c35c60fa/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java b/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
index 5c2371c..f078973 100644
--- a/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
+++ b/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
@@ -17,6 +17,7 @@
package org.apache.spark.streaming.kinesis;
+import com.amazonaws.regions.RegionUtils;
import com.amazonaws.services.kinesis.model.Record;
import org.junit.Test;
@@ -34,11 +35,13 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn
public class JavaKinesisStreamSuite extends LocalJavaStreamingContext {
@Test
public void testKinesisStream() {
- // Tests the API, does not actually test data receiving
- JavaDStream<byte[]> kinesisStream = KinesisUtils.createStream(ssc, "mySparkStream",
- "https://kinesis.us-west-2.amazonaws.com", new Duration(2000),
- InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2());
+ String dummyEndpointUrl = KinesisTestUtils.defaultEndpointUrl();
+ String dummyRegionName = RegionUtils.getRegionByEndpoint(dummyEndpointUrl).getName();
+ // Tests the API, does not actually test data receiving
+ JavaDStream<byte[]> kinesisStream = KinesisUtils.createStream(ssc, "myAppName", "mySparkStream",
+ dummyEndpointUrl, dummyRegionName, InitialPositionInStream.LATEST, new Duration(2000),
+ StorageLevel.MEMORY_AND_DISK_2());
ssc.stop();
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c35c60fa/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
----------------------------------------------------------------------
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
index ee428f3..1c81298 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
@@ -40,7 +40,7 @@ trait KinesisFunSuite extends SparkFunSuite {
if (shouldRunTests) {
body
} else {
- ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")()
+ ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")(())
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c35c60fa/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index 4460b6b..0e71bf9 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -99,14 +99,10 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
}
test("KinesisUtils API") {
- // Tests the API, does not actually test data receiving
- val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream",
- dummyEndpointUrl, Seconds(2),
- InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
- val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
+ val kinesisStream1 = KinesisUtils.createStream(ssc, "myAppName", "mySparkStream",
dummyEndpointUrl, dummyRegionName,
InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2)
- val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
+ val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppName", "mySparkStream",
dummyEndpointUrl, dummyRegionName,
InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2,
dummyAWSAccessKey, dummyAWSSecretKey)
@@ -154,7 +150,9 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
// Verify that KinesisBackedBlockRDD is generated even when there are no blocks
val emptyRDD = kinesisStream.createBlockRDD(time, Seq.empty)
- emptyRDD shouldBe a [KinesisBackedBlockRDD[Array[Byte]]]
+ // Verify it's KinesisBackedBlockRDD[_] rather than KinesisBackedBlockRDD[Array[Byte]], because
+ // the type parameter will be erased at runtime
+ emptyRDD shouldBe a [KinesisBackedBlockRDD[_]]
emptyRDD.partitions shouldBe empty
// Verify that the KinesisBackedBlockRDD has isBlockValid = false when blocks are invalid
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org