You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/01 21:32:23 UTC

[GitHub] tweise closed pull request #6980: [FLINK-5697] [kinesis] Add periodic per-shard watermark support

tweise closed pull request #6980: [FLINK-5697] [kinesis] Add periodic per-shard watermark support
URL: https://github.com/apache/flink/pull/6980
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 407a5a95524..f0852584ade 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -31,6 +31,7 @@
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
@@ -78,6 +79,17 @@
  * A custom assigner implementation can be set via {@link #setShardAssigner(KinesisShardAssigner)} to optimize the
  * hash function or use static overrides to limit skew.
  *
+ * <p>In order for the consumer to emit watermarks, a timestamp assigner needs to be set via {@link
+ * #setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks)} and the auto watermark emit
+ * interval configured via {@link
+ * org.apache.flink.api.common.ExecutionConfig#setAutoWatermarkInterval(long)}.
+ *
+ * <p>Watermarks can only advance when all shards of a subtask continuously deliver records. To
+ * avoid an inactive or closed shard to block the watermark progress, the idle timeout should be
+ * configured via configuration property {@link
+ * ConsumerConfigConstants#SHARD_IDLE_INTERVAL_MILLIS}. By default, shards won't be considered
+ * idle and watermark calculation will wait for newer records to arrive from all shards.
+ *
  * @param <T> the type of data emitted
  */
 @PublicEvolving
@@ -108,6 +120,8 @@
 	 */
 	private KinesisShardAssigner shardAssigner = KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER;
 
+	private AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
+
 	// ------------------------------------------------------------------------
 	//  Runtime state
 	// ------------------------------------------------------------------------
@@ -220,6 +234,22 @@ public void setShardAssigner(KinesisShardAssigner shardAssigner) {
 		ClosureCleaner.clean(shardAssigner, true);
 	}
 
+	public AssignerWithPeriodicWatermarks<T> getPeriodicWatermarkAssigner() {
+		return periodicWatermarkAssigner;
+	}
+
+	/**
+	 * Set the assigner that will extract the timestamp from {@link T} and calculate the
+	 * watermark.
+	 *
+	 * @param periodicWatermarkAssigner
+	 */
+	public void setPeriodicWatermarkAssigner(
+		AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner) {
+		this.periodicWatermarkAssigner = periodicWatermarkAssigner;
+		ClosureCleaner.clean(this.periodicWatermarkAssigner, true);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Source life cycle
 	// ------------------------------------------------------------------------
@@ -414,7 +444,7 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception {
 			Properties configProps,
 			KinesisDeserializationSchema<T> deserializationSchema) {
 
-		return new KinesisDataFetcher<>(streams, sourceContext, runtimeContext, configProps, deserializationSchema, shardAssigner);
+		return new KinesisDataFetcher<>(streams, sourceContext, runtimeContext, configProps, deserializationSchema, shardAssigner, periodicWatermarkAssigner);
 	}
 
 	@VisibleForTesting
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 443b19ec382..42e2173474b 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -140,6 +140,8 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
 	/** The config to turn on adaptive reads from a shard. */
 	public static final String SHARD_USE_ADAPTIVE_READS = "flink.shard.adaptivereads";
 
+	/** The interval after which to consider a shard idle for purposes of watermark generation. */
+	public static final String SHARD_IDLE_INTERVAL_MILLIS = "flink.shard.idle.interval";
 
 	// ------------------------------------------------------------------------
 	//  Default values for consumer configuration
@@ -190,6 +192,8 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
 
 	public static final boolean DEFAULT_SHARD_USE_ADAPTIVE_READS = false;
 
+	public static final long DEFAULT_SHARD_IDLE_INTERVAL_MILLIS = -1;
+
 	/**
 	 * To avoid shard iterator expires in {@link ShardConsumer}s, the value for the configured
 	 * getRecords interval can not exceed 5 minutes, which is the expire time for retrieved iterators.
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 0981b76ce89..60e7201e913 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -21,7 +21,10 @@
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.metrics.KinesisConsumerMetricConstants;
@@ -35,7 +38,10 @@
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
 
 import com.amazonaws.services.kinesis.model.HashKeyRange;
 import com.amazonaws.services.kinesis.model.SequenceNumberRange;
@@ -51,6 +57,7 @@
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -182,6 +189,30 @@
 
 	private volatile boolean running = true;
 
+	private final AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
+
+	private PeriodicWatermarkEmitter periodicWatermarkEmitter;
+
+	/**
+	 * The watermark related state for each shard consumer. Entries in this map will be created when shards
+	 * are discovered. After recovery, this shard map will be recreated, possibly with different shard index keys,
+	 * since those are transient and not part of checkpointed state.
+	 */
+	private ConcurrentHashMap<Integer, ShardWatermarkState> shardWatermarks = new ConcurrentHashMap<>();
+
+	/**
+	 * The most recent watermark, calculated from the per shard watermarks. The initial value will never be emitted and
+	 * also apply after recovery. The fist watermark that will be emitted is derived from actually consumed records.
+	 * In case of recovery and replay, the watermark will rewind, consistent wth the shard consumer sequence.
+	 */
+	private long lastWatermark = Long.MIN_VALUE;
+
+	/**
+	 * The time span since last consumed record, after which a shard will be considered idle for purpose of watermark
+	 * calculation. A positive value will allow the watermark to progress even when some shards don't receive new records.
+	 */
+	private long shardIdleIntervalMillis = ConsumerConfigConstants.DEFAULT_SHARD_IDLE_INTERVAL_MILLIS;
+
 	/**
 	 * Factory to create Kinesis proxy instances used by a fetcher.
 	 */
@@ -203,7 +234,8 @@ public KinesisDataFetcher(List<String> streams,
 							RuntimeContext runtimeContext,
 							Properties configProps,
 							KinesisDeserializationSchema<T> deserializationSchema,
-							KinesisShardAssigner shardAssigner) {
+							KinesisShardAssigner shardAssigner,
+							AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner) {
 		this(streams,
 			sourceContext,
 			sourceContext.getCheckpointLock(),
@@ -211,6 +243,7 @@ public KinesisDataFetcher(List<String> streams,
 			configProps,
 			deserializationSchema,
 			shardAssigner,
+			periodicWatermarkAssigner,
 			new AtomicReference<>(),
 			new ArrayList<>(),
 			createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
@@ -225,6 +258,7 @@ protected KinesisDataFetcher(List<String> streams,
 								Properties configProps,
 								KinesisDeserializationSchema<T> deserializationSchema,
 								KinesisShardAssigner shardAssigner,
+								AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
 								AtomicReference<Throwable> error,
 								List<KinesisStreamShardState> subscribedShardsState,
 								HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds,
@@ -238,6 +272,7 @@ protected KinesisDataFetcher(List<String> streams,
 		this.indexOfThisConsumerSubtask = runtimeContext.getIndexOfThisSubtask();
 		this.deserializationSchema = checkNotNull(deserializationSchema);
 		this.shardAssigner = checkNotNull(shardAssigner);
+		this.periodicWatermarkAssigner = periodicWatermarkAssigner;
 		this.kinesisProxyFactory = checkNotNull(kinesisProxyFactory);
 		this.kinesis = kinesisProxyFactory.create(configProps);
 
@@ -339,6 +374,20 @@ public void runFetcher() throws Exception {
 			}
 		}
 
+        // start periodic watermark emitter, if a watermark assigner was configured
+		if (periodicWatermarkAssigner != null) {
+			long periodicWatermarkIntervalMillis = runtimeContext.getExecutionConfig().getAutoWatermarkInterval();
+			if (periodicWatermarkIntervalMillis > 0) {
+				ProcessingTimeService timerService = ((StreamingRuntimeContext) runtimeContext).getProcessingTimeService();
+				LOG.info("Starting periodic watermark emitter with interval {}", periodicWatermarkIntervalMillis);
+				this.periodicWatermarkEmitter = new PeriodicWatermarkEmitter(timerService, periodicWatermarkIntervalMillis);
+				this.periodicWatermarkEmitter.start();
+			}
+			this.shardIdleIntervalMillis = Long.parseLong(
+				getConsumerConfiguration().getProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS,
+					Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_IDLE_INTERVAL_MILLIS)));
+		}
+
 		// ------------------------------------------------------------------------
 
 		// finally, start the infinite shard discovery and consumer launching loop;
@@ -546,6 +595,18 @@ protected Properties getConsumerConfiguration() {
 	 * @param lastSequenceNumber the last sequence number value to update
 	 */
 	protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
+		// track per shard watermarks and emit timestamps extracted from the record,
+		// when a watermark assigner was configured.
+		if (periodicWatermarkAssigner != null) {
+			ShardWatermarkState sws = shardWatermarks.get(shardStateIndex);
+			Preconditions.checkNotNull(
+				sws, "shard watermark state initialized in registerNewSubscribedShardState");
+			recordTimestamp =
+				sws.periodicWatermarkAssigner.extractTimestamp(record, sws.lastRecordTimestamp);
+			sws.lastRecordTimestamp = recordTimestamp;
+			sws.lastUpdated = getCurrentTimeMillis();
+		}
+
 		synchronized (checkpointLock) {
 			if (record != null) {
 				sourceContext.collectWithTimestamp(record, recordTimestamp);
@@ -609,7 +670,119 @@ public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
 				this.numberOfActiveShards.incrementAndGet();
 			}
 
-			return subscribedShardsState.size() - 1;
+			int shardStateIndex = subscribedShardsState.size() - 1;
+
+			// track all discovered shards for watermark determination
+			ShardWatermarkState sws = shardWatermarks.get(shardStateIndex);
+			if (sws == null) {
+				sws = new ShardWatermarkState();
+				try {
+					sws.periodicWatermarkAssigner = InstantiationUtil.clone(periodicWatermarkAssigner);
+				} catch (Exception e) {
+					throw new RuntimeException(e);
+				}
+				sws.lastUpdated = getCurrentTimeMillis();
+				sws.lastRecordTimestamp = Long.MIN_VALUE;
+				shardWatermarks.put(shardStateIndex, sws);
+			}
+
+			return shardStateIndex;
+		}
+	}
+
+	/**
+	 * Return the current system time. Allow tests to override this to simulate progress for watermark
+	 * logic.
+	 *
+	 * @return
+	 */
+	@VisibleForTesting
+	protected long getCurrentTimeMillis() {
+		return System.currentTimeMillis();
+	}
+
+	/**
+	 * Called periodically to emit a watermark. Checks all shards for the current event time
+	 * watermark, and possibly emits the next watermark.
+	 *
+	 * <p>Shards that have not received an update for a certain interval are considered inactive so as
+	 * to not hold back the watermark indefinitely. When all shards are inactive, the subtask will be
+	 * marked as temporarily idle to not block downstream operators.
+	 */
+	@VisibleForTesting
+	protected void emitWatermark() {
+		LOG.debug(
+			"###evaluating watermark for subtask {} time {}",
+			indexOfThisConsumerSubtask,
+			getCurrentTimeMillis());
+		long potentialWatermark = Long.MAX_VALUE;
+		long idleTime =
+			(shardIdleIntervalMillis > 0)
+				? getCurrentTimeMillis() - shardIdleIntervalMillis
+				: Long.MAX_VALUE;
+
+		for (Map.Entry<Integer, ShardWatermarkState> e : shardWatermarks.entrySet()) {
+			// consider only active shards, or those that would advance the watermark
+			Watermark w = e.getValue().periodicWatermarkAssigner.getCurrentWatermark();
+			if (w != null && (e.getValue().lastUpdated >= idleTime || w.getTimestamp() > lastWatermark)) {
+				potentialWatermark = Math.min(potentialWatermark, w.getTimestamp());
+			}
+		}
+
+		// advance watermark if possible (watermarks can only be ascending)
+		if (potentialWatermark == Long.MAX_VALUE) {
+			if (shardWatermarks.isEmpty() || shardIdleIntervalMillis > 0) {
+				LOG.debug(
+					"###No active shard for subtask {}, marking the source idle.",
+					indexOfThisConsumerSubtask);
+				// no active shard, signal downstream operators to not wait for a watermark
+				sourceContext.markAsTemporarilyIdle();
+			}
+		} else if (potentialWatermark > lastWatermark) {
+			LOG.debug(
+				"###emitting watermark {} from subtask {}",
+				potentialWatermark,
+				indexOfThisConsumerSubtask);
+			sourceContext.emitWatermark(new Watermark(potentialWatermark));
+			lastWatermark = potentialWatermark;
+		}
+	}
+
+	/** Per shard tracking of watermark and last activity. */
+	private static class ShardWatermarkState<T> {
+		private AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
+		private volatile long lastRecordTimestamp;
+		private volatile long lastUpdated;
+	}
+
+	/**
+	 * The periodic watermark emitter. In its given interval, it checks all shards for the current
+	 * event time watermark, and possibly emits the next watermark.
+	 */
+	private class PeriodicWatermarkEmitter implements ProcessingTimeCallback {
+
+		private final ProcessingTimeService timerService;
+		private final long interval;
+
+		// -------------------------------------------------
+
+		PeriodicWatermarkEmitter(ProcessingTimeService timerService, long autoWatermarkInterval) {
+			this.timerService = checkNotNull(timerService);
+			this.interval = autoWatermarkInterval;
+		}
+
+		// -------------------------------------------------
+
+		public void start() {
+			LOG.debug("registering periodic watermark timer with interval {}", interval);
+			timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
+		}
+
+		@Override
+		public void onProcessingTime(long timestamp) {
+			emitWatermark();
+			// schedule the next watermark
+			timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
 		}
 	}
 
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
index a99e845f249..9a6d2d66a6f 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
@@ -418,7 +418,7 @@ public TestFetcher(
 				HashMap<StreamShardMetadata, SequenceNumber> testStateSnapshot,
 				List<StreamShardHandle> testInitialDiscoveryShards) {
 
-			super(streams, sourceContext, runtimeContext, configProps, deserializationSchema, DEFAULT_SHARD_ASSIGNER);
+			super(streams, sourceContext, runtimeContext, configProps, deserializationSchema, DEFAULT_SHARD_ASSIGNER, null);
 
 			this.testStateSnapshot = testStateSnapshot;
 			this.testInitialDiscoveryShards = testInitialDiscoveryShards;
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index ccf39d0e19b..e3e7287f24a 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -21,6 +21,9 @@
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
@@ -40,9 +43,13 @@
 import com.amazonaws.services.kinesis.model.HashKeyRange;
 import com.amazonaws.services.kinesis.model.SequenceNumberRange;
 import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.junit.Assert;
 import org.junit.Test;
 import org.powermock.reflect.Whitebox;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -710,4 +717,98 @@ public void testIsThisSubtaskShouldSubscribeTo() {
 		assertFalse(KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(2, 2, 1));
 	}
 
+	private static BoundedOutOfOrdernessTimestampExtractor<String> watermarkAssigner =
+		new BoundedOutOfOrdernessTimestampExtractor<String>(Time.milliseconds(0)) {
+			@Override
+			public long extractTimestamp(String element) {
+				return Long.parseLong(element);
+			}
+		};
+
+	@Test
+	public void testPeriodicWatermark() {
+		final MutableLong clock = new MutableLong();
+		final MutableBoolean isTemporaryIdle = new MutableBoolean();
+		final List<Watermark> watermarks = new ArrayList<>();
+
+		String fakeStream1 = "fakeStream1";
+		StreamShardHandle shardHandle =
+			new StreamShardHandle(
+				fakeStream1,
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)));
+
+		TestSourceContext<String> sourceContext =
+			new TestSourceContext<String>() {
+				@Override
+				public void emitWatermark(Watermark mark) {
+					watermarks.add(mark);
+				}
+
+				@Override
+				public void markAsTemporarilyIdle() {
+					isTemporaryIdle.setTrue();
+				}
+			};
+
+		HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest = new HashMap<>();
+
+		final KinesisDataFetcher<String> fetcher =
+			new TestableKinesisDataFetcher<String>(
+				Collections.singletonList(fakeStream1),
+				sourceContext,
+				new java.util.Properties(),
+				new KinesisDeserializationSchemaWrapper<>(new org.apache.flink.streaming.util.serialization.SimpleStringSchema()),
+				1,
+				1,
+				new AtomicReference<>(),
+				new LinkedList<>(),
+				subscribedStreamsToLastSeenShardIdsUnderTest,
+				FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(new HashMap<>())) {
+
+				@Override
+				protected long getCurrentTimeMillis() {
+					return clock.getValue();
+				}
+			};
+		Whitebox.setInternalState(fetcher, "periodicWatermarkAssigner", watermarkAssigner);
+
+		SequenceNumber seq = new SequenceNumber("fakeSequenceNumber");
+		// register shards to subsequently emit records
+		int shardIndex =
+			fetcher.registerNewSubscribedShardState(
+				new KinesisStreamShardState(
+					KinesisDataFetcher.convertToStreamShardMetadata(shardHandle), shardHandle, seq));
+
+		StreamRecord<String> record1 =
+			new StreamRecord<>(String.valueOf(Long.MIN_VALUE), Long.MIN_VALUE);
+		fetcher.emitRecordAndUpdateState(record1.getValue(), record1.getTimestamp(), shardIndex, seq);
+		Assert.assertEquals(record1, sourceContext.getCollectedOutputs().poll());
+
+		fetcher.emitWatermark();
+		Assert.assertTrue("potential watermark equals previous watermark", watermarks.isEmpty());
+
+		StreamRecord<String> record2 = new StreamRecord<>(String.valueOf(1), 1);
+		fetcher.emitRecordAndUpdateState(record2.getValue(), record2.getTimestamp(), shardIndex, seq);
+		Assert.assertEquals(record2, sourceContext.getCollectedOutputs().poll());
+
+		fetcher.emitWatermark();
+		Assert.assertFalse("watermark advanced", watermarks.isEmpty());
+		Assert.assertEquals(new Watermark(record2.getTimestamp()), watermarks.remove(0));
+		Assert.assertFalse("not idle", isTemporaryIdle.booleanValue());
+
+		// test idle timeout
+		long idleTimeout = 10;
+		// advance clock idleTimeout
+		clock.add(idleTimeout + 1);
+		fetcher.emitWatermark();
+		Assert.assertFalse("not idle", isTemporaryIdle.booleanValue());
+		Assert.assertTrue("not idle, no new watermark", watermarks.isEmpty());
+
+		// activate idle timeout
+		Whitebox.setInternalState(fetcher, "shardIdleIntervalMillis", idleTimeout);
+		fetcher.emitWatermark();
+		Assert.assertTrue("idle", isTemporaryIdle.booleanValue());
+		Assert.assertTrue("idle, no watermark", watermarks.isEmpty());
+	}
+
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
index 21588c9a7a7..a44028766e1 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
@@ -69,6 +69,7 @@ public TestableKinesisDataFetcher(
 			fakeConfiguration,
 			deserializationSchema,
 			DEFAULT_SHARD_ASSIGNER,
+			null,
 			thrownErrorUnderTest,
 			subscribedShardsStateUnderTest,
 			subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services