You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/07/31 01:44:05 UTC

spark git commit: [STREAMING] [TEST] [HOTFIX] Fixed Kinesis test to not throw weird errors when Kinesis tests are enabled without AWS keys

Repository: spark
Updated Branches:
  refs/heads/master 04c840910 -> 1afdeb7b4


[STREAMING] [TEST] [HOTFIX] Fixed Kinesis test to not throw weird errors when Kinesis tests are enabled without AWS keys

If Kinesis tests are enabled by env ENABLE_KINESIS_TESTS = 1 but no AWS credentials are found, the desired behavior is the fail the test using with
```
Exception encountered when attempting to run a suite with class name: org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDSuite *** ABORTED *** (3 seconds, 5 milliseconds)
[info]   java.lang.Exception: Kinesis tests enabled, but could get not AWS credentials
```

Instead KinesisStreamSuite fails with

```
[info] - basic operation *** FAILED *** (3 seconds, 35 milliseconds)
[info]   java.lang.IllegalArgumentException: requirement failed: Stream not yet created, call createStream() to create one
[info]   at scala.Predef$.require(Predef.scala:233)
[info]   at org.apache.spark.streaming.kinesis.KinesisTestUtils.streamName(KinesisTestUtils.scala:77)
[info]   at org.apache.spark.streaming.kinesis.KinesisTestUtils$$anonfun$deleteStream$1.apply(KinesisTestUtils.scala:150)
[info]   at org.apache.spark.streaming.kinesis.KinesisTestUtils$$anonfun$deleteStream$1.apply(KinesisTestUtils.scala:150)
[info]   at org.apache.spark.Logging$class.logWarning(Logging.scala:71)
[info]   at org.apache.spark.streaming.kinesis.KinesisTestUtils.logWarning(KinesisTestUtils.scala:39)
[info]   at org.apache.spark.streaming.kinesis.KinesisTestUtils.deleteStream(KinesisTestUtils.scala:150)
[info]   at org.apache.spark.streaming.kinesis.KinesisStreamSuite$$anonfun$3.apply$mcV$sp(KinesisStreamSuite.scala:111)
[info]   at org.apache.spark.streaming.kinesis.KinesisStreamSuite$$anonfun$3.apply(KinesisStreamSuite.scala:86)
[info]   at org.apache.spark.streaming.kinesis.KinesisStreamSuite$$anonfun$3.apply(KinesisStreamSuite.scala:86)
```
This is because attempting to delete a non-existent Kinesis stream throws uncaught exception. This PR fixes it.

Author: Tathagata Das <ta...@gmail.com>

Closes #7809 from tdas/kinesis-test-hotfix and squashes the following commits:

7c372e6 [Tathagata Das] Fixed test


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1afdeb7b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1afdeb7b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1afdeb7b

Branch: refs/heads/master
Commit: 1afdeb7b458f86e2641f062fb9ddc00e9c5c7531
Parents: 04c8409
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu Jul 30 16:44:02 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Jul 30 16:44:02 2015 -0700

----------------------------------------------------------------------
 .../streaming/kinesis/KinesisTestUtils.scala    | 27 ++++++++++----------
 .../streaming/kinesis/KinesisStreamSuite.scala  |  4 +--
 2 files changed, 16 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1afdeb7b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index 0ff1b7e..ca39358 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -53,6 +53,8 @@ private class KinesisTestUtils(
 
   @volatile
   private var streamCreated = false
+
+  @volatile
   private var _streamName: String = _
 
   private lazy val kinesisClient = {
@@ -115,21 +117,9 @@ private class KinesisTestUtils(
     shardIdToSeqNumbers.toMap
   }
 
-  def describeStream(streamNameToDescribe: String = streamName): Option[StreamDescription] = {
-    try {
-      val describeStreamRequest = new DescribeStreamRequest().withStreamName(streamNameToDescribe)
-      val desc = kinesisClient.describeStream(describeStreamRequest).getStreamDescription()
-      Some(desc)
-    } catch {
-      case rnfe: ResourceNotFoundException =>
-        None
-    }
-  }
-
   def deleteStream(): Unit = {
     try {
-      if (describeStream().nonEmpty) {
-        val deleteStreamRequest = new DeleteStreamRequest()
+      if (streamCreated) {
         kinesisClient.deleteStream(streamName)
       }
     } catch {
@@ -149,6 +139,17 @@ private class KinesisTestUtils(
     }
   }
 
+  private def describeStream(streamNameToDescribe: String): Option[StreamDescription] = {
+    try {
+      val describeStreamRequest = new DescribeStreamRequest().withStreamName(streamNameToDescribe)
+      val desc = kinesisClient.describeStream(describeStreamRequest).getStreamDescription()
+      Some(desc)
+    } catch {
+      case rnfe: ResourceNotFoundException =>
+        None
+    }
+  }
+
   private def findNonExistentStreamName(): String = {
     var testStreamName: String = null
     do {

http://git-wip-us.apache.org/repos/asf/spark/blob/1afdeb7b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index f9c952b..b88c9c6 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -88,11 +88,11 @@ class KinesisStreamSuite extends KinesisFunSuite
     try {
       kinesisTestUtils.createStream()
       ssc = new StreamingContext(sc, Seconds(1))
-      val aWSCredentials = KinesisTestUtils.getAWSCredentials()
+      val awsCredentials = KinesisTestUtils.getAWSCredentials()
       val stream = KinesisUtils.createStream(ssc, kinesisAppName, kinesisTestUtils.streamName,
         kinesisTestUtils.endpointUrl, kinesisTestUtils.regionName, InitialPositionInStream.LATEST,
         Seconds(10), StorageLevel.MEMORY_ONLY,
-        aWSCredentials.getAWSAccessKeyId, aWSCredentials.getAWSSecretKey)
+        awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
 
       val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
       stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org