You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/21 10:52:13 UTC

[GitHub] asfgit closed pull request #6980: [FLINK-5697] [kinesis] Add periodic per-shard watermark support

asfgit closed pull request #6980: [FLINK-5697] [kinesis] Add periodic per-shard watermark support
URL: https://github.com/apache/flink/pull/6980
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 407a5a95524..3c5e3c7303f 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -31,6 +31,7 @@
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
@@ -78,6 +79,22 @@
  * A custom assigner implementation can be set via {@link #setShardAssigner(KinesisShardAssigner)} to optimize the
  * hash function or use static overrides to limit skew.
  *
+ * <p>In order for the consumer to emit watermarks, a timestamp assigner needs to be set via {@link
+ * #setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks)} and the auto watermark emit
+ * interval configured via {@link
+ * org.apache.flink.api.common.ExecutionConfig#setAutoWatermarkInterval(long)}.
+ *
+ * <p>Watermarks can only advance when all shards of a subtask continuously deliver records. To
+ * avoid an inactive or closed shard to block the watermark progress, the idle timeout should be
+ * configured via configuration property {@link
+ * ConsumerConfigConstants#SHARD_IDLE_INTERVAL_MILLIS}. By default, shards won't be considered
+ * idle and watermark calculation will wait for newer records to arrive from all shards.
+ *
+ * <p>Note that re-sharding of the Kinesis stream while an application (that relies on
+ * the Kinesis records for watermarking) is running can lead to incorrect late events.
+ * This depends on how shards are assigned to subtasks and applies regardless of whether watermarks
+ * are generated in the source or a downstream operator.
+ *
  * @param <T> the type of data emitted
  */
 @PublicEvolving
@@ -108,6 +125,8 @@
 	 */
 	private KinesisShardAssigner shardAssigner = KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER;
 
+	private AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
+
 	// ------------------------------------------------------------------------
 	//  Runtime state
 	// ------------------------------------------------------------------------
@@ -213,13 +232,28 @@ public KinesisShardAssigner getShardAssigner() {
 
 	/**
 	 * Provide a custom assigner to influence how shards are distributed over subtasks.
-	 * @param shardAssigner
+	 * @param shardAssigner shard assigner
 	 */
 	public void setShardAssigner(KinesisShardAssigner shardAssigner) {
 		this.shardAssigner = checkNotNull(shardAssigner, "function can not be null");
 		ClosureCleaner.clean(shardAssigner, true);
 	}
 
+	public AssignerWithPeriodicWatermarks<T> getPeriodicWatermarkAssigner() {
+		return periodicWatermarkAssigner;
+	}
+
+	/**
+	 * Set the assigner that will extract the timestamp from {@link T} and calculate the
+	 * watermark.
+	 * @param periodicWatermarkAssigner periodic watermark assigner
+	 */
+	public void setPeriodicWatermarkAssigner(
+		AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner) {
+		this.periodicWatermarkAssigner = periodicWatermarkAssigner;
+		ClosureCleaner.clean(this.periodicWatermarkAssigner, true);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Source life cycle
 	// ------------------------------------------------------------------------
@@ -414,7 +448,7 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception {
 			Properties configProps,
 			KinesisDeserializationSchema<T> deserializationSchema) {
 
-		return new KinesisDataFetcher<>(streams, sourceContext, runtimeContext, configProps, deserializationSchema, shardAssigner);
+		return new KinesisDataFetcher<>(streams, sourceContext, runtimeContext, configProps, deserializationSchema, shardAssigner, periodicWatermarkAssigner);
 	}
 
 	@VisibleForTesting
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 443b19ec382..42e2173474b 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -140,6 +140,8 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
 	/** The config to turn on adaptive reads from a shard. */
 	public static final String SHARD_USE_ADAPTIVE_READS = "flink.shard.adaptivereads";
 
+	/** The interval after which to consider a shard idle for purposes of watermark generation. */
+	public static final String SHARD_IDLE_INTERVAL_MILLIS = "flink.shard.idle.interval";
 
 	// ------------------------------------------------------------------------
 	//  Default values for consumer configuration
@@ -190,6 +192,8 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
 
 	public static final boolean DEFAULT_SHARD_USE_ADAPTIVE_READS = false;
 
+	public static final long DEFAULT_SHARD_IDLE_INTERVAL_MILLIS = -1;
+
 	/**
 	 * To avoid shard iterator expires in {@link ShardConsumer}s, the value for the configured
 	 * getRecords interval can not exceed 5 minutes, which is the expire time for retrieved iterators.
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 0981b76ce89..77ca23c9d37 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -21,7 +21,10 @@
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.metrics.KinesisConsumerMetricConstants;
@@ -35,7 +38,10 @@
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
 
 import com.amazonaws.services.kinesis.model.HashKeyRange;
 import com.amazonaws.services.kinesis.model.SequenceNumberRange;
@@ -51,6 +57,7 @@
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -182,6 +189,28 @@
 
 	private volatile boolean running = true;
 
+	private final AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
+
+	/**
+	 * The watermark related state for each shard consumer. Entries in this map will be created when shards
+	 * are discovered. After recovery, this shard map will be recreated, possibly with different shard index keys,
+	 * since those are transient and not part of checkpointed state.
+	 */
+	private ConcurrentHashMap<Integer, ShardWatermarkState> shardWatermarks = new ConcurrentHashMap<>();
+
+	/**
+	 * The most recent watermark, calculated from the per shard watermarks. The initial value will never be emitted and
+	 * also apply after recovery. The fist watermark that will be emitted is derived from actually consumed records.
+	 * In case of recovery and replay, the watermark will rewind, consistent wth the shard consumer sequence.
+	 */
+	private long lastWatermark = Long.MIN_VALUE;
+
+	/**
+	 * The time span since last consumed record, after which a shard will be considered idle for purpose of watermark
+	 * calculation. A positive value will allow the watermark to progress even when some shards don't receive new records.
+	 */
+	private long shardIdleIntervalMillis = ConsumerConfigConstants.DEFAULT_SHARD_IDLE_INTERVAL_MILLIS;
+
 	/**
 	 * Factory to create Kinesis proxy instances used by a fetcher.
 	 */
@@ -203,7 +232,8 @@ public KinesisDataFetcher(List<String> streams,
 							RuntimeContext runtimeContext,
 							Properties configProps,
 							KinesisDeserializationSchema<T> deserializationSchema,
-							KinesisShardAssigner shardAssigner) {
+							KinesisShardAssigner shardAssigner,
+							AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner) {
 		this(streams,
 			sourceContext,
 			sourceContext.getCheckpointLock(),
@@ -211,6 +241,7 @@ public KinesisDataFetcher(List<String> streams,
 			configProps,
 			deserializationSchema,
 			shardAssigner,
+			periodicWatermarkAssigner,
 			new AtomicReference<>(),
 			new ArrayList<>(),
 			createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
@@ -225,6 +256,7 @@ protected KinesisDataFetcher(List<String> streams,
 								Properties configProps,
 								KinesisDeserializationSchema<T> deserializationSchema,
 								KinesisShardAssigner shardAssigner,
+								AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
 								AtomicReference<Throwable> error,
 								List<KinesisStreamShardState> subscribedShardsState,
 								HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds,
@@ -238,6 +270,7 @@ protected KinesisDataFetcher(List<String> streams,
 		this.indexOfThisConsumerSubtask = runtimeContext.getIndexOfThisSubtask();
 		this.deserializationSchema = checkNotNull(deserializationSchema);
 		this.shardAssigner = checkNotNull(shardAssigner);
+		this.periodicWatermarkAssigner = periodicWatermarkAssigner;
 		this.kinesisProxyFactory = checkNotNull(kinesisProxyFactory);
 		this.kinesis = kinesisProxyFactory.create(configProps);
 
@@ -339,6 +372,19 @@ public void runFetcher() throws Exception {
 			}
 		}
 
+        // start periodic watermark emitter, if a watermark assigner was configured
+		if (periodicWatermarkAssigner != null) {
+			long periodicWatermarkIntervalMillis = runtimeContext.getExecutionConfig().getAutoWatermarkInterval();
+			if (periodicWatermarkIntervalMillis > 0) {
+				ProcessingTimeService timerService = ((StreamingRuntimeContext) runtimeContext).getProcessingTimeService();
+				LOG.info("Starting periodic watermark emitter with interval {}", periodicWatermarkIntervalMillis);
+				new PeriodicWatermarkEmitter(timerService, periodicWatermarkIntervalMillis).start();
+			}
+			this.shardIdleIntervalMillis = Long.parseLong(
+				getConsumerConfiguration().getProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS,
+					Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_IDLE_INTERVAL_MILLIS)));
+		}
+
 		// ------------------------------------------------------------------------
 
 		// finally, start the infinite shard discovery and consumer launching loop;
@@ -546,6 +592,18 @@ protected Properties getConsumerConfiguration() {
 	 * @param lastSequenceNumber the last sequence number value to update
 	 */
 	protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
+		// track per shard watermarks and emit timestamps extracted from the record,
+		// when a watermark assigner was configured.
+		if (periodicWatermarkAssigner != null) {
+			ShardWatermarkState sws = shardWatermarks.get(shardStateIndex);
+			Preconditions.checkNotNull(
+				sws, "shard watermark state initialized in registerNewSubscribedShardState");
+			recordTimestamp =
+				sws.periodicWatermarkAssigner.extractTimestamp(record, sws.lastRecordTimestamp);
+			sws.lastRecordTimestamp = recordTimestamp;
+			sws.lastUpdated = getCurrentTimeMillis();
+		}
+
 		synchronized (checkpointLock) {
 			if (record != null) {
 				sourceContext.collectWithTimestamp(record, recordTimestamp);
@@ -609,7 +667,110 @@ public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
 				this.numberOfActiveShards.incrementAndGet();
 			}
 
-			return subscribedShardsState.size() - 1;
+			int shardStateIndex = subscribedShardsState.size() - 1;
+
+			// track all discovered shards for watermark determination
+			ShardWatermarkState sws = shardWatermarks.get(shardStateIndex);
+			if (sws == null) {
+				sws = new ShardWatermarkState();
+				try {
+					sws.periodicWatermarkAssigner = InstantiationUtil.clone(periodicWatermarkAssigner);
+				} catch (Exception e) {
+					throw new RuntimeException("Failed to instantiate new WatermarkAssigner", e);
+				}
+				sws.lastUpdated = getCurrentTimeMillis();
+				sws.lastRecordTimestamp = Long.MIN_VALUE;
+				shardWatermarks.put(shardStateIndex, sws);
+			}
+
+			return shardStateIndex;
+		}
+	}
+
+	/**
+	 * Return the current system time. Allow tests to override this to simulate progress for watermark
+	 * logic.
+	 *
+	 * @return current processing time
+	 */
+	@VisibleForTesting
+	protected long getCurrentTimeMillis() {
+		return System.currentTimeMillis();
+	}
+
+	/**
+	 * Called periodically to emit a watermark. Checks all shards for the current event time
+	 * watermark, and possibly emits the next watermark.
+	 *
+	 * <p>Shards that have not received an update for a certain interval are considered inactive so as
+	 * to not hold back the watermark indefinitely. When all shards are inactive, the subtask will be
+	 * marked as temporarily idle to not block downstream operators.
+	 */
+	@VisibleForTesting
+	protected void emitWatermark() {
+		LOG.debug("Evaluating watermark for subtask {} time {}", indexOfThisConsumerSubtask, getCurrentTimeMillis());
+		long potentialWatermark = Long.MAX_VALUE;
+		long idleTime =
+			(shardIdleIntervalMillis > 0)
+				? getCurrentTimeMillis() - shardIdleIntervalMillis
+				: Long.MAX_VALUE;
+
+		for (Map.Entry<Integer, ShardWatermarkState> e : shardWatermarks.entrySet()) {
+			// consider only active shards, or those that would advance the watermark
+			Watermark w = e.getValue().periodicWatermarkAssigner.getCurrentWatermark();
+			if (w != null && (e.getValue().lastUpdated >= idleTime || w.getTimestamp() > lastWatermark)) {
+				potentialWatermark = Math.min(potentialWatermark, w.getTimestamp());
+			}
+		}
+
+		// advance watermark if possible (watermarks can only be ascending)
+		if (potentialWatermark == Long.MAX_VALUE) {
+			if (shardWatermarks.isEmpty() || shardIdleIntervalMillis > 0) {
+				LOG.debug("No active shard for subtask {}, marking the source idle.",
+					indexOfThisConsumerSubtask);
+				// no active shard, signal downstream operators to not wait for a watermark
+				sourceContext.markAsTemporarilyIdle();
+			}
+		} else if (potentialWatermark > lastWatermark) {
+			LOG.debug("Emitting watermark {} from subtask {}",
+				potentialWatermark,
+				indexOfThisConsumerSubtask);
+			sourceContext.emitWatermark(new Watermark(potentialWatermark));
+			lastWatermark = potentialWatermark;
+		}
+	}
+
+	/** Per shard tracking of watermark and last activity. */
+	private static class ShardWatermarkState<T> {
+		private AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
+		private volatile long lastRecordTimestamp;
+		private volatile long lastUpdated;
+	}
+
+	/**
+	 * The periodic watermark emitter. In its given interval, it checks all shards for the current
+	 * event time watermark, and possibly emits the next watermark.
+	 */
+	private class PeriodicWatermarkEmitter implements ProcessingTimeCallback {
+
+		private final ProcessingTimeService timerService;
+		private final long interval;
+
+		PeriodicWatermarkEmitter(ProcessingTimeService timerService, long autoWatermarkInterval) {
+			this.timerService = checkNotNull(timerService);
+			this.interval = autoWatermarkInterval;
+		}
+
+		public void start() {
+			LOG.debug("registering periodic watermark timer with interval {}", interval);
+			timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
+		}
+
+		@Override
+		public void onProcessingTime(long timestamp) {
+			emitWatermark();
+			// schedule the next watermark
+			timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
 		}
 	}
 
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
index a99e845f249..9a6d2d66a6f 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
@@ -418,7 +418,7 @@ public TestFetcher(
 				HashMap<StreamShardMetadata, SequenceNumber> testStateSnapshot,
 				List<StreamShardHandle> testInitialDiscoveryShards) {
 
-			super(streams, sourceContext, runtimeContext, configProps, deserializationSchema, DEFAULT_SHARD_ASSIGNER);
+			super(streams, sourceContext, runtimeContext, configProps, deserializationSchema, DEFAULT_SHARD_ASSIGNER, null);
 
 			this.testStateSnapshot = testStateSnapshot;
 			this.testInitialDiscoveryShards = testInitialDiscoveryShards;
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index c18f4309be4..84e18bdc98c 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -23,7 +23,6 @@
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
@@ -32,7 +31,13 @@
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
@@ -41,10 +46,16 @@
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.CollectingSourceContext;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
 import com.amazonaws.services.kinesis.model.HashKeyRange;
 import com.amazonaws.services.kinesis.model.SequenceNumberRange;
@@ -59,16 +70,21 @@
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -243,8 +259,7 @@ public void testListStateChangedAfterSnapshotState() throws Exception {
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testFetcherShouldNotBeRestoringFromFailureIfNotRestoringFromCheckpoint() throws Exception {
-		KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
-		PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
+		KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
 
 		// assume the given config is correct
 		PowerMockito.mockStatic(KinesisConfigUtil.class);
@@ -286,11 +301,10 @@ public void testFetcherShouldBeCorrectlySeededIfRestoringFromCheckpoint() throws
 		// mock fetcher
 		// ----------------------------------------------------------------------
 
-		KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
+		KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
 		List<StreamShardHandle> shards = new ArrayList<>();
 		shards.addAll(fakeRestoredState.keySet());
 		when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
-		PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
 
 		// assume the given config is correct
 		PowerMockito.mockStatic(KinesisConfigUtil.class);
@@ -348,11 +362,10 @@ public void testFetcherShouldBeCorrectlySeededOnlyItsOwnStates() throws Exceptio
 		// mock fetcher
 		// ----------------------------------------------------------------------
 
-		KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
+		KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
 		List<StreamShardHandle> shards = new ArrayList<>();
 		shards.addAll(fakeRestoredState.keySet());
 		when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
-		PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
 
 		// assume the given config is correct
 		PowerMockito.mockStatic(KinesisConfigUtil.class);
@@ -441,13 +454,12 @@ public void testFetcherShouldBeCorrectlySeededWithNewDiscoveredKinesisStreamShar
 		// mock fetcher
 		// ----------------------------------------------------------------------
 
-		KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
+		KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
 		List<StreamShardHandle> shards = new ArrayList<>();
 		shards.addAll(fakeRestoredState.keySet());
 		shards.add(new StreamShardHandle("fakeStream2",
 			new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))));
 		when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
-		PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
 
 		// assume the given config is correct
 		PowerMockito.mockStatic(KinesisConfigUtil.class);
@@ -553,7 +565,7 @@ public void testFindSequenceNumberToRestoreFromIfTheShardHasBeenClosedSinceTheSt
 		// mock fetcher
 		// ----------------------------------------------------------------------
 
-		KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
+		KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
 		List<StreamShardHandle> shards = new ArrayList<>();
 
 		// create a fake stream shard handle based on the first entry in the restored state
@@ -567,7 +579,6 @@ public void testFindSequenceNumberToRestoreFromIfTheShardHasBeenClosedSinceTheSt
 		shards.add(closedStreamShardHandle);
 
 		when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
-		PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
 
 		// assume the given config is correct
 		PowerMockito.mockStatic(KinesisConfigUtil.class);
@@ -664,34 +675,166 @@ public void addAll(List<T> values) throws Exception {
 		return fakeRestoredState;
 	}
 
-	/**
-	 * A non-serializable {@link KinesisDeserializationSchema} (because it is a nested class with reference
-	 * to the enclosing class, which is not serializable) used for testing.
-	 */
-	private final class NonSerializableDeserializationSchema implements KinesisDeserializationSchema<String> {
-		@Override
-		public String deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId) throws IOException {
-			return new String(recordValue);
-		}
+	private static KinesisDataFetcher mockKinesisDataFetcher() throws Exception {
+		KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
 
-		@Override
-		public TypeInformation<String> getProducedType() {
-			return BasicTypeInfo.STRING_TYPE_INFO;
+		java.lang.reflect.Constructor<KinesisDataFetcher> ctor = (java.lang.reflect.Constructor<KinesisDataFetcher>) KinesisDataFetcher.class.getConstructors()[0];
+		Class<?>[] otherParamTypes = new Class<?>[ctor.getParameterTypes().length - 1];
+		System.arraycopy(ctor.getParameterTypes(), 1, otherParamTypes, 0, ctor.getParameterTypes().length - 1);
+
+		Supplier<Object[]> argumentSupplier = () -> {
+			Object[] otherParamArgs = new Object[otherParamTypes.length];
+			for (int i = 0; i < otherParamTypes.length; i++) {
+				otherParamArgs[i] = Mockito.nullable(otherParamTypes[i]);
+			}
+			return otherParamArgs;
+		};
+		PowerMockito.whenNew(ctor).withArguments(Mockito.any(ctor.getParameterTypes()[0]),
+			argumentSupplier.get()).thenReturn(mockedFetcher);
+		return mockedFetcher;
+	}
+
+	@Test
+	public void testPeriodicWatermark() throws Exception {
+
+		String streamName = "fakeStreamName";
+		Time maxOutOfOrderness = Time.milliseconds(5);
+		long autoWatermarkInterval = 1_000;
+
+		HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds = new HashMap<>();
+		subscribedStreamsToLastDiscoveredShardIds.put(streamName, null);
+
+		KinesisDeserializationSchema<String> deserializationSchema = new KinesisDeserializationSchemaWrapper<>(
+			new SimpleStringSchema());
+		Properties props = new Properties();
+		props.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, Long.toString(10L));
+
+		BlockingQueue<String> shard1 = new LinkedBlockingQueue();
+		BlockingQueue<String> shard2 = new LinkedBlockingQueue();
+
+		Map<String, List<BlockingQueue<String>>> streamToQueueMap = new HashMap<>();
+		streamToQueueMap.put(streamName, Lists.newArrayList(shard1, shard2));
+
+		// override createFetcher to mock Kinesis
+		FlinkKinesisConsumer<String> sourceFunc =
+			new FlinkKinesisConsumer<String>(streamName, deserializationSchema, props) {
+				@Override
+				protected KinesisDataFetcher<String> createFetcher(
+					List<String> streams,
+					SourceContext<String> sourceContext,
+					RuntimeContext runtimeContext,
+					Properties configProps,
+					KinesisDeserializationSchema<String> deserializationSchema) {
+
+					KinesisDataFetcher<String> fetcher =
+						new KinesisDataFetcher<String>(
+							streams,
+							sourceContext,
+							sourceContext.getCheckpointLock(),
+							runtimeContext,
+							configProps,
+							deserializationSchema,
+							getShardAssigner(),
+							getPeriodicWatermarkAssigner(),
+							new AtomicReference<>(),
+							new ArrayList<>(),
+							subscribedStreamsToLastDiscoveredShardIds,
+							(props) -> FakeKinesisBehavioursFactory.blockingQueueGetRecords(streamToQueueMap)
+							) {};
+					return fetcher;
+				}
+			};
+
+		sourceFunc.setShardAssigner(
+			(streamShardHandle, i) -> {
+				// shardId-000000000000
+				return Integer.parseInt(
+					streamShardHandle.getShard().getShardId().substring("shardId-".length()));
+			});
+
+		sourceFunc.setPeriodicWatermarkAssigner(new TestTimestampExtractor(maxOutOfOrderness));
+
+		// there is currently no test harness specifically for sources,
+		// so we overlay the source thread here
+		AbstractStreamOperatorTestHarness<Object> testHarness =
+			new AbstractStreamOperatorTestHarness<Object>(
+				new StreamSource(sourceFunc), 1, 1, 0);
+		testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
+		testHarness.getExecutionConfig().setAutoWatermarkInterval(autoWatermarkInterval);
+
+		testHarness.initializeState(null);
+		testHarness.open();
+
+		ConcurrentLinkedQueue<Watermark> watermarks = new ConcurrentLinkedQueue<>();
+
+		@SuppressWarnings("unchecked")
+		SourceFunction.SourceContext<String> sourceContext = new CollectingSourceContext(
+			testHarness.getCheckpointLock(), testHarness.getOutput()) {
+			@Override
+			public void emitWatermark(Watermark mark) {
+				watermarks.add(mark);
+			}
+		};
+
+		new Thread(
+			() -> {
+				try {
+					sourceFunc.run(sourceContext);
+				} catch (InterruptedException e) {
+					// expected on cancel
+				} catch (Exception e) {
+					throw new RuntimeException(e);
+				}
+			})
+			.start();
+
+		shard1.put("1");
+		shard1.put("2");
+		shard2.put("10");
+		int recordCount = 3;
+		int watermarkCount = 0;
+		awaitRecordCount(testHarness.getOutput(), recordCount);
+
+		// trigger watermark emit
+		testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval);
+		watermarkCount++;
+
+		// advance watermark
+		shard1.put("10");
+		recordCount++;
+		awaitRecordCount(testHarness.getOutput(), recordCount);
+
+		// trigger watermark emit
+		testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval);
+		watermarkCount++;
+
+		sourceFunc.cancel();
+		testHarness.close();
+
+		assertEquals("record count", recordCount, testHarness.getOutput().size());
+		assertEquals("watermark count", watermarkCount, watermarks.size());
+		assertThat(watermarks, org.hamcrest.Matchers.contains(new Watermark(-3), new Watermark(5)));
+	}
+
+	private void awaitRecordCount(ConcurrentLinkedQueue<? extends Object> queue, int count) throws Exception {
+		long timeoutMillis = System.currentTimeMillis() + 10_000;
+		while (System.currentTimeMillis() < timeoutMillis && queue.size() < count) {
+			Thread.sleep(10);
 		}
 	}
 
-	/**
-	 * A static, serializable {@link KinesisDeserializationSchema}.
-	 */
-	private static final class SerializableDeserializationSchema implements KinesisDeserializationSchema<String> {
-		@Override
-		public String deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId) throws IOException {
-			return new String(recordValue);
+	private static class TestTimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<String> {
+		private static final long serialVersionUID = 1L;
+
+		public TestTimestampExtractor(Time maxAllowedLateness) {
+			super(maxAllowedLateness);
 		}
 
 		@Override
-		public TypeInformation<String> getProducedType() {
-			return BasicTypeInfo.STRING_TYPE_INFO;
+		public long extractTimestamp(String element) {
+			return Long.parseLong(element);
 		}
 	}
+
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index ccf39d0e19b..e3e7287f24a 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -21,6 +21,9 @@
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
@@ -40,9 +43,13 @@
 import com.amazonaws.services.kinesis.model.HashKeyRange;
 import com.amazonaws.services.kinesis.model.SequenceNumberRange;
 import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.junit.Assert;
 import org.junit.Test;
 import org.powermock.reflect.Whitebox;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -710,4 +717,98 @@ public void testIsThisSubtaskShouldSubscribeTo() {
 		assertFalse(KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(2, 2, 1));
 	}
 
+	private static BoundedOutOfOrdernessTimestampExtractor<String> watermarkAssigner =
+		new BoundedOutOfOrdernessTimestampExtractor<String>(Time.milliseconds(0)) {
+			@Override
+			public long extractTimestamp(String element) {
+				return Long.parseLong(element);
+			}
+		};
+
+	@Test
+	public void testPeriodicWatermark() {
+		final MutableLong clock = new MutableLong();
+		final MutableBoolean isTemporaryIdle = new MutableBoolean();
+		final List<Watermark> watermarks = new ArrayList<>();
+
+		String fakeStream1 = "fakeStream1";
+		StreamShardHandle shardHandle =
+			new StreamShardHandle(
+				fakeStream1,
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)));
+
+		TestSourceContext<String> sourceContext =
+			new TestSourceContext<String>() {
+				@Override
+				public void emitWatermark(Watermark mark) {
+					watermarks.add(mark);
+				}
+
+				@Override
+				public void markAsTemporarilyIdle() {
+					isTemporaryIdle.setTrue();
+				}
+			};
+
+		HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest = new HashMap<>();
+
+		final KinesisDataFetcher<String> fetcher =
+			new TestableKinesisDataFetcher<String>(
+				Collections.singletonList(fakeStream1),
+				sourceContext,
+				new java.util.Properties(),
+				new KinesisDeserializationSchemaWrapper<>(new org.apache.flink.streaming.util.serialization.SimpleStringSchema()),
+				1,
+				1,
+				new AtomicReference<>(),
+				new LinkedList<>(),
+				subscribedStreamsToLastSeenShardIdsUnderTest,
+				FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(new HashMap<>())) {
+
+				@Override
+				protected long getCurrentTimeMillis() {
+					return clock.getValue();
+				}
+			};
+		Whitebox.setInternalState(fetcher, "periodicWatermarkAssigner", watermarkAssigner);
+
+		SequenceNumber seq = new SequenceNumber("fakeSequenceNumber");
+		// register shards to subsequently emit records
+		int shardIndex =
+			fetcher.registerNewSubscribedShardState(
+				new KinesisStreamShardState(
+					KinesisDataFetcher.convertToStreamShardMetadata(shardHandle), shardHandle, seq));
+
+		StreamRecord<String> record1 =
+			new StreamRecord<>(String.valueOf(Long.MIN_VALUE), Long.MIN_VALUE);
+		fetcher.emitRecordAndUpdateState(record1.getValue(), record1.getTimestamp(), shardIndex, seq);
+		Assert.assertEquals(record1, sourceContext.getCollectedOutputs().poll());
+
+		fetcher.emitWatermark();
+		Assert.assertTrue("potential watermark equals previous watermark", watermarks.isEmpty());
+
+		StreamRecord<String> record2 = new StreamRecord<>(String.valueOf(1), 1);
+		fetcher.emitRecordAndUpdateState(record2.getValue(), record2.getTimestamp(), shardIndex, seq);
+		Assert.assertEquals(record2, sourceContext.getCollectedOutputs().poll());
+
+		fetcher.emitWatermark();
+		Assert.assertFalse("watermark advanced", watermarks.isEmpty());
+		Assert.assertEquals(new Watermark(record2.getTimestamp()), watermarks.remove(0));
+		Assert.assertFalse("not idle", isTemporaryIdle.booleanValue());
+
+		// test idle timeout
+		long idleTimeout = 10;
+		// advance clock idleTimeout
+		clock.add(idleTimeout + 1);
+		fetcher.emitWatermark();
+		Assert.assertFalse("not idle", isTemporaryIdle.booleanValue());
+		Assert.assertTrue("not idle, no new watermark", watermarks.isEmpty());
+
+		// activate idle timeout
+		Whitebox.setInternalState(fetcher, "shardIdleIntervalMillis", idleTimeout);
+		fetcher.emitWatermark();
+		Assert.assertTrue("idle", isTemporaryIdle.booleanValue());
+		Assert.assertTrue("idle, no watermark", watermarks.isEmpty());
+	}
+
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index eb3415572c0..c8e1f0919dc 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -22,20 +22,25 @@
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.util.Preconditions;
 
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.HashKeyRange;
 import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
 import com.amazonaws.services.kinesis.model.Shard;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -102,6 +107,10 @@ public static KinesisProxyInterface initialNumOfRecordsAfterNumOfGetRecordsCalls
 				millisBehindLatest);
 	}
 
+	public static KinesisProxyInterface blockingQueueGetRecords(Map<String, List<BlockingQueue<String>>> streamsToShardQueues) {
+		return new BlockingQueueKinesis(streamsToShardQueues);
+	}
+
 	private static class SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis extends SingleShardEmittingFixNumOfRecordsKinesis {
 
 		private final long millisBehindLatest;
@@ -387,4 +396,85 @@ public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
 			return null;
 		}
 	}
+
+	private static class BlockingQueueKinesis implements KinesisProxyInterface {
+
+		private Map<String, List<StreamShardHandle>> streamsWithListOfShards = new HashMap<>();
+		private Map<String, BlockingQueue<String>> shardIteratorToQueueMap = new HashMap<>();
+
+		private static String getShardIterator(StreamShardHandle shardHandle) {
+			return shardHandle.getStreamName() + "-" + shardHandle.getShard().getShardId();
+		}
+
+		public BlockingQueueKinesis(Map<String, List<BlockingQueue<String>>> streamsToShardCount) {
+			for (Map.Entry<String, List<BlockingQueue<String>>> streamToShardQueues : streamsToShardCount.entrySet()) {
+				String streamName = streamToShardQueues.getKey();
+				int shardCount = streamToShardQueues.getValue().size();
+
+				if (shardCount == 0) {
+					// don't do anything
+				} else {
+					List<StreamShardHandle> shardsOfStream = new ArrayList<>(shardCount);
+					for (int i = 0; i < shardCount; i++) {
+						StreamShardHandle shardHandle = new StreamShardHandle(
+							streamName,
+							new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i))
+								.withSequenceNumberRange(new SequenceNumberRange().withStartingSequenceNumber("0"))
+								.withHashKeyRange(new HashKeyRange().withStartingHashKey("0").withEndingHashKey("0")));
+						shardsOfStream.add(shardHandle);
+						shardIteratorToQueueMap.put(getShardIterator(shardHandle), streamToShardQueues.getValue().get(i));
+					}
+					streamsWithListOfShards.put(streamName, shardsOfStream);
+				}
+			}
+		}
+
+		@Override
+		public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) {
+			GetShardListResult result = new GetShardListResult();
+			for (Map.Entry<String, List<StreamShardHandle>> streamsWithShards : streamsWithListOfShards.entrySet()) {
+				String streamName = streamsWithShards.getKey();
+				for (StreamShardHandle shard : streamsWithShards.getValue()) {
+					if (streamNamesWithLastSeenShardIds.get(streamName) == null) {
+						result.addRetrievedShardToStream(streamName, shard);
+					} else {
+						if (StreamShardHandle.compareShardIds(
+							shard.getShard().getShardId(), streamNamesWithLastSeenShardIds.get(streamName)) > 0) {
+							result.addRetrievedShardToStream(streamName, shard);
+						}
+					}
+				}
+			}
+			return result;
+		}
+
+		@Override
+		public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) {
+			return getShardIterator(shard);
+		}
+
+		@Override
+		public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
+			BlockingQueue<String> queue = Preconditions.checkNotNull(this.shardIteratorToQueueMap.get(shardIterator),
+			"no queue for iterator %s", shardIterator);
+			List<Record> records = Collections.emptyList();
+			try {
+				String data = queue.take();
+				Record record = new Record()
+					.withData(
+						ByteBuffer.wrap(String.valueOf(data).getBytes(ConfigConstants.DEFAULT_CHARSET)))
+					.withPartitionKey(UUID.randomUUID().toString())
+					.withApproximateArrivalTimestamp(new Date(System.currentTimeMillis()))
+					.withSequenceNumber(String.valueOf(0));
+				records = Collections.singletonList(record);
+			} catch (InterruptedException e) {
+				shardIterator = null;
+			}
+			return new GetRecordsResult()
+				.withRecords(records)
+				.withMillisBehindLatest(0L)
+				.withNextShardIterator(shardIterator);
+		}
+	}
+
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
index 21588c9a7a7..a44028766e1 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
@@ -69,6 +69,7 @@ public TestableKinesisDataFetcher(
 			fakeConfiguration,
 			deserializationSchema,
 			DEFAULT_SHARD_ASSIGNER,
+			null,
 			thrownErrorUnderTest,
 			subscribedShardsStateUnderTest,
 			subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
diff --git a/pom.xml b/pom.xml
index 2a5dfced140..e6446094d07 100644
--- a/pom.xml
+++ b/pom.xml
@@ -124,7 +124,7 @@ under the License.
 		<avro.version>1.8.2</avro.version>
 		<junit.version>4.12</junit.version>
 		<mockito.version>2.21.0</mockito.version>
-		<powermock.version>2.0.0-beta.5</powermock.version>
+		<powermock.version>2.0.0-RC.4</powermock.version>
 		<hamcrest.version>1.3</hamcrest.version>
 		<japicmp.skip>false</japicmp.skip>
 		<codebase>new</codebase>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services