You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/12/04 21:08:51 UTC

spark git commit: [SPARK-12058][STREAMING][KINESIS][TESTS] fix Kinesis python tests

Repository: spark
Updated Branches:
  refs/heads/master d0d822277 -> 302d68de8


[SPARK-12058][STREAMING][KINESIS][TESTS] fix Kinesis python tests

Python tests require access to the `KinesisTestUtils` file. When this file exists under src/test, python can't access it, since it is not available in the assembly jar.

However, if we move KinesisTestUtils to src/main, we need to add the KinesisProducerLibrary as a dependency. In order to avoid this, I moved KinesisTestUtils to src/main, and extended it with ExtendedKinesisTestUtils which is under src/test that adds support for the KPL.

cc zsxwing tdas

Author: Burak Yavuz <br...@gmail.com>

Closes #10050 from brkyvz/kinesis-py.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/302d68de
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/302d68de
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/302d68de

Branch: refs/heads/master
Commit: 302d68de87dbaf1974accf49de26fc01fc0eb089
Parents: d0d8222
Author: Burak Yavuz <br...@gmail.com>
Authored: Fri Dec 4 12:08:42 2015 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri Dec 4 12:08:42 2015 -0800

----------------------------------------------------------------------
 .../streaming/kinesis/KinesisTestUtils.scala    | 260 ++++++++++++++++++
 .../kinesis/KPLBasedKinesisTestUtils.scala      |  72 +++++
 .../kinesis/KinesisBackedBlockRDDSuite.scala    |   2 +-
 .../streaming/kinesis/KinesisStreamSuite.scala  |   2 +-
 .../streaming/kinesis/KinesisTestUtils.scala    | 266 -------------------
 python/pyspark/streaming/tests.py               |   1 -
 6 files changed, 334 insertions(+), 269 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/302d68de/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
new file mode 100644
index 0000000..0ace453
--- /dev/null
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import java.nio.ByteBuffer
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.{Failure, Random, Success, Try}
+
+import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
+import com.amazonaws.regions.RegionUtils
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
+import com.amazonaws.services.dynamodbv2.document.DynamoDB
+import com.amazonaws.services.kinesis.AmazonKinesisClient
+import com.amazonaws.services.kinesis.model._
+
+import org.apache.spark.Logging
+
+/**
+ * Shared utility methods for performing Kinesis tests that actually transfer data.
+ *
+ * PLEASE KEEP THIS FILE UNDER src/main AS PYTHON TESTS NEED ACCESS TO THIS FILE!
+ */
+private[kinesis] class KinesisTestUtils extends Logging {
+
+  val endpointUrl = KinesisTestUtils.endpointUrl
+  val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
+  val streamShardCount = 2
+
+  private val createStreamTimeoutSeconds = 300
+  private val describeStreamPollTimeSeconds = 1
+
+  @volatile
+  private var streamCreated = false
+
+  @volatile
+  private var _streamName: String = _
+
+  protected lazy val kinesisClient = {
+    val client = new AmazonKinesisClient(KinesisTestUtils.getAWSCredentials())
+    client.setEndpoint(endpointUrl)
+    client
+  }
+
+  private lazy val dynamoDB = {
+    val dynamoDBClient = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain())
+    dynamoDBClient.setRegion(RegionUtils.getRegion(regionName))
+    new DynamoDB(dynamoDBClient)
+  }
+
+  protected def getProducer(aggregate: Boolean): KinesisDataGenerator = {
+    if (!aggregate) {
+      new SimpleDataGenerator(kinesisClient)
+    } else {
+      throw new UnsupportedOperationException("Aggregation is not supported through this code path")
+    }
+  }
+
+  def streamName: String = {
+    require(streamCreated, "Stream not yet created, call createStream() to create one")
+    _streamName
+  }
+
+  def createStream(): Unit = {
+    require(!streamCreated, "Stream already created")
+    _streamName = findNonExistentStreamName()
+
+    // Create a stream. The number of shards determines the provisioned throughput.
+    logInfo(s"Creating stream ${_streamName}")
+    val createStreamRequest = new CreateStreamRequest()
+    createStreamRequest.setStreamName(_streamName)
+    createStreamRequest.setShardCount(2)
+    kinesisClient.createStream(createStreamRequest)
+
+    // The stream is now being created. Wait for it to become active.
+    waitForStreamToBeActive(_streamName)
+    streamCreated = true
+    logInfo(s"Created stream ${_streamName}")
+  }
+
+  /**
+   * Push data to Kinesis stream and return a map of
+   * shardId -> seq of (data, seq number) pushed to corresponding shard
+   */
+  def pushData(testData: Seq[Int], aggregate: Boolean): Map[String, Seq[(Int, String)]] = {
+    require(streamCreated, "Stream not yet created, call createStream() to create one")
+    val producer = getProducer(aggregate)
+    val shardIdToSeqNumbers = producer.sendData(streamName, testData)
+    logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}")
+    shardIdToSeqNumbers.toMap
+  }
+
+  /**
+   * Expose a Python friendly API.
+   */
+  def pushData(testData: java.util.List[Int]): Unit = {
+    pushData(testData.asScala, aggregate = false)
+  }
+
+  def deleteStream(): Unit = {
+    try {
+      if (streamCreated) {
+        kinesisClient.deleteStream(streamName)
+      }
+    } catch {
+      case e: Exception =>
+        logWarning(s"Could not delete stream $streamName")
+    }
+  }
+
+  def deleteDynamoDBTable(tableName: String): Unit = {
+    try {
+      val table = dynamoDB.getTable(tableName)
+      table.delete()
+      table.waitForDelete()
+    } catch {
+      case e: Exception =>
+        logWarning(s"Could not delete DynamoDB table $tableName")
+    }
+  }
+
+  private def describeStream(streamNameToDescribe: String): Option[StreamDescription] = {
+    try {
+      val describeStreamRequest = new DescribeStreamRequest().withStreamName(streamNameToDescribe)
+      val desc = kinesisClient.describeStream(describeStreamRequest).getStreamDescription()
+      Some(desc)
+    } catch {
+      case rnfe: ResourceNotFoundException =>
+        None
+    }
+  }
+
+  private def findNonExistentStreamName(): String = {
+    var testStreamName: String = null
+    do {
+      Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
+      testStreamName = s"KinesisTestUtils-${math.abs(Random.nextLong())}"
+    } while (describeStream(testStreamName).nonEmpty)
+    testStreamName
+  }
+
+  private def waitForStreamToBeActive(streamNameToWaitFor: String): Unit = {
+    val startTime = System.currentTimeMillis()
+    val endTime = startTime + TimeUnit.SECONDS.toMillis(createStreamTimeoutSeconds)
+    while (System.currentTimeMillis() < endTime) {
+      Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
+      describeStream(streamNameToWaitFor).foreach { description =>
+        val streamStatus = description.getStreamStatus()
+        logDebug(s"\t- current state: $streamStatus\n")
+        if ("ACTIVE".equals(streamStatus)) {
+          return
+        }
+      }
+    }
+    require(false, s"Stream $streamName never became active")
+  }
+}
+
+private[kinesis] object KinesisTestUtils {
+
+  val envVarNameForEnablingTests = "ENABLE_KINESIS_TESTS"
+  val endVarNameForEndpoint = "KINESIS_TEST_ENDPOINT_URL"
+  val defaultEndpointUrl = "https://kinesis.us-west-2.amazonaws.com"
+
+  lazy val shouldRunTests = {
+    val isEnvSet = sys.env.get(envVarNameForEnablingTests) == Some("1")
+    if (isEnvSet) {
+      // scalastyle:off println
+      // Print this so that they are easily visible on the console and not hidden in the log4j logs.
+      println(
+        s"""
+          |Kinesis tests that actually send data has been enabled by setting the environment
+          |variable $envVarNameForEnablingTests to 1. This will create Kinesis Streams and
+          |DynamoDB tables in AWS. Please be aware that this may incur some AWS costs.
+          |By default, the tests use the endpoint URL $defaultEndpointUrl to create Kinesis streams.
+          |To change this endpoint URL to a different region, you can set the environment variable
+          |$endVarNameForEndpoint to the desired endpoint URL
+          |(e.g. $endVarNameForEndpoint="https://kinesis.us-west-2.amazonaws.com").
+        """.stripMargin)
+      // scalastyle:on println
+    }
+    isEnvSet
+  }
+
+  lazy val endpointUrl = {
+    val url = sys.env.getOrElse(endVarNameForEndpoint, defaultEndpointUrl)
+    // scalastyle:off println
+    // Print this so that they are easily visible on the console and not hidden in the log4j logs.
+    println(s"Using endpoint URL $url for creating Kinesis streams for tests.")
+    // scalastyle:on println
+    url
+  }
+
+  def isAWSCredentialsPresent: Boolean = {
+    Try { new DefaultAWSCredentialsProviderChain().getCredentials() }.isSuccess
+  }
+
+  def getAWSCredentials(): AWSCredentials = {
+    assert(shouldRunTests,
+      "Kinesis test not enabled, should not attempt to get AWS credentials")
+    Try { new DefaultAWSCredentialsProviderChain().getCredentials() } match {
+      case Success(cred) => cred
+      case Failure(e) =>
+        throw new Exception(
+          s"""
+             |Kinesis tests enabled using environment variable $envVarNameForEnablingTests
+             |but could not find AWS credentials. Please follow instructions in AWS documentation
+             |to set the credentials in your system such that the DefaultAWSCredentialsProviderChain
+             |can find the credentials.
+           """.stripMargin)
+    }
+  }
+}
+
+/** A wrapper interface that will allow us to consolidate the code for synthetic data generation. */
+private[kinesis] trait KinesisDataGenerator {
+  /** Sends the data to Kinesis and returns the metadata for everything that has been sent. */
+  def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]]
+}
+
+private[kinesis] class SimpleDataGenerator(
+    client: AmazonKinesisClient) extends KinesisDataGenerator {
+  override def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]] = {
+    val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()
+    data.foreach { num =>
+      val str = num.toString
+      val data = ByteBuffer.wrap(str.getBytes())
+      val putRecordRequest = new PutRecordRequest().withStreamName(streamName)
+        .withData(data)
+        .withPartitionKey(str)
+
+      val putRecordResult = client.putRecord(putRecordRequest)
+      val shardId = putRecordResult.getShardId
+      val seqNumber = putRecordResult.getSequenceNumber()
+      val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
+        new ArrayBuffer[(Int, String)]())
+      sentSeqNumbers += ((num, seqNumber))
+    }
+
+    shardIdToSeqNumbers.toMap
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/302d68de/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
new file mode 100644
index 0000000..fdb270e
--- /dev/null
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import com.amazonaws.services.kinesis.producer.{KinesisProducer => KPLProducer, KinesisProducerConfiguration, UserRecordResult}
+import com.google.common.util.concurrent.{FutureCallback, Futures}
+
+private[kinesis] class KPLBasedKinesisTestUtils extends KinesisTestUtils {
+  override protected def getProducer(aggregate: Boolean): KinesisDataGenerator = {
+    if (!aggregate) {
+      new SimpleDataGenerator(kinesisClient)
+    } else {
+      new KPLDataGenerator(regionName)
+    }
+  }
+}
+
+/** A wrapper for the KinesisProducer provided in the KPL. */
+private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataGenerator {
+
+  private lazy val producer: KPLProducer = {
+    val conf = new KinesisProducerConfiguration()
+      .setRecordMaxBufferedTime(1000)
+      .setMaxConnections(1)
+      .setRegion(regionName)
+      .setMetricsLevel("none")
+
+    new KPLProducer(conf)
+  }
+
+  override def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]] = {
+    val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()
+    data.foreach { num =>
+      val str = num.toString
+      val data = ByteBuffer.wrap(str.getBytes())
+      val future = producer.addUserRecord(streamName, str, data)
+      val kinesisCallBack = new FutureCallback[UserRecordResult]() {
+        override def onFailure(t: Throwable): Unit = {} // do nothing
+
+        override def onSuccess(result: UserRecordResult): Unit = {
+          val shardId = result.getShardId
+          val seqNumber = result.getSequenceNumber()
+          val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
+            new ArrayBuffer[(Int, String)]())
+          sentSeqNumbers += ((num, seqNumber))
+        }
+      }
+      Futures.addCallback(future, kinesisCallBack)
+    }
+    producer.flushSync()
+    shardIdToSeqNumbers.toMap
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/302d68de/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
index 52c61df..d85b4cd 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
@@ -40,7 +40,7 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
 
   override def beforeAll(): Unit = {
     runIfTestsEnabled("Prepare KinesisTestUtils") {
-      testUtils = new KinesisTestUtils()
+      testUtils = new KPLBasedKinesisTestUtils()
       testUtils.createStream()
 
       shardIdToDataAndSeqNumbers = testUtils.pushData(testData, aggregate = aggregateTestData)

http://git-wip-us.apache.org/repos/asf/spark/blob/302d68de/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index dee3044..78cec02 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -63,7 +63,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
     sc = new SparkContext(conf)
 
     runIfTestsEnabled("Prepare KinesisTestUtils") {
-      testUtils = new KinesisTestUtils()
+      testUtils = new KPLBasedKinesisTestUtils()
       testUtils.createStream()
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/302d68de/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
deleted file mode 100644
index 7487aa1..0000000
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kinesis
-
-import java.nio.ByteBuffer
-import java.util.concurrent.TimeUnit
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-import scala.util.{Failure, Random, Success, Try}
-
-import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
-import com.amazonaws.regions.RegionUtils
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
-import com.amazonaws.services.dynamodbv2.document.DynamoDB
-import com.amazonaws.services.kinesis.AmazonKinesisClient
-import com.amazonaws.services.kinesis.model._
-import com.amazonaws.services.kinesis.producer.{KinesisProducer, KinesisProducerConfiguration, UserRecordResult}
-import com.google.common.util.concurrent.{FutureCallback, Futures}
-
-import org.apache.spark.Logging
-
-/**
- * Shared utility methods for performing Kinesis tests that actually transfer data
- */
-private[kinesis] class KinesisTestUtils extends Logging {
-
-  val endpointUrl = KinesisTestUtils.endpointUrl
-  val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
-  val streamShardCount = 2
-
-  private val createStreamTimeoutSeconds = 300
-  private val describeStreamPollTimeSeconds = 1
-
-  @volatile
-  private var streamCreated = false
-
-  @volatile
-  private var _streamName: String = _
-
-  private lazy val kinesisClient = {
-    val client = new AmazonKinesisClient(KinesisTestUtils.getAWSCredentials())
-    client.setEndpoint(endpointUrl)
-    client
-  }
-
-  private lazy val dynamoDB = {
-    val dynamoDBClient = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain())
-    dynamoDBClient.setRegion(RegionUtils.getRegion(regionName))
-    new DynamoDB(dynamoDBClient)
-  }
-
-  private lazy val kinesisProducer: KinesisProducer = {
-    val conf = new KinesisProducerConfiguration()
-      .setRecordMaxBufferedTime(1000)
-      .setMaxConnections(1)
-      .setRegion(regionName)
-      .setMetricsLevel("none")
-
-    new KinesisProducer(conf)
-  }
-
-  def streamName: String = {
-    require(streamCreated, "Stream not yet created, call createStream() to create one")
-    _streamName
-  }
-
-  def createStream(): Unit = {
-    require(!streamCreated, "Stream already created")
-    _streamName = findNonExistentStreamName()
-
-    // Create a stream. The number of shards determines the provisioned throughput.
-    logInfo(s"Creating stream ${_streamName}")
-    val createStreamRequest = new CreateStreamRequest()
-    createStreamRequest.setStreamName(_streamName)
-    createStreamRequest.setShardCount(2)
-    kinesisClient.createStream(createStreamRequest)
-
-    // The stream is now being created. Wait for it to become active.
-    waitForStreamToBeActive(_streamName)
-    streamCreated = true
-    logInfo(s"Created stream ${_streamName}")
-  }
-
-  /**
-   * Push data to Kinesis stream and return a map of
-   * shardId -> seq of (data, seq number) pushed to corresponding shard
-   */
-  def pushData(testData: Seq[Int], aggregate: Boolean): Map[String, Seq[(Int, String)]] = {
-    require(streamCreated, "Stream not yet created, call createStream() to create one")
-    val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()
-
-    testData.foreach { num =>
-      val str = num.toString
-      val data = ByteBuffer.wrap(str.getBytes())
-      if (aggregate) {
-        val future = kinesisProducer.addUserRecord(streamName, str, data)
-        val kinesisCallBack = new FutureCallback[UserRecordResult]() {
-          override def onFailure(t: Throwable): Unit = {} // do nothing
-
-          override def onSuccess(result: UserRecordResult): Unit = {
-            val shardId = result.getShardId
-            val seqNumber = result.getSequenceNumber()
-            val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
-              new ArrayBuffer[(Int, String)]())
-            sentSeqNumbers += ((num, seqNumber))
-          }
-        }
-
-        Futures.addCallback(future, kinesisCallBack)
-        kinesisProducer.flushSync() // make sure we send all data before returning the map
-      } else {
-        val putRecordRequest = new PutRecordRequest().withStreamName(streamName)
-          .withData(data)
-          .withPartitionKey(str)
-
-        val putRecordResult = kinesisClient.putRecord(putRecordRequest)
-        val shardId = putRecordResult.getShardId
-        val seqNumber = putRecordResult.getSequenceNumber()
-        val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
-          new ArrayBuffer[(Int, String)]())
-        sentSeqNumbers += ((num, seqNumber))
-      }
-    }
-
-    logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}")
-    shardIdToSeqNumbers.toMap
-  }
-
-  /**
-   * Expose a Python friendly API.
-   */
-  def pushData(testData: java.util.List[Int]): Unit = {
-    pushData(testData.asScala, aggregate = false)
-  }
-
-  def deleteStream(): Unit = {
-    try {
-      if (streamCreated) {
-        kinesisClient.deleteStream(streamName)
-      }
-    } catch {
-      case e: Exception =>
-        logWarning(s"Could not delete stream $streamName")
-    }
-  }
-
-  def deleteDynamoDBTable(tableName: String): Unit = {
-    try {
-      val table = dynamoDB.getTable(tableName)
-      table.delete()
-      table.waitForDelete()
-    } catch {
-      case e: Exception =>
-        logWarning(s"Could not delete DynamoDB table $tableName")
-    }
-  }
-
-  private def describeStream(streamNameToDescribe: String): Option[StreamDescription] = {
-    try {
-      val describeStreamRequest = new DescribeStreamRequest().withStreamName(streamNameToDescribe)
-      val desc = kinesisClient.describeStream(describeStreamRequest).getStreamDescription()
-      Some(desc)
-    } catch {
-      case rnfe: ResourceNotFoundException =>
-        None
-    }
-  }
-
-  private def findNonExistentStreamName(): String = {
-    var testStreamName: String = null
-    do {
-      Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
-      testStreamName = s"KinesisTestUtils-${math.abs(Random.nextLong())}"
-    } while (describeStream(testStreamName).nonEmpty)
-    testStreamName
-  }
-
-  private def waitForStreamToBeActive(streamNameToWaitFor: String): Unit = {
-    val startTime = System.currentTimeMillis()
-    val endTime = startTime + TimeUnit.SECONDS.toMillis(createStreamTimeoutSeconds)
-    while (System.currentTimeMillis() < endTime) {
-      Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
-      describeStream(streamNameToWaitFor).foreach { description =>
-        val streamStatus = description.getStreamStatus()
-        logDebug(s"\t- current state: $streamStatus\n")
-        if ("ACTIVE".equals(streamStatus)) {
-          return
-        }
-      }
-    }
-    require(false, s"Stream $streamName never became active")
-  }
-}
-
-private[kinesis] object KinesisTestUtils {
-
-  val envVarNameForEnablingTests = "ENABLE_KINESIS_TESTS"
-  val endVarNameForEndpoint = "KINESIS_TEST_ENDPOINT_URL"
-  val defaultEndpointUrl = "https://kinesis.us-west-2.amazonaws.com"
-
-  lazy val shouldRunTests = {
-    val isEnvSet = sys.env.get(envVarNameForEnablingTests) == Some("1")
-    if (isEnvSet) {
-      // scalastyle:off println
-      // Print this so that they are easily visible on the console and not hidden in the log4j logs.
-      println(
-        s"""
-          |Kinesis tests that actually send data has been enabled by setting the environment
-          |variable $envVarNameForEnablingTests to 1. This will create Kinesis Streams and
-          |DynamoDB tables in AWS. Please be aware that this may incur some AWS costs.
-          |By default, the tests use the endpoint URL $defaultEndpointUrl to create Kinesis streams.
-          |To change this endpoint URL to a different region, you can set the environment variable
-          |$endVarNameForEndpoint to the desired endpoint URL
-          |(e.g. $endVarNameForEndpoint="https://kinesis.us-west-2.amazonaws.com").
-        """.stripMargin)
-      // scalastyle:on println
-    }
-    isEnvSet
-  }
-
-  lazy val endpointUrl = {
-    val url = sys.env.getOrElse(endVarNameForEndpoint, defaultEndpointUrl)
-    // scalastyle:off println
-    // Print this so that they are easily visible on the console and not hidden in the log4j logs.
-    println(s"Using endpoint URL $url for creating Kinesis streams for tests.")
-    // scalastyle:on println
-    url
-  }
-
-  def isAWSCredentialsPresent: Boolean = {
-    Try { new DefaultAWSCredentialsProviderChain().getCredentials() }.isSuccess
-  }
-
-  def getAWSCredentials(): AWSCredentials = {
-    assert(shouldRunTests,
-      "Kinesis test not enabled, should not attempt to get AWS credentials")
-    Try { new DefaultAWSCredentialsProviderChain().getCredentials() } match {
-      case Success(cred) => cred
-      case Failure(e) =>
-        throw new Exception(
-          s"""
-             |Kinesis tests enabled using environment variable $envVarNameForEnablingTests
-             |but could not find AWS credentials. Please follow instructions in AWS documentation
-             |to set the credentials in your system such that the DefaultAWSCredentialsProviderChain
-             |can find the credentials.
-           """.stripMargin)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/302d68de/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index d50c6b8..a2bfd79 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -1458,7 +1458,6 @@ class KinesisStreamTests(PySparkStreamingTestCase):
             InitialPositionInStream.LATEST, 2, StorageLevel.MEMORY_AND_DISK_2,
             "awsAccessKey", "awsSecretKey")
 
-    @unittest.skip("Enable it when we fix SPAKR-12058")
     def test_kinesis_stream(self):
         if not are_kinesis_tests_enabled:
             sys.stderr.write(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org