You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2016/06/29 11:37:16 UTC
flink git commit: [FLINK-4080] Guarantee exactly-once for Kinesis
consumer for failures in the middle of aggregated records
Repository: flink
Updated Branches:
refs/heads/master dbe41f487 -> fa42cdabf
[FLINK-4080] Guarantee exactly-once for Kinesis consumer for failures in the middle of aggregated records
This closes #2180
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa42cdab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa42cdab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa42cdab
Branch: refs/heads/master
Commit: fa42cdabfdd37fa3c2f86198ca04d37113ddda00
Parents: dbe41f4
Author: Gordon Tai <tz...@gmail.com>
Authored: Wed Jun 29 15:46:35 2016 +0800
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Jun 29 13:36:54 2016 +0200
----------------------------------------------------------------------
.../kinesis/FlinkKinesisConsumer.java | 23 ++--
.../kinesis/internals/KinesisDataFetcher.java | 11 +-
.../kinesis/internals/ShardConsumerThread.java | 110 +++++++++++++------
.../kinesis/model/SentinelSequenceNumber.java | 18 ++-
.../kinesis/model/SequenceNumber.java | 104 ++++++++++++++++++
.../kinesis/FlinkKinesisConsumerTest.java | 15 +--
.../internals/KinesisDataFetcherTest.java | 3 +-
.../internals/ShardConsumerThreadTest.java | 47 +++++---
8 files changed, 254 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fa42cdab/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 9cbc9d9..19b36e4 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstan
import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
@@ -64,7 +65,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* @param <T> the type of data emitted
*/
public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
- implements CheckpointedAsynchronously<HashMap<KinesisStreamShard, String>>, ResultTypeQueryable<T> {
+ implements CheckpointedAsynchronously<HashMap<KinesisStreamShard, SequenceNumber>>, ResultTypeQueryable<T> {
private static final long serialVersionUID = 4724006128720664870L;
@@ -92,10 +93,10 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
private transient KinesisDataFetcher fetcher;
/** The sequence numbers of the last fetched data records from Kinesis by this task */
- private transient HashMap<KinesisStreamShard, String> lastSequenceNums;
+ private transient HashMap<KinesisStreamShard, SequenceNumber> lastSequenceNums;
/** The sequence numbers to restore to upon restore from failure */
- private transient HashMap<KinesisStreamShard, String> sequenceNumsToRestore;
+ private transient HashMap<KinesisStreamShard, SequenceNumber> sequenceNumsToRestore;
private volatile boolean hasAssignedShards;
@@ -227,14 +228,14 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
LOG.info("Consumer task {} is restoring sequence numbers from previous checkpointed state", thisConsumerTaskIndex);
}
- for (Map.Entry<KinesisStreamShard, String> restoreSequenceNum : sequenceNumsToRestore.entrySet()) {
+ for (Map.Entry<KinesisStreamShard, SequenceNumber> restoreSequenceNum : sequenceNumsToRestore.entrySet()) {
// advance the corresponding shard to the last known sequence number
fetcher.advanceSequenceNumberTo(restoreSequenceNum.getKey(), restoreSequenceNum.getValue());
}
if (LOG.isInfoEnabled()) {
StringBuilder sb = new StringBuilder();
- for (Map.Entry<KinesisStreamShard, String> restoreSequenceNo : sequenceNumsToRestore.entrySet()) {
+ for (Map.Entry<KinesisStreamShard, SequenceNumber> restoreSequenceNo : sequenceNumsToRestore.entrySet()) {
KinesisStreamShard shard = restoreSequenceNo.getKey();
sb.append(shard.getStreamName()).append(":").append(shard.getShardId())
.append(" -> ").append(restoreSequenceNo.getValue()).append(", ");
@@ -265,14 +266,14 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
}
for (KinesisStreamShard assignedShard : assignedShards) {
- fetcher.advanceSequenceNumberTo(assignedShard, sentinelSequenceNum.toString());
+ fetcher.advanceSequenceNumberTo(assignedShard, sentinelSequenceNum.get());
}
if (LOG.isInfoEnabled()) {
StringBuilder sb = new StringBuilder();
for (KinesisStreamShard assignedShard : assignedShards) {
sb.append(assignedShard.getStreamName()).append(":").append(assignedShard.getShardId())
- .append(" -> ").append(sentinelSequenceNum.toString()).append(", ");
+ .append(" -> ").append(sentinelSequenceNum.get()).append(", ");
}
LOG.info("Advanced the starting sequence numbers of consumer task {}: {}", thisConsumerTaskIndex, sb.toString());
}
@@ -335,7 +336,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
// ------------------------------------------------------------------------
@Override
- public HashMap<KinesisStreamShard, String> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
if (lastSequenceNums == null) {
LOG.debug("snapshotState() requested on not yet opened source; returning null.");
return null;
@@ -351,12 +352,14 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
}
@SuppressWarnings("unchecked")
- HashMap<KinesisStreamShard, String> currentSequenceNums = (HashMap<KinesisStreamShard, String>) lastSequenceNums.clone();
+ HashMap<KinesisStreamShard, SequenceNumber> currentSequenceNums =
+ (HashMap<KinesisStreamShard, SequenceNumber>) lastSequenceNums.clone();
+
return currentSequenceNums;
}
@Override
- public void restoreState(HashMap<KinesisStreamShard, String> restoredState) throws Exception {
+ public void restoreState(HashMap<KinesisStreamShard, SequenceNumber> restoredState) throws Exception {
sequenceNumsToRestore = restoredState;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fa42cdab/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 4ae2c85..58933f6 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.internals;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
@@ -50,7 +51,7 @@ public class KinesisDataFetcher {
private final String taskName;
/** Information of the shards that this fetcher handles, along with the sequence numbers that they should start from */
- private HashMap<KinesisStreamShard, String> assignedShardsWithStartingSequenceNum;
+ private HashMap<KinesisStreamShard, SequenceNumber> assignedShardsWithStartingSequenceNum;
/** Reference to the thread that executed run() */
private volatile Thread mainThread;
@@ -71,7 +72,7 @@ public class KinesisDataFetcher {
this.configProps = checkNotNull(configProps);
this.assignedShardsWithStartingSequenceNum = new HashMap<>();
for (KinesisStreamShard shard : assignedShards) {
- assignedShardsWithStartingSequenceNum.put(shard, SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString());
+ assignedShardsWithStartingSequenceNum.put(shard, SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.get());
}
this.taskName = taskName;
this.error = new AtomicReference<>();
@@ -83,7 +84,7 @@ public class KinesisDataFetcher {
* @param streamShard the shard to perform the advance on
* @param sequenceNum the sequence number to advance to
*/
- public void advanceSequenceNumberTo(KinesisStreamShard streamShard, String sequenceNum) {
+ public void advanceSequenceNumberTo(KinesisStreamShard streamShard, SequenceNumber sequenceNum) {
if (!assignedShardsWithStartingSequenceNum.containsKey(streamShard)) {
throw new IllegalArgumentException("Can't advance sequence number on a shard we are not going to read.");
}
@@ -92,7 +93,7 @@ public class KinesisDataFetcher {
public <T> void run(SourceFunction.SourceContext<T> sourceContext,
KinesisDeserializationSchema<T> deserializationSchema,
- HashMap<KinesisStreamShard, String> lastSequenceNums) throws Exception {
+ HashMap<KinesisStreamShard, SequenceNumber> lastSequenceNums) throws Exception {
if (assignedShardsWithStartingSequenceNum == null || assignedShardsWithStartingSequenceNum.size() == 0) {
throw new IllegalArgumentException("No shards set to read for this fetcher");
@@ -104,7 +105,7 @@ public class KinesisDataFetcher {
// create a thread for each individual shard
ArrayList<ShardConsumerThread<?>> consumerThreads = new ArrayList<>(assignedShardsWithStartingSequenceNum.size());
- for (Map.Entry<KinesisStreamShard, String> assignedShard : assignedShardsWithStartingSequenceNum.entrySet()) {
+ for (Map.Entry<KinesisStreamShard, SequenceNumber> assignedShard : assignedShardsWithStartingSequenceNum.entrySet()) {
ShardConsumerThread<T> thread = new ShardConsumerThread<>(this, configProps, assignedShard.getKey(),
assignedShard.getValue(), sourceContext, InstantiationUtil.clone(deserializationSchema), lastSequenceNums);
thread.setName(String.format("ShardConsumer - %s - %s/%s",
http://git-wip-us.apache.org/repos/asf/flink/blob/fa42cdab/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerThread.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerThread.java
index a350ce8..66452e6 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerThread.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerThread.java
@@ -25,9 +25,11 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.HashMap;
@@ -42,7 +44,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
public class ShardConsumerThread<T> extends Thread {
private final SourceFunction.SourceContext<T> sourceContext;
private final KinesisDeserializationSchema<T> deserializer;
- private final HashMap<KinesisStreamShard, String> seqNoState;
+ private final HashMap<KinesisStreamShard, SequenceNumber> seqNoState;
private final KinesisProxy kinesisProxy;
@@ -52,18 +54,17 @@ public class ShardConsumerThread<T> extends Thread {
private final int maxNumberOfRecordsPerFetch;
- private String lastSequenceNum;
- private String nextShardItr;
+ private SequenceNumber lastSequenceNum;
private volatile boolean running = true;
public ShardConsumerThread(KinesisDataFetcher ownerRef,
Properties props,
KinesisStreamShard assignedShard,
- String lastSequenceNum,
+ SequenceNumber lastSequenceNum,
SourceFunction.SourceContext<T> sourceContext,
KinesisDeserializationSchema<T> deserializer,
- HashMap<KinesisStreamShard, String> seqNumState) {
+ HashMap<KinesisStreamShard, SequenceNumber> seqNumState) {
this.ownerRef = checkNotNull(ownerRef);
this.assignedShard = checkNotNull(assignedShard);
this.lastSequenceNum = checkNotNull(lastSequenceNum);
@@ -79,56 +80,74 @@ public class ShardConsumerThread<T> extends Thread {
@SuppressWarnings("unchecked")
@Override
public void run() {
+ String nextShardItr;
+
try {
- if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.toString())) {
+ // before infinitely looping, we set the initial nextShardItr appropriately
+
+ if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) {
// if the shard is already closed, there will be no latest next record to get for this shard
if (assignedShard.isClosed()) {
nextShardItr = null;
} else {
nextShardItr = kinesisProxy.getShardIterator(assignedShard, ShardIteratorType.LATEST.toString(), null);
}
- } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.toString())) {
+ } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) {
nextShardItr = kinesisProxy.getShardIterator(assignedShard, ShardIteratorType.TRIM_HORIZON.toString(), null);
- } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.toString())) {
+ } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
nextShardItr = null;
} else {
- nextShardItr = kinesisProxy.getShardIterator(assignedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum);
+ // we will be starting from an actual sequence number (due to restore from failure).
+ // if the last sequence number refers to an aggregated record, we need to clean up any dangling sub-records
+ // from the last aggregated record; otherwise, we can simply start iterating from the record right after.
+
+ if (lastSequenceNum.isAggregated()) {
+ String itrForLastAggregatedRecord =
+ kinesisProxy.getShardIterator(assignedShard, ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
+
+ // get only the last aggregated record
+ GetRecordsResult getRecordsResult = kinesisProxy.getRecords(itrForLastAggregatedRecord, 1);
+
+ List<UserRecord> fetchedRecords = deaggregateRecords(
+ getRecordsResult.getRecords(),
+ assignedShard.getStartingHashKey(),
+ assignedShard.getEndingHashKey());
+
+ long lastSubSequenceNum = lastSequenceNum.getSubSequenceNumber();
+ for (UserRecord record : fetchedRecords) {
+ // we have found a dangling sub-record if it has a larger subsequence number
+ // than our last sequence number; if so, collect the record and update state
+ if (record.getSubSequenceNumber() > lastSubSequenceNum) {
+ collectRecordAndUpdateState(record);
+ }
+ }
+
+ // set the nextShardItr so we can continue iterating in the next while loop
+ nextShardItr = getRecordsResult.getNextShardIterator();
+ } else {
+ // the last record was non-aggregated, so we can simply start from the next record
+ nextShardItr = kinesisProxy.getShardIterator(assignedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
+ }
}
while(running) {
if (nextShardItr == null) {
- lastSequenceNum = SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.toString();
-
synchronized (sourceContext.getCheckpointLock()) {
- seqNoState.put(assignedShard, lastSequenceNum);
+ seqNoState.put(assignedShard, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
}
break;
} else {
GetRecordsResult getRecordsResult = kinesisProxy.getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
- List<Record> fetchedRecords = getRecordsResult.getRecords();
-
// each of the Kinesis records may be aggregated, so we must deaggregate them before proceeding
- fetchedRecords = deaggregateRecords(fetchedRecords, assignedShard.getStartingHashKey(), assignedShard.getEndingHashKey());
-
- for (Record record : fetchedRecords) {
- ByteBuffer recordData = record.getData();
-
- byte[] dataBytes = new byte[recordData.remaining()];
- recordData.get(dataBytes);
-
- byte[] keyBytes = record.getPartitionKey().getBytes();
+ List<UserRecord> fetchedRecords = deaggregateRecords(
+ getRecordsResult.getRecords(),
+ assignedShard.getStartingHashKey(),
+ assignedShard.getEndingHashKey());
- final T value = deserializer.deserialize(keyBytes, dataBytes,assignedShard.getStreamName(),
- record.getSequenceNumber());
-
- synchronized (sourceContext.getCheckpointLock()) {
- sourceContext.collect(value);
- seqNoState.put(assignedShard, record.getSequenceNumber());
- }
-
- lastSequenceNum = record.getSequenceNumber();
+ for (UserRecord record : fetchedRecords) {
+ collectRecordAndUpdateState(record);
}
nextShardItr = getRecordsResult.getNextShardIterator();
@@ -144,8 +163,31 @@ public class ShardConsumerThread<T> extends Thread {
this.interrupt();
}
+ private void collectRecordAndUpdateState(UserRecord record) throws IOException {
+ ByteBuffer recordData = record.getData();
+
+ byte[] dataBytes = new byte[recordData.remaining()];
+ recordData.get(dataBytes);
+
+ byte[] keyBytes = record.getPartitionKey().getBytes();
+
+ final T value = deserializer.deserialize(keyBytes, dataBytes, assignedShard.getStreamName(),
+ record.getSequenceNumber());
+
+ synchronized (sourceContext.getCheckpointLock()) {
+ sourceContext.collect(value);
+ if (record.isAggregated()) {
+ seqNoState.put(
+ assignedShard,
+ new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber()));
+ } else {
+ seqNoState.put(assignedShard, new SequenceNumber(record.getSequenceNumber()));
+ }
+ }
+ }
+
@SuppressWarnings("unchecked")
- protected static List<Record> deaggregateRecords(List<Record> records, String startingHashKey, String endingHashKey) {
- return (List<Record>) (List<?>) UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey));
+ protected static List<UserRecord> deaggregateRecords(List<Record> records, String startingHashKey, String endingHashKey) {
+ return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fa42cdab/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
index f18e664..55752f8 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
@@ -29,17 +29,27 @@ public enum SentinelSequenceNumber {
/** Flag value to indicate that the sequence number of a shard is not set. This value is used
* as an initial value in {@link KinesisDataFetcher}'s constructor for all shard's sequence number. */
- SENTINEL_SEQUENCE_NUMBER_NOT_SET,
+ SENTINEL_SEQUENCE_NUMBER_NOT_SET( new SequenceNumber("SEQUENCE_NUMBER_NOT_SET") ),
/** Flag value for shard's sequence numbers to indicate that the
* shard should start to be read from the latest incoming records */
- SENTINEL_LATEST_SEQUENCE_NUM,
+ SENTINEL_LATEST_SEQUENCE_NUM( new SequenceNumber("LATEST_SEQUENCE_NUM") ),
/** Flag value for shard's sequence numbers to indicate that the shard should
* start to be read from the earliest records that haven't expired yet */
- SENTINEL_EARLIEST_SEQUENCE_NUM,
+ SENTINEL_EARLIEST_SEQUENCE_NUM( new SequenceNumber("EARLIEST_SEQUENCE_NUM") ),
/** Flag value to indicate that we have already read the last record of this shard
* (Note: Kinesis shards that have been closed due to a split or merge will have an ending data record) */
- SENTINEL_SHARD_ENDING_SEQUENCE_NUM
+ SENTINEL_SHARD_ENDING_SEQUENCE_NUM( new SequenceNumber("SHARD_ENDING_SEQUENCE_NUM") );
+
+ private SequenceNumber sentinel;
+
+ SentinelSequenceNumber(SequenceNumber sentinel) {
+ this.sentinel = sentinel;
+ }
+
+ public SequenceNumber get() {
+ return sentinel;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fa42cdab/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
new file mode 100644
index 0000000..e5c6def
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A serializable representation of a Kinesis record's sequence number. It has two fields: the main sequence number,
+ * and also a subsequence number. If this {@link SequenceNumber} is referring to an aggregated Kinesis record, the
+ * subsequence number will be a non-negative value representing the order of the sub-record within the aggregation.
+ */
+public class SequenceNumber implements Serializable {
+
+ private static final long serialVersionUID = 876972197938972667L;
+
+ private static final String DELIMITER = "-";
+
+ private final String sequenceNumber;
+ private final long subSequenceNumber;
+
+ private final int cachedHash;
+
+ /**
+ * Create a new instance for a non-aggregated Kinesis record without a subsequence number.
+ * @param sequenceNumber the sequence number
+ */
+ public SequenceNumber(String sequenceNumber) {
+ this(sequenceNumber, -1);
+ }
+
+ /**
+ * Create a new instance, with the specified sequence number and subsequence number.
+ * To represent the sequence number for a non-aggregated Kinesis record, the subsequence number should be -1.
+ * Otherwise, give a non-negative sequence number to represent an aggregated Kinesis record.
+ *
+ * @param sequenceNumber the sequence number
+ * @param subSequenceNumber the subsequence number (-1 to represent non-aggregated Kinesis records)
+ */
+ public SequenceNumber(String sequenceNumber, long subSequenceNumber) {
+ this.sequenceNumber = checkNotNull(sequenceNumber);
+ this.subSequenceNumber = subSequenceNumber;
+
+ this.cachedHash = 37 * (sequenceNumber.hashCode() + Long.hashCode(subSequenceNumber));
+ }
+
+ public boolean isAggregated() {
+ return subSequenceNumber >= 0;
+ }
+
+ public String getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ public long getSubSequenceNumber() {
+ return subSequenceNumber;
+ }
+
+ @Override
+ public String toString() {
+ if (isAggregated()) {
+ return sequenceNumber + DELIMITER + subSequenceNumber;
+ } else {
+ return sequenceNumber;
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof SequenceNumber)) {
+ return false;
+ }
+
+ if (obj == this) {
+ return true;
+ }
+
+ SequenceNumber other = (SequenceNumber) obj;
+
+ return sequenceNumber.equals(other.getSequenceNumber())
+ && (subSequenceNumber == other.getSubSequenceNumber());
+ }
+
+ @Override
+ public int hashCode() {
+ return cachedHash;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fa42cdab/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 9f09161..5ced019 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstan
import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
import org.apache.flink.streaming.connectors.kinesis.testutils.ReferenceKinesisShardTopologies;
import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
@@ -344,7 +345,7 @@ public class FlinkKinesisConsumerTest {
dummyConsumer.open(new Configuration());
for (KinesisStreamShard shard : fakeAssignedShardsToThisConsumerTask) {
- verify(kinesisDataFetcherMock).advanceSequenceNumberTo(shard, SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.toString());
+ verify(kinesisDataFetcherMock).advanceSequenceNumberTo(shard, SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get());
}
}
@@ -380,7 +381,7 @@ public class FlinkKinesisConsumerTest {
dummyConsumer.open(new Configuration());
for (KinesisStreamShard shard : fakeAssignedShardsToThisConsumerTask) {
- verify(kinesisDataFetcherMock).advanceSequenceNumberTo(shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.toString());
+ verify(kinesisDataFetcherMock).advanceSequenceNumberTo(shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
}
}
@@ -414,14 +415,14 @@ public class FlinkKinesisConsumerTest {
null, null, false, false);
// generate random UUIDs as sequence numbers of last checkpointed state for each assigned shard
- ArrayList<String> listOfSeqNumIfAssignedShards = new ArrayList<>(fakeAssignedShardsToThisConsumerTask.size());
+ ArrayList<SequenceNumber> listOfSeqNumOfAssignedShards = new ArrayList<>(fakeAssignedShardsToThisConsumerTask.size());
for (KinesisStreamShard shard : fakeAssignedShardsToThisConsumerTask) {
- listOfSeqNumIfAssignedShards.add(UUID.randomUUID().toString());
+ listOfSeqNumOfAssignedShards.add(new SequenceNumber(UUID.randomUUID().toString()));
}
- HashMap<KinesisStreamShard, String> fakeRestoredState = new HashMap<>();
+ HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = new HashMap<>();
for (int i=0; i<fakeAssignedShardsToThisConsumerTask.size(); i++) {
- fakeRestoredState.put(fakeAssignedShardsToThisConsumerTask.get(i), listOfSeqNumIfAssignedShards.get(i));
+ fakeRestoredState.put(fakeAssignedShardsToThisConsumerTask.get(i), listOfSeqNumOfAssignedShards.get(i));
}
dummyConsumer.restoreState(fakeRestoredState);
@@ -430,7 +431,7 @@ public class FlinkKinesisConsumerTest {
for (int i=0; i<fakeAssignedShardsToThisConsumerTask.size(); i++) {
verify(kinesisDataFetcherMock).advanceSequenceNumberTo(
fakeAssignedShardsToThisConsumerTask.get(i),
- listOfSeqNumIfAssignedShards.get(i));
+ listOfSeqNumOfAssignedShards.get(i));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fa42cdab/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 7473403..e099a5c 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kinesis.internals;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.testutils.ReferenceKinesisShardTopologies;
import org.junit.Rule;
import org.junit.Test;
@@ -42,7 +43,7 @@ public class KinesisDataFetcherTest {
KinesisDataFetcher fetcherUnderTest = new KinesisDataFetcher(assignedShardsToThisFetcher, new Properties(), "fake-task-name");
// advance the fetcher on a shard that it does not own
- fetcherUnderTest.advanceSequenceNumberTo(fakeCompleteListOfShards.get(2), "fake-seq-num");
+ fetcherUnderTest.advanceSequenceNumberTo(fakeCompleteListOfShards.get(2), new SequenceNumber("fake-seq-num"));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fa42cdab/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerThreadTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerThreadTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerThreadTest.java
index 93467a0..38937ec 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerThreadTest.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerThreadTest.java
@@ -17,12 +17,18 @@
package org.apache.flink.streaming.connectors.kinesis.internals;
-import com.amazonaws.services.kinesis.model.*;
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.Shard;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
@@ -35,11 +41,12 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
-import java.util.HashMap;
import java.util.UUID;
-import java.util.LinkedList;
import static org.junit.Assert.assertEquals;
@@ -111,31 +118,31 @@ public class ShardConsumerThreadTest {
// so we are mocking the static deaggregateRecords() to return the original list of records
PowerMockito.mockStatic(ShardConsumerThread.class);
PowerMockito.when(ShardConsumerThread.deaggregateRecords(Matchers.anyListOf(Record.class), Matchers.anyString(), Matchers.anyString()))
- .thenReturn(getRecordsResultFirst.getRecords())
- .thenReturn(getRecordsResultSecond.getRecords())
- .thenReturn(getRecordsResultThird.getRecords())
- .thenReturn(getRecordsResultFourth.getRecords())
- .thenReturn(getRecordsResultFifth.getRecords())
- .thenReturn(getRecordsResultFinal.getRecords());
+ .thenReturn(convertRecordsToUserRecords(getRecordsResultFirst.getRecords()))
+ .thenReturn(convertRecordsToUserRecords(getRecordsResultSecond.getRecords()))
+ .thenReturn(convertRecordsToUserRecords(getRecordsResultThird.getRecords()))
+ .thenReturn(convertRecordsToUserRecords(getRecordsResultFourth.getRecords()))
+ .thenReturn(convertRecordsToUserRecords(getRecordsResultFifth.getRecords()))
+ .thenReturn(convertRecordsToUserRecords(getRecordsResultFinal.getRecords()));
// ------------------------------------------------------------------------------------------
Properties testConsumerConfig = new Properties();
- HashMap<KinesisStreamShard, String> seqNumState = new HashMap<>();
+ HashMap<KinesisStreamShard, SequenceNumber> seqNumState = new HashMap<>();
DummySourceContext dummySourceContext = new DummySourceContext();
ShardConsumerThread dummyShardConsumerThread = getDummyShardConsumerThreadWithMockedKinesisProxy(
dummySourceContext, kinesisProxyMock, Mockito.mock(KinesisDataFetcher.class),
- testConsumerConfig, assignedShardUnderTest, "fake-last-seq-num", seqNumState);
+ testConsumerConfig, assignedShardUnderTest, new SequenceNumber("fake-last-seq-num"), seqNumState);
dummyShardConsumerThread.run();
// the final sequence number state for the assigned shard to this consumer thread
// should store SENTINEL_SHARD_ENDING_SEQUENCE_NUMBER since the final nextShardItr should be null
- assertEquals(seqNumState.get(assignedShardUnderTest), SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.toString());
+ assertEquals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(), seqNumState.get(assignedShardUnderTest));
// the number of elements collected should equal the number of records generated by mocked KinesisProxy
- assertEquals(dummySourceContext.getNumOfElementsCollected(), totalRecordCount);
+ assertEquals(totalRecordCount, dummySourceContext.getNumOfElementsCollected());
}
private ShardConsumerThread getDummyShardConsumerThreadWithMockedKinesisProxy(
@@ -144,8 +151,8 @@ public class ShardConsumerThreadTest {
KinesisDataFetcher owningFetcherRefMock,
Properties testConsumerConfig,
KinesisStreamShard assignedShard,
- String lastSequenceNum,
- HashMap<KinesisStreamShard, String> seqNumState) {
+ SequenceNumber lastSequenceNum,
+ HashMap<KinesisStreamShard, SequenceNumber> seqNumState) {
try {
PowerMockito.whenNew(KinesisProxy.class).withArguments(testConsumerConfig).thenReturn(kinesisProxyMock);
@@ -159,7 +166,7 @@ public class ShardConsumerThreadTest {
private List<Record> generateFakeListOfRecordsFromToIncluding(int startingSeq, int endingSeq) {
List<Record> fakeListOfRecords = new LinkedList<>();
- for (int i=0; i <= (endingSeq - startingSeq); i++) {
+ for (int i=startingSeq; i <= endingSeq; i++) {
fakeListOfRecords.add(new Record()
.withData(ByteBuffer.wrap(String.valueOf(i).getBytes()))
.withPartitionKey(UUID.randomUUID().toString()) // the partition key assigned doesn't matter here
@@ -168,6 +175,14 @@ public class ShardConsumerThreadTest {
return fakeListOfRecords;
}
+ private List<UserRecord> convertRecordsToUserRecords(List<Record> records) {
+ List<UserRecord> converted = new ArrayList<>(records.size());
+ for (Record record : records) {
+ converted.add(new UserRecord(record));
+ }
+ return converted;
+ }
+
private static class DummySourceContext implements SourceFunction.SourceContext<String> {
private static final Object lock = new Object();