You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by th...@apache.org on 2019/05/31 19:17:50 UTC
[flink] branch release-1.8 updated: [FLINK-10921] [kinesis] Shard
watermark synchronization in Kinesis consumer
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.8 by this push:
new 184e8e0 [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer
184e8e0 is described below
commit 184e8e00fd20519a037cf0bf4e3dba7132d75fa1
Author: Thomas Weise <th...@apache.org>
AuthorDate: Wed May 22 21:42:15 2019 -0700
[FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer
---
.../connectors/kinesis/FlinkKinesisConsumer.java | 18 +-
.../kinesis/config/ConsumerConfigConstants.java | 11 +
.../internals/DynamoDBStreamsDataFetcher.java | 1 +
.../kinesis/internals/KinesisDataFetcher.java | 292 +++++++++++++++++++--
.../kinesis/util/JobManagerWatermarkTracker.java | 179 +++++++++++++
.../connectors/kinesis/util/RecordEmitter.java | 269 +++++++++++++++++++
.../connectors/kinesis/util/WatermarkTracker.java | 114 ++++++++
.../kinesis/FlinkKinesisConsumerMigrationTest.java | 2 +-
.../kinesis/FlinkKinesisConsumerTest.java | 185 +++++++++++++
.../kinesis/internals/ShardConsumerTest.java | 9 +-
.../testutils/TestableKinesisDataFetcher.java | 1 +
.../util/JobManagerWatermarkTrackerTest.java | 101 +++++++
.../connectors/kinesis/util/RecordEmitterTest.java | 81 ++++++
.../kinesis/util/WatermarkTrackerTest.java | 108 ++++++++
14 files changed, 1342 insertions(+), 29 deletions(-)
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 3c5e3c7..5b24ded 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -45,6 +45,7 @@ import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
@@ -126,6 +127,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
private KinesisShardAssigner shardAssigner = KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER;
private AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
+ private WatermarkTracker watermarkTracker;
// ------------------------------------------------------------------------
// Runtime state
@@ -254,6 +256,20 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
ClosureCleaner.clean(this.periodicWatermarkAssigner, true);
}
+ public WatermarkTracker getWatermarkTracker() {
+ return this.watermarkTracker;
+ }
+
+ /**
+ * Set the global watermark tracker. When set, it will be used by the fetcher
+ * to align the shard consumers by event time.
+ * @param watermarkTracker
+ */
+ public void setWatermarkTracker(WatermarkTracker watermarkTracker) {
+ this.watermarkTracker = watermarkTracker;
+ ClosureCleaner.clean(this.watermarkTracker, true);
+ }
+
// ------------------------------------------------------------------------
// Source life cycle
// ------------------------------------------------------------------------
@@ -448,7 +464,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
Properties configProps,
KinesisDeserializationSchema<T> deserializationSchema) {
- return new KinesisDataFetcher<>(streams, sourceContext, runtimeContext, configProps, deserializationSchema, shardAssigner, periodicWatermarkAssigner);
+ return new KinesisDataFetcher<>(streams, sourceContext, runtimeContext, configProps, deserializationSchema, shardAssigner, periodicWatermarkAssigner, watermarkTracker);
}
@VisibleForTesting
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 41ac6b8..2f5be97 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -125,6 +125,15 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
/** The interval after which to consider a shard idle for purposes of watermark generation. */
public static final String SHARD_IDLE_INTERVAL_MILLIS = "flink.shard.idle.interval";
+ /** The interval for periodically synchronizing the shared watermark state. */
+ public static final String WATERMARK_SYNC_MILLIS = "flink.watermark.sync.interval";
+
+ /** The maximum delta allowed for the reader to advance ahead of the shared global watermark. */
+ public static final String WATERMARK_LOOKAHEAD_MILLIS = "flink.watermark.lookahead.millis";
+
+ /** The maximum number of records that will be buffered before suspending consumption of a shard. */
+ public static final String WATERMARK_SYNC_QUEUE_CAPACITY = "flink.watermark.sync.queue.capacity";
+
// ------------------------------------------------------------------------
// Default values for consumer configuration
// ------------------------------------------------------------------------
@@ -173,6 +182,8 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
public static final long DEFAULT_SHARD_IDLE_INTERVAL_MILLIS = -1;
+ public static final long DEFAULT_WATERMARK_SYNC_MILLIS = 30_000;
+
/**
* To avoid shard iterator expires in {@link ShardConsumer}s, the value for the configured
* getRecords interval can not exceed 5 minutes, which is the expire time for retrieved iterators.
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java
index c2b7be3..5620142 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java
@@ -64,6 +64,7 @@ public class DynamoDBStreamsDataFetcher<T> extends KinesisDataFetcher<T> {
deserializationSchema,
shardAssigner,
null,
+ null,
new AtomicReference<>(),
new ArrayList<>(),
createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 8c8d94a..eae3153 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -38,6 +38,9 @@ import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter;
+import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
+import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.InstantiationUtil;
@@ -191,6 +194,9 @@ public class KinesisDataFetcher<T> {
private volatile boolean running = true;
private final AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
+ private final WatermarkTracker watermarkTracker;
+ private final transient RecordEmitter recordEmitter;
+ private transient boolean isIdle;
/**
* The watermark related state for each shard consumer. Entries in this map will be created when shards
@@ -207,6 +213,14 @@ public class KinesisDataFetcher<T> {
private long lastWatermark = Long.MIN_VALUE;
/**
+ * The next watermark used for synchronization.
+ * For purposes of global watermark calculation, we need to consider the next watermark based
+ * on the buffered records vs. the last emitted watermark to allow for progress.
+ */
+ private long nextWatermark = Long.MIN_VALUE;
+
+
+ /**
* The time span since last consumed record, after which a shard will be considered idle for purpose of watermark
* calculation. A positive value will allow the watermark to progress even when some shards don't receive new records.
*/
@@ -220,6 +234,82 @@ public class KinesisDataFetcher<T> {
}
/**
+ * The wrapper that holds the watermark handling related parameters
+ * of a record produced by the shard consumer thread.
+ *
+ * @param <T>
+ */
+ private static class RecordWrapper<T> extends TimestampedValue<T> {
+ int shardStateIndex;
+ SequenceNumber lastSequenceNumber;
+ long timestamp;
+ Watermark watermark;
+
+ private RecordWrapper(T record, long timestamp) {
+ super(record, timestamp);
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public long getTimestamp() {
+ return timestamp;
+ }
+ }
+
+ /** Kinesis data fetcher specific, asynchronous record emitter. */
+ private class AsyncKinesisRecordEmitter extends RecordEmitter<RecordWrapper<T>> {
+
+ private AsyncKinesisRecordEmitter() {
+ this(DEFAULT_QUEUE_CAPACITY);
+ }
+
+ private AsyncKinesisRecordEmitter(int queueCapacity) {
+ super(queueCapacity);
+ }
+
+ @Override
+ public void emit(RecordWrapper<T> record, RecordQueue<RecordWrapper<T>> queue) {
+ emitRecordAndUpdateState(record);
+ ShardWatermarkState<T> sws = shardWatermarks.get(queue.getQueueId());
+ sws.lastEmittedRecordWatermark = record.watermark;
+ }
+ }
+
+ /** Synchronous emitter for use w/o watermark synchronization. */
+ private class SyncKinesisRecordEmitter extends AsyncKinesisRecordEmitter {
+ private final ConcurrentHashMap<Integer, RecordQueue<RecordWrapper<T>>> queues =
+ new ConcurrentHashMap<>();
+
+ @Override
+ public RecordQueue<RecordWrapper<T>> getQueue(int producerIndex) {
+ return queues.computeIfAbsent(producerIndex, (key) -> {
+ return new RecordQueue<RecordWrapper<T>>() {
+ @Override
+ public void put(RecordWrapper<T> record) {
+ emit(record, this);
+ }
+
+ @Override
+ public int getQueueId() {
+ return producerIndex;
+ }
+
+ @Override
+ public int getSize() {
+ return 0;
+ }
+
+ @Override
+ public RecordWrapper<T> peek() {
+ return null;
+ }
+
+ };
+ });
+ }
+ }
+
+ /**
* Creates a Kinesis Data Fetcher.
*
* @param streams the streams to subscribe to
@@ -234,7 +324,8 @@ public class KinesisDataFetcher<T> {
Properties configProps,
KinesisDeserializationSchema<T> deserializationSchema,
KinesisShardAssigner shardAssigner,
- AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner) {
+ AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
+ WatermarkTracker watermarkTracker) {
this(streams,
sourceContext,
sourceContext.getCheckpointLock(),
@@ -243,6 +334,7 @@ public class KinesisDataFetcher<T> {
deserializationSchema,
shardAssigner,
periodicWatermarkAssigner,
+ watermarkTracker,
new AtomicReference<>(),
new ArrayList<>(),
createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
@@ -258,6 +350,7 @@ public class KinesisDataFetcher<T> {
KinesisDeserializationSchema<T> deserializationSchema,
KinesisShardAssigner shardAssigner,
AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
+ WatermarkTracker watermarkTracker,
AtomicReference<Throwable> error,
List<KinesisStreamShardState> subscribedShardsState,
HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds,
@@ -272,6 +365,7 @@ public class KinesisDataFetcher<T> {
this.deserializationSchema = checkNotNull(deserializationSchema);
this.shardAssigner = checkNotNull(shardAssigner);
this.periodicWatermarkAssigner = periodicWatermarkAssigner;
+ this.watermarkTracker = watermarkTracker;
this.kinesisProxyFactory = checkNotNull(kinesisProxyFactory);
this.kinesis = kinesisProxyFactory.create(configProps);
@@ -284,6 +378,17 @@ public class KinesisDataFetcher<T> {
this.shardConsumersExecutor =
createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks());
+ this.recordEmitter = createRecordEmitter(configProps);
+ }
+
+ private RecordEmitter createRecordEmitter(Properties configProps) {
+ if (periodicWatermarkAssigner != null && watermarkTracker != null) {
+ int queueCapacity = Integer.parseInt(configProps.getProperty(
+ ConsumerConfigConstants.WATERMARK_SYNC_QUEUE_CAPACITY,
+ Integer.toString(AsyncKinesisRecordEmitter.DEFAULT_QUEUE_CAPACITY)));
+ return new AsyncKinesisRecordEmitter(queueCapacity);
+ }
+ return new SyncKinesisRecordEmitter();
}
/**
@@ -380,16 +485,37 @@ public class KinesisDataFetcher<T> {
ProcessingTimeService timerService = ((StreamingRuntimeContext) runtimeContext).getProcessingTimeService();
LOG.info("Starting periodic watermark emitter with interval {}", periodicWatermarkIntervalMillis);
new PeriodicWatermarkEmitter(timerService, periodicWatermarkIntervalMillis).start();
+ if (watermarkTracker != null) {
+ // setup global watermark tracking
+ long watermarkSyncMillis = Long.parseLong(
+ getConsumerConfiguration().getProperty(ConsumerConfigConstants.WATERMARK_SYNC_MILLIS,
+ Long.toString(ConsumerConfigConstants.DEFAULT_WATERMARK_SYNC_MILLIS)));
+ watermarkTracker.setUpdateTimeoutMillis(watermarkSyncMillis * 3); // synchronization latency
+ watermarkTracker.open(runtimeContext);
+ new WatermarkSyncCallback(timerService, watermarkSyncMillis).start();
+ // emit records ahead of watermark to offset synchronization latency
+ long lookaheadMillis = Long.parseLong(
+ getConsumerConfiguration().getProperty(ConsumerConfigConstants.WATERMARK_LOOKAHEAD_MILLIS,
+ Long.toString(0)));
+ recordEmitter.setMaxLookaheadMillis(Math.max(lookaheadMillis, watermarkSyncMillis * 3));
+ }
}
this.shardIdleIntervalMillis = Long.parseLong(
getConsumerConfiguration().getProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_IDLE_INTERVAL_MILLIS)));
+
+ // run record emitter in separate thread since main thread is used for discovery
+ Thread thread = new Thread(this.recordEmitter);
+ thread.setName("recordEmitter-" + runtimeContext.getTaskNameWithSubtasks());
+ thread.setDaemon(true);
+ thread.start();
}
// ------------------------------------------------------------------------
// finally, start the infinite shard discovery and consumer launching loop;
// we will escape from this loop only when shutdownFetcher() or stopWithError() is called
+ // TODO: have this thread emit the records for tracking backpressure
final long discoveryIntervalMillis = Long.valueOf(
configProps.getProperty(
@@ -490,6 +616,11 @@ public class KinesisDataFetcher<T> {
mainThread.interrupt(); // the main thread may be sleeping for the discovery interval
}
+ if (watermarkTracker != null) {
+ watermarkTracker.close();
+ }
+ this.recordEmitter.stop();
+
if (LOG.isInfoEnabled()) {
LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask);
}
@@ -603,28 +734,48 @@ public class KinesisDataFetcher<T> {
* @param lastSequenceNumber the last sequence number value to update
*/
protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
- // track per shard watermarks and emit timestamps extracted from the record,
- // when a watermark assigner was configured.
- if (periodicWatermarkAssigner != null) {
- ShardWatermarkState sws = shardWatermarks.get(shardStateIndex);
- Preconditions.checkNotNull(
- sws, "shard watermark state initialized in registerNewSubscribedShardState");
+ ShardWatermarkState sws = shardWatermarks.get(shardStateIndex);
+ Preconditions.checkNotNull(
+ sws, "shard watermark state initialized in registerNewSubscribedShardState");
+ Watermark watermark = null;
+ if (sws.periodicWatermarkAssigner != null) {
recordTimestamp =
sws.periodicWatermarkAssigner.extractTimestamp(record, sws.lastRecordTimestamp);
- sws.lastRecordTimestamp = recordTimestamp;
- sws.lastUpdated = getCurrentTimeMillis();
+ // track watermark per record since extractTimestamp has side effect
+ watermark = sws.periodicWatermarkAssigner.getCurrentWatermark();
}
+ sws.lastRecordTimestamp = recordTimestamp;
+ sws.lastUpdated = getCurrentTimeMillis();
+ RecordWrapper<T> recordWrapper = new RecordWrapper<>(record, recordTimestamp);
+ recordWrapper.shardStateIndex = shardStateIndex;
+ recordWrapper.lastSequenceNumber = lastSequenceNumber;
+ recordWrapper.watermark = watermark;
+ try {
+ sws.emitQueue.put(recordWrapper);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Actual record emission called from the record emitter.
+ *
+ * <p>Responsible for tracking per shard watermarks and emit timestamps extracted from
+ * the record, when a watermark assigner was configured.
+ *
+ * @param rw
+ */
+ private void emitRecordAndUpdateState(RecordWrapper<T> rw) {
synchronized (checkpointLock) {
- if (record != null) {
- sourceContext.collectWithTimestamp(record, recordTimestamp);
+ if (rw.getValue() != null) {
+ sourceContext.collectWithTimestamp(rw.getValue(), rw.timestamp);
} else {
LOG.warn("Skipping non-deserializable record at sequence number {} of shard {}.",
- lastSequenceNumber,
- subscribedShardsState.get(shardStateIndex).getStreamShardHandle());
+ rw.lastSequenceNumber,
+ subscribedShardsState.get(rw.shardStateIndex).getStreamShardHandle());
}
-
- updateState(shardStateIndex, lastSequenceNumber);
+ updateState(rw.shardStateIndex, rw.lastSequenceNumber);
}
}
@@ -689,6 +840,7 @@ public class KinesisDataFetcher<T> {
} catch (Exception e) {
throw new RuntimeException("Failed to instantiate new WatermarkAssigner", e);
}
+ sws.emitQueue = recordEmitter.getQueue(shardStateIndex);
sws.lastUpdated = getCurrentTimeMillis();
sws.lastRecordTimestamp = Long.MIN_VALUE;
shardWatermarks.put(shardStateIndex, sws);
@@ -721,41 +873,57 @@ public class KinesisDataFetcher<T> {
protected void emitWatermark() {
LOG.debug("Evaluating watermark for subtask {} time {}", indexOfThisConsumerSubtask, getCurrentTimeMillis());
long potentialWatermark = Long.MAX_VALUE;
+ long potentialNextWatermark = Long.MAX_VALUE;
long idleTime =
(shardIdleIntervalMillis > 0)
? getCurrentTimeMillis() - shardIdleIntervalMillis
: Long.MAX_VALUE;
for (Map.Entry<Integer, ShardWatermarkState> e : shardWatermarks.entrySet()) {
+ Watermark w = e.getValue().lastEmittedRecordWatermark;
// consider only active shards, or those that would advance the watermark
- Watermark w = e.getValue().periodicWatermarkAssigner.getCurrentWatermark();
- if (w != null && (e.getValue().lastUpdated >= idleTime || w.getTimestamp() > lastWatermark)) {
+ if (w != null && (e.getValue().lastUpdated >= idleTime
+ || e.getValue().emitQueue.getSize() > 0
+ || w.getTimestamp() > lastWatermark)) {
potentialWatermark = Math.min(potentialWatermark, w.getTimestamp());
+ // for sync, use the watermark of the next record, when available
+ // otherwise watermark may stall when record is blocked by synchronization
+ RecordEmitter.RecordQueue<RecordWrapper<T>> q = e.getValue().emitQueue;
+ RecordWrapper<T> nextRecord = q.peek();
+ Watermark nextWatermark = (nextRecord != null) ? nextRecord.watermark : w;
+ potentialNextWatermark = Math.min(potentialNextWatermark, nextWatermark.getTimestamp());
}
}
// advance watermark if possible (watermarks can only be ascending)
if (potentialWatermark == Long.MAX_VALUE) {
if (shardWatermarks.isEmpty() || shardIdleIntervalMillis > 0) {
- LOG.debug("No active shard for subtask {}, marking the source idle.",
+ LOG.info("No active shard for subtask {}, marking the source idle.",
indexOfThisConsumerSubtask);
// no active shard, signal downstream operators to not wait for a watermark
sourceContext.markAsTemporarilyIdle();
+ isIdle = true;
}
- } else if (potentialWatermark > lastWatermark) {
- LOG.debug("Emitting watermark {} from subtask {}",
- potentialWatermark,
- indexOfThisConsumerSubtask);
- sourceContext.emitWatermark(new Watermark(potentialWatermark));
- lastWatermark = potentialWatermark;
+ } else {
+ if (potentialWatermark > lastWatermark) {
+ LOG.debug("Emitting watermark {} from subtask {}",
+ potentialWatermark,
+ indexOfThisConsumerSubtask);
+ sourceContext.emitWatermark(new Watermark(potentialWatermark));
+ lastWatermark = potentialWatermark;
+ isIdle = false;
+ }
+ nextWatermark = potentialNextWatermark;
}
}
/** Per shard tracking of watermark and last activity. */
private static class ShardWatermarkState<T> {
private AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
+ private RecordEmitter.RecordQueue<RecordWrapper<T>> emitQueue;
private volatile long lastRecordTimestamp;
private volatile long lastUpdated;
+ private volatile Watermark lastEmittedRecordWatermark;
}
/**
@@ -785,6 +953,82 @@ public class KinesisDataFetcher<T> {
}
}
+ /** Timer task to update shared watermark state. */
+ private class WatermarkSyncCallback implements ProcessingTimeCallback {
+
+ private final ProcessingTimeService timerService;
+ private final long interval;
+ private final MetricGroup shardMetricsGroup;
+ private long lastGlobalWatermark = Long.MIN_VALUE;
+ private long propagatedLocalWatermark = Long.MIN_VALUE;
+ private long logIntervalMillis = 60_000;
+ private int stalledWatermarkIntervalCount = 0;
+ private long lastLogged;
+
+ WatermarkSyncCallback(ProcessingTimeService timerService, long interval) {
+ this.timerService = checkNotNull(timerService);
+ this.interval = interval;
+ this.shardMetricsGroup = consumerMetricGroup.addGroup("subtaskId",
+ String.valueOf(indexOfThisConsumerSubtask));
+ this.shardMetricsGroup.gauge("localWatermark", () -> nextWatermark);
+ this.shardMetricsGroup.gauge("globalWatermark", () -> lastGlobalWatermark);
+ }
+
+ public void start() {
+ LOG.info("Registering watermark tracker with interval {}", interval);
+ timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
+ }
+
+ @Override
+ public void onProcessingTime(long timestamp) {
+ if (nextWatermark != Long.MIN_VALUE) {
+ long globalWatermark = lastGlobalWatermark;
+ // TODO: refresh watermark while idle
+ if (!(isIdle && nextWatermark == propagatedLocalWatermark)) {
+ globalWatermark = watermarkTracker.updateWatermark(nextWatermark);
+ propagatedLocalWatermark = nextWatermark;
+ } else {
+ LOG.info("WatermarkSyncCallback subtask: {} is idle", indexOfThisConsumerSubtask);
+ }
+
+ if (timestamp - lastLogged > logIntervalMillis) {
+ lastLogged = System.currentTimeMillis();
+ LOG.info("WatermarkSyncCallback subtask: {} local watermark: {}"
+ + ", global watermark: {}, delta: {} timeouts: {}, emitter: {}",
+ indexOfThisConsumerSubtask,
+ nextWatermark,
+ globalWatermark,
+ nextWatermark - globalWatermark,
+ watermarkTracker.getUpdateTimeoutCount(),
+ recordEmitter.printInfo());
+
+ // Following is for debugging non-reproducible issue with stalled watermark
+ if (globalWatermark == nextWatermark && globalWatermark == lastGlobalWatermark
+ && stalledWatermarkIntervalCount++ > 5) {
+ // subtask blocks watermark, log to aid troubleshooting
+ stalledWatermarkIntervalCount = 0;
+ for (Map.Entry<Integer, ShardWatermarkState> e : shardWatermarks.entrySet()) {
+ RecordEmitter.RecordQueue<RecordWrapper<T>> q = e.getValue().emitQueue;
+ RecordWrapper<T> nextRecord = q.peek();
+ if (nextRecord != null) {
+ LOG.info("stalled watermark {} key {} next watermark {} next timestamp {}",
+ nextWatermark,
+ e.getKey(),
+ nextRecord.watermark,
+ nextRecord.timestamp);
+ }
+ }
+ }
+ }
+
+ lastGlobalWatermark = globalWatermark;
+ recordEmitter.setCurrentWatermark(globalWatermark);
+ }
+ // schedule next callback
+ timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
+ }
+ }
+
/**
* Registers a metric group associated with the shard id of the provided {@link KinesisStreamShardState shardState}.
*
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
new file mode 100644
index 0000000..f150bb0
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A {@link WatermarkTracker} that shares state through {@link GlobalAggregateManager}.
+ */
+@PublicEvolving
+public class JobManagerWatermarkTracker extends WatermarkTracker {
+
+ private GlobalAggregateManager aggregateManager;
+ private final String aggregateName;
+ private final WatermarkAggregateFunction aggregateFunction = new WatermarkAggregateFunction();
+ private final long logAccumulatorIntervalMillis;
+ private long updateTimeoutCount;
+
+ public JobManagerWatermarkTracker(String aggregateName) {
+ this(aggregateName, -1);
+ }
+
+ public JobManagerWatermarkTracker(String aggregateName, long logAccumulatorIntervalMillis) {
+ super();
+ this.aggregateName = aggregateName;
+ this.logAccumulatorIntervalMillis = logAccumulatorIntervalMillis;
+ }
+
+ @Override
+ public long updateWatermark(long localWatermark) {
+ WatermarkUpdate update = new WatermarkUpdate();
+ update.id = getSubtaskId();
+ update.watermark = localWatermark;
+ try {
+ byte[] resultBytes = aggregateManager.updateGlobalAggregate(aggregateName,
+ InstantiationUtil.serializeObject(update), aggregateFunction);
+ WatermarkResult result = InstantiationUtil.deserializeObject(resultBytes,
+ this.getClass().getClassLoader());
+ this.updateTimeoutCount += result.updateTimeoutCount;
+ return result.watermark;
+ } catch (ClassNotFoundException | IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public void open(RuntimeContext context) {
+ super.open(context);
+ this.aggregateFunction.updateTimeoutMillis = super.getUpdateTimeoutMillis();
+ this.aggregateFunction.logAccumulatorIntervalMillis = logAccumulatorIntervalMillis;
+ Preconditions.checkArgument(context instanceof StreamingRuntimeContext);
+ StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) context;
+ this.aggregateManager = runtimeContext.getGlobalAggregateManager();
+ }
+
+ public long getUpdateTimeoutCount() {
+ return updateTimeoutCount;
+ }
+
+ /** Watermark aggregation input. */
+ protected static class WatermarkUpdate implements Serializable {
+ protected long watermark = Long.MIN_VALUE;
+ protected String id;
+ }
+
+ /** Watermark aggregation result. */
+ protected static class WatermarkResult implements Serializable {
+ protected long watermark = Long.MIN_VALUE;
+ protected long updateTimeoutCount = 0;
+ }
+
+ /**
+ * Aggregate function for computing a combined watermark of parallel subtasks.
+ */
+ private static class WatermarkAggregateFunction implements
+ AggregateFunction<byte[], Map<String, WatermarkState>, byte[]> {
+
+ private long updateTimeoutMillis = DEFAULT_UPDATE_TIMEOUT_MILLIS;
+ private long logAccumulatorIntervalMillis = -1;
+
+ // TODO: wrap accumulator
+ static long addCount;
+ static long lastLogged;
+ private static final Logger LOG = LoggerFactory.getLogger(WatermarkAggregateFunction.class);
+
+ @Override
+ public Map<String, WatermarkState> createAccumulator() {
+ return new HashMap<>();
+ }
+
+ @Override
+ public Map<String, WatermarkState> add(byte[] valueBytes, Map<String, WatermarkState> accumulator) {
+ addCount++;
+ final WatermarkUpdate value;
+ try {
+ value = InstantiationUtil.deserializeObject(valueBytes, this.getClass().getClassLoader());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ WatermarkState ws = accumulator.get(value.id);
+ if (ws == null) {
+ accumulator.put(value.id, ws = new WatermarkState());
+ }
+ ws.watermark = value.watermark;
+ ws.lastUpdated = System.currentTimeMillis();
+ return accumulator;
+ }
+
+ @Override
+ public byte[] getResult(Map<String, WatermarkState> accumulator) {
+ long updateTimeoutCount = 0;
+ long currentTime = System.currentTimeMillis();
+ long globalWatermark = Long.MAX_VALUE;
+ for (Map.Entry<String, WatermarkState> e : accumulator.entrySet()) {
+ WatermarkState ws = e.getValue();
+ if (ws.lastUpdated + updateTimeoutMillis < currentTime) {
+ // ignore outdated entry
+ updateTimeoutCount++;
+ continue;
+ }
+ globalWatermark = Math.min(ws.watermark, globalWatermark);
+ }
+
+ WatermarkResult result = new WatermarkResult();
+ result.watermark = globalWatermark == Long.MAX_VALUE ? Long.MIN_VALUE : globalWatermark;
+ result.updateTimeoutCount = updateTimeoutCount;
+
+ if (logAccumulatorIntervalMillis > 0) {
+ if (currentTime - lastLogged > logAccumulatorIntervalMillis) {
+ lastLogged = System.currentTimeMillis();
+ LOG.info("WatermarkAggregateFunction added: {}, timeout: {}, map: {}",
+ addCount, updateTimeoutCount, accumulator);
+ }
+ }
+
+ try {
+ return InstantiationUtil.serializeObject(result);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Map<String, WatermarkState> merge(Map<String, WatermarkState> accumulatorA, Map<String, WatermarkState> accumulatorB) {
+ // not required
+ throw new UnsupportedOperationException();
+ }
+ }
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
new file mode 100644
index 0000000..17344b1
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Emitter that handles event time synchronization between producer threads.
+ *
+ * <p>Records are organized into per producer queues that will block when capacity is exhausted.
+ *
+ * <p>Records are emitted by selecting the oldest available element of all producer queues,
+ * as long as the timestamp does not exceed the current shared watermark plus allowed lookahead interval.
+ *
+ * @param <T>
+ */
+@Internal
+public abstract class RecordEmitter<T extends TimestampedValue> implements Runnable {
+ private static final Logger LOG = LoggerFactory.getLogger(RecordEmitter.class);
+
+ /**
+ * The default capacity of a single queue.
+ *
+ * <p>Larger queue size can lead to higher throughput, but also to
+ * very high heap space consumption, depending on the size of elements.
+ *
+ * <p>Note that this is difficult to tune, because it does not take into account
+ * the size of individual objects.
+ */
+ public static final int DEFAULT_QUEUE_CAPACITY = 100;
+
+ private final int queueCapacity;
+ private final ConcurrentHashMap<Integer, AsyncRecordQueue<T>> queues = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<AsyncRecordQueue<T>, Boolean> emptyQueues = new ConcurrentHashMap<>();
+ private final PriorityQueue<AsyncRecordQueue<T>> heads = new PriorityQueue<>(this::compareHeadElement);
+ private volatile boolean running = true;
+ private volatile long maxEmitTimestamp = Long.MAX_VALUE;
+ private long maxLookaheadMillis = 60 * 1000; // one minute
+ private long idleSleepMillis = 100;
+ private final Object condition = new Object();
+
+ public RecordEmitter(int queueCapacity) {
+ this.queueCapacity = queueCapacity;
+ }
+
+ private int compareHeadElement(AsyncRecordQueue left, AsyncRecordQueue right) {
+ return Long.compare(left.headTimestamp, right.headTimestamp);
+ }
+
+ /**
+ * Accepts records from readers.
+ *
+ * @param <T>
+ */
+ public interface RecordQueue<T> {
+ void put(T record) throws InterruptedException;
+
+ int getQueueId();
+
+ int getSize();
+
+ T peek();
+ }
+
+ private class AsyncRecordQueue<T> implements RecordQueue<T> {
+ private final ArrayBlockingQueue<T> queue;
+ private final int queueId;
+ long headTimestamp;
+
+ private AsyncRecordQueue(int queueId) {
+ super();
+ this.queue = new ArrayBlockingQueue<>(queueCapacity);
+ this.queueId = queueId;
+ this.headTimestamp = Long.MAX_VALUE;
+ }
+
+ public void put(T record) throws InterruptedException {
+ queue.put(record);
+ synchronized (condition) {
+ condition.notify();
+ }
+ }
+
+ public int getQueueId() {
+ return queueId;
+ }
+
+ public int getSize() {
+ return queue.size();
+ }
+
+ public T peek() {
+ return queue.peek();
+ }
+
+ }
+
+ /**
+ * The queue for the given producer (i.e. Kinesis shard consumer thread).
+ *
+ * <p>The producer may hold on to the queue for subsequent records.
+ *
+ * @param producerIndex
+ * @return
+ */
+ public RecordQueue<T> getQueue(int producerIndex) {
+ return queues.computeIfAbsent(producerIndex, (key) -> {
+ AsyncRecordQueue<T> q = new AsyncRecordQueue<>(producerIndex);
+ emptyQueues.put(q, false);
+ return q;
+ });
+ }
+
+ /**
+ * How far ahead of the watermark records should be emitted.
+ *
+ * <p>Needs to account for the latency of obtaining the global watermark.
+ *
+ * @param maxLookaheadMillis
+ */
+ public void setMaxLookaheadMillis(long maxLookaheadMillis) {
+ this.maxLookaheadMillis = maxLookaheadMillis;
+ LOG.info("[setMaxLookaheadMillis] Max lookahead millis set to {}", maxLookaheadMillis);
+ }
+
+ /**
+ * Set the current watermark.
+ *
+ * <p>This watermark will be used to control which records to emit from the queues of pending
+ * elements. When an element timestamp is ahead of the watermark by at least {@link
+ * #maxLookaheadMillis}, elements in that queue will wait until the watermark advances.
+ *
+ * @param watermark
+ */
+ public void setCurrentWatermark(long watermark) {
+ maxEmitTimestamp = watermark + maxLookaheadMillis;
+ synchronized (condition) {
+ condition.notify();
+ }
+ LOG.info(
+ "[setCurrentWatermark] Current watermark set to {}, maxEmitTimestamp = {}",
+ watermark,
+ maxEmitTimestamp);
+ }
+
+ /**
+ * Run loop, does not return unless {@link #stop()} was called.
+ */
+ @Override
+ public void run() {
+ LOG.info("Starting emitter with maxLookaheadMillis: {}", this.maxLookaheadMillis);
+
+ // emit available records, ordered by timestamp
+ AsyncRecordQueue<T> min = heads.poll();
+ runLoop:
+ while (running) {
+ // find a queue to emit from
+ while (min == null) {
+ // check new or previously empty queues
+ if (!emptyQueues.isEmpty()) {
+ for (AsyncRecordQueue<T> queueHead : emptyQueues.keySet()) {
+ if (!queueHead.queue.isEmpty()) {
+ emptyQueues.remove(queueHead);
+ queueHead.headTimestamp = queueHead.queue.peek().getTimestamp();
+ heads.offer(queueHead);
+ }
+ }
+ }
+ min = heads.poll();
+ if (min == null) {
+ synchronized (condition) {
+ // wait for work
+ try {
+ condition.wait(idleSleepMillis);
+ } catch (InterruptedException e) {
+ continue runLoop;
+ }
+ }
+ }
+ }
+
+ // wait until ready to emit min or another queue receives elements
+ while (min.headTimestamp > maxEmitTimestamp) {
+ synchronized (condition) {
+ // wait until ready to emit
+ try {
+ condition.wait(idleSleepMillis);
+ } catch (InterruptedException e) {
+ continue runLoop;
+ }
+ if (min.headTimestamp > maxEmitTimestamp && !emptyQueues.isEmpty()) {
+ // see if another queue can make progress
+ heads.offer(min);
+ min = null;
+ continue runLoop;
+ }
+ }
+ }
+
+ // emit up to queue capacity records
+ // cap on empty queues since older records may arrive
+ AsyncRecordQueue<T> nextQueue = heads.poll();
+ T record;
+ int emitCount = 0;
+ while ((record = min.queue.poll()) != null) {
+ emit(record, min);
+ // track last record emitted, even when queue becomes empty
+ min.headTimestamp = record.getTimestamp();
+ // potentially switch to next queue
+ if ((nextQueue != null && min.headTimestamp > nextQueue.headTimestamp)
+ || (min.headTimestamp > maxEmitTimestamp)
+ || (emitCount++ > queueCapacity && !emptyQueues.isEmpty())) {
+ break;
+ }
+ }
+ if (record == null) {
+ this.emptyQueues.put(min, true);
+ } else {
+ heads.offer(min);
+ }
+ min = nextQueue;
+ }
+ }
+
+ public void stop() {
+ running = false;
+ }
+
+ /** Emit the record. This is specific to a connector, like the Kinesis data fetcher. */
+ public abstract void emit(T record, RecordQueue<T> source);
+
+ /** Summary of emit queues that can be used for logging. */
+ public String printInfo() {
+ StringBuffer sb = new StringBuffer();
+ sb.append(String.format("queues: %d, empty: %d",
+ queues.size(), emptyQueues.size()));
+ for (Map.Entry<Integer, AsyncRecordQueue<T>> e : queues.entrySet()) {
+ AsyncRecordQueue q = e.getValue();
+ sb.append(String.format("\n%d timestamp: %s size: %d",
+ e.getValue().queueId, q.headTimestamp, q.queue.size()));
+ }
+ return sb.toString();
+ }
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/WatermarkTracker.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/WatermarkTracker.java
new file mode 100644
index 0000000..f4207c7
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/WatermarkTracker.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import java.io.Closeable;
+import java.io.Serializable;
+
+/**
+ * The watermark tracker is responsible for aggregating watermarks across distributed operators.
+ * <p/>It can be used for sub tasks of a single Flink source as well as multiple heterogeneous
+ * sources or other operators.
+ * <p/>The class essentially functions like a distributed hash table that enclosing operators can
+ * use to adopt their processing / IO rates.
+ */
+@PublicEvolving
+public abstract class WatermarkTracker implements Closeable, Serializable {
+
+ public static final long DEFAULT_UPDATE_TIMEOUT_MILLIS = 60_000;
+
+ /**
+ * Subtasks that have not provided a watermark update within the configured interval will be
+ * considered idle and excluded from target watermark calculation.
+ */
+ private long updateTimeoutMillis = DEFAULT_UPDATE_TIMEOUT_MILLIS;
+
+ /**
+ * Unique id for the subtask.
+ * Using string (instead of subtask index) so synchronization can spawn across multiple sources.
+ */
+ private String subtaskId;
+
+ /** Watermark state. */
+ protected static class WatermarkState {
+ protected long watermark = Long.MIN_VALUE;
+ protected long lastUpdated;
+
+ public long getWatermark() {
+ return watermark;
+ }
+
+ @Override
+ public String toString() {
+ return "WatermarkState{watermark=" + watermark + ", lastUpdated=" + lastUpdated + '}';
+ }
+ }
+
+ protected String getSubtaskId() {
+ return this.subtaskId;
+ }
+
+ protected long getUpdateTimeoutMillis() {
+ return this.updateTimeoutMillis;
+ }
+
+ public abstract long getUpdateTimeoutCount();
+
+ /**
+ * Subtasks that have not provided a watermark update within the configured interval will be
+ * considered idle and excluded from target watermark calculation.
+ *
+ * @param updateTimeoutMillis
+ */
+ public void setUpdateTimeoutMillis(long updateTimeoutMillis) {
+ this.updateTimeoutMillis = updateTimeoutMillis;
+ }
+
+ /**
+ * Set the current watermark of the owning subtask and return the global low watermark based on
+ * the current state snapshot. Periodically called by the enclosing consumer instance, which is
+ * responsible for any timer management etc.
+ *
+ * @param localWatermark
+ * @return
+ */
+ public abstract long updateWatermark(final long localWatermark);
+
+ protected long getCurrentTime() {
+ return System.currentTimeMillis();
+ }
+
+ public void open(RuntimeContext context) {
+ if (context instanceof StreamingRuntimeContext) {
+ this.subtaskId = ((StreamingRuntimeContext) context).getOperatorUniqueID()
+ + "-" + context.getIndexOfThisSubtask();
+ } else {
+ this.subtaskId = context.getTaskNameWithSubtasks();
+ }
+ }
+
+ @Override
+ public void close() {
+ // no work to do here
+ }
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
index b38eef1..1ce05d1 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
@@ -418,7 +418,7 @@ public class FlinkKinesisConsumerMigrationTest {
HashMap<StreamShardMetadata, SequenceNumber> testStateSnapshot,
List<StreamShardHandle> testInitialDiscoveryShards) {
- super(streams, sourceContext, runtimeContext, configProps, deserializationSchema, DEFAULT_SHARD_ASSIGNER, null);
+ super(streams, sourceContext, runtimeContext, configProps, deserializationSchema, DEFAULT_SHARD_ASSIGNER, null, null);
this.testStateSnapshot = testStateSnapshot;
this.testInitialDiscoveryShards = testInitialDiscoveryShards;
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index d36d68a..cbcd8b4 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -52,6 +52,7 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGen
import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.CollectingSourceContext;
@@ -60,6 +61,7 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
import com.amazonaws.services.kinesis.model.Shard;
+import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -79,6 +81,7 @@ import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
@@ -737,6 +740,7 @@ public class FlinkKinesisConsumerTest {
deserializationSchema,
getShardAssigner(),
getPeriodicWatermarkAssigner(),
+ null,
new AtomicReference<>(),
new ArrayList<>(),
subscribedStreamsToLastDiscoveredShardIds,
@@ -775,6 +779,10 @@ public class FlinkKinesisConsumerTest {
public void emitWatermark(Watermark mark) {
watermarks.add(mark);
}
+
+ @Override
+ public void markAsTemporarilyIdle() {
+ }
};
new Thread(
@@ -817,6 +825,164 @@ public class FlinkKinesisConsumerTest {
assertThat(watermarks, org.hamcrest.Matchers.contains(new Watermark(-3), new Watermark(5)));
}
+ @Test
+ public void testSourceSynchronization() throws Exception {
+
+ final String streamName = "fakeStreamName";
+ final Time maxOutOfOrderness = Time.milliseconds(5);
+ final long autoWatermarkInterval = 1_000;
+ final long watermarkSyncInterval = autoWatermarkInterval + 1;
+
+ HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds = new HashMap<>();
+ subscribedStreamsToLastDiscoveredShardIds.put(streamName, null);
+
+ final KinesisDeserializationSchema<String> deserializationSchema =
+ new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema());
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, Long.toString(10L));
+ props.setProperty(ConsumerConfigConstants.WATERMARK_SYNC_MILLIS,
+ Long.toString(watermarkSyncInterval));
+ props.setProperty(ConsumerConfigConstants.WATERMARK_LOOKAHEAD_MILLIS, Long.toString(5));
+
+ BlockingQueue<String> shard1 = new LinkedBlockingQueue();
+ BlockingQueue<String> shard2 = new LinkedBlockingQueue();
+
+ Map<String, List<BlockingQueue<String>>> streamToQueueMap = new HashMap<>();
+ streamToQueueMap.put(streamName, Lists.newArrayList(shard1, shard2));
+
+ // override createFetcher to mock Kinesis
+ FlinkKinesisConsumer<String> sourceFunc =
+ new FlinkKinesisConsumer<String>(streamName, deserializationSchema, props) {
+ @Override
+ protected KinesisDataFetcher<String> createFetcher(
+ List<String> streams,
+ SourceFunction.SourceContext<String> sourceContext,
+ RuntimeContext runtimeContext,
+ Properties configProps,
+ KinesisDeserializationSchema<String> deserializationSchema) {
+
+ KinesisDataFetcher<String> fetcher =
+ new KinesisDataFetcher<String>(
+ streams,
+ sourceContext,
+ sourceContext.getCheckpointLock(),
+ runtimeContext,
+ configProps,
+ deserializationSchema,
+ getShardAssigner(),
+ getPeriodicWatermarkAssigner(),
+ getWatermarkTracker(),
+ new AtomicReference<>(),
+ new ArrayList<>(),
+ subscribedStreamsToLastDiscoveredShardIds,
+ (props) -> FakeKinesisBehavioursFactory.blockingQueueGetRecords(
+ streamToQueueMap)
+ ) {};
+ return fetcher;
+ }
+ };
+
+ sourceFunc.setShardAssigner(
+ (streamShardHandle, i) -> {
+ // shardId-000000000000
+ return Integer.parseInt(
+ streamShardHandle.getShard().getShardId().substring("shardId-".length()));
+ });
+
+ sourceFunc.setPeriodicWatermarkAssigner(new TestTimestampExtractor(maxOutOfOrderness));
+
+ sourceFunc.setWatermarkTracker(new TestWatermarkTracker());
+
+ // there is currently no test harness specifically for sources,
+ // so we overlay the source thread here
+ AbstractStreamOperatorTestHarness<Object> testHarness =
+ new AbstractStreamOperatorTestHarness<Object>(
+ new StreamSource(sourceFunc), 1, 1, 0);
+ testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
+ testHarness.getExecutionConfig().setAutoWatermarkInterval(autoWatermarkInterval);
+
+ testHarness.initializeEmptyState();
+ testHarness.open();
+
+ final ConcurrentLinkedQueue<Object> results = testHarness.getOutput();
+
+ @SuppressWarnings("unchecked")
+ SourceFunction.SourceContext<String> sourceContext = new CollectingSourceContext(
+ testHarness.getCheckpointLock(), results) {
+ @Override
+ public void markAsTemporarilyIdle() {
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+ results.add(mark);
+ }
+ };
+
+ new Thread(
+ () -> {
+ try {
+ sourceFunc.run(sourceContext);
+ } catch (InterruptedException e) {
+ // expected on cancel
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .start();
+
+ ArrayList<Object> expectedResults = new ArrayList<>();
+
+ final long record1 = 1;
+ shard1.put(Long.toString(record1));
+ expectedResults.add(Long.toString(record1));
+ awaitRecordCount(results, expectedResults.size());
+
+ // at this point we know the fetcher was initialized
+ final KinesisDataFetcher fetcher = org.powermock.reflect.Whitebox.getInternalState(sourceFunc, "fetcher");
+
+ // trigger watermark emit
+ testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval);
+ expectedResults.add(new Watermark(-4));
+ // verify watermark
+ awaitRecordCount(results, expectedResults.size());
+ assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray()));
+ assertEquals(0, TestWatermarkTracker.WATERMARK.get());
+
+ // trigger sync
+ testHarness.setProcessingTime(testHarness.getProcessingTime() + 1);
+ TestWatermarkTracker.assertSingleWatermark(-4);
+
+ final long record2 = record1 + (watermarkSyncInterval * 3) + 1;
+ shard1.put(Long.toString(record2));
+
+ // TODO: check for record received instead
+ Thread.sleep(100);
+
+ // Advance the watermark. Since the new record is past global watermark + threshold,
+ // it won't be emitted and the watermark does not advance
+ testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval);
+ assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray()));
+ assertEquals(3000L, (long) org.powermock.reflect.Whitebox.getInternalState(fetcher, "nextWatermark"));
+ TestWatermarkTracker.assertSingleWatermark(-4);
+
+ // Trigger global watermark sync
+ testHarness.setProcessingTime(testHarness.getProcessingTime() + 1);
+ expectedResults.add(Long.toString(record2));
+ awaitRecordCount(results, expectedResults.size());
+ assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray()));
+ TestWatermarkTracker.assertSingleWatermark(3000);
+
+ // Trigger watermark update and emit
+ testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval);
+ expectedResults.add(new Watermark(3000));
+ assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray()));
+
+ sourceFunc.cancel();
+ testHarness.close();
+ }
+
private void awaitRecordCount(ConcurrentLinkedQueue<? extends Object> queue, int count) throws Exception {
long timeoutMillis = System.currentTimeMillis() + 10_000;
while (System.currentTimeMillis() < timeoutMillis && queue.size() < count) {
@@ -837,4 +1003,23 @@ public class FlinkKinesisConsumerTest {
}
}
+ private static class TestWatermarkTracker extends WatermarkTracker {
+
+ private static final AtomicLong WATERMARK = new AtomicLong();
+
+ @Override
+ public long getUpdateTimeoutCount() {
+ return 0;
+ }
+
+ @Override
+ public long updateWatermark(long localWatermark) {
+ WATERMARK.set(localWatermark);
+ return localWatermark;
+ }
+
+ static void assertSingleWatermark(long expected) {
+ Assert.assertEquals(expected, WATERMARK.get());
+ }
+ }
}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
index dbc7118..93886f9 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
@@ -113,9 +113,10 @@ public class ShardConsumerTest {
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
Mockito.mock(KinesisProxyInterface.class));
+ int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
new ShardConsumer<>(
fetcher,
- 0,
+ shardIndex,
subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 9, 500L),
@@ -151,9 +152,10 @@ public class ShardConsumerTest {
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
Mockito.mock(KinesisProxyInterface.class));
+ int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
new ShardConsumer<>(
fetcher,
- 0,
+ shardIndex,
subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
// Get a total of 1000 records with 9 getRecords() calls,
@@ -195,9 +197,10 @@ public class ShardConsumerTest {
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
Mockito.mock(KinesisProxyInterface.class));
+ int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
new ShardConsumer<>(
fetcher,
- 0,
+ shardIndex,
subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
// Initial number of records to fetch --> 10
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
index f1fd069..3bb11bd 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
@@ -74,6 +74,7 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
deserializationSchema,
DEFAULT_SHARD_ASSIGNER,
null,
+ null,
thrownErrorUnderTest,
subscribedShardsStateUnderTest,
subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTrackerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTrackerTest.java
new file mode 100644
index 0000000..b793b54
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTrackerTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/** Test for {@link JobManagerWatermarkTracker}. */
+public class JobManagerWatermarkTrackerTest {
+
+ private static MiniCluster flink;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ final Configuration config = new Configuration();
+ config.setInteger(RestOptions.PORT, 0);
+
+ final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
+ .setConfiguration(config)
+ .setNumTaskManagers(1)
+ .setNumSlotsPerTaskManager(1)
+ .build();
+
+ flink = new MiniCluster(miniClusterConfiguration);
+
+ flink.start();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (flink != null) {
+ flink.close();
+ }
+ }
+
+ @Test
+ public void testUpateWatermark() throws Exception {
+ final Configuration clientConfiguration = new Configuration();
+ clientConfiguration.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 0);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+ flink.getRestAddress().get().getHost(),
+ flink.getRestAddress().get().getPort(),
+ clientConfiguration);
+
+ env.addSource(new TestSourceFunction(new JobManagerWatermarkTracker("fakeId")))
+ .addSink(new SinkFunction<Integer>() {});
+ env.execute();
+ }
+
+ private static class TestSourceFunction extends RichSourceFunction<Integer> {
+
+ private final JobManagerWatermarkTracker tracker;
+
+ public TestSourceFunction(JobManagerWatermarkTracker tracker) {
+ this.tracker = tracker;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ tracker.open(getRuntimeContext());
+ }
+
+ @Override
+ public void run(SourceContext<Integer> ctx) {
+ Assert.assertEquals(998, tracker.updateWatermark(998));
+ Assert.assertEquals(999, tracker.updateWatermark(999));
+ }
+
+ @Override
+ public void cancel() {
+ }
+ }
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java
new file mode 100644
index 0000000..1948237
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
+
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/** Test for {@link RecordEmitter}. */
+public class RecordEmitterTest {
+
+ static List<TimestampedValue> results = Collections.synchronizedList(new ArrayList<>());
+
+ private class TestRecordEmitter extends RecordEmitter<TimestampedValue> {
+
+ private TestRecordEmitter() {
+ super(DEFAULT_QUEUE_CAPACITY);
+ }
+
+ @Override
+ public void emit(TimestampedValue record, RecordQueue<TimestampedValue> queue) {
+ results.add(record);
+ }
+ }
+
+ @Test
+ public void test() throws Exception {
+
+ TestRecordEmitter emitter = new TestRecordEmitter();
+
+ final TimestampedValue<String> one = new TimestampedValue<>("one", 1);
+ final TimestampedValue<String> two = new TimestampedValue<>("two", 2);
+ final TimestampedValue<String> five = new TimestampedValue<>("five", 5);
+ final TimestampedValue<String> ten = new TimestampedValue<>("ten", 10);
+
+ final RecordEmitter.RecordQueue<TimestampedValue> queue0 = emitter.getQueue(0);
+ final RecordEmitter.RecordQueue<TimestampedValue> queue1 = emitter.getQueue(1);
+
+ queue0.put(one);
+ queue0.put(five);
+ queue0.put(ten);
+
+ queue1.put(two);
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ executor.submit(emitter);
+
+ long timeout = System.currentTimeMillis() + 10_000;
+ while (results.size() != 4 && System.currentTimeMillis() < timeout) {
+ Thread.sleep(100);
+ }
+ emitter.stop();
+ executor.shutdownNow();
+
+ Assert.assertThat(results, Matchers.contains(one, five, two, ten));
+ }
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/WatermarkTrackerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/WatermarkTrackerTest.java
new file mode 100644
index 0000000..3d59a45
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/WatermarkTrackerTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Test for {@link WatermarkTracker}. */
+public class WatermarkTrackerTest {
+
+ WatermarkTracker.WatermarkState wm1 = new WatermarkTracker.WatermarkState();
+ MutableLong clock = new MutableLong(0);
+
+ private class TestWatermarkTracker extends WatermarkTracker {
+ /**
+ * The watermarks of all sub tasks that participate in the synchronization.
+ */
+ private final Map<String, WatermarkState> watermarks = new HashMap<>();
+
+ private long updateTimeoutCount = 0;
+
+ @Override
+ protected long getCurrentTime() {
+ return clock.longValue();
+ }
+
+ @Override
+ public long updateWatermark(final long localWatermark) {
+ refreshWatermarkSnapshot(this.watermarks);
+
+ long currentTime = getCurrentTime();
+ String subtaskId = this.getSubtaskId();
+
+ WatermarkState ws = watermarks.get(subtaskId);
+ if (ws == null) {
+ watermarks.put(subtaskId, ws = new WatermarkState());
+ }
+ ws.lastUpdated = currentTime;
+ ws.watermark = Math.max(ws.watermark, localWatermark);
+ saveWatermark(subtaskId, ws);
+
+ long globalWatermark = ws.watermark;
+ for (Map.Entry<String, WatermarkState> e : watermarks.entrySet()) {
+ ws = e.getValue();
+ if (ws.lastUpdated + getUpdateTimeoutMillis() < currentTime) {
+ // ignore outdated subtask
+ updateTimeoutCount++;
+ continue;
+ }
+ globalWatermark = Math.min(ws.watermark, globalWatermark);
+ }
+ return globalWatermark;
+ }
+
+ protected void refreshWatermarkSnapshot(Map<String, WatermarkState> watermarks) {
+ watermarks.put("wm1", wm1);
+ }
+
+ protected void saveWatermark(String id, WatermarkState ws) {
+ // do nothing
+ }
+
+ public long getUpdateTimeoutCount() {
+ return updateTimeoutCount;
+ }
+ }
+
+ @Test
+ public void test() {
+ long watermark = 0;
+ TestWatermarkTracker ws = new TestWatermarkTracker();
+ ws.open(new MockStreamingRuntimeContext(false, 1, 0));
+ Assert.assertEquals(Long.MIN_VALUE, ws.updateWatermark(Long.MIN_VALUE));
+ Assert.assertEquals(Long.MIN_VALUE, ws.updateWatermark(watermark));
+ // timeout wm1
+ clock.add(WatermarkTracker.DEFAULT_UPDATE_TIMEOUT_MILLIS + 1);
+ Assert.assertEquals(watermark, ws.updateWatermark(watermark));
+ Assert.assertEquals(watermark, ws.updateWatermark(watermark - 1));
+
+ // min watermark
+ wm1.watermark = watermark + 1;
+ wm1.lastUpdated = clock.longValue();
+ Assert.assertEquals(watermark, ws.updateWatermark(watermark));
+ Assert.assertEquals(watermark + 1, ws.updateWatermark(watermark + 1));
+ }
+
+}