You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/12/04 21:08:51 UTC
spark git commit: [SPARK-12058][STREAMING][KINESIS][TESTS] fix
Kinesis python tests
Repository: spark
Updated Branches:
refs/heads/master d0d822277 -> 302d68de8
[SPARK-12058][STREAMING][KINESIS][TESTS] fix Kinesis python tests
Python tests require access to the `KinesisTestUtils` file. When this file exists under src/test, python can't access it, since it is not available in the assembly jar.
However, if we move KinesisTestUtils to src/main, we need to add the KinesisProducerLibrary as a dependency. In order to avoid this, I moved KinesisTestUtils to src/main, and extended it with ExtendedKinesisTestUtils which is under src/test that adds support for the KPL.
cc zsxwing tdas
Author: Burak Yavuz <br...@gmail.com>
Closes #10050 from brkyvz/kinesis-py.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/302d68de
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/302d68de
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/302d68de
Branch: refs/heads/master
Commit: 302d68de87dbaf1974accf49de26fc01fc0eb089
Parents: d0d8222
Author: Burak Yavuz <br...@gmail.com>
Authored: Fri Dec 4 12:08:42 2015 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri Dec 4 12:08:42 2015 -0800
----------------------------------------------------------------------
.../streaming/kinesis/KinesisTestUtils.scala | 260 ++++++++++++++++++
.../kinesis/KPLBasedKinesisTestUtils.scala | 72 +++++
.../kinesis/KinesisBackedBlockRDDSuite.scala | 2 +-
.../streaming/kinesis/KinesisStreamSuite.scala | 2 +-
.../streaming/kinesis/KinesisTestUtils.scala | 266 -------------------
python/pyspark/streaming/tests.py | 1 -
6 files changed, 334 insertions(+), 269 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/302d68de/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
new file mode 100644
index 0000000..0ace453
--- /dev/null
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import java.nio.ByteBuffer
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.{Failure, Random, Success, Try}
+
+import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
+import com.amazonaws.regions.RegionUtils
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
+import com.amazonaws.services.dynamodbv2.document.DynamoDB
+import com.amazonaws.services.kinesis.AmazonKinesisClient
+import com.amazonaws.services.kinesis.model._
+
+import org.apache.spark.Logging
+
+/**
+ * Shared utility methods for performing Kinesis tests that actually transfer data.
+ *
+ * PLEASE KEEP THIS FILE UNDER src/main AS PYTHON TESTS NEED ACCESS TO THIS FILE!
+ */
+private[kinesis] class KinesisTestUtils extends Logging {
+
+ val endpointUrl = KinesisTestUtils.endpointUrl
+ val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
+ val streamShardCount = 2
+
+ private val createStreamTimeoutSeconds = 300
+ private val describeStreamPollTimeSeconds = 1
+
+ @volatile
+ private var streamCreated = false
+
+ @volatile
+ private var _streamName: String = _
+
+ protected lazy val kinesisClient = {
+ val client = new AmazonKinesisClient(KinesisTestUtils.getAWSCredentials())
+ client.setEndpoint(endpointUrl)
+ client
+ }
+
+ private lazy val dynamoDB = {
+ val dynamoDBClient = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain())
+ dynamoDBClient.setRegion(RegionUtils.getRegion(regionName))
+ new DynamoDB(dynamoDBClient)
+ }
+
+ protected def getProducer(aggregate: Boolean): KinesisDataGenerator = {
+ if (!aggregate) {
+ new SimpleDataGenerator(kinesisClient)
+ } else {
+ throw new UnsupportedOperationException("Aggregation is not supported through this code path")
+ }
+ }
+
+ def streamName: String = {
+ require(streamCreated, "Stream not yet created, call createStream() to create one")
+ _streamName
+ }
+
+ def createStream(): Unit = {
+ require(!streamCreated, "Stream already created")
+ _streamName = findNonExistentStreamName()
+
+ // Create a stream. The number of shards determines the provisioned throughput.
+ logInfo(s"Creating stream ${_streamName}")
+ val createStreamRequest = new CreateStreamRequest()
+ createStreamRequest.setStreamName(_streamName)
+ createStreamRequest.setShardCount(2)
+ kinesisClient.createStream(createStreamRequest)
+
+ // The stream is now being created. Wait for it to become active.
+ waitForStreamToBeActive(_streamName)
+ streamCreated = true
+ logInfo(s"Created stream ${_streamName}")
+ }
+
+ /**
+ * Push data to Kinesis stream and return a map of
+ * shardId -> seq of (data, seq number) pushed to corresponding shard
+ */
+ def pushData(testData: Seq[Int], aggregate: Boolean): Map[String, Seq[(Int, String)]] = {
+ require(streamCreated, "Stream not yet created, call createStream() to create one")
+ val producer = getProducer(aggregate)
+ val shardIdToSeqNumbers = producer.sendData(streamName, testData)
+ logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}")
+ shardIdToSeqNumbers.toMap
+ }
+
+ /**
+ * Expose a Python friendly API.
+ */
+ def pushData(testData: java.util.List[Int]): Unit = {
+ pushData(testData.asScala, aggregate = false)
+ }
+
+ def deleteStream(): Unit = {
+ try {
+ if (streamCreated) {
+ kinesisClient.deleteStream(streamName)
+ }
+ } catch {
+ case e: Exception =>
+ logWarning(s"Could not delete stream $streamName")
+ }
+ }
+
+ def deleteDynamoDBTable(tableName: String): Unit = {
+ try {
+ val table = dynamoDB.getTable(tableName)
+ table.delete()
+ table.waitForDelete()
+ } catch {
+ case e: Exception =>
+ logWarning(s"Could not delete DynamoDB table $tableName")
+ }
+ }
+
+ private def describeStream(streamNameToDescribe: String): Option[StreamDescription] = {
+ try {
+ val describeStreamRequest = new DescribeStreamRequest().withStreamName(streamNameToDescribe)
+ val desc = kinesisClient.describeStream(describeStreamRequest).getStreamDescription()
+ Some(desc)
+ } catch {
+ case rnfe: ResourceNotFoundException =>
+ None
+ }
+ }
+
+ private def findNonExistentStreamName(): String = {
+ var testStreamName: String = null
+ do {
+ Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
+ testStreamName = s"KinesisTestUtils-${math.abs(Random.nextLong())}"
+ } while (describeStream(testStreamName).nonEmpty)
+ testStreamName
+ }
+
+ private def waitForStreamToBeActive(streamNameToWaitFor: String): Unit = {
+ val startTime = System.currentTimeMillis()
+ val endTime = startTime + TimeUnit.SECONDS.toMillis(createStreamTimeoutSeconds)
+ while (System.currentTimeMillis() < endTime) {
+ Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
+ describeStream(streamNameToWaitFor).foreach { description =>
+ val streamStatus = description.getStreamStatus()
+ logDebug(s"\t- current state: $streamStatus\n")
+ if ("ACTIVE".equals(streamStatus)) {
+ return
+ }
+ }
+ }
+ require(false, s"Stream $streamName never became active")
+ }
+}
+
+private[kinesis] object KinesisTestUtils {
+
+ val envVarNameForEnablingTests = "ENABLE_KINESIS_TESTS"
+ val endVarNameForEndpoint = "KINESIS_TEST_ENDPOINT_URL"
+ val defaultEndpointUrl = "https://kinesis.us-west-2.amazonaws.com"
+
+ lazy val shouldRunTests = {
+ val isEnvSet = sys.env.get(envVarNameForEnablingTests) == Some("1")
+ if (isEnvSet) {
+ // scalastyle:off println
+ // Print this so that they are easily visible on the console and not hidden in the log4j logs.
+ println(
+ s"""
+ |Kinesis tests that actually send data has been enabled by setting the environment
+ |variable $envVarNameForEnablingTests to 1. This will create Kinesis Streams and
+ |DynamoDB tables in AWS. Please be aware that this may incur some AWS costs.
+ |By default, the tests use the endpoint URL $defaultEndpointUrl to create Kinesis streams.
+ |To change this endpoint URL to a different region, you can set the environment variable
+ |$endVarNameForEndpoint to the desired endpoint URL
+ |(e.g. $endVarNameForEndpoint="https://kinesis.us-west-2.amazonaws.com").
+ """.stripMargin)
+ // scalastyle:on println
+ }
+ isEnvSet
+ }
+
+ lazy val endpointUrl = {
+ val url = sys.env.getOrElse(endVarNameForEndpoint, defaultEndpointUrl)
+ // scalastyle:off println
+ // Print this so that they are easily visible on the console and not hidden in the log4j logs.
+ println(s"Using endpoint URL $url for creating Kinesis streams for tests.")
+ // scalastyle:on println
+ url
+ }
+
+ def isAWSCredentialsPresent: Boolean = {
+ Try { new DefaultAWSCredentialsProviderChain().getCredentials() }.isSuccess
+ }
+
+ def getAWSCredentials(): AWSCredentials = {
+ assert(shouldRunTests,
+ "Kinesis test not enabled, should not attempt to get AWS credentials")
+ Try { new DefaultAWSCredentialsProviderChain().getCredentials() } match {
+ case Success(cred) => cred
+ case Failure(e) =>
+ throw new Exception(
+ s"""
+ |Kinesis tests enabled using environment variable $envVarNameForEnablingTests
+ |but could not find AWS credentials. Please follow instructions in AWS documentation
+ |to set the credentials in your system such that the DefaultAWSCredentialsProviderChain
+ |can find the credentials.
+ """.stripMargin)
+ }
+ }
+}
+
+/** A wrapper interface that will allow us to consolidate the code for synthetic data generation. */
+private[kinesis] trait KinesisDataGenerator {
+ /** Sends the data to Kinesis and returns the metadata for everything that has been sent. */
+ def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]]
+}
+
+private[kinesis] class SimpleDataGenerator(
+ client: AmazonKinesisClient) extends KinesisDataGenerator {
+ override def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]] = {
+ val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()
+ data.foreach { num =>
+ val str = num.toString
+ val data = ByteBuffer.wrap(str.getBytes())
+ val putRecordRequest = new PutRecordRequest().withStreamName(streamName)
+ .withData(data)
+ .withPartitionKey(str)
+
+ val putRecordResult = client.putRecord(putRecordRequest)
+ val shardId = putRecordResult.getShardId
+ val seqNumber = putRecordResult.getSequenceNumber()
+ val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
+ new ArrayBuffer[(Int, String)]())
+ sentSeqNumbers += ((num, seqNumber))
+ }
+
+ shardIdToSeqNumbers.toMap
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/302d68de/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
new file mode 100644
index 0000000..fdb270e
--- /dev/null
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import com.amazonaws.services.kinesis.producer.{KinesisProducer => KPLProducer, KinesisProducerConfiguration, UserRecordResult}
+import com.google.common.util.concurrent.{FutureCallback, Futures}
+
+private[kinesis] class KPLBasedKinesisTestUtils extends KinesisTestUtils {
+ override protected def getProducer(aggregate: Boolean): KinesisDataGenerator = {
+ if (!aggregate) {
+ new SimpleDataGenerator(kinesisClient)
+ } else {
+ new KPLDataGenerator(regionName)
+ }
+ }
+}
+
+/** A wrapper for the KinesisProducer provided in the KPL. */
+private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataGenerator {
+
+ private lazy val producer: KPLProducer = {
+ val conf = new KinesisProducerConfiguration()
+ .setRecordMaxBufferedTime(1000)
+ .setMaxConnections(1)
+ .setRegion(regionName)
+ .setMetricsLevel("none")
+
+ new KPLProducer(conf)
+ }
+
+ override def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]] = {
+ val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()
+ data.foreach { num =>
+ val str = num.toString
+ val data = ByteBuffer.wrap(str.getBytes())
+ val future = producer.addUserRecord(streamName, str, data)
+ val kinesisCallBack = new FutureCallback[UserRecordResult]() {
+ override def onFailure(t: Throwable): Unit = {} // do nothing
+
+ override def onSuccess(result: UserRecordResult): Unit = {
+ val shardId = result.getShardId
+ val seqNumber = result.getSequenceNumber()
+ val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
+ new ArrayBuffer[(Int, String)]())
+ sentSeqNumbers += ((num, seqNumber))
+ }
+ }
+ Futures.addCallback(future, kinesisCallBack)
+ }
+ producer.flushSync()
+ shardIdToSeqNumbers.toMap
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/302d68de/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
index 52c61df..d85b4cd 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
@@ -40,7 +40,7 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
override def beforeAll(): Unit = {
runIfTestsEnabled("Prepare KinesisTestUtils") {
- testUtils = new KinesisTestUtils()
+ testUtils = new KPLBasedKinesisTestUtils()
testUtils.createStream()
shardIdToDataAndSeqNumbers = testUtils.pushData(testData, aggregate = aggregateTestData)
http://git-wip-us.apache.org/repos/asf/spark/blob/302d68de/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index dee3044..78cec02 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -63,7 +63,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
sc = new SparkContext(conf)
runIfTestsEnabled("Prepare KinesisTestUtils") {
- testUtils = new KinesisTestUtils()
+ testUtils = new KPLBasedKinesisTestUtils()
testUtils.createStream()
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/302d68de/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
deleted file mode 100644
index 7487aa1..0000000
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kinesis
-
-import java.nio.ByteBuffer
-import java.util.concurrent.TimeUnit
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-import scala.util.{Failure, Random, Success, Try}
-
-import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
-import com.amazonaws.regions.RegionUtils
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
-import com.amazonaws.services.dynamodbv2.document.DynamoDB
-import com.amazonaws.services.kinesis.AmazonKinesisClient
-import com.amazonaws.services.kinesis.model._
-import com.amazonaws.services.kinesis.producer.{KinesisProducer, KinesisProducerConfiguration, UserRecordResult}
-import com.google.common.util.concurrent.{FutureCallback, Futures}
-
-import org.apache.spark.Logging
-
-/**
- * Shared utility methods for performing Kinesis tests that actually transfer data
- */
-private[kinesis] class KinesisTestUtils extends Logging {
-
- val endpointUrl = KinesisTestUtils.endpointUrl
- val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
- val streamShardCount = 2
-
- private val createStreamTimeoutSeconds = 300
- private val describeStreamPollTimeSeconds = 1
-
- @volatile
- private var streamCreated = false
-
- @volatile
- private var _streamName: String = _
-
- private lazy val kinesisClient = {
- val client = new AmazonKinesisClient(KinesisTestUtils.getAWSCredentials())
- client.setEndpoint(endpointUrl)
- client
- }
-
- private lazy val dynamoDB = {
- val dynamoDBClient = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain())
- dynamoDBClient.setRegion(RegionUtils.getRegion(regionName))
- new DynamoDB(dynamoDBClient)
- }
-
- private lazy val kinesisProducer: KinesisProducer = {
- val conf = new KinesisProducerConfiguration()
- .setRecordMaxBufferedTime(1000)
- .setMaxConnections(1)
- .setRegion(regionName)
- .setMetricsLevel("none")
-
- new KinesisProducer(conf)
- }
-
- def streamName: String = {
- require(streamCreated, "Stream not yet created, call createStream() to create one")
- _streamName
- }
-
- def createStream(): Unit = {
- require(!streamCreated, "Stream already created")
- _streamName = findNonExistentStreamName()
-
- // Create a stream. The number of shards determines the provisioned throughput.
- logInfo(s"Creating stream ${_streamName}")
- val createStreamRequest = new CreateStreamRequest()
- createStreamRequest.setStreamName(_streamName)
- createStreamRequest.setShardCount(2)
- kinesisClient.createStream(createStreamRequest)
-
- // The stream is now being created. Wait for it to become active.
- waitForStreamToBeActive(_streamName)
- streamCreated = true
- logInfo(s"Created stream ${_streamName}")
- }
-
- /**
- * Push data to Kinesis stream and return a map of
- * shardId -> seq of (data, seq number) pushed to corresponding shard
- */
- def pushData(testData: Seq[Int], aggregate: Boolean): Map[String, Seq[(Int, String)]] = {
- require(streamCreated, "Stream not yet created, call createStream() to create one")
- val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()
-
- testData.foreach { num =>
- val str = num.toString
- val data = ByteBuffer.wrap(str.getBytes())
- if (aggregate) {
- val future = kinesisProducer.addUserRecord(streamName, str, data)
- val kinesisCallBack = new FutureCallback[UserRecordResult]() {
- override def onFailure(t: Throwable): Unit = {} // do nothing
-
- override def onSuccess(result: UserRecordResult): Unit = {
- val shardId = result.getShardId
- val seqNumber = result.getSequenceNumber()
- val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
- new ArrayBuffer[(Int, String)]())
- sentSeqNumbers += ((num, seqNumber))
- }
- }
-
- Futures.addCallback(future, kinesisCallBack)
- kinesisProducer.flushSync() // make sure we send all data before returning the map
- } else {
- val putRecordRequest = new PutRecordRequest().withStreamName(streamName)
- .withData(data)
- .withPartitionKey(str)
-
- val putRecordResult = kinesisClient.putRecord(putRecordRequest)
- val shardId = putRecordResult.getShardId
- val seqNumber = putRecordResult.getSequenceNumber()
- val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
- new ArrayBuffer[(Int, String)]())
- sentSeqNumbers += ((num, seqNumber))
- }
- }
-
- logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}")
- shardIdToSeqNumbers.toMap
- }
-
- /**
- * Expose a Python friendly API.
- */
- def pushData(testData: java.util.List[Int]): Unit = {
- pushData(testData.asScala, aggregate = false)
- }
-
- def deleteStream(): Unit = {
- try {
- if (streamCreated) {
- kinesisClient.deleteStream(streamName)
- }
- } catch {
- case e: Exception =>
- logWarning(s"Could not delete stream $streamName")
- }
- }
-
- def deleteDynamoDBTable(tableName: String): Unit = {
- try {
- val table = dynamoDB.getTable(tableName)
- table.delete()
- table.waitForDelete()
- } catch {
- case e: Exception =>
- logWarning(s"Could not delete DynamoDB table $tableName")
- }
- }
-
- private def describeStream(streamNameToDescribe: String): Option[StreamDescription] = {
- try {
- val describeStreamRequest = new DescribeStreamRequest().withStreamName(streamNameToDescribe)
- val desc = kinesisClient.describeStream(describeStreamRequest).getStreamDescription()
- Some(desc)
- } catch {
- case rnfe: ResourceNotFoundException =>
- None
- }
- }
-
- private def findNonExistentStreamName(): String = {
- var testStreamName: String = null
- do {
- Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
- testStreamName = s"KinesisTestUtils-${math.abs(Random.nextLong())}"
- } while (describeStream(testStreamName).nonEmpty)
- testStreamName
- }
-
- private def waitForStreamToBeActive(streamNameToWaitFor: String): Unit = {
- val startTime = System.currentTimeMillis()
- val endTime = startTime + TimeUnit.SECONDS.toMillis(createStreamTimeoutSeconds)
- while (System.currentTimeMillis() < endTime) {
- Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
- describeStream(streamNameToWaitFor).foreach { description =>
- val streamStatus = description.getStreamStatus()
- logDebug(s"\t- current state: $streamStatus\n")
- if ("ACTIVE".equals(streamStatus)) {
- return
- }
- }
- }
- require(false, s"Stream $streamName never became active")
- }
-}
-
-private[kinesis] object KinesisTestUtils {
-
- val envVarNameForEnablingTests = "ENABLE_KINESIS_TESTS"
- val endVarNameForEndpoint = "KINESIS_TEST_ENDPOINT_URL"
- val defaultEndpointUrl = "https://kinesis.us-west-2.amazonaws.com"
-
- lazy val shouldRunTests = {
- val isEnvSet = sys.env.get(envVarNameForEnablingTests) == Some("1")
- if (isEnvSet) {
- // scalastyle:off println
- // Print this so that they are easily visible on the console and not hidden in the log4j logs.
- println(
- s"""
- |Kinesis tests that actually send data has been enabled by setting the environment
- |variable $envVarNameForEnablingTests to 1. This will create Kinesis Streams and
- |DynamoDB tables in AWS. Please be aware that this may incur some AWS costs.
- |By default, the tests use the endpoint URL $defaultEndpointUrl to create Kinesis streams.
- |To change this endpoint URL to a different region, you can set the environment variable
- |$endVarNameForEndpoint to the desired endpoint URL
- |(e.g. $endVarNameForEndpoint="https://kinesis.us-west-2.amazonaws.com").
- """.stripMargin)
- // scalastyle:on println
- }
- isEnvSet
- }
-
- lazy val endpointUrl = {
- val url = sys.env.getOrElse(endVarNameForEndpoint, defaultEndpointUrl)
- // scalastyle:off println
- // Print this so that they are easily visible on the console and not hidden in the log4j logs.
- println(s"Using endpoint URL $url for creating Kinesis streams for tests.")
- // scalastyle:on println
- url
- }
-
- def isAWSCredentialsPresent: Boolean = {
- Try { new DefaultAWSCredentialsProviderChain().getCredentials() }.isSuccess
- }
-
- def getAWSCredentials(): AWSCredentials = {
- assert(shouldRunTests,
- "Kinesis test not enabled, should not attempt to get AWS credentials")
- Try { new DefaultAWSCredentialsProviderChain().getCredentials() } match {
- case Success(cred) => cred
- case Failure(e) =>
- throw new Exception(
- s"""
- |Kinesis tests enabled using environment variable $envVarNameForEnablingTests
- |but could not find AWS credentials. Please follow instructions in AWS documentation
- |to set the credentials in your system such that the DefaultAWSCredentialsProviderChain
- |can find the credentials.
- """.stripMargin)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/302d68de/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index d50c6b8..a2bfd79 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -1458,7 +1458,6 @@ class KinesisStreamTests(PySparkStreamingTestCase):
InitialPositionInStream.LATEST, 2, StorageLevel.MEMORY_AND_DISK_2,
"awsAccessKey", "awsSecretKey")
- @unittest.skip("Enable it when we fix SPAKR-12058")
def test_kinesis_stream(self):
if not are_kinesis_tests_enabled:
sys.stderr.write(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org