You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/07/31 01:44:05 UTC
spark git commit: [STREAMING] [TEST] [HOTFIX] Fixed Kinesis test to
not throw weird errors when Kinesis tests are enabled without AWS keys
Repository: spark
Updated Branches:
refs/heads/master 04c840910 -> 1afdeb7b4
[STREAMING] [TEST] [HOTFIX] Fixed Kinesis test to not throw weird errors when Kinesis tests are enabled without AWS keys
If Kinesis tests are enabled by env ENABLE_KINESIS_TESTS = 1 but no AWS credentials are found, the desired behavior is the fail the test using with
```
Exception encountered when attempting to run a suite with class name: org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDSuite *** ABORTED *** (3 seconds, 5 milliseconds)
[info] java.lang.Exception: Kinesis tests enabled, but could get not AWS credentials
```
Instead KinesisStreamSuite fails with
```
[info] - basic operation *** FAILED *** (3 seconds, 35 milliseconds)
[info] java.lang.IllegalArgumentException: requirement failed: Stream not yet created, call createStream() to create one
[info] at scala.Predef$.require(Predef.scala:233)
[info] at org.apache.spark.streaming.kinesis.KinesisTestUtils.streamName(KinesisTestUtils.scala:77)
[info] at org.apache.spark.streaming.kinesis.KinesisTestUtils$$anonfun$deleteStream$1.apply(KinesisTestUtils.scala:150)
[info] at org.apache.spark.streaming.kinesis.KinesisTestUtils$$anonfun$deleteStream$1.apply(KinesisTestUtils.scala:150)
[info] at org.apache.spark.Logging$class.logWarning(Logging.scala:71)
[info] at org.apache.spark.streaming.kinesis.KinesisTestUtils.logWarning(KinesisTestUtils.scala:39)
[info] at org.apache.spark.streaming.kinesis.KinesisTestUtils.deleteStream(KinesisTestUtils.scala:150)
[info] at org.apache.spark.streaming.kinesis.KinesisStreamSuite$$anonfun$3.apply$mcV$sp(KinesisStreamSuite.scala:111)
[info] at org.apache.spark.streaming.kinesis.KinesisStreamSuite$$anonfun$3.apply(KinesisStreamSuite.scala:86)
[info] at org.apache.spark.streaming.kinesis.KinesisStreamSuite$$anonfun$3.apply(KinesisStreamSuite.scala:86)
```
This is because attempting to delete a non-existent Kinesis stream throws uncaught exception. This PR fixes it.
Author: Tathagata Das <ta...@gmail.com>
Closes #7809 from tdas/kinesis-test-hotfix and squashes the following commits:
7c372e6 [Tathagata Das] Fixed test
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1afdeb7b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1afdeb7b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1afdeb7b
Branch: refs/heads/master
Commit: 1afdeb7b458f86e2641f062fb9ddc00e9c5c7531
Parents: 04c8409
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu Jul 30 16:44:02 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Jul 30 16:44:02 2015 -0700
----------------------------------------------------------------------
.../streaming/kinesis/KinesisTestUtils.scala | 27 ++++++++++----------
.../streaming/kinesis/KinesisStreamSuite.scala | 4 +--
2 files changed, 16 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/1afdeb7b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index 0ff1b7e..ca39358 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -53,6 +53,8 @@ private class KinesisTestUtils(
@volatile
private var streamCreated = false
+
+ @volatile
private var _streamName: String = _
private lazy val kinesisClient = {
@@ -115,21 +117,9 @@ private class KinesisTestUtils(
shardIdToSeqNumbers.toMap
}
- def describeStream(streamNameToDescribe: String = streamName): Option[StreamDescription] = {
- try {
- val describeStreamRequest = new DescribeStreamRequest().withStreamName(streamNameToDescribe)
- val desc = kinesisClient.describeStream(describeStreamRequest).getStreamDescription()
- Some(desc)
- } catch {
- case rnfe: ResourceNotFoundException =>
- None
- }
- }
-
def deleteStream(): Unit = {
try {
- if (describeStream().nonEmpty) {
- val deleteStreamRequest = new DeleteStreamRequest()
+ if (streamCreated) {
kinesisClient.deleteStream(streamName)
}
} catch {
@@ -149,6 +139,17 @@ private class KinesisTestUtils(
}
}
+ private def describeStream(streamNameToDescribe: String): Option[StreamDescription] = {
+ try {
+ val describeStreamRequest = new DescribeStreamRequest().withStreamName(streamNameToDescribe)
+ val desc = kinesisClient.describeStream(describeStreamRequest).getStreamDescription()
+ Some(desc)
+ } catch {
+ case rnfe: ResourceNotFoundException =>
+ None
+ }
+ }
+
private def findNonExistentStreamName(): String = {
var testStreamName: String = null
do {
http://git-wip-us.apache.org/repos/asf/spark/blob/1afdeb7b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index f9c952b..b88c9c6 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -88,11 +88,11 @@ class KinesisStreamSuite extends KinesisFunSuite
try {
kinesisTestUtils.createStream()
ssc = new StreamingContext(sc, Seconds(1))
- val aWSCredentials = KinesisTestUtils.getAWSCredentials()
+ val awsCredentials = KinesisTestUtils.getAWSCredentials()
val stream = KinesisUtils.createStream(ssc, kinesisAppName, kinesisTestUtils.streamName,
kinesisTestUtils.endpointUrl, kinesisTestUtils.regionName, InitialPositionInStream.LATEST,
Seconds(10), StorageLevel.MEMORY_ONLY,
- aWSCredentials.getAWSAccessKeyId, aWSCredentials.getAWSSecretKey)
+ awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org