You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2018/07/12 17:04:51 UTC

spark git commit: [SPARK-20168][STREAMING KINESIS] Setting the timestamp directly would cause exception on …

Repository: spark
Updated Branches:
  refs/heads/master e6c6f90a5 -> 9fa4a1ed3


[SPARK-20168][STREAMING KINESIS] Setting the timestamp directly would cause exception on …

Setting the timestamp directly would cause exception on reading stream, it can be set directly only if the mode is not AT_TIMESTAMP

## What changes were proposed in this pull request?

The last patch in the kinesis streaming receiver sets the timestamp for the mode AT_TIMESTAMP, but this mode can only be set via the

`baseClientLibConfiguration.withTimestampAtInitialPositionInStream()
`
and can't be set directly using
`.withInitialPositionInStream()`

This patch fixes the issue.

## How was this patch tested?
Kinesis Receiver doesn't expose the internal state outside, so couldn't find the right way to test this change. Seeking for tips from other contributors here.

Author: Yash Sharma <ys...@atlassian.com>

Closes #21541 from yashs360/ysharma/fix_kinesis_bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9fa4a1ed
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9fa4a1ed
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9fa4a1ed

Branch: refs/heads/master
Commit: 9fa4a1ed38713e2d18a3320d3fc56f9f6db07b06
Parents: e6c6f90
Author: Yash Sharma <ys...@atlassian.com>
Authored: Thu Jul 12 10:04:47 2018 -0700
Committer: Sean Owen <sr...@gmail.com>
Committed: Thu Jul 12 10:04:47 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/streaming/kinesis/KinesisReceiver.scala     | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9fa4a1ed/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
----------------------------------------------------------------------
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index fa0de62..69c5236 100644
--- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -160,7 +160,6 @@ private[kinesis] class KinesisReceiver[T](
         cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
         workerId)
         .withKinesisEndpoint(endpointUrl)
-        .withInitialPositionInStream(initialPosition.getPosition)
         .withTaskBackoffTimeMillis(500)
         .withRegionName(regionName)
 
@@ -169,7 +168,8 @@ private[kinesis] class KinesisReceiver[T](
       initialPosition match {
         case ts: AtTimestamp =>
           baseClientLibConfiguration.withTimestampAtInitialPositionInStream(ts.getTimestamp)
-        case _ => baseClientLibConfiguration
+        case _ =>
+          baseClientLibConfiguration.withInitialPositionInStream(initialPosition.getPosition)
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org