You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by th...@apache.org on 2019/05/29 13:21:40 UTC

[flink] branch master updated: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer

This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 793a784  [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer
793a784 is described below

commit 793a78407aa22530448efbf18b714952eac40aba
Author: Thomas Weise <th...@apache.org>
AuthorDate: Wed May 22 21:42:15 2019 -0700

    [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer
---
 .../connectors/kinesis/FlinkKinesisConsumer.java   |  18 +-
 .../kinesis/config/ConsumerConfigConstants.java    |  11 +
 .../internals/DynamoDBStreamsDataFetcher.java      |   1 +
 .../kinesis/internals/KinesisDataFetcher.java      | 292 +++++++++++++++++++--
 .../kinesis/util/JobManagerWatermarkTracker.java   | 179 +++++++++++++
 .../connectors/kinesis/util/RecordEmitter.java     | 269 +++++++++++++++++++
 .../connectors/kinesis/util/WatermarkTracker.java  | 114 ++++++++
 .../kinesis/FlinkKinesisConsumerMigrationTest.java |   2 +-
 .../kinesis/FlinkKinesisConsumerTest.java          | 185 +++++++++++++
 .../kinesis/internals/ShardConsumerTest.java       |   9 +-
 .../testutils/TestableKinesisDataFetcher.java      |   1 +
 .../util/JobManagerWatermarkTrackerTest.java       | 101 +++++++
 .../connectors/kinesis/util/RecordEmitterTest.java |  81 ++++++
 .../kinesis/util/WatermarkTrackerTest.java         | 108 ++++++++
 14 files changed, 1342 insertions(+), 29 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 3c5e3c7..5b24ded 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -45,6 +45,7 @@ import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.slf4j.Logger;
@@ -126,6 +127,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 	private KinesisShardAssigner shardAssigner = KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER;
 
 	private AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
+	private WatermarkTracker watermarkTracker;
 
 	// ------------------------------------------------------------------------
 	//  Runtime state
@@ -254,6 +256,20 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 		ClosureCleaner.clean(this.periodicWatermarkAssigner, true);
 	}
 
+	public WatermarkTracker getWatermarkTracker() {
+		return this.watermarkTracker;
+	}
+
+	/**
+	 * Set the global watermark tracker. When set, it will be used by the fetcher
+	 * to align the shard consumers by event time.
+	 * @param watermarkTracker
+	 */
+	public void setWatermarkTracker(WatermarkTracker watermarkTracker) {
+		this.watermarkTracker = watermarkTracker;
+		ClosureCleaner.clean(this.watermarkTracker, true);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Source life cycle
 	// ------------------------------------------------------------------------
@@ -448,7 +464,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 			Properties configProps,
 			KinesisDeserializationSchema<T> deserializationSchema) {
 
-		return new KinesisDataFetcher<>(streams, sourceContext, runtimeContext, configProps, deserializationSchema, shardAssigner, periodicWatermarkAssigner);
+		return new KinesisDataFetcher<>(streams, sourceContext, runtimeContext, configProps, deserializationSchema, shardAssigner, periodicWatermarkAssigner, watermarkTracker);
 	}
 
 	@VisibleForTesting
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 41ac6b8..2f5be97 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -125,6 +125,15 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 	/** The interval after which to consider a shard idle for purposes of watermark generation. */
 	public static final String SHARD_IDLE_INTERVAL_MILLIS = "flink.shard.idle.interval";
 
+	/** The interval for periodically synchronizing the shared watermark state. */
+	public static final String WATERMARK_SYNC_MILLIS = "flink.watermark.sync.interval";
+
+	/** The maximum delta allowed for the reader to advance ahead of the shared global watermark. */
+	public static final String WATERMARK_LOOKAHEAD_MILLIS = "flink.watermark.lookahead.millis";
+
+	/** The maximum number of records that will be buffered before suspending consumption of a shard. */
+	public static final String WATERMARK_SYNC_QUEUE_CAPACITY = "flink.watermark.sync.queue.capacity";
+
 	// ------------------------------------------------------------------------
 	//  Default values for consumer configuration
 	// ------------------------------------------------------------------------
@@ -173,6 +182,8 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 
 	public static final long DEFAULT_SHARD_IDLE_INTERVAL_MILLIS = -1;
 
+	public static final long DEFAULT_WATERMARK_SYNC_MILLIS = 30_000;
+
 	/**
 	 * To avoid shard iterator expires in {@link ShardConsumer}s, the value for the configured
 	 * getRecords interval can not exceed 5 minutes, which is the expire time for retrieved iterators.
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java
index c2b7be3..5620142 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java
@@ -64,6 +64,7 @@ public class DynamoDBStreamsDataFetcher<T> extends KinesisDataFetcher<T> {
 			deserializationSchema,
 			shardAssigner,
 			null,
+			null,
 			new AtomicReference<>(),
 			new ArrayList<>(),
 			createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 8c8d94a..eae3153 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -38,6 +38,9 @@ import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter;
+import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
+import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.InstantiationUtil;
@@ -191,6 +194,9 @@ public class KinesisDataFetcher<T> {
 	private volatile boolean running = true;
 
 	private final AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
+	private final WatermarkTracker watermarkTracker;
+	private final transient RecordEmitter recordEmitter;
+	private transient boolean isIdle;
 
 	/**
 	 * The watermark related state for each shard consumer. Entries in this map will be created when shards
@@ -207,6 +213,14 @@ public class KinesisDataFetcher<T> {
 	private long lastWatermark = Long.MIN_VALUE;
 
 	/**
+	 * The next watermark used for synchronization.
+	 * For purposes of global watermark calculation, we need to consider the next watermark based
+	 * on the buffered records vs. the last emitted watermark to allow for progress.
+	 */
+	private long nextWatermark = Long.MIN_VALUE;
+
+
+	/**
 	 * The time span since last consumed record, after which a shard will be considered idle for purpose of watermark
 	 * calculation. A positive value will allow the watermark to progress even when some shards don't receive new records.
 	 */
@@ -220,6 +234,82 @@ public class KinesisDataFetcher<T> {
 	}
 
 	/**
+	 * The wrapper that holds the watermark handling related parameters
+	 * of a record produced by the shard consumer thread.
+	 *
+	 * @param <T>
+	 */
+	private static class RecordWrapper<T> extends TimestampedValue<T> {
+		int shardStateIndex;
+		SequenceNumber lastSequenceNumber;
+		long timestamp;
+		Watermark watermark;
+
+		private RecordWrapper(T record, long timestamp) {
+			super(record, timestamp);
+			this.timestamp = timestamp;
+		}
+
+		@Override
+		public long getTimestamp() {
+			return timestamp;
+		}
+	}
+
+	/** Kinesis data fetcher specific, asynchronous record emitter. */
+	private class AsyncKinesisRecordEmitter extends RecordEmitter<RecordWrapper<T>> {
+
+		private AsyncKinesisRecordEmitter() {
+			this(DEFAULT_QUEUE_CAPACITY);
+		}
+
+		private AsyncKinesisRecordEmitter(int queueCapacity) {
+			super(queueCapacity);
+		}
+
+		@Override
+		public void emit(RecordWrapper<T> record, RecordQueue<RecordWrapper<T>> queue) {
+			emitRecordAndUpdateState(record);
+			ShardWatermarkState<T> sws = shardWatermarks.get(queue.getQueueId());
+			sws.lastEmittedRecordWatermark = record.watermark;
+		}
+	}
+
+	/** Synchronous emitter for use w/o watermark synchronization. */
+	private class SyncKinesisRecordEmitter extends AsyncKinesisRecordEmitter {
+		private final ConcurrentHashMap<Integer, RecordQueue<RecordWrapper<T>>> queues =
+			new ConcurrentHashMap<>();
+
+		@Override
+		public RecordQueue<RecordWrapper<T>> getQueue(int producerIndex) {
+			return queues.computeIfAbsent(producerIndex, (key) -> {
+				return new RecordQueue<RecordWrapper<T>>() {
+					@Override
+					public void put(RecordWrapper<T> record) {
+						emit(record, this);
+					}
+
+					@Override
+					public int getQueueId() {
+						return producerIndex;
+					}
+
+					@Override
+					public int getSize() {
+						return 0;
+					}
+
+					@Override
+					public RecordWrapper<T> peek() {
+						return null;
+					}
+
+				};
+			});
+		}
+	}
+
+	/**
 	 * Creates a Kinesis Data Fetcher.
 	 *
 	 * @param streams the streams to subscribe to
@@ -234,7 +324,8 @@ public class KinesisDataFetcher<T> {
 							Properties configProps,
 							KinesisDeserializationSchema<T> deserializationSchema,
 							KinesisShardAssigner shardAssigner,
-							AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner) {
+							AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
+							WatermarkTracker watermarkTracker) {
 		this(streams,
 			sourceContext,
 			sourceContext.getCheckpointLock(),
@@ -243,6 +334,7 @@ public class KinesisDataFetcher<T> {
 			deserializationSchema,
 			shardAssigner,
 			periodicWatermarkAssigner,
+			watermarkTracker,
 			new AtomicReference<>(),
 			new ArrayList<>(),
 			createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
@@ -258,6 +350,7 @@ public class KinesisDataFetcher<T> {
 								KinesisDeserializationSchema<T> deserializationSchema,
 								KinesisShardAssigner shardAssigner,
 								AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
+								WatermarkTracker watermarkTracker,
 								AtomicReference<Throwable> error,
 								List<KinesisStreamShardState> subscribedShardsState,
 								HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds,
@@ -272,6 +365,7 @@ public class KinesisDataFetcher<T> {
 		this.deserializationSchema = checkNotNull(deserializationSchema);
 		this.shardAssigner = checkNotNull(shardAssigner);
 		this.periodicWatermarkAssigner = periodicWatermarkAssigner;
+		this.watermarkTracker = watermarkTracker;
 		this.kinesisProxyFactory = checkNotNull(kinesisProxyFactory);
 		this.kinesis = kinesisProxyFactory.create(configProps);
 
@@ -284,6 +378,17 @@ public class KinesisDataFetcher<T> {
 
 		this.shardConsumersExecutor =
 			createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks());
+		this.recordEmitter = createRecordEmitter(configProps);
+	}
+
+	private RecordEmitter createRecordEmitter(Properties configProps) {
+		if (periodicWatermarkAssigner != null && watermarkTracker != null) {
+			int queueCapacity = Integer.parseInt(configProps.getProperty(
+				ConsumerConfigConstants.WATERMARK_SYNC_QUEUE_CAPACITY,
+				Integer.toString(AsyncKinesisRecordEmitter.DEFAULT_QUEUE_CAPACITY)));
+			return new AsyncKinesisRecordEmitter(queueCapacity);
+		}
+		return new SyncKinesisRecordEmitter();
 	}
 
 	/**
@@ -380,16 +485,37 @@ public class KinesisDataFetcher<T> {
 				ProcessingTimeService timerService = ((StreamingRuntimeContext) runtimeContext).getProcessingTimeService();
 				LOG.info("Starting periodic watermark emitter with interval {}", periodicWatermarkIntervalMillis);
 				new PeriodicWatermarkEmitter(timerService, periodicWatermarkIntervalMillis).start();
+				if (watermarkTracker != null) {
+					// setup global watermark tracking
+					long watermarkSyncMillis = Long.parseLong(
+						getConsumerConfiguration().getProperty(ConsumerConfigConstants.WATERMARK_SYNC_MILLIS,
+							Long.toString(ConsumerConfigConstants.DEFAULT_WATERMARK_SYNC_MILLIS)));
+					watermarkTracker.setUpdateTimeoutMillis(watermarkSyncMillis * 3); // synchronization latency
+					watermarkTracker.open(runtimeContext);
+					new WatermarkSyncCallback(timerService, watermarkSyncMillis).start();
+					// emit records ahead of watermark to offset synchronization latency
+					long lookaheadMillis = Long.parseLong(
+						getConsumerConfiguration().getProperty(ConsumerConfigConstants.WATERMARK_LOOKAHEAD_MILLIS,
+							Long.toString(0)));
+					recordEmitter.setMaxLookaheadMillis(Math.max(lookaheadMillis, watermarkSyncMillis * 3));
+				}
 			}
 			this.shardIdleIntervalMillis = Long.parseLong(
 				getConsumerConfiguration().getProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS,
 					Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_IDLE_INTERVAL_MILLIS)));
+
+			// run record emitter in separate thread since main thread is used for discovery
+			Thread thread = new Thread(this.recordEmitter);
+			thread.setName("recordEmitter-" + runtimeContext.getTaskNameWithSubtasks());
+			thread.setDaemon(true);
+			thread.start();
 		}
 
 		// ------------------------------------------------------------------------
 
 		// finally, start the infinite shard discovery and consumer launching loop;
 		// we will escape from this loop only when shutdownFetcher() or stopWithError() is called
+		// TODO: have this thread emit the records for tracking backpressure
 
 		final long discoveryIntervalMillis = Long.valueOf(
 			configProps.getProperty(
@@ -490,6 +616,11 @@ public class KinesisDataFetcher<T> {
 			mainThread.interrupt(); // the main thread may be sleeping for the discovery interval
 		}
 
+		if (watermarkTracker != null) {
+			watermarkTracker.close();
+		}
+		this.recordEmitter.stop();
+
 		if (LOG.isInfoEnabled()) {
 			LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask);
 		}
@@ -603,28 +734,48 @@ public class KinesisDataFetcher<T> {
 	 * @param lastSequenceNumber the last sequence number value to update
 	 */
 	protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
-		// track per shard watermarks and emit timestamps extracted from the record,
-		// when a watermark assigner was configured.
-		if (periodicWatermarkAssigner != null) {
-			ShardWatermarkState sws = shardWatermarks.get(shardStateIndex);
-			Preconditions.checkNotNull(
-				sws, "shard watermark state initialized in registerNewSubscribedShardState");
+		ShardWatermarkState sws = shardWatermarks.get(shardStateIndex);
+		Preconditions.checkNotNull(
+			sws, "shard watermark state initialized in registerNewSubscribedShardState");
+		Watermark watermark = null;
+		if (sws.periodicWatermarkAssigner != null) {
 			recordTimestamp =
 				sws.periodicWatermarkAssigner.extractTimestamp(record, sws.lastRecordTimestamp);
-			sws.lastRecordTimestamp = recordTimestamp;
-			sws.lastUpdated = getCurrentTimeMillis();
+			// track watermark per record since extractTimestamp has side effect
+			watermark = sws.periodicWatermarkAssigner.getCurrentWatermark();
 		}
+		sws.lastRecordTimestamp = recordTimestamp;
+		sws.lastUpdated = getCurrentTimeMillis();
 
+		RecordWrapper<T> recordWrapper = new RecordWrapper<>(record, recordTimestamp);
+		recordWrapper.shardStateIndex = shardStateIndex;
+		recordWrapper.lastSequenceNumber = lastSequenceNumber;
+		recordWrapper.watermark = watermark;
+		try {
+			sws.emitQueue.put(recordWrapper);
+		} catch (InterruptedException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	/**
+	 * Actual record emission called from the record emitter.
+	 *
+	 * <p>Responsible for tracking per shard watermarks and emit timestamps extracted from
+	 * the record, when a watermark assigner was configured.
+	 *
+	 * @param rw
+	 */
+	private void emitRecordAndUpdateState(RecordWrapper<T> rw) {
 		synchronized (checkpointLock) {
-			if (record != null) {
-				sourceContext.collectWithTimestamp(record, recordTimestamp);
+			if (rw.getValue() != null) {
+				sourceContext.collectWithTimestamp(rw.getValue(), rw.timestamp);
 			} else {
 				LOG.warn("Skipping non-deserializable record at sequence number {} of shard {}.",
-					lastSequenceNumber,
-					subscribedShardsState.get(shardStateIndex).getStreamShardHandle());
+					rw.lastSequenceNumber,
+					subscribedShardsState.get(rw.shardStateIndex).getStreamShardHandle());
 			}
-
-			updateState(shardStateIndex, lastSequenceNumber);
+			updateState(rw.shardStateIndex, rw.lastSequenceNumber);
 		}
 	}
 
@@ -689,6 +840,7 @@ public class KinesisDataFetcher<T> {
 				} catch (Exception e) {
 					throw new RuntimeException("Failed to instantiate new WatermarkAssigner", e);
 				}
+				sws.emitQueue = recordEmitter.getQueue(shardStateIndex);
 				sws.lastUpdated = getCurrentTimeMillis();
 				sws.lastRecordTimestamp = Long.MIN_VALUE;
 				shardWatermarks.put(shardStateIndex, sws);
@@ -721,41 +873,57 @@ public class KinesisDataFetcher<T> {
 	protected void emitWatermark() {
 		LOG.debug("Evaluating watermark for subtask {} time {}", indexOfThisConsumerSubtask, getCurrentTimeMillis());
 		long potentialWatermark = Long.MAX_VALUE;
+		long potentialNextWatermark = Long.MAX_VALUE;
 		long idleTime =
 			(shardIdleIntervalMillis > 0)
 				? getCurrentTimeMillis() - shardIdleIntervalMillis
 				: Long.MAX_VALUE;
 
 		for (Map.Entry<Integer, ShardWatermarkState> e : shardWatermarks.entrySet()) {
+			Watermark w = e.getValue().lastEmittedRecordWatermark;
 			// consider only active shards, or those that would advance the watermark
-			Watermark w = e.getValue().periodicWatermarkAssigner.getCurrentWatermark();
-			if (w != null && (e.getValue().lastUpdated >= idleTime || w.getTimestamp() > lastWatermark)) {
+			if (w != null && (e.getValue().lastUpdated >= idleTime
+				|| e.getValue().emitQueue.getSize() > 0
+				|| w.getTimestamp() > lastWatermark)) {
 				potentialWatermark = Math.min(potentialWatermark, w.getTimestamp());
+				// for sync, use the watermark of the next record, when available
+				// otherwise watermark may stall when record is blocked by synchronization
+				RecordEmitter.RecordQueue<RecordWrapper<T>> q = e.getValue().emitQueue;
+				RecordWrapper<T> nextRecord = q.peek();
+				Watermark nextWatermark = (nextRecord != null) ? nextRecord.watermark : w;
+				potentialNextWatermark = Math.min(potentialNextWatermark, nextWatermark.getTimestamp());
 			}
 		}
 
 		// advance watermark if possible (watermarks can only be ascending)
 		if (potentialWatermark == Long.MAX_VALUE) {
 			if (shardWatermarks.isEmpty() || shardIdleIntervalMillis > 0) {
-				LOG.debug("No active shard for subtask {}, marking the source idle.",
+				LOG.info("No active shard for subtask {}, marking the source idle.",
 					indexOfThisConsumerSubtask);
 				// no active shard, signal downstream operators to not wait for a watermark
 				sourceContext.markAsTemporarilyIdle();
+				isIdle = true;
 			}
-		} else if (potentialWatermark > lastWatermark) {
-			LOG.debug("Emitting watermark {} from subtask {}",
-				potentialWatermark,
-				indexOfThisConsumerSubtask);
-			sourceContext.emitWatermark(new Watermark(potentialWatermark));
-			lastWatermark = potentialWatermark;
+		} else {
+			if (potentialWatermark > lastWatermark) {
+				LOG.debug("Emitting watermark {} from subtask {}",
+					potentialWatermark,
+					indexOfThisConsumerSubtask);
+				sourceContext.emitWatermark(new Watermark(potentialWatermark));
+				lastWatermark = potentialWatermark;
+				isIdle = false;
+			}
+			nextWatermark = potentialNextWatermark;
 		}
 	}
 
 	/** Per shard tracking of watermark and last activity. */
 	private static class ShardWatermarkState<T> {
 		private AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
+		private RecordEmitter.RecordQueue<RecordWrapper<T>> emitQueue;
 		private volatile long lastRecordTimestamp;
 		private volatile long lastUpdated;
+		private volatile Watermark lastEmittedRecordWatermark;
 	}
 
 	/**
@@ -785,6 +953,82 @@ public class KinesisDataFetcher<T> {
 		}
 	}
 
+	/** Timer task to update shared watermark state. */
+	private class WatermarkSyncCallback implements ProcessingTimeCallback {
+
+		private final ProcessingTimeService timerService;
+		private final long interval;
+		private final MetricGroup shardMetricsGroup;
+		private long lastGlobalWatermark = Long.MIN_VALUE;
+		private long propagatedLocalWatermark = Long.MIN_VALUE;
+		private long logIntervalMillis = 60_000;
+		private int stalledWatermarkIntervalCount = 0;
+		private long lastLogged;
+
+		WatermarkSyncCallback(ProcessingTimeService timerService, long interval) {
+			this.timerService = checkNotNull(timerService);
+			this.interval = interval;
+			this.shardMetricsGroup = consumerMetricGroup.addGroup("subtaskId",
+				String.valueOf(indexOfThisConsumerSubtask));
+			this.shardMetricsGroup.gauge("localWatermark", () -> nextWatermark);
+			this.shardMetricsGroup.gauge("globalWatermark", () -> lastGlobalWatermark);
+		}
+
+		public void start() {
+			LOG.info("Registering watermark tracker with interval {}", interval);
+			timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
+		}
+
+		@Override
+		public void onProcessingTime(long timestamp) {
+			if (nextWatermark != Long.MIN_VALUE) {
+				long globalWatermark = lastGlobalWatermark;
+				// TODO: refresh watermark while idle
+				if (!(isIdle && nextWatermark == propagatedLocalWatermark)) {
+					globalWatermark = watermarkTracker.updateWatermark(nextWatermark);
+					propagatedLocalWatermark = nextWatermark;
+				} else {
+					LOG.info("WatermarkSyncCallback subtask: {} is idle", indexOfThisConsumerSubtask);
+				}
+
+				if (timestamp - lastLogged > logIntervalMillis) {
+					lastLogged = System.currentTimeMillis();
+					LOG.info("WatermarkSyncCallback subtask: {} local watermark: {}"
+							+ ", global watermark: {}, delta: {} timeouts: {}, emitter: {}",
+						indexOfThisConsumerSubtask,
+						nextWatermark,
+						globalWatermark,
+						nextWatermark - globalWatermark,
+						watermarkTracker.getUpdateTimeoutCount(),
+						recordEmitter.printInfo());
+
+					// Following is for debugging non-reproducible issue with stalled watermark
+					if (globalWatermark == nextWatermark && globalWatermark == lastGlobalWatermark
+						&& stalledWatermarkIntervalCount++ > 5) {
+						// subtask blocks watermark, log to aid troubleshooting
+						stalledWatermarkIntervalCount = 0;
+						for (Map.Entry<Integer, ShardWatermarkState> e : shardWatermarks.entrySet()) {
+							RecordEmitter.RecordQueue<RecordWrapper<T>> q = e.getValue().emitQueue;
+							RecordWrapper<T> nextRecord = q.peek();
+							if (nextRecord != null) {
+								LOG.info("stalled watermark {} key {} next watermark {} next timestamp {}",
+									nextWatermark,
+									e.getKey(),
+									nextRecord.watermark,
+									nextRecord.timestamp);
+							}
+						}
+					}
+				}
+
+				lastGlobalWatermark = globalWatermark;
+				recordEmitter.setCurrentWatermark(globalWatermark);
+			}
+			// schedule next callback
+			timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
+		}
+	}
+
 	/**
 	 * Registers a metric group associated with the shard id of the provided {@link KinesisStreamShardState shardState}.
 	 *
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
new file mode 100644
index 0000000..f150bb0
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A {@link WatermarkTracker} that shares state through {@link GlobalAggregateManager}.
+ */
+@PublicEvolving
+public class JobManagerWatermarkTracker extends WatermarkTracker {
+
+	private GlobalAggregateManager aggregateManager;
+	private final String aggregateName;
+	private final WatermarkAggregateFunction aggregateFunction = new WatermarkAggregateFunction();
+	private final long logAccumulatorIntervalMillis;
+	private long updateTimeoutCount;
+
+	public JobManagerWatermarkTracker(String aggregateName) {
+		this(aggregateName, -1);
+	}
+
+	public JobManagerWatermarkTracker(String aggregateName, long logAccumulatorIntervalMillis) {
+		super();
+		this.aggregateName = aggregateName;
+		this.logAccumulatorIntervalMillis = logAccumulatorIntervalMillis;
+	}
+
+	@Override
+	public long updateWatermark(long localWatermark) {
+		WatermarkUpdate update = new WatermarkUpdate();
+		update.id = getSubtaskId();
+		update.watermark = localWatermark;
+		try {
+			byte[] resultBytes = aggregateManager.updateGlobalAggregate(aggregateName,
+				InstantiationUtil.serializeObject(update), aggregateFunction);
+			WatermarkResult result = InstantiationUtil.deserializeObject(resultBytes,
+				this.getClass().getClassLoader());
+			this.updateTimeoutCount += result.updateTimeoutCount;
+			return result.watermark;
+		} catch (ClassNotFoundException | IOException ex) {
+			throw new RuntimeException(ex);
+		}
+	}
+
+	@Override
+	public void open(RuntimeContext context) {
+		super.open(context);
+		this.aggregateFunction.updateTimeoutMillis = super.getUpdateTimeoutMillis();
+		this.aggregateFunction.logAccumulatorIntervalMillis = logAccumulatorIntervalMillis;
+		Preconditions.checkArgument(context instanceof StreamingRuntimeContext);
+		StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) context;
+		this.aggregateManager = runtimeContext.getGlobalAggregateManager();
+	}
+
+	public long getUpdateTimeoutCount() {
+		return updateTimeoutCount;
+	}
+
+	/** Watermark aggregation input. */
+	protected static class WatermarkUpdate implements Serializable {
+		protected long watermark = Long.MIN_VALUE;
+		protected String id;
+	}
+
+	/** Watermark aggregation result. */
+	protected static class WatermarkResult implements Serializable {
+		protected long watermark = Long.MIN_VALUE;
+		protected long updateTimeoutCount = 0;
+	}
+
+	/**
+	 * Aggregate function for computing a combined watermark of parallel subtasks.
+	 */
+	private static class WatermarkAggregateFunction implements
+		AggregateFunction<byte[], Map<String, WatermarkState>, byte[]> {
+
+		private long updateTimeoutMillis = DEFAULT_UPDATE_TIMEOUT_MILLIS;
+		private long logAccumulatorIntervalMillis = -1;
+
+		// TODO: wrap accumulator
+		static long addCount;
+		static long lastLogged;
+		private static final Logger LOG = LoggerFactory.getLogger(WatermarkAggregateFunction.class);
+
+		@Override
+		public Map<String, WatermarkState> createAccumulator() {
+			return new HashMap<>();
+		}
+
+		@Override
+		public Map<String, WatermarkState> add(byte[] valueBytes, Map<String, WatermarkState> accumulator) {
+			addCount++;
+			final WatermarkUpdate value;
+			try {
+				value = InstantiationUtil.deserializeObject(valueBytes, this.getClass().getClassLoader());
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+			WatermarkState ws = accumulator.get(value.id);
+			if (ws == null) {
+				accumulator.put(value.id, ws = new WatermarkState());
+			}
+			ws.watermark = value.watermark;
+			ws.lastUpdated = System.currentTimeMillis();
+			return accumulator;
+		}
+
+		@Override
+		public byte[] getResult(Map<String, WatermarkState> accumulator) {
+			long updateTimeoutCount = 0;
+			long currentTime = System.currentTimeMillis();
+			long globalWatermark = Long.MAX_VALUE;
+			for (Map.Entry<String, WatermarkState> e : accumulator.entrySet()) {
+				WatermarkState ws = e.getValue();
+				if (ws.lastUpdated + updateTimeoutMillis < currentTime) {
+					// ignore outdated entry
+					updateTimeoutCount++;
+					continue;
+				}
+				globalWatermark = Math.min(ws.watermark, globalWatermark);
+			}
+
+			WatermarkResult result = new WatermarkResult();
+			result.watermark = globalWatermark == Long.MAX_VALUE ? Long.MIN_VALUE : globalWatermark;
+			result.updateTimeoutCount = updateTimeoutCount;
+
+			if (logAccumulatorIntervalMillis > 0) {
+				if (currentTime - lastLogged > logAccumulatorIntervalMillis) {
+					lastLogged = System.currentTimeMillis();
+					LOG.info("WatermarkAggregateFunction added: {}, timeout: {}, map: {}",
+						addCount, updateTimeoutCount, accumulator);
+				}
+			}
+
+			try {
+				return InstantiationUtil.serializeObject(result);
+			} catch (IOException e) {
+				throw new RuntimeException(e);
+			}
+		}
+
+		@Override
+		public Map<String, WatermarkState> merge(Map<String, WatermarkState> accumulatorA, Map<String, WatermarkState> accumulatorB) {
+			// not required
+			throw new UnsupportedOperationException();
+		}
+	}
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
new file mode 100644
index 0000000..17344b1
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Emitter that handles event time synchronization between producer threads.
+ *
+ * <p>Records are organized into per producer queues that will block when capacity is exhausted.
+ *
+ * <p>Records are emitted by selecting the oldest available element of all producer queues,
+ * as long as the timestamp does not exceed the current shared watermark plus allowed lookahead interval.
+ *
+ * @param <T>
+ */
+@Internal
+public abstract class RecordEmitter<T extends TimestampedValue> implements Runnable {
+	private static final Logger LOG = LoggerFactory.getLogger(RecordEmitter.class);
+
+	/**
+	 * The default capacity of a single queue.
+	 *
+	 * <p>Larger queue size can lead to higher throughput, but also to
+	 * very high heap space consumption, depending on the size of elements.
+	 *
+	 * <p>Note that this is difficult to tune, because it does not take into account
+	 * the size of individual objects.
+	 */
+	public static final int DEFAULT_QUEUE_CAPACITY = 100;
+
+	private final int queueCapacity;
+	private final ConcurrentHashMap<Integer, AsyncRecordQueue<T>> queues = new ConcurrentHashMap<>();
+	private final ConcurrentHashMap<AsyncRecordQueue<T>, Boolean> emptyQueues = new ConcurrentHashMap<>();
+	private final PriorityQueue<AsyncRecordQueue<T>> heads = new PriorityQueue<>(this::compareHeadElement);
+	private volatile boolean running = true;
+	private volatile long maxEmitTimestamp = Long.MAX_VALUE;
+	private long maxLookaheadMillis = 60 * 1000; // one minute
+	private long idleSleepMillis = 100;
+	private final Object condition = new Object();
+
+	public RecordEmitter(int queueCapacity) {
+		this.queueCapacity = queueCapacity;
+	}
+
+	private int compareHeadElement(AsyncRecordQueue left, AsyncRecordQueue right) {
+		return Long.compare(left.headTimestamp, right.headTimestamp);
+	}
+
+	/**
+	 * Accepts records from readers.
+	 *
+	 * @param <T>
+	 */
+	public interface RecordQueue<T> {
+		void put(T record) throws InterruptedException;
+
+		int getQueueId();
+
+		int getSize();
+
+		T peek();
+	}
+
+	private class AsyncRecordQueue<T> implements RecordQueue<T> {
+		private final ArrayBlockingQueue<T> queue;
+		private final int queueId;
+		long headTimestamp;
+
+		private AsyncRecordQueue(int queueId) {
+			super();
+			this.queue = new ArrayBlockingQueue<>(queueCapacity);
+			this.queueId = queueId;
+			this.headTimestamp = Long.MAX_VALUE;
+		}
+
+		public void put(T record) throws InterruptedException {
+			queue.put(record);
+			synchronized (condition) {
+				condition.notify();
+			}
+		}
+
+		public int getQueueId() {
+			return queueId;
+		}
+
+		public int getSize() {
+			return queue.size();
+		}
+
+		public T peek() {
+			return queue.peek();
+		}
+
+	}
+
+	/**
+	 * The queue for the given producer (i.e. Kinesis shard consumer thread).
+	 *
+	 * <p>The producer may hold on to the queue for subsequent records.
+	 *
+	 * @param producerIndex
+	 * @return
+	 */
+	public RecordQueue<T> getQueue(int producerIndex) {
+		return queues.computeIfAbsent(producerIndex, (key) -> {
+			AsyncRecordQueue<T> q = new AsyncRecordQueue<>(producerIndex);
+			emptyQueues.put(q, false);
+			return q;
+		});
+	}
+
+	/**
+	 * How far ahead of the watermark records should be emitted.
+	 *
+	 * <p>Needs to account for the latency of obtaining the global watermark.
+	 *
+	 * @param maxLookaheadMillis
+	 */
+	public void setMaxLookaheadMillis(long maxLookaheadMillis) {
+		this.maxLookaheadMillis = maxLookaheadMillis;
+		LOG.info("[setMaxLookaheadMillis] Max lookahead millis set to {}", maxLookaheadMillis);
+	}
+
+	/**
+	 * Set the current watermark.
+	 *
+	 * <p>This watermark will be used to control which records to emit from the queues of pending
+	 * elements. When an element timestamp is ahead of the watermark by at least {@link
+	 * #maxLookaheadMillis}, elements in that queue will wait until the watermark advances.
+	 *
+	 * @param watermark
+	 */
+	public void setCurrentWatermark(long watermark) {
+		maxEmitTimestamp = watermark + maxLookaheadMillis;
+		synchronized (condition) {
+			condition.notify();
+		}
+		LOG.info(
+			"[setCurrentWatermark] Current watermark set to {}, maxEmitTimestamp = {}",
+			watermark,
+			maxEmitTimestamp);
+	}
+
+	/**
+	 * Run loop, does not return unless {@link #stop()} was called.
+	 */
+	@Override
+	public void run() {
+		LOG.info("Starting emitter with maxLookaheadMillis: {}", this.maxLookaheadMillis);
+
+		// emit available records, ordered by timestamp
+		AsyncRecordQueue<T> min = heads.poll();
+		runLoop:
+		while (running) {
+			// find a queue to emit from
+			while (min == null) {
+				// check new or previously empty queues
+				if (!emptyQueues.isEmpty()) {
+					for (AsyncRecordQueue<T> queueHead : emptyQueues.keySet()) {
+						if (!queueHead.queue.isEmpty()) {
+							emptyQueues.remove(queueHead);
+							queueHead.headTimestamp = queueHead.queue.peek().getTimestamp();
+							heads.offer(queueHead);
+						}
+					}
+				}
+				min = heads.poll();
+				if (min == null) {
+					synchronized (condition) {
+						// wait for work
+						try {
+							condition.wait(idleSleepMillis);
+						} catch (InterruptedException e) {
+							continue runLoop;
+						}
+					}
+				}
+			}
+
+			// wait until ready to emit min or another queue receives elements
+			while (min.headTimestamp > maxEmitTimestamp) {
+				synchronized (condition) {
+					// wait until ready to emit
+					try {
+						condition.wait(idleSleepMillis);
+					} catch (InterruptedException e) {
+						continue runLoop;
+					}
+					if (min.headTimestamp > maxEmitTimestamp && !emptyQueues.isEmpty()) {
+						// see if another queue can make progress
+						heads.offer(min);
+						min = null;
+						continue runLoop;
+					}
+				}
+			}
+
+			// emit up to queue capacity records
+			// cap on empty queues since older records may arrive
+			AsyncRecordQueue<T> nextQueue = heads.poll();
+			T record;
+			int emitCount = 0;
+			while ((record = min.queue.poll()) != null) {
+				emit(record, min);
+				// track last record emitted, even when queue becomes empty
+				min.headTimestamp = record.getTimestamp();
+				// potentially switch to next queue
+				if ((nextQueue != null && min.headTimestamp > nextQueue.headTimestamp)
+					|| (min.headTimestamp > maxEmitTimestamp)
+					|| (emitCount++ > queueCapacity && !emptyQueues.isEmpty())) {
+					break;
+				}
+			}
+			if (record == null) {
+				this.emptyQueues.put(min, true);
+			} else {
+				heads.offer(min);
+			}
+			min = nextQueue;
+		}
+	}
+
+	public void stop() {
+		running = false;
+	}
+
+	/** Emit the record. This is specific to a connector, like the Kinesis data fetcher. */
+	public abstract void emit(T record, RecordQueue<T> source);
+
+	/** Summary of emit queues that can be used for logging. */
+	public String printInfo() {
+		StringBuffer sb = new StringBuffer();
+		sb.append(String.format("queues: %d, empty: %d",
+			queues.size(), emptyQueues.size()));
+		for (Map.Entry<Integer, AsyncRecordQueue<T>> e : queues.entrySet()) {
+			AsyncRecordQueue q = e.getValue();
+			sb.append(String.format("\n%d timestamp: %s size: %d",
+				e.getValue().queueId, q.headTimestamp, q.queue.size()));
+		}
+		return sb.toString();
+	}
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/WatermarkTracker.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/WatermarkTracker.java
new file mode 100644
index 0000000..f4207c7
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/WatermarkTracker.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import java.io.Closeable;
+import java.io.Serializable;
+
+/**
+ * The watermark tracker is responsible for aggregating watermarks across distributed operators.
+ * <p/>It can be used for sub tasks of a single Flink source as well as multiple heterogeneous
+ * sources or other operators.
+ * <p/>The class essentially functions like a distributed hash table that enclosing operators can
+ * use to adopt their processing / IO rates.
+ */
+@PublicEvolving
+public abstract class WatermarkTracker implements Closeable, Serializable {
+
+	public static final long DEFAULT_UPDATE_TIMEOUT_MILLIS = 60_000;
+
+	/**
+	 * Subtasks that have not provided a watermark update within the configured interval will be
+	 * considered idle and excluded from target watermark calculation.
+	 */
+	private long updateTimeoutMillis = DEFAULT_UPDATE_TIMEOUT_MILLIS;
+
+	/**
+	 * Unique id for the subtask.
+	 * Using string (instead of subtask index) so synchronization can spawn across multiple sources.
+	 */
+	private String subtaskId;
+
+	/** Watermark state. */
+	protected static class WatermarkState {
+		protected long watermark = Long.MIN_VALUE;
+		protected long lastUpdated;
+
+		public long getWatermark() {
+			return watermark;
+		}
+
+		@Override
+		public String toString() {
+			return "WatermarkState{watermark=" + watermark + ", lastUpdated=" + lastUpdated + '}';
+		}
+	}
+
+	protected String getSubtaskId() {
+		return this.subtaskId;
+	}
+
+	protected long getUpdateTimeoutMillis() {
+		return this.updateTimeoutMillis;
+	}
+
+	public abstract long getUpdateTimeoutCount();
+
+	/**
+	 * Subtasks that have not provided a watermark update within the configured interval will be
+	 * considered idle and excluded from target watermark calculation.
+	 *
+	 * @param updateTimeoutMillis
+	 */
+	public void setUpdateTimeoutMillis(long updateTimeoutMillis) {
+		this.updateTimeoutMillis = updateTimeoutMillis;
+	}
+
+	/**
+	 * Set the current watermark of the owning subtask and return the global low watermark based on
+	 * the current state snapshot. Periodically called by the enclosing consumer instance, which is
+	 * responsible for any timer management etc.
+	 *
+	 * @param localWatermark
+	 * @return
+	 */
+	public abstract long updateWatermark(final long localWatermark);
+
+	protected long getCurrentTime() {
+		return System.currentTimeMillis();
+	}
+
+	public void open(RuntimeContext context) {
+		if (context instanceof StreamingRuntimeContext) {
+			this.subtaskId = ((StreamingRuntimeContext) context).getOperatorUniqueID()
+				+ "-" + context.getIndexOfThisSubtask();
+		} else {
+			this.subtaskId = context.getTaskNameWithSubtasks();
+		}
+	}
+
+	@Override
+	public void close() {
+		// no work to do here
+	}
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
index b38eef1..1ce05d1 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
@@ -418,7 +418,7 @@ public class FlinkKinesisConsumerMigrationTest {
 				HashMap<StreamShardMetadata, SequenceNumber> testStateSnapshot,
 				List<StreamShardHandle> testInitialDiscoveryShards) {
 
-			super(streams, sourceContext, runtimeContext, configProps, deserializationSchema, DEFAULT_SHARD_ASSIGNER, null);
+			super(streams, sourceContext, runtimeContext, configProps, deserializationSchema, DEFAULT_SHARD_ASSIGNER, null, null);
 
 			this.testStateSnapshot = testStateSnapshot;
 			this.testInitialDiscoveryShards = testInitialDiscoveryShards;
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index d36d68a..cbcd8b4 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -52,6 +52,7 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGen
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.CollectingSourceContext;
 
@@ -60,6 +61,7 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 import com.amazonaws.services.kinesis.model.HashKeyRange;
 import com.amazonaws.services.kinesis.model.SequenceNumberRange;
 import com.amazonaws.services.kinesis.model.Shard;
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -79,6 +81,7 @@ import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
@@ -737,6 +740,7 @@ public class FlinkKinesisConsumerTest {
 							deserializationSchema,
 							getShardAssigner(),
 							getPeriodicWatermarkAssigner(),
+							null,
 							new AtomicReference<>(),
 							new ArrayList<>(),
 							subscribedStreamsToLastDiscoveredShardIds,
@@ -775,6 +779,10 @@ public class FlinkKinesisConsumerTest {
 			public void emitWatermark(Watermark mark) {
 				watermarks.add(mark);
 			}
+
+			@Override
+			public void markAsTemporarilyIdle() {
+			}
 		};
 
 		new Thread(
@@ -817,6 +825,164 @@ public class FlinkKinesisConsumerTest {
 		assertThat(watermarks, org.hamcrest.Matchers.contains(new Watermark(-3), new Watermark(5)));
 	}
 
+	@Test
+	public void testSourceSynchronization() throws Exception {
+
+		final String streamName = "fakeStreamName";
+		final Time maxOutOfOrderness = Time.milliseconds(5);
+		final long autoWatermarkInterval = 1_000;
+		final long watermarkSyncInterval = autoWatermarkInterval + 1;
+
+		HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds = new HashMap<>();
+		subscribedStreamsToLastDiscoveredShardIds.put(streamName, null);
+
+		final KinesisDeserializationSchema<String> deserializationSchema =
+			new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema());
+		Properties props = new Properties();
+		props.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, Long.toString(10L));
+		props.setProperty(ConsumerConfigConstants.WATERMARK_SYNC_MILLIS,
+			Long.toString(watermarkSyncInterval));
+		props.setProperty(ConsumerConfigConstants.WATERMARK_LOOKAHEAD_MILLIS, Long.toString(5));
+
+		BlockingQueue<String> shard1 = new LinkedBlockingQueue();
+		BlockingQueue<String> shard2 = new LinkedBlockingQueue();
+
+		Map<String, List<BlockingQueue<String>>> streamToQueueMap = new HashMap<>();
+		streamToQueueMap.put(streamName, Lists.newArrayList(shard1, shard2));
+
+		// override createFetcher to mock Kinesis
+		FlinkKinesisConsumer<String> sourceFunc =
+			new FlinkKinesisConsumer<String>(streamName, deserializationSchema, props) {
+				@Override
+				protected KinesisDataFetcher<String> createFetcher(
+					List<String> streams,
+					SourceFunction.SourceContext<String> sourceContext,
+					RuntimeContext runtimeContext,
+					Properties configProps,
+					KinesisDeserializationSchema<String> deserializationSchema) {
+
+					KinesisDataFetcher<String> fetcher =
+						new KinesisDataFetcher<String>(
+							streams,
+							sourceContext,
+							sourceContext.getCheckpointLock(),
+							runtimeContext,
+							configProps,
+							deserializationSchema,
+							getShardAssigner(),
+							getPeriodicWatermarkAssigner(),
+							getWatermarkTracker(),
+							new AtomicReference<>(),
+							new ArrayList<>(),
+							subscribedStreamsToLastDiscoveredShardIds,
+							(props) -> FakeKinesisBehavioursFactory.blockingQueueGetRecords(
+								streamToQueueMap)
+						) {};
+					return fetcher;
+				}
+			};
+
+		sourceFunc.setShardAssigner(
+			(streamShardHandle, i) -> {
+				// shardId-000000000000
+				return Integer.parseInt(
+					streamShardHandle.getShard().getShardId().substring("shardId-".length()));
+			});
+
+		sourceFunc.setPeriodicWatermarkAssigner(new TestTimestampExtractor(maxOutOfOrderness));
+
+		sourceFunc.setWatermarkTracker(new TestWatermarkTracker());
+
+		// there is currently no test harness specifically for sources,
+		// so we overlay the source thread here
+		AbstractStreamOperatorTestHarness<Object> testHarness =
+			new AbstractStreamOperatorTestHarness<Object>(
+				new StreamSource(sourceFunc), 1, 1, 0);
+		testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
+		testHarness.getExecutionConfig().setAutoWatermarkInterval(autoWatermarkInterval);
+
+		testHarness.initializeEmptyState();
+		testHarness.open();
+
+		final ConcurrentLinkedQueue<Object> results = testHarness.getOutput();
+
+		@SuppressWarnings("unchecked")
+		SourceFunction.SourceContext<String> sourceContext = new CollectingSourceContext(
+			testHarness.getCheckpointLock(), results) {
+			@Override
+			public void markAsTemporarilyIdle() {
+			}
+
+			@Override
+			public void emitWatermark(Watermark mark) {
+				results.add(mark);
+			}
+		};
+
+		new Thread(
+			() -> {
+				try {
+					sourceFunc.run(sourceContext);
+				} catch (InterruptedException e) {
+					// expected on cancel
+				} catch (Exception e) {
+					throw new RuntimeException(e);
+				}
+			})
+			.start();
+
+		ArrayList<Object> expectedResults = new ArrayList<>();
+
+		final long record1 = 1;
+		shard1.put(Long.toString(record1));
+		expectedResults.add(Long.toString(record1));
+		awaitRecordCount(results, expectedResults.size());
+
+		// at this point we know the fetcher was initialized
+		final KinesisDataFetcher fetcher = org.powermock.reflect.Whitebox.getInternalState(sourceFunc, "fetcher");
+
+		// trigger watermark emit
+		testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval);
+		expectedResults.add(new Watermark(-4));
+		// verify watermark
+		awaitRecordCount(results, expectedResults.size());
+		assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray()));
+		assertEquals(0, TestWatermarkTracker.WATERMARK.get());
+
+		// trigger sync
+		testHarness.setProcessingTime(testHarness.getProcessingTime() + 1);
+		TestWatermarkTracker.assertSingleWatermark(-4);
+
+		final long record2 = record1 + (watermarkSyncInterval * 3) + 1;
+		shard1.put(Long.toString(record2));
+
+		// TODO: check for record received instead
+		Thread.sleep(100);
+
+		// Advance the watermark. Since the new record is past global watermark + threshold,
+		// it won't be emitted and the watermark does not advance
+		testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval);
+		assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray()));
+		assertEquals(3000L, (long) org.powermock.reflect.Whitebox.getInternalState(fetcher, "nextWatermark"));
+		TestWatermarkTracker.assertSingleWatermark(-4);
+
+		// Trigger global watermark sync
+		testHarness.setProcessingTime(testHarness.getProcessingTime() + 1);
+		expectedResults.add(Long.toString(record2));
+		awaitRecordCount(results, expectedResults.size());
+		assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray()));
+		TestWatermarkTracker.assertSingleWatermark(3000);
+
+		// Trigger watermark update and emit
+		testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval);
+		expectedResults.add(new Watermark(3000));
+		assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray()));
+
+		sourceFunc.cancel();
+		testHarness.close();
+	}
+
 	private void awaitRecordCount(ConcurrentLinkedQueue<? extends Object> queue, int count) throws Exception {
 		long timeoutMillis = System.currentTimeMillis() + 10_000;
 		while (System.currentTimeMillis() < timeoutMillis && queue.size() < count) {
@@ -837,4 +1003,23 @@ public class FlinkKinesisConsumerTest {
 		}
 	}
 
+	private static class TestWatermarkTracker extends WatermarkTracker {
+
+		private static final AtomicLong WATERMARK = new AtomicLong();
+
+		@Override
+		public long getUpdateTimeoutCount() {
+			return 0;
+		}
+
+		@Override
+		public long updateWatermark(long localWatermark) {
+			WATERMARK.set(localWatermark);
+			return localWatermark;
+		}
+
+		static void assertSingleWatermark(long expected) {
+			Assert.assertEquals(expected, WATERMARK.get());
+		}
+	}
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
index dbc7118..93886f9 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
@@ -113,9 +113,10 @@ public class ShardConsumerTest {
 				KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
 				Mockito.mock(KinesisProxyInterface.class));
 
+		int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
 		new ShardConsumer<>(
 			fetcher,
-			0,
+			shardIndex,
 			subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
 			subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
 			FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 9, 500L),
@@ -151,9 +152,10 @@ public class ShardConsumerTest {
 				KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
 				Mockito.mock(KinesisProxyInterface.class));
 
+		int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
 		new ShardConsumer<>(
 			fetcher,
-			0,
+			shardIndex,
 			subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
 			subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
 			// Get a total of 1000 records with 9 getRecords() calls,
@@ -195,9 +197,10 @@ public class ShardConsumerTest {
 				KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
 				Mockito.mock(KinesisProxyInterface.class));
 
+		int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
 		new ShardConsumer<>(
 			fetcher,
-			0,
+			shardIndex,
 			subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
 			subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
 			// Initial number of records to fetch --> 10
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
index f1fd069..3bb11bd 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
@@ -74,6 +74,7 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
 			deserializationSchema,
 			DEFAULT_SHARD_ASSIGNER,
 			null,
+			null,
 			thrownErrorUnderTest,
 			subscribedShardsStateUnderTest,
 			subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTrackerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTrackerTest.java
new file mode 100644
index 0000000..b793b54
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTrackerTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/** Test for {@link JobManagerWatermarkTracker}. */
+public class JobManagerWatermarkTrackerTest {
+
+	private static MiniCluster flink;
+
+	@BeforeClass
+	public static void setUp() throws Exception {
+		final Configuration config = new Configuration();
+		config.setInteger(RestOptions.PORT, 0);
+
+		final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
+			.setConfiguration(config)
+			.setNumTaskManagers(1)
+			.setNumSlotsPerTaskManager(1)
+			.build();
+
+		flink = new MiniCluster(miniClusterConfiguration);
+
+		flink.start();
+	}
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		if (flink != null) {
+			flink.close();
+		}
+	}
+
+	@Test
+	public void testUpateWatermark() throws Exception {
+		final Configuration clientConfiguration = new Configuration();
+		clientConfiguration.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 0);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+			flink.getRestAddress().get().getHost(),
+			flink.getRestAddress().get().getPort(),
+			clientConfiguration);
+
+		env.addSource(new TestSourceFunction(new JobManagerWatermarkTracker("fakeId")))
+			.addSink(new SinkFunction<Integer>() {});
+		env.execute();
+	}
+
+	private static class TestSourceFunction extends RichSourceFunction<Integer> {
+
+		private final JobManagerWatermarkTracker tracker;
+
+		public TestSourceFunction(JobManagerWatermarkTracker tracker) {
+			this.tracker = tracker;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			tracker.open(getRuntimeContext());
+		}
+
+		@Override
+		public void run(SourceContext<Integer> ctx) {
+			Assert.assertEquals(998, tracker.updateWatermark(998));
+			Assert.assertEquals(999, tracker.updateWatermark(999));
+		}
+
+		@Override
+		public void cancel() {
+		}
+	}
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java
new file mode 100644
index 0000000..1948237
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
+
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/** Test for {@link RecordEmitter}. */
+public class RecordEmitterTest {
+
+	static List<TimestampedValue> results = Collections.synchronizedList(new ArrayList<>());
+
+	private class TestRecordEmitter extends RecordEmitter<TimestampedValue> {
+
+		private TestRecordEmitter() {
+			super(DEFAULT_QUEUE_CAPACITY);
+		}
+
+		@Override
+		public void emit(TimestampedValue record, RecordQueue<TimestampedValue> queue) {
+			results.add(record);
+		}
+	}
+
+	@Test
+	public void test() throws Exception {
+
+		TestRecordEmitter emitter = new TestRecordEmitter();
+
+		final TimestampedValue<String> one = new TimestampedValue<>("one", 1);
+		final TimestampedValue<String> two = new TimestampedValue<>("two", 2);
+		final TimestampedValue<String> five = new TimestampedValue<>("five", 5);
+		final TimestampedValue<String> ten = new TimestampedValue<>("ten", 10);
+
+		final RecordEmitter.RecordQueue<TimestampedValue> queue0 = emitter.getQueue(0);
+		final RecordEmitter.RecordQueue<TimestampedValue> queue1 = emitter.getQueue(1);
+
+		queue0.put(one);
+		queue0.put(five);
+		queue0.put(ten);
+
+		queue1.put(two);
+
+		ExecutorService executor = Executors.newSingleThreadExecutor();
+		executor.submit(emitter);
+
+		long timeout = System.currentTimeMillis() + 10_000;
+		while (results.size() != 4 && System.currentTimeMillis() < timeout) {
+			Thread.sleep(100);
+		}
+		emitter.stop();
+		executor.shutdownNow();
+
+		Assert.assertThat(results, Matchers.contains(one, five, two, ten));
+	}
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/WatermarkTrackerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/WatermarkTrackerTest.java
new file mode 100644
index 0000000..3d59a45
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/WatermarkTrackerTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Test for {@link WatermarkTracker}. */
+public class WatermarkTrackerTest {
+
+	WatermarkTracker.WatermarkState wm1 = new WatermarkTracker.WatermarkState();
+	MutableLong clock = new MutableLong(0);
+
+	private class TestWatermarkTracker extends WatermarkTracker {
+		/**
+		 * The watermarks of all sub tasks that participate in the synchronization.
+		 */
+		private final Map<String, WatermarkState> watermarks = new HashMap<>();
+
+		private long updateTimeoutCount = 0;
+
+		@Override
+		protected long getCurrentTime() {
+			return clock.longValue();
+		}
+
+		@Override
+		public long updateWatermark(final long localWatermark) {
+			refreshWatermarkSnapshot(this.watermarks);
+
+			long currentTime = getCurrentTime();
+			String subtaskId = this.getSubtaskId();
+
+			WatermarkState ws = watermarks.get(subtaskId);
+			if (ws == null) {
+				watermarks.put(subtaskId, ws = new WatermarkState());
+			}
+			ws.lastUpdated = currentTime;
+			ws.watermark = Math.max(ws.watermark, localWatermark);
+			saveWatermark(subtaskId, ws);
+
+			long globalWatermark = ws.watermark;
+			for (Map.Entry<String, WatermarkState> e : watermarks.entrySet()) {
+				ws = e.getValue();
+				if (ws.lastUpdated + getUpdateTimeoutMillis() < currentTime) {
+					// ignore outdated subtask
+					updateTimeoutCount++;
+					continue;
+				}
+				globalWatermark = Math.min(ws.watermark, globalWatermark);
+			}
+			return globalWatermark;
+		}
+
+		protected void refreshWatermarkSnapshot(Map<String, WatermarkState> watermarks) {
+			watermarks.put("wm1", wm1);
+		}
+
+		protected void saveWatermark(String id, WatermarkState ws) {
+			// do nothing
+		}
+
+		public long getUpdateTimeoutCount() {
+			return updateTimeoutCount;
+		}
+	}
+
+	@Test
+	public void test() {
+		long watermark = 0;
+		TestWatermarkTracker ws = new TestWatermarkTracker();
+		ws.open(new MockStreamingRuntimeContext(false, 1, 0));
+		Assert.assertEquals(Long.MIN_VALUE, ws.updateWatermark(Long.MIN_VALUE));
+		Assert.assertEquals(Long.MIN_VALUE, ws.updateWatermark(watermark));
+		// timeout wm1
+		clock.add(WatermarkTracker.DEFAULT_UPDATE_TIMEOUT_MILLIS + 1);
+		Assert.assertEquals(watermark, ws.updateWatermark(watermark));
+		Assert.assertEquals(watermark, ws.updateWatermark(watermark - 1));
+
+		// min watermark
+		wm1.watermark = watermark + 1;
+		wm1.lastUpdated = clock.longValue();
+		Assert.assertEquals(watermark, ws.updateWatermark(watermark));
+		Assert.assertEquals(watermark + 1, ws.updateWatermark(watermark + 1));
+	}
+
+}