You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2020/02/25 02:18:12 UTC

[spark] branch branch-3.0 updated: [SPARK-30901][DOCS] Fix doc exemple with deprecated codes

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 1b9dbcf  [SPARK-30901][DOCS] Fix doc exemple with deprecated codes
1b9dbcf is described below

commit 1b9dbcf697c0d49534a200b729ffd1d088407fc6
Author: XU Duo <Du...@canal-plus.com>
AuthorDate: Mon Feb 24 20:16:00 2020 -0600

    [SPARK-30901][DOCS] Fix doc exemple with deprecated codes
    
    ### What changes were proposed in this pull request?
    
    Previous exemple given for spark-streaming-kinesis was true for Apache Spark < 2.3.0. After that the method used in exemple became deprecated:
    deprecated("use initialPosition(initialPosition: KinesisInitialPosition)", "2.3.0")
    def initialPositionInStream(initialPosition: InitialPositionInStream)
    
    This PR updates the doc on rewriting exemple in Scala/Java (remain unchanged in Python) to adapt Apache Spark 2.4.0 + releases.
    
    ### Why are the changes needed?
    
    It introduces some confusion for developers to test their spark-streaming-kinesis exemple.
    
    ### Does this PR introduce any user-facing change?
    
    No
    
    ### How was this patch tested?
    
    In my opinion, the change is only about the documentation level, so I did not add any special test.
    
    Closes #27652 from supaggregator/SPARK-30901.
    
    Authored-by: XU Duo <Du...@canal-plus.com>
    Signed-off-by: Sean Owen <sr...@gmail.com>
    (cherry picked from commit 10fa71321f607cb35c33d9b574595b27efcb7633)
    Signed-off-by: Sean Owen <sr...@gmail.com>
---
 docs/streaming-kinesis-integration.md | 24 ++++++++++++------------
 1 file changed, 12 insertions(+), 12 deletions(-)

diff --git a/docs/streaming-kinesis-integration.md b/docs/streaming-kinesis-integration.md
index c815971..e68d513 100644
--- a/docs/streaming-kinesis-integration.md
+++ b/docs/streaming-kinesis-integration.md
@@ -46,14 +46,14 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
             import org.apache.spark.storage.StorageLevel
             import org.apache.spark.streaming.kinesis.KinesisInputDStream
             import org.apache.spark.streaming.{Seconds, StreamingContext}
-            import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+            import org.apache.spark.streaming.kinesis.KinesisInitialPositions
 
             val kinesisStream = KinesisInputDStream.builder
                 .streamingContext(streamingContext)
                 .endpointUrl([endpoint URL])
                 .regionName([region name])
                 .streamName([streamName])
-                .initialPositionInStream([initial position])
+                .initialPosition([initial position])
                 .checkpointAppName([Kinesis app name])
                 .checkpointInterval([checkpoint interval])
                 .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
@@ -68,14 +68,14 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
             import org.apache.spark.streaming.kinesis.KinesisInputDStream;
             import org.apache.spark.streaming.Seconds;
             import org.apache.spark.streaming.StreamingContext;
-            import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+            import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
 
             KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
                 .streamingContext(streamingContext)
                 .endpointUrl([endpoint URL])
                 .regionName([region name])
                 .streamName([streamName])
-                .initialPositionInStream([initial position])
+                .initialPosition([initial position])
                 .checkpointAppName([Kinesis app name])
                 .checkpointInterval([checkpoint interval])
                 .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
@@ -110,7 +110,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
                 import org.apache.spark.storage.StorageLevel
                 import org.apache.spark.streaming.kinesis.KinesisInputDStream
                 import org.apache.spark.streaming.{Seconds, StreamingContext}
-                import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+                import org.apache.spark.streaming.kinesis.KinesisInitialPositions
                 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
                 import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
 
@@ -119,7 +119,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
                     .endpointUrl([endpoint URL])
                     .regionName([region name])
                     .streamName([streamName])
-                    .initialPositionInStream([initial position])
+                    .initialPosition([initial position])
                     .checkpointAppName([Kinesis app name])
                     .checkpointInterval([checkpoint interval])
                     .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
@@ -133,7 +133,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
                 import org.apache.spark.streaming.kinesis.KinesisInputDStream;
                 import org.apache.spark.streaming.Seconds;
                 import org.apache.spark.streaming.StreamingContext;
-                import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+                import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
                 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
                 import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
                 import scala.collection.JavaConverters;
@@ -143,7 +143,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
                     .endpointUrl([endpoint URL])
                     .regionName([region name])
                     .streamName([streamName])
-                    .initialPositionInStream([initial position])
+                    .initialPosition([initial position])
                     .checkpointAppName([Kinesis app name])
                     .checkpointInterval([checkpoint interval])
                     .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
@@ -170,7 +170,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
 
 	- `[checkpoint interval]`: The interval (e.g., Duration(2000) = 2 seconds) at which the Kinesis Client Library saves its position in the stream.  For starters, set it to the same as the batch interval of the streaming application.
 
-	- `[initial position]`: Can be either `InitialPositionInStream.TRIM_HORIZON` or `InitialPositionInStream.LATEST` (see [`Kinesis Checkpointing`](#kinesis-checkpointing) section and [`Amazon Kinesis API documentation`](http://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-sdk.html) for more details).
+	- `[initial position]`: Can be either `KinesisInitialPositions.TrimHorizon` or `KinesisInitialPositions.Latest` or `KinesisInitialPositions.AtTimestamp` (see [`Kinesis Checkpointing`](#kinesis-checkpointing) section and [`Amazon Kinesis API documentation`](http://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-sdk.html) for more details).
 
 	- `[message handler]`: A function that takes a Kinesis `Record` and outputs generic `T`.
 
@@ -272,9 +272,9 @@ de-aggregate records during consumption.
 
 - Checkpointing too frequently will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling.  The provided example handles this throttling with a random-backoff-retry strategy.
 
-- If no Kinesis checkpoint info exists when the input DStream starts, it will start either from the oldest record available (`InitialPositionInStream.TRIM_HORIZON`) or from the latest tip (`InitialPositionInStream.LATEST`).  This is configurable.
-  - `InitialPositionInStream.LATEST` could lead to missed records if data is added to the stream while no input DStreams are running (and no checkpoint info is being stored).
-  - `InitialPositionInStream.TRIM_HORIZON` may lead to duplicate processing of records where the impact is dependent on checkpoint frequency and processing idempotency.
+- If no Kinesis checkpoint info exists when the input DStream starts, it will start either from the oldest record available (`KinesisInitialPositions.TrimHorizon`), or from the latest tip (`KinesisInitialPositions.Latest`), or (except Python) from the position denoted by the provided UTC timestamp (`KinesisInitialPositions.AtTimestamp(Date timestamp)`).  This is configurable.
+  - `KinesisInitialPositions.Latest` could lead to missed records if data is added to the stream while no input DStreams are running (and no checkpoint info is being stored).
+  - `KinesisInitialPositions.TrimHorizon` may lead to duplicate processing of records where the impact is dependent on checkpoint frequency and processing idempotency.
 
 #### Kinesis retry configuration
  - `spark.streaming.kinesis.retry.waitTime` : Wait time between Kinesis retries as a duration string. When reading from Amazon Kinesis, users may hit `ProvisionedThroughputExceededException`'s, when consuming faster than 5 transactions/second or, exceeding the maximum read rate of 2 MiB/second. This configuration can be tweaked to increase the sleep between fetches when a fetch fails to reduce these exceptions. Default is "100ms".


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org