You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/01/24 07:22:49 UTC
[1/2] flink git commit: [FLINK-4523] [kinesis] Add documentation for
start position configuration
Repository: flink
Updated Branches:
refs/heads/master a8e85a2d5 -> 00d1ad86a
[FLINK-4523] [kinesis] Add documentation for start position configuration
This closes #2916.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/00d1ad86
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/00d1ad86
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/00d1ad86
Branch: refs/heads/master
Commit: 00d1ad86a021911f25b0a0aa6e095267d51af1f4
Parents: 8d8a5ab
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Jan 9 18:04:12 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jan 24 14:20:07 2017 +0800
----------------------------------------------------------------------
docs/dev/connectors/kinesis.md | 12 ++++++++++++
1 file changed, 12 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/00d1ad86/docs/dev/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md
index d95fe21..59f3d61 100644
--- a/docs/dev/connectors/kinesis.md
+++ b/docs/dev/connectors/kinesis.md
@@ -113,6 +113,18 @@ configured to enable checkpointing, so that the new shards due to resharding can
Kinesis consumer after the job is restored. This is a temporary limitation that will be resolved in future versions.
Please see [FLINK-4341](https://issues.apache.org/jira/browse/FLINK-4341) for more detail.
+#### Configuring Starting Position
+
+The Flink Kinesis Consumer currently provides the following options to configure where to start reading Kinesis streams, simply by setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION` to
+one of the following values in the provided configuration properties (the naming of the options identically follows [the namings used by the AWS Kinesis Streams service](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax)):
+
+- `LATEST`: read all shards of all streams starting from the latest record.
+- `TRIM_HORIZON`: read all shards of all streams starting from the earliest record possible (data may be trimmed by Kinesis depending on the retention settings).
+- `AT_TIMESTAMP`: read all shards of all streams starting from a specified timestamp. The timestamp must also be specified in the configuration
+properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, either in the date pattern
+`yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, `2016-04-04T19:58:46.480-00:00`), or a non-negative double value representing the number of seconds
+that has elapsed since the Unix epoch (for example, `1459799926.480`).
+
#### Fault Tolerance for Exactly-Once User-Defined State Update Semantics
With Flink's checkpointing enabled, the Flink Kinesis Consumer will consume records from shards in Kinesis streams and
[2/2] flink git commit: [FLINK-4523] Allow Kinesis Consumer to start
from specific timestamp / Date
Posted by tz...@apache.org.
[FLINK-4523] Allow Kinesis Consumer to start from specific timestamp / Date
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d8a5abf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d8a5abf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d8a5abf
Branch: refs/heads/master
Commit: 8d8a5abfcc4d2452a3ae46f18a3223b66588c191
Parents: a8e85a2
Author: Tony Wei <to...@gmail.com>
Authored: Thu Dec 1 11:40:46 2016 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jan 24 14:20:07 2017 +0800
----------------------------------------------------------------------
.../kinesis/config/ConsumerConfigConstants.java | 8 ++++-
.../kinesis/internals/ShardConsumer.java | 18 ++++++++++
.../kinesis/model/SentinelSequenceNumber.java | 4 +++
.../connectors/kinesis/proxy/KinesisProxy.java | 36 ++++++++++++++++++--
.../kinesis/proxy/KinesisProxyInterface.java | 8 +++--
.../kinesis/util/KinesisConfigUtil.java | 30 +++++++++++++++-
.../kinesis/FlinkKinesisConsumerTest.java | 32 +++++++++++++++++
.../testutils/FakeKinesisBehavioursFactory.java | 8 ++---
8 files changed, 132 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 76c20ed..4ffe0ad 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -37,7 +37,10 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM),
/** Start reading from the latest incoming record */
- LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM);
+ LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM),
+
+ /** Start reading from the record at the specified timestamp */
+ AT_TIMESTAMP(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM);
private SentinelSequenceNumber sentinelSequenceNumber;
@@ -53,6 +56,9 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
/** The initial position to start reading Kinesis streams from (LATEST is used if not set) */
public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";
+ /** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */
+ public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.initpos.timestamp";
+
/** The base backoff time between each describeStream attempt */
public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base";
http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 612a4a7..f6c53ce 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -30,12 +30,15 @@ import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
+import java.text.ParseException;
+import java.util.Date;
import java.util.List;
import java.util.Properties;
@@ -64,6 +67,8 @@ public class ShardConsumer<T> implements Runnable {
private SequenceNumber lastSequenceNum;
+ private Date initTimestamp;
+
/**
* Creates a shard consumer.
*
@@ -107,6 +112,17 @@ public class ShardConsumer<T> implements Runnable {
this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty(
ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
+
+ if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
+ String timestamp = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP);
+ try {
+ this.initTimestamp = KinesisConfigUtil.initTimestampDateFormat.parse(timestamp);
+ } catch (ParseException e) {
+ this.initTimestamp = new Date((long) (Double.parseDouble(timestamp) * 1000));
+ }
+ } else {
+ this.initTimestamp = null;
+ }
}
@SuppressWarnings("unchecked")
@@ -128,6 +144,8 @@ public class ShardConsumer<T> implements Runnable {
nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null);
} else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
nextShardItr = null;
+ } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
+ nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_TIMESTAMP.toString(), initTimestamp);
} else {
// we will be starting from an actual sequence number (due to restore from failure).
// if the last sequence number refers to an aggregated record, we need to clean up any dangling sub-records
http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
index 8182201..7f9dbbb 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
@@ -35,6 +35,10 @@ public enum SentinelSequenceNumber {
* start to be read from the earliest records that haven't expired yet */
SENTINEL_EARLIEST_SEQUENCE_NUM( new SequenceNumber("EARLIEST_SEQUENCE_NUM") ),
+ /** Flag value for shard's sequence numbers to indicate that the shard should
+ * start to be read from the specified timestamp */
+ SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM( new SequenceNumber("AT_TIMESTAMP_SEQUENCE_NUM") ),
+
/** Flag value to indicate that we have already read the last record of this shard
* (Note: Kinesis shards that have been closed due to a split or merge will have an ending data record) */
SENTINEL_SHARD_ENDING_SEQUENCE_NUM( new SequenceNumber("SHARD_ENDING_SEQUENCE_NUM") );
http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 0b0fccf..580555f 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -29,6 +29,8 @@ import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededExcepti
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.StreamStatus;
import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
@@ -42,6 +44,7 @@ import java.util.List;
import java.util.Properties;
import java.util.Map;
import java.util.Random;
+import java.util.Date;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -234,14 +237,41 @@ public class KinesisProxy implements KinesisProxyInterface {
* {@inheritDoc}
*/
@Override
- public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException {
+ public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException {
+ GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest()
+ .withStreamName(shard.getStreamName())
+ .withShardId(shard.getShard().getShardId())
+ .withShardIteratorType(shardIteratorType);
+
+ switch (ShardIteratorType.fromValue(shardIteratorType)) {
+ case TRIM_HORIZON:
+ case LATEST:
+ break;
+ case AT_TIMESTAMP:
+ if (startingMarker instanceof Date) {
+ getShardIteratorRequest.setTimestamp((Date) startingMarker);
+ } else {
+ throw new IllegalArgumentException("Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_TIMESTAMP. Must be a Date object.");
+ }
+ break;
+ case AT_SEQUENCE_NUMBER:
+ case AFTER_SEQUENCE_NUMBER:
+ if (startingMarker instanceof String) {
+ getShardIteratorRequest.setStartingSequenceNumber((String) startingMarker);
+ } else {
+ throw new IllegalArgumentException("Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER. Must be a String.");
+ }
+ }
+ return getShardIterator(getShardIteratorRequest);
+ }
+
+ private String getShardIterator(GetShardIteratorRequest getShardIteratorRequest) throws InterruptedException {
GetShardIteratorResult getShardIteratorResult = null;
int attempt = 0;
while (attempt <= getShardIteratorMaxAttempts && getShardIteratorResult == null) {
try {
- getShardIteratorResult =
- kinesisClient.getShardIterator(shard.getStreamName(), shard.getShard().getShardId(), shardIteratorType, startingSeqNum);
+ getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
} catch (AmazonServiceException ex) {
if (isRecoverableException(ex)) {
long backoffMillis = fullJitterBackoff(
http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
index 39ddc52..9f6d594 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
@@ -34,14 +34,16 @@ public interface KinesisProxyInterface {
*
* @param shard the shard to get the iterator
* @param shardIteratorType the iterator type, defining how the shard is to be iterated
- * (one of: TRIM_HORIZON, LATEST, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER)
- * @param startingSeqNum sequence number, must be null if shardIteratorType is TRIM_HORIZON or LATEST
+ * (one of: TRIM_HORIZON, LATEST, AT_TIMESTAMP, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER)
+ * @param startingMarker should be {@code null} if shardIteratorType is TRIM_HORIZON or LATEST,
+ * should be a {@code Date} value if shardIteratorType is AT_TIMESTAMP,
+ * should be a {@code String} representing the sequence number if shardIteratorType is AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER
* @return shard iterator which can be used to read data from Kinesis
* @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the
* operation has exceeded the rate limit; this exception will be thrown
* if the backoff is interrupted.
*/
- String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) throws InterruptedException;
+ String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object startingMarker) throws InterruptedException;
/**
* Get the next batch of data records using a specific shard iterator
http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index d8ea0a2..eb29d78 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -26,6 +26,8 @@ import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
import java.util.Properties;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -35,6 +37,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* Utilities for Flink Kinesis connector configuration.
*/
public class KinesisConfigUtil {
+ public static SimpleDateFormat initTimestampDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
/**
* Validate configuration properties for {@link FlinkKinesisConsumer}.
@@ -47,7 +50,7 @@ public class KinesisConfigUtil {
if (config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_POSITION)) {
String initPosType = config.getProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION);
- // specified initial position in stream must be either LATEST or TRIM_HORIZON
+ // specified initial position in stream must be either LATEST, TRIM_HORIZON or AT_TIMESTAMP
try {
InitialPosition.valueOf(initPosType);
} catch (IllegalArgumentException e) {
@@ -57,6 +60,17 @@ public class KinesisConfigUtil {
}
throw new IllegalArgumentException("Invalid initial position in stream set in config. Valid values are: " + sb.toString());
}
+
+ // specified initial timestamp in stream when using AT_TIMESTAMP
+ if (InitialPosition.valueOf(initPosType) == InitialPosition.AT_TIMESTAMP) {
+ if (!config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP)) {
+ throw new IllegalArgumentException("Please set value for initial timestamp ('"
+ + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position.");
+ }
+ validateOptionalDateProperty(config, ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP,
+ "Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. "
+ + "Must be a valid format: yyyy-MM-dd'T'HH:mm:ss.SSSXXX or non-negative double value. For example, 2016-04-04T19:58:46.480-00:00 or 1459799926.480 .");
+ }
}
validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
@@ -207,4 +221,18 @@ public class KinesisConfigUtil {
}
}
}
+
+ private static void validateOptionalDateProperty(Properties config, String key, String message) {
+ if (config.containsKey(key)) {
+ try {
+ initTimestampDateFormat.parse(config.getProperty(key));
+ double value = Double.parseDouble(config.getProperty(key));
+ if (value < 0) {
+ throw new NumberFormatException();
+ }
+ } catch (ParseException | NumberFormatException e) {
+ throw new IllegalArgumentException(message);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index a72d8df..2cc0270 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -133,6 +133,38 @@ public class FlinkKinesisConsumerTest {
}
@Test
+ public void testStreamInitPositionTypeSetToAtTimestampButNoInitTimestampSetInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Please set value for initial timestamp ('"
+ + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position.");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+ testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
+ @Test
+ public void testUnparsableDateForInitialTimestampInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+ testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+ testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "unparsableDate");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
+ @Test
public void testUnparsableLongForDescribeStreamBackoffBaseMillisInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for describe stream operation base backoff milliseconds");
http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index 65e6d4e..964ee76 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -54,7 +54,7 @@ public class FakeKinesisBehavioursFactory {
}
@Override
- public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) {
+ public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object startingMarker) {
return null;
}
@@ -121,7 +121,7 @@ public class FakeKinesisBehavioursFactory {
}
@Override
- public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) {
+ public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object startingMarker) {
if (!expiredOnceAlready) {
// for the first call, just return the iterator of the first batch of records
return "0";
@@ -180,7 +180,7 @@ public class FakeKinesisBehavioursFactory {
}
@Override
- public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) {
+ public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object startingMarker) {
// this will be called only one time per ShardConsumer;
// so, simply return the iterator of the first batch of records
return "0";
@@ -250,7 +250,7 @@ public class FakeKinesisBehavioursFactory {
}
@Override
- public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) {
+ public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object startingMarker) {
return null;
}