You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/03/31 04:38:30 UTC

[1/2] flink git commit: [FLINK-5625] [kinesis] Configurable date format for timestamp-based start position in FlinkKinesisConsumer

Repository: flink
Updated Branches:
  refs/heads/master ee033c903 -> 193224017


[FLINK-5625] [kinesis] Configurable date format for timestamp-based start position in FlinkKinesisConsumer

This closes #3651.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a119a30d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a119a30d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a119a30d

Branch: refs/heads/master
Commit: a119a30da91bc88b6b0242622adff2189b0a8fa4
Parents: ee033c9
Author: Tony Wei <to...@gmail.com>
Authored: Thu Mar 30 09:48:43 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Mar 31 12:32:19 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kinesis.md                  |  8 ++-
 .../kinesis/config/ConsumerConfigConstants.java |  7 +-
 .../kinesis/internals/ShardConsumer.java        | 12 +++-
 .../kinesis/util/KinesisConfigUtil.java         | 23 +++---
 .../kinesis/FlinkKinesisConsumerTest.java       | 75 ++++++++++++++++----
 5 files changed, 95 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a119a30d/docs/dev/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md
index 59f3d61..ef1afca 100644
--- a/docs/dev/connectors/kinesis.md
+++ b/docs/dev/connectors/kinesis.md
@@ -121,9 +121,11 @@ one of the following values in the provided configuration properties (the naming
 - `LATEST`: read all shards of all streams starting from the latest record.
 - `TRIM_HORIZON`: read all shards of all streams starting from the earliest record possible (data may be trimmed by Kinesis depending on the retention settings).
 - `AT_TIMESTAMP`: read all shards of all streams starting from a specified timestamp. The timestamp must also be specified in the configuration
-properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, either in the date pattern
-`yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, `2016-04-04T19:58:46.480-00:00`), or a non-negative double value representing the number of seconds
-that has elapsed since the Unix epoch (for example, `1459799926.480`).
+properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, in one of the following date pattern :
+    - a non-negative double value representing the number of seconds that has elapsed since the Unix epoch (for example, `1459799926.480`).
+    - a user defined pattern, which is a valid pattern for `SimpleDateFormat` provided by `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT`.
+    If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined then the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`
+    (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` given by user or timestamp value is `2016-04-04T19:58:46.480-00:00` without given a pattern).
 
 #### Fault Tolerance for Exactly-Once User-Defined State Update Semantics
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a119a30d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 4ffe0ad..7c31af4 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -56,9 +56,12 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 	/** The initial position to start reading Kinesis streams from (LATEST is used if not set) */
 	public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";
 
-	/** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */
+	/** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION) */
 	public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.initpos.timestamp";
 
+	/** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION) */
+	public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format";
+
 	/** The base backoff time between each describeStream attempt */
 	public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base";
 
@@ -107,6 +110,8 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 
 	public static final String DEFAULT_STREAM_INITIAL_POSITION = InitialPosition.LATEST.toString();
 
+	public static final String DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
+
 	public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 1000L;
 
 	public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L;

http://git-wip-us.apache.org/repos/asf/flink/blob/a119a30d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index f6c53ce..ca85854 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -30,7 +30,6 @@ import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
-import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,6 +37,7 @@ import java.io.IOException;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.List;
 import java.util.Properties;
@@ -115,9 +115,15 @@ public class ShardConsumer<T> implements Runnable {
 
 		if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
 			String timestamp = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP);
+
 			try {
-				this.initTimestamp = KinesisConfigUtil.initTimestampDateFormat.parse(timestamp);
-			} catch (ParseException e) {
+				String format = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
+					ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT);
+				SimpleDateFormat customDateFormat = new SimpleDateFormat(format);
+				this.initTimestamp = customDateFormat.parse(timestamp);
+			} catch (IllegalArgumentException | NullPointerException exception) {
+				throw new IllegalArgumentException(exception);
+			} catch (ParseException exception) {
 				this.initTimestamp = new Date((long) (Double.parseDouble(timestamp) * 1000));
 			}
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/a119a30d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index 59b8529..244f5a5 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -37,8 +37,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * Utilities for Flink Kinesis connector configuration.
  */
 public class KinesisConfigUtil {
-	public static SimpleDateFormat initTimestampDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
-
 	/**
 	 * Validate configuration properties for {@link FlinkKinesisConsumer}.
 	 */
@@ -67,7 +65,9 @@ public class KinesisConfigUtil {
 					throw new IllegalArgumentException("Please set value for initial timestamp ('"
 						+ ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position.");
 				}
-				validateOptionalDateProperty(config, ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP,
+				validateOptionalDateProperty(config,
+					ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP,
+					config.getProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT),
 					"Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. "
 						+ "Must be a valid format: yyyy-MM-dd'T'HH:mm:ss.SSSXXX or non-negative double value. For example, 2016-04-04T19:58:46.480-00:00 or 1459799926.480 .");
 			}
@@ -222,17 +222,20 @@ public class KinesisConfigUtil {
 		}
 	}
 
-	private static void validateOptionalDateProperty(Properties config, String key, String message) {
-		if (config.containsKey(key)) {
+	private static void validateOptionalDateProperty(Properties config, String timestampKey, String format, String message) {
+		if (config.containsKey(timestampKey)) {
 			try {
-				initTimestampDateFormat.parse(config.getProperty(key));
-			} catch (ParseException parseException) {
+				SimpleDateFormat customDateFormat = new SimpleDateFormat(format);
+				customDateFormat.parse(config.getProperty(timestampKey));
+			} catch (IllegalArgumentException | NullPointerException exception) {
+				throw new IllegalArgumentException(message);
+			} catch (ParseException exception) {
 				try {
-					double value = Double.parseDouble(config.getProperty(key));
+					double value = Double.parseDouble(config.getProperty(timestampKey));
 					if (value < 0) {
-						throw new NumberFormatException();
+						throw new IllegalArgumentException(message);
 					}
-				} catch (NumberFormatException numberFormatException){
+				} catch (NumberFormatException numberFormatException) {
 					throw new IllegalArgumentException(message);
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/a119a30d/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 45eb1bd..741f0ca 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -40,7 +40,7 @@ import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -183,7 +183,7 @@ public class FlinkKinesisConsumerTest {
 	}
 
 	@Test
-	public void testDateStringInValidateOptionDatePropertyForInitialTimestampInConfig() {
+	public void testDateStringForValidateOptionDateProperty() {
 		String timestamp = "2016-04-04T19:58:46.480-00:00";
 
 		Properties testConfig = new Properties();
@@ -194,18 +194,16 @@ public class FlinkKinesisConsumerTest {
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, timestamp);
 
-		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-
 		try {
-			KinesisConfigUtil.initTimestampDateFormat.parse(timestamp);
-		} catch (ParseException e){
+			KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+		} catch (Exception e) {
 			e.printStackTrace();
 			fail();
 		}
 	}
 
 	@Test
-	public void testUnixTimestampInValidateOptionDatePropertyForInitialTimestampInConfig() {
+	public void testUnixTimestampForValidateOptionDateProperty() {
 		String unixTimestamp = "1459799926.480";
 
 		Properties testConfig = new Properties();
@@ -216,14 +214,65 @@ public class FlinkKinesisConsumerTest {
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);
 
+		try {
+			KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail();
+		}
+	}
+
+	@Test
+	public void testInvalidPatternForInitialTimestampInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "2016-03-14");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "InvalidPattern");
+
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
 
-		try{
-			double value = Double.parseDouble(unixTimestamp);
-			if (value < 0) {
-				throw new NumberFormatException();
-			}
-		} catch (Exception e){
+	@Test
+	public void testUnparsableDateForUserDefinedDateFormatForInitialTimestampInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "stillUnparsable");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "yyyy-MM-dd");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testDateStringForUserDefinedDateFormatForValidateOptionDateProperty() {
+		String unixTimestamp = "2016-04-04";
+		String pattern = "yyyy-MM-dd";
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, pattern);
+
+		try {
+			KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+		} catch (Exception e) {
 			e.printStackTrace();
 			fail();
 		}


[2/2] flink git commit: [FLINK-4577] [kinesis] Transparent reshard handling for FlinkKinesisConsumer

Posted by tz...@apache.org.
[FLINK-4577] [kinesis] Transparent reshard handling for FlinkKinesisConsumer

This closes #3458.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/19322401
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/19322401
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/19322401

Branch: refs/heads/master
Commit: 1932240179189a88273f52d3d93f277e7bf604de
Parents: a119a30
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu Mar 2 18:56:39 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Mar 31 12:33:48 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kinesis.md                  | 36 +++++----
 .../kinesis/internals/KinesisDataFetcher.java   | 77 ++++----------------
 2 files changed, 34 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/19322401/docs/dev/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md
index ef1afca..1fcc529 100644
--- a/docs/dev/connectors/kinesis.md
+++ b/docs/dev/connectors/kinesis.md
@@ -23,6 +23,9 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+* This will be replaced by the TOC
+{:toc}
+
 The Kinesis connector provides access to [Amazon AWS Kinesis Streams](http://aws.amazon.com/kinesis/streams/).
 
 To use the connector, add the following Maven dependency to your project:
@@ -53,14 +56,14 @@ mvn clean install -Pinclude-kinesis -DskipTests
 The streaming connectors are not part of the binary distribution. See how to link with them for cluster
 execution [here]({{site.baseurl}}/dev/linking.html).
 
-### Using the Amazon Kinesis Streams Service
+## Using the Amazon Kinesis Streams Service
 Follow the instructions from the [Amazon Kinesis Streams Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)
 to setup Kinesis streams. Make sure to create the appropriate IAM policy and user to read / write to the Kinesis streams.
 
-### Kinesis Consumer
+## Kinesis Consumer
 
 The `FlinkKinesisConsumer` is an exactly-once parallel streaming data source that subscribes to multiple AWS Kinesis
-streams within the same AWS service region, and can handle resharding of streams. Each subtask of the consumer is
+streams within the same AWS service region, and can transparently handle resharding of streams while the job is running. Each subtask of the consumer is
 responsible for fetching data records from multiple Kinesis shards. The number of shards fetched by each subtask will
 change as shards are closed and created by Kinesis.
 
@@ -107,13 +110,16 @@ to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream from
 
 Other optional configuration keys for the consumer can be found in `ConsumerConfigConstants`.
 
-**NOTE:** Currently, resharding can not be handled transparently (i.e., without failing and restarting jobs) if there are idle consumer
-subtasks, which occur when the total number of shards is lower than the configured consumer parallelism. The job must be
-configured to enable checkpointing, so that the new shards due to resharding can be correctly picked up and consumed by the
-Kinesis consumer after the job is restored. This is a temporary limitation that will be resolved in future versions.
-Please see [FLINK-4341](https://issues.apache.org/jira/browse/FLINK-4341) for more detail.
+Note that the configured parallelism of the Flink Kinesis Consumer source
+can be completely independent of the total number of shards in the Kinesis streams.
+When the number of shards is larger than the parallelism of the consumer,
+then each consumer subtask can subscribe to multiple shards; otherwise
+if the number of shards is smaller than the parallelism of the consumer,
+then some consumer subtasks will simply be idle and wait until it gets assigned
+new shards (i.e., when the streams are resharded to increase the
+number of shards for higher provisioned Kinesis service throughput).
 
-#### Configuring Starting Position
+### Configuring Starting Position
 
 The Flink Kinesis Consumer currently provides the following options to configure where to start reading Kinesis streams, simply by setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION` to
 one of the following values in the provided configuration properties (the naming of the options identically follows [the namings used by the AWS Kinesis Streams service](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax)):
@@ -127,7 +133,7 @@ properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIME
     If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined then the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`
     (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` given by user or timestamp value is `2016-04-04T19:58:46.480-00:00` without given a pattern).
 
-#### Fault Tolerance for Exactly-Once User-Defined State Update Semantics
+### Fault Tolerance for Exactly-Once User-Defined State Update Semantics
 
 With Flink's checkpointing enabled, the Flink Kinesis Consumer will consume records from shards in Kinesis streams and
 periodically checkpoint each shard's progress. In case of a job failure, Flink will restore the streaming program to the
@@ -157,7 +163,7 @@ Also note that Flink can only restart the topology if enough processing slots ar
 Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards.
 Flink on YARN supports automatic restart of lost YARN containers.
 
-#### Event Time for Consumed Records
+### Event Time for Consumed Records
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -201,7 +207,7 @@ kinesis = kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner)
 </div>
 </div>
 
-#### Threading Model
+### Threading Model
 
 The Flink Kinesis Consumer uses multiple threads for shard discovery and data consumption.
 
@@ -214,7 +220,7 @@ For data consumption, a single thread will be created to consume each discovered
 shard it is responsible of consuming is closed as a result of stream resharding. In other words, there will always be
 one thread per open shard.
 
-#### Internally Used Kinesis APIs
+### Internally Used Kinesis APIs
 
 The Flink Kinesis Consumer uses the [AWS Java SDK](http://aws.amazon.com/sdk-for-java/) internally to call Kinesis APIs
 for shard discovery and data consumption. Due to Amazon's [service limits for Kinesis Streams](http://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html)
@@ -248,7 +254,7 @@ adjusts the maximum number of records each consuming thread tries to fetch from
 the latter modifies the sleep interval between each fetch (there will be no sleep by default). The retry behaviour of the
 consumer when calling this API can also be modified by using the other keys prefixed by `ConsumerConfigConstants.SHARD_GETRECORDS_*`.
 
-### Kinesis Producer
+## Kinesis Producer
 
 The `FlinkKinesisProducer` is used for putting data from a Flink stream into a Kinesis stream. Note that the producer is not participating in
 Flink's checkpointing and doesn't provide exactly-once processing guarantees.
@@ -305,7 +311,7 @@ Otherwise, the returned stream name is used.
 Other optional configuration keys for the producer can be found in `ProducerConfigConstants`.
 
 
-### Using Non-AWS Kinesis Endpoints for Testing
+## Using Non-AWS Kinesis Endpoints for Testing
 
 It is sometimes desirable to have Flink operate as a consumer or producer against a non-AWS Kinesis endpoint such as
 [Kinesalite](https://github.com/mhart/kinesalite); this is especially useful when performing functional testing of a Flink

http://git-wip-us.apache.org/repos/asf/flink/blob/19322401/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index a06fdca..46847b3 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.connectors.kinesis.internals;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
@@ -325,35 +324,10 @@ public class KinesisDataFetcher<T> {
 				ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
 				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS)));
 
-		// FLINK-4341:
-		// For downstream operators that work on time (ex. window operators), we are required to emit a max value watermark
-		// for subtasks that won't continue to have shards to read from unless resharding happens in the future, otherwise
-		// the downstream watermarks would not advance, leading to unbounded accumulating state.
-		//
-		// The side-effect of this limitation is that on resharding, we must fail hard if the newly discovered shard
-		// is to be subscribed by a subtask that has previously emitted a max value watermark, otherwise the watermarks
-		// will be messed up.
-		//
-		// There are 2 cases were we need to either emit a max value watermark, or deliberately fail hard:
-		//  (a) if this subtask has no more shards to read from unless resharding happens in the future, we emit a max
-		//      value watermark. This case is encountered when 1) all previously read shards by this subtask were closed
-		//      due to resharding, 2) when this subtask was initially only subscribed to closed shards while the consumer
-		//      was told to start from TRIM_HORIZON, or 3) there was initially no shards for this subtask to read on startup.
-		//  (b) this subtask has discovered new shards to read from due to a reshard; if this subtask has already emitted
-		//      a max value watermark, we must deliberately fail hard to avoid messing up the watermarks. The new shards
-		//      will be subscribed by this subtask after restore as initial shards on startup.
-		//
-		// TODO: This is a temporary workaround until a min-watermark information service is available in the JobManager
-		// Please see FLINK-4341 for more detail
-
-		boolean emittedMaxValueWatermark = false;
-
 		if (this.numberOfActiveShards.get() == 0) {
-			// FLINK-4341 workaround case (a) - please see the above for details on this case
-			LOG.info("Subtask {} has no initial shards to read on startup; emitting max value watermark ...",
+			LOG.info("Subtask {} has no active shards to read on startup; marking the subtask as temporarily idle ...",
 				indexOfThisConsumerSubtask);
-			sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
-			emittedMaxValueWatermark = true;
+			sourceContext.markAsTemporarilyIdle();
 		}
 
 		while (running) {
@@ -363,41 +337,6 @@ public class KinesisDataFetcher<T> {
 			}
 			List<KinesisStreamShard> newShardsDueToResharding = discoverNewShardsToSubscribe();
 
-			// -- NOTE: Potential race condition between newShardsDueToResharding and numberOfActiveShards --
-			// Since numberOfActiveShards is updated by parallel shard consuming threads in updateState(), there exists
-			// a race condition with the currently queried newShardsDueToResharding. Therefore, numberOfActiveShards
-			// may not correctly reflect the discover result in the below case determination. This may lead to incorrect
-			// case determination on the current discovery attempt, but can still be correctly handled on future attempts.
-			//
-			// Although this can be resolved by wrapping the current shard discovery attempt with the below
-			// case determination within a synchronized block on the checkpoint lock for atomicity, there will be
-			// considerable throughput performance regression as shard discovery is a remote call to AWS. Therefore,
-			// since the case determination is a temporary workaround for FLINK-4341, the race condition is tolerable as
-			// we can still eventually handle max value watermark emitting / deliberately failing on successive
-			// discovery attempts.
-
-			if (newShardsDueToResharding.size() == 0 && this.numberOfActiveShards.get() == 0 && !emittedMaxValueWatermark) {
-				// FLINK-4341 workaround case (a) - please see the above for details on this case
-				LOG.info("Subtask {} has completed reading all shards; emitting max value watermark ...",
-					indexOfThisConsumerSubtask);
-				sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
-				emittedMaxValueWatermark = true;
-			} else if (newShardsDueToResharding.size() > 0 && emittedMaxValueWatermark) {
-				// FLINK-4341 workaround case (b) - please see the above for details on this case
-				//
-				// Note that in the case where on resharding this subtask ceased to read all of it's previous shards
-				// but new shards is also to be subscribed by this subtask immediately after, emittedMaxValueWatermark
-				// will be false; this allows the fetcher to continue reading the new shards without failing on such cases.
-				// However, due to the race condition mentioned above, we might still fall into case (a) first, and
-				// then (b) on the next discovery attempt. Although the failure is ideally unnecessary, max value
-				// watermark emitting still remains to be correct.
-
-				LOG.warn("Subtask {} has discovered {} new shards to subscribe, but is failing hard to avoid messing" +
-						" up watermarks; the new shards will be subscribed by this subtask after restore ...",
-					indexOfThisConsumerSubtask, newShardsDueToResharding.size());
-				throw new RuntimeException("Deliberate failure to avoid messing up watermarks");
-			}
-
 			for (KinesisStreamShard shard : newShardsDueToResharding) {
 				// since there may be delay in discovering a new shard, all new shards due to
 				// resharding should be read starting from the earliest record possible
@@ -605,9 +544,19 @@ public class KinesisDataFetcher<T> {
 			// if a shard's state is updated to be SENTINEL_SHARD_ENDING_SEQUENCE_NUM by its consumer thread,
 			// we've finished reading the shard and should determine it to be non-active
 			if (lastSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
-				this.numberOfActiveShards.decrementAndGet();
 				LOG.info("Subtask {} has reached the end of subscribed shard: {}",
 					indexOfThisConsumerSubtask, subscribedShardsState.get(shardStateIndex).getKinesisStreamShard());
+
+				// check if we need to mark the source as idle;
+				// note that on resharding, if registerNewSubscribedShardState was invoked for newly discovered shards
+				// AFTER the old shards had reached the end, the subtask's status will be automatically toggled back to
+				// be active immediately afterwards as soon as we collect records from the new shards
+				if (this.numberOfActiveShards.decrementAndGet() == 0) {
+					LOG.info("Subtask {} has reached the end of all currently subscribed shards; marking the subtask as temporarily idle ...",
+						indexOfThisConsumerSubtask);
+
+					sourceContext.markAsTemporarilyIdle();
+				}
 			}
 		}
 	}