You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/05/23 08:05:58 UTC
spark git commit: [SPARK-7838] [STREAMING] Set scope for kinesis
stream
Repository: spark
Updated Branches:
refs/heads/master 017b3404a -> baa89838c
[SPARK-7838] [STREAMING] Set scope for kinesis stream
Author: Tathagata Das <ta...@gmail.com>
Closes #6369 from tdas/SPARK-7838 and squashes the following commits:
87d1c7f [Tathagata Das] Addressed comment
37775d8 [Tathagata Das] set scope for kinesis stream
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/baa89838
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/baa89838
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/baa89838
Branch: refs/heads/master
Commit: baa89838cca96fa091c9e5ce62be01e1a265d820
Parents: 017b340
Author: Tathagata Das <ta...@gmail.com>
Authored: Fri May 22 23:05:54 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Fri May 22 23:05:54 2015 -0700
----------------------------------------------------------------------
.../org/apache/spark/streaming/kinesis/KinesisUtils.scala | 9 ++++++---
.../scala/org/apache/spark/streaming/StreamingContext.scala | 2 +-
2 files changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/baa89838/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
index b114bcf..2531aeb 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
@@ -63,9 +63,12 @@ object KinesisUtils {
checkpointInterval: Duration,
storageLevel: StorageLevel
): ReceiverInputDStream[Array[Byte]] = {
- ssc.receiverStream(
- new KinesisReceiver(kinesisAppName, streamName, endpointUrl, validateRegion(regionName),
- initialPositionInStream, checkpointInterval, storageLevel, None))
+ // Setting scope to override receiver stream's scope of "receiver stream"
+ ssc.withNamedScope("kinesis stream") {
+ ssc.receiverStream(
+ new KinesisReceiver(kinesisAppName, streamName, endpointUrl, validateRegion(regionName),
+ initialPositionInStream, checkpointInterval, storageLevel, None))
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/baa89838/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 7b77d44..5e58ed7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -262,7 +262,7 @@ class StreamingContext private[streaming] (
*
* Note: Return statements are NOT allowed in the given body.
*/
- private def withNamedScope[U](name: String)(body: => U): U = {
+ private[streaming] def withNamedScope[U](name: String)(body: => U): U = {
RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org