You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/09/26 16:04:48 UTC

[6/9] flink git commit: [FLINK-7393] [kinesis] Move unit tests of KinesisConfigUtil from FlinkKinesisConsumerTest to KinesisConfigUtilTest

[FLINK-7393] [kinesis] Move unit tests of KinesisConfigUtil from FlinkKinesisConsumerTest to KinesisConfigUtilTest

This closes #4708.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff417e76
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff417e76
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff417e76

Branch: refs/heads/master
Commit: ff417e762032b100a74a6e9ee215618f4ac9db3e
Parents: 415e7d0
Author: Bowen Li <bo...@gmail.com>
Authored: Fri Sep 22 16:10:56 2017 -0700
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Sep 26 18:04:07 2017 +0200

----------------------------------------------------------------------
 .../kinesis/FlinkKinesisConsumerTest.java       | 417 ------------------
 .../kinesis/util/KinesisConfigUtilTest.java     | 421 +++++++++++++++++++
 2 files changed, 421 insertions(+), 417 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ff417e76/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 4a007d5..78ca160 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
@@ -73,7 +72,6 @@ import java.util.UUID;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
@@ -90,421 +88,6 @@ public class FlinkKinesisConsumerTest {
 	private ExpectedException exception = ExpectedException.none();
 
 	// ----------------------------------------------------------------------
-	// FlinkKinesisConsumer.validateAwsConfiguration() tests
-	// ----------------------------------------------------------------------
-
-	@Test
-	public void testMissingAwsRegionInConfig() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("The AWS region ('" + AWSConfigConstants.AWS_REGION + "') must be set in the config.");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey");
-		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
-		KinesisConfigUtil.validateAwsConfiguration(testConfig);
-	}
-
-	@Test
-	public void testUnrecognizableAwsRegionInConfig() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid AWS region");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "wrongRegionId");
-		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
-		KinesisConfigUtil.validateAwsConfiguration(testConfig);
-	}
-
-	@Test
-	public void testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Please set values for AWS Access Key ID ('" + AWSConfigConstants.AWS_ACCESS_KEY_ID + "') " +
-			"and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
-
-		KinesisConfigUtil.validateAwsConfiguration(testConfig);
-	}
-
-	@Test
-	public void testUnrecognizableCredentialProviderTypeInConfig() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid AWS Credential Provider Type");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "wrongProviderType");
-		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
-		KinesisConfigUtil.validateAwsConfiguration(testConfig);
-	}
-
-	// ----------------------------------------------------------------------
-	// FlinkKinesisConsumer.validateConsumerConfiguration() tests
-	// ----------------------------------------------------------------------
-
-	@Test
-	public void testUnrecognizableStreamInitPositionTypeInConfig() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid initial position in stream");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "wrongInitPosition");
-
-		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-	}
-
-	@Test
-	public void testStreamInitPositionTypeSetToAtTimestampButNoInitTimestampSetInConfig() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Please set value for initial timestamp ('"
-			+ ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position.");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
-
-		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-	}
-
-	@Test
-	public void testUnparsableDateForInitialTimestampInConfig() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
-		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "unparsableDate");
-
-		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-	}
-
-	@Test
-	public void testIllegalValueForInitialTimestampInConfig() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
-		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "-1.0");
-
-		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-	}
-
-	@Test
-	public void testDateStringForValidateOptionDateProperty() {
-		String timestamp = "2016-04-04T19:58:46.480-00:00";
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
-		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, timestamp);
-
-		try {
-			KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail();
-		}
-	}
-
-	@Test
-	public void testUnixTimestampForValidateOptionDateProperty() {
-		String unixTimestamp = "1459799926.480";
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
-		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);
-
-		try {
-			KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail();
-		}
-	}
-
-	@Test
-	public void testInvalidPatternForInitialTimestampInConfig() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
-		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "2016-03-14");
-		testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "InvalidPattern");
-
-		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-	}
-
-	@Test
-	public void testUnparsableDateForUserDefinedDateFormatForInitialTimestampInConfig() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
-		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "stillUnparsable");
-		testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "yyyy-MM-dd");
-
-		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-	}
-
-	@Test
-	public void testDateStringForUserDefinedDateFormatForValidateOptionDateProperty() {
-		String unixTimestamp = "2016-04-04";
-		String pattern = "yyyy-MM-dd";
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
-		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);
-		testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, pattern);
-
-		try {
-			KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail();
-		}
-	}
-
-	@Test
-	public void testUnparsableLongForDescribeStreamBackoffBaseMillisInConfig() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid value given for describe stream operation base backoff milliseconds");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-		testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, "unparsableLong");
-
-		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-	}
-
-	@Test
-	public void testUnparsableLongForDescribeStreamBackoffMaxMillisInConfig() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid value given for describe stream operation max backoff milliseconds");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-		testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, "unparsableLong");
-
-		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-	}
-
-	@Test
-	public void testUnparsableDoubleForDescribeStreamBackoffExponentialConstantInConfig() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid value given for describe stream operation backoff exponential constant");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-		testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
-
-		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-	}
-
-	@Test
-	public void testUnparsableIntForGetRecordsRetriesInConfig() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid value given for maximum retry attempts for getRecords shard operation");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "unparsableInt");
-
-		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-	}
-
-	@Test
-	public void testUnparsableIntForGetRecordsMaxCountInConfig() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid value given for maximum records per getRecords shard operation");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "unparsableInt");
-
-		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-	}
-
-	@Test
-	public void testUnparsableLongForGetRecordsBackoffBaseMillisInConfig() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid value given for get records operation base backoff milliseconds");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "unparsableLong");
-
-		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-	}
-
-	@Test
-	public void testUnparsableLongForGetRecordsBackoffMaxMillisInConfig() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid value given for get records operation max backoff milliseconds");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "unparsableLong");
-
-		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-	}
-
-	@Test
-	public void testUnparsableDoubleForGetRecordsBackoffExponentialConstantInConfig() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid value given for get records operation backoff exponential constant");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
-
-		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-	}
-
-	@Test
-	public void testUnparsableLongForGetRecordsIntervalMillisInConfig() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid value given for getRecords sleep interval in milliseconds");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "unparsableLong");
-
-		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-	}
-
-	@Test
-	public void testUnparsableIntForGetShardIteratorRetriesInConfig() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid value given for maximum retry attempts for getShardIterator shard operation");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES, "unparsableInt");
-
-		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-	}
-
-	@Test
-	public void testUnparsableLongForGetShardIteratorBackoffBaseMillisInConfig() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid value given for get shard iterator operation base backoff milliseconds");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE, "unparsableLong");
-
-		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-	}
-
-	@Test
-	public void testUnparsableLongForGetShardIteratorBackoffMaxMillisInConfig() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid value given for get shard iterator operation max backoff milliseconds");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX, "unparsableLong");
-
-		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-	}
-
-	@Test
-	public void testUnparsableDoubleForGetShardIteratorBackoffExponentialConstantInConfig() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid value given for get shard iterator operation backoff exponential constant");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
-
-		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-	}
-
-	@Test
-	public void testUnparsableLongForShardDiscoveryIntervalMillisInConfig() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid value given for shard discovery sleep interval in milliseconds");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-		testConfig.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, "unparsableLong");
-
-		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-	}
-
-	// ----------------------------------------------------------------------
 	// Tests related to state initialization
 	// ----------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff417e76/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
index a576748..ddb300e 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kinesis.util;
 
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
 
 import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
@@ -32,6 +33,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for KinesisConfigUtil.
@@ -42,6 +44,10 @@ public class KinesisConfigUtilTest {
 	@Rule
 	private ExpectedException exception = ExpectedException.none();
 
+	// ----------------------------------------------------------------------
+	// getValidatedProducerConfiguration() tests
+	// ----------------------------------------------------------------------
+
 	@Test
 	public void testUnparsableLongForProducerConfiguration() {
 		exception.expect(IllegalArgumentException.class);
@@ -125,4 +131,419 @@ public class KinesisConfigUtilTest {
 		assertEquals("1", replacedConfig.getProperty(KinesisConfigUtil.AGGREGATION_MAX_COUNT));
 		assertEquals("2", replacedConfig.getProperty(KinesisConfigUtil.COLLECTION_MAX_COUNT));
 	}
+
+	// ----------------------------------------------------------------------
+	// validateAwsConfiguration() tests
+	// ----------------------------------------------------------------------
+
+	@Test
+	public void testMissingAwsRegionInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("The AWS region ('" + AWSConfigConstants.AWS_REGION + "') must be set in the config.");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey");
+		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+		KinesisConfigUtil.validateAwsConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnrecognizableAwsRegionInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid AWS region");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "wrongRegionId");
+		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+		KinesisConfigUtil.validateAwsConfiguration(testConfig);
+	}
+
+	@Test
+	public void testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Please set values for AWS Access Key ID ('" + AWSConfigConstants.AWS_ACCESS_KEY_ID + "') " +
+				"and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+
+		KinesisConfigUtil.validateAwsConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnrecognizableCredentialProviderTypeInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid AWS Credential Provider Type");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "wrongProviderType");
+		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+		KinesisConfigUtil.validateAwsConfiguration(testConfig);
+	}
+
+	// ----------------------------------------------------------------------
+	// validateConsumerConfiguration() tests
+	// ----------------------------------------------------------------------
+
+	@Test
+	public void testUnrecognizableStreamInitPositionTypeInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid initial position in stream");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "wrongInitPosition");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testStreamInitPositionTypeSetToAtTimestampButNoInitTimestampSetInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Please set value for initial timestamp ('"
+				+ ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position.");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableDateForInitialTimestampInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "unparsableDate");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testIllegalValueForInitialTimestampInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "-1.0");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testDateStringForValidateOptionDateProperty() {
+		String timestamp = "2016-04-04T19:58:46.480-00:00";
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, timestamp);
+
+		try {
+			KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail();
+		}
+	}
+
+	@Test
+	public void testUnixTimestampForValidateOptionDateProperty() {
+		String unixTimestamp = "1459799926.480";
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);
+
+		try {
+			KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail();
+		}
+	}
+
+	@Test
+	public void testInvalidPatternForInitialTimestampInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "2016-03-14");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "InvalidPattern");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableDateForUserDefinedDateFormatForInitialTimestampInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "stillUnparsable");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "yyyy-MM-dd");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testDateStringForUserDefinedDateFormatForValidateOptionDateProperty() {
+		String unixTimestamp = "2016-04-04";
+		String pattern = "yyyy-MM-dd";
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, pattern);
+
+		try {
+			KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail();
+		}
+	}
+
+	@Test
+	public void testUnparsableLongForDescribeStreamBackoffBaseMillisInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for describe stream operation base backoff milliseconds");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, "unparsableLong");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableLongForDescribeStreamBackoffMaxMillisInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for describe stream operation max backoff milliseconds");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, "unparsableLong");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableDoubleForDescribeStreamBackoffExponentialConstantInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for describe stream operation backoff exponential constant");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableIntForGetRecordsRetriesInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for maximum retry attempts for getRecords shard operation");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "unparsableInt");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableIntForGetRecordsMaxCountInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for maximum records per getRecords shard operation");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "unparsableInt");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableLongForGetRecordsBackoffBaseMillisInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for get records operation base backoff milliseconds");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "unparsableLong");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableLongForGetRecordsBackoffMaxMillisInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for get records operation max backoff milliseconds");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "unparsableLong");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableDoubleForGetRecordsBackoffExponentialConstantInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for get records operation backoff exponential constant");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableLongForGetRecordsIntervalMillisInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for getRecords sleep interval in milliseconds");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "unparsableLong");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableIntForGetShardIteratorRetriesInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for maximum retry attempts for getShardIterator shard operation");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES, "unparsableInt");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableLongForGetShardIteratorBackoffBaseMillisInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for get shard iterator operation base backoff milliseconds");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE, "unparsableLong");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableLongForGetShardIteratorBackoffMaxMillisInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for get shard iterator operation max backoff milliseconds");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX, "unparsableLong");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableDoubleForGetShardIteratorBackoffExponentialConstantInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for get shard iterator operation backoff exponential constant");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableLongForShardDiscoveryIntervalMillisInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for shard discovery sleep interval in milliseconds");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, "unparsableLong");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
 }