You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/01/24 07:22:49 UTC

[1/2] flink git commit: [FLINK-4523] [kinesis] Add documentation for start position configuration

Repository: flink
Updated Branches:
  refs/heads/master a8e85a2d5 -> 00d1ad86a


[FLINK-4523] [kinesis] Add documentation for start position configuration

This closes #2916.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/00d1ad86
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/00d1ad86
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/00d1ad86

Branch: refs/heads/master
Commit: 00d1ad86a021911f25b0a0aa6e095267d51af1f4
Parents: 8d8a5ab
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Jan 9 18:04:12 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jan 24 14:20:07 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kinesis.md | 12 ++++++++++++
 1 file changed, 12 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/00d1ad86/docs/dev/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md
index d95fe21..59f3d61 100644
--- a/docs/dev/connectors/kinesis.md
+++ b/docs/dev/connectors/kinesis.md
@@ -113,6 +113,18 @@ configured to enable checkpointing, so that the new shards due to resharding can
 Kinesis consumer after the job is restored. This is a temporary limitation that will be resolved in future versions.
 Please see [FLINK-4341](https://issues.apache.org/jira/browse/FLINK-4341) for more detail.
 
+#### Configuring Starting Position
+
+The Flink Kinesis Consumer currently provides the following options to configure where to start reading Kinesis streams, simply by setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION` to
+one of the following values in the provided configuration properties (the naming of the options identically follows [the namings used by the AWS Kinesis Streams service](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax)):
+
+- `LATEST`: read all shards of all streams starting from the latest record.
+- `TRIM_HORIZON`: read all shards of all streams starting from the earliest record possible (data may be trimmed by Kinesis depending on the retention settings).
+- `AT_TIMESTAMP`: read all shards of all streams starting from a specified timestamp. The timestamp must also be specified in the configuration
+properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, either in the date pattern
+`yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, `2016-04-04T19:58:46.480-00:00`), or a non-negative double value representing the number of seconds
+that has elapsed since the Unix epoch (for example, `1459799926.480`).
+
 #### Fault Tolerance for Exactly-Once User-Defined State Update Semantics
 
 With Flink's checkpointing enabled, the Flink Kinesis Consumer will consume records from shards in Kinesis streams and


[2/2] flink git commit: [FLINK-4523] Allow Kinesis Consumer to start from specific timestamp / Date

Posted by tz...@apache.org.
[FLINK-4523] Allow Kinesis Consumer to start from specific timestamp / Date


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d8a5abf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d8a5abf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d8a5abf

Branch: refs/heads/master
Commit: 8d8a5abfcc4d2452a3ae46f18a3223b66588c191
Parents: a8e85a2
Author: Tony Wei <to...@gmail.com>
Authored: Thu Dec 1 11:40:46 2016 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jan 24 14:20:07 2017 +0800

----------------------------------------------------------------------
 .../kinesis/config/ConsumerConfigConstants.java |  8 ++++-
 .../kinesis/internals/ShardConsumer.java        | 18 ++++++++++
 .../kinesis/model/SentinelSequenceNumber.java   |  4 +++
 .../connectors/kinesis/proxy/KinesisProxy.java  | 36 ++++++++++++++++++--
 .../kinesis/proxy/KinesisProxyInterface.java    |  8 +++--
 .../kinesis/util/KinesisConfigUtil.java         | 30 +++++++++++++++-
 .../kinesis/FlinkKinesisConsumerTest.java       | 32 +++++++++++++++++
 .../testutils/FakeKinesisBehavioursFactory.java |  8 ++---
 8 files changed, 132 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 76c20ed..4ffe0ad 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -37,7 +37,10 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 		TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM),
 
 		/** Start reading from the latest incoming record */
-		LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM);
+		LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM),
+
+		/** Start reading from the record at the specified timestamp */
+		AT_TIMESTAMP(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM);
 
 		private SentinelSequenceNumber sentinelSequenceNumber;
 
@@ -53,6 +56,9 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 	/** The initial position to start reading Kinesis streams from (LATEST is used if not set) */
 	public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";
 
+	/** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */
+	public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.initpos.timestamp";
+
 	/** The base backoff time between each describeStream attempt */
 	public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 612a4a7..f6c53ce 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -30,12 +30,15 @@ import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
+import java.text.ParseException;
+import java.util.Date;
 import java.util.List;
 import java.util.Properties;
 
@@ -64,6 +67,8 @@ public class ShardConsumer<T> implements Runnable {
 
 	private SequenceNumber lastSequenceNum;
 
+	private Date initTimestamp;
+
 	/**
 	 * Creates a shard consumer.
 	 *
@@ -107,6 +112,17 @@ public class ShardConsumer<T> implements Runnable {
 		this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty(
 			ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
 			Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
+
+		if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
+			String timestamp = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP);
+			try {
+				this.initTimestamp = KinesisConfigUtil.initTimestampDateFormat.parse(timestamp);
+			} catch (ParseException e) {
+				this.initTimestamp = new Date((long) (Double.parseDouble(timestamp) * 1000));
+			}
+		} else {
+			this.initTimestamp = null;
+		}
 	}
 
 	@SuppressWarnings("unchecked")
@@ -128,6 +144,8 @@ public class ShardConsumer<T> implements Runnable {
 				nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null);
 			} else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
 				nextShardItr = null;
+			} else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
+				nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_TIMESTAMP.toString(), initTimestamp);
 			} else {
 				// we will be starting from an actual sequence number (due to restore from failure).
 				// if the last sequence number refers to an aggregated record, we need to clean up any dangling sub-records

http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
index 8182201..7f9dbbb 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
@@ -35,6 +35,10 @@ public enum SentinelSequenceNumber {
 	 * start to be read from the earliest records that haven't expired yet */
 	SENTINEL_EARLIEST_SEQUENCE_NUM( new SequenceNumber("EARLIEST_SEQUENCE_NUM") ),
 
+	/** Flag value for shard's sequence numbers to indicate that the shard should
+	 * start to be read from the specified timestamp */
+	SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM( new SequenceNumber("AT_TIMESTAMP_SEQUENCE_NUM") ),
+
 	/** Flag value to indicate that we have already read the last record of this shard
 	 * (Note: Kinesis shards that have been closed due to a split or merge will have an ending data record) */
 	SENTINEL_SHARD_ENDING_SEQUENCE_NUM( new SequenceNumber("SHARD_ENDING_SEQUENCE_NUM") );

http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 0b0fccf..580555f 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -29,6 +29,8 @@ import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededExcepti
 import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
 import com.amazonaws.services.kinesis.model.StreamStatus;
 import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
@@ -42,6 +44,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.Map;
 import java.util.Random;
+import java.util.Date;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -234,14 +237,41 @@ public class KinesisProxy implements KinesisProxyInterface {
 	 * {@inheritDoc}
 	 */
 	@Override
-	public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException {
+	public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException {
+		GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest()
+			.withStreamName(shard.getStreamName())
+			.withShardId(shard.getShard().getShardId())
+			.withShardIteratorType(shardIteratorType);
+
+		switch (ShardIteratorType.fromValue(shardIteratorType)) {
+			case TRIM_HORIZON:
+			case LATEST:
+				break;
+			case AT_TIMESTAMP:
+				if (startingMarker instanceof Date) {
+					getShardIteratorRequest.setTimestamp((Date) startingMarker);
+				} else {
+					throw new IllegalArgumentException("Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_TIMESTAMP. Must be a Date object.");
+				}
+				break;
+			case AT_SEQUENCE_NUMBER:
+			case AFTER_SEQUENCE_NUMBER:
+				if (startingMarker instanceof String) {
+					getShardIteratorRequest.setStartingSequenceNumber((String) startingMarker);
+				} else {
+					throw new IllegalArgumentException("Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER. Must be a String.");
+				}
+		}
+		return getShardIterator(getShardIteratorRequest);
+	}
+
+	private String getShardIterator(GetShardIteratorRequest getShardIteratorRequest) throws InterruptedException {
 		GetShardIteratorResult getShardIteratorResult = null;
 
 		int attempt = 0;
 		while (attempt <= getShardIteratorMaxAttempts && getShardIteratorResult == null) {
 			try {
-				getShardIteratorResult =
-					kinesisClient.getShardIterator(shard.getStreamName(), shard.getShard().getShardId(), shardIteratorType, startingSeqNum);
+					getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
 			} catch (AmazonServiceException ex) {
 				if (isRecoverableException(ex)) {
 					long backoffMillis = fullJitterBackoff(

http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
index 39ddc52..9f6d594 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
@@ -34,14 +34,16 @@ public interface KinesisProxyInterface {
 	 *
 	 * @param shard the shard to get the iterator
 	 * @param shardIteratorType the iterator type, defining how the shard is to be iterated
-	 *                          (one of: TRIM_HORIZON, LATEST, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER)
-	 * @param startingSeqNum sequence number, must be null if shardIteratorType is TRIM_HORIZON or LATEST
+	 *                          (one of: TRIM_HORIZON, LATEST, AT_TIMESTAMP, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER)
+	 * @param startingMarker should be {@code null} if shardIteratorType is TRIM_HORIZON or LATEST,
+	 *                       should be a {@code Date} value if shardIteratorType is AT_TIMESTAMP,
+	 *                       should be a {@code String} representing the sequence number if shardIteratorType is AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER
 	 * @return shard iterator which can be used to read data from Kinesis
 	 * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the
 	 *                              operation has exceeded the rate limit; this exception will be thrown
 	 *                              if the backoff is interrupted.
 	 */
-	String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) throws InterruptedException;
+	String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object startingMarker) throws InterruptedException;
 
 	/**
 	 * Get the next batch of data records using a specific shard iterator

http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index d8ea0a2..eb29d78 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -26,6 +26,8 @@ import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
 import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
 
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.Properties;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -35,6 +37,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * Utilities for Flink Kinesis connector configuration.
  */
 public class KinesisConfigUtil {
+	public static SimpleDateFormat initTimestampDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
 
 	/**
 	 * Validate configuration properties for {@link FlinkKinesisConsumer}.
@@ -47,7 +50,7 @@ public class KinesisConfigUtil {
 		if (config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_POSITION)) {
 			String initPosType = config.getProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION);
 
-			// specified initial position in stream must be either LATEST or TRIM_HORIZON
+			// specified initial position in stream must be either LATEST, TRIM_HORIZON or AT_TIMESTAMP
 			try {
 				InitialPosition.valueOf(initPosType);
 			} catch (IllegalArgumentException e) {
@@ -57,6 +60,17 @@ public class KinesisConfigUtil {
 				}
 				throw new IllegalArgumentException("Invalid initial position in stream set in config. Valid values are: " + sb.toString());
 			}
+
+			// specified initial timestamp in stream when using AT_TIMESTAMP
+			if (InitialPosition.valueOf(initPosType) == InitialPosition.AT_TIMESTAMP) {
+				if (!config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP)) {
+					throw new IllegalArgumentException("Please set value for initial timestamp ('"
+						+ ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position.");
+				}
+				validateOptionalDateProperty(config, ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP,
+					"Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. "
+						+ "Must be a valid format: yyyy-MM-dd'T'HH:mm:ss.SSSXXX or non-negative double value. For example, 2016-04-04T19:58:46.480-00:00 or 1459799926.480 .");
+			}
 		}
 
 		validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
@@ -207,4 +221,18 @@ public class KinesisConfigUtil {
 			}
 		}
 	}
+
+	private static void validateOptionalDateProperty(Properties config, String key, String message) {
+		if (config.containsKey(key)) {
+			try {
+				initTimestampDateFormat.parse(config.getProperty(key));
+				double value = Double.parseDouble(config.getProperty(key));
+				if (value < 0) {
+					throw new NumberFormatException();
+				}
+			} catch (ParseException | NumberFormatException e) {
+				throw new IllegalArgumentException(message);
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index a72d8df..2cc0270 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -133,6 +133,38 @@ public class FlinkKinesisConsumerTest {
 	}
 
 	@Test
+	public void testStreamInitPositionTypeSetToAtTimestampButNoInitTimestampSetInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Please set value for initial timestamp ('"
+			+ ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position.");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableDateForInitialTimestampInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "unparsableDate");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
 	public void testUnparsableLongForDescribeStreamBackoffBaseMillisInConfig() {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for describe stream operation base backoff milliseconds");

http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index 65e6d4e..964ee76 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -54,7 +54,7 @@ public class FakeKinesisBehavioursFactory {
 			}
 
 			@Override
-			public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) {
+			public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object startingMarker) {
 				return null;
 			}
 
@@ -121,7 +121,7 @@ public class FakeKinesisBehavioursFactory {
 		}
 
 		@Override
-		public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) {
+		public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object startingMarker) {
 			if (!expiredOnceAlready) {
 				// for the first call, just return the iterator of the first batch of records
 				return "0";
@@ -180,7 +180,7 @@ public class FakeKinesisBehavioursFactory {
 		}
 
 		@Override
-		public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) {
+		public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object startingMarker) {
 			// this will be called only one time per ShardConsumer;
 			// so, simply return the iterator of the first batch of records
 			return "0";
@@ -250,7 +250,7 @@ public class FakeKinesisBehavioursFactory {
 		}
 
 		@Override
-		public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) {
+		public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object startingMarker) {
 			return null;
 		}