You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/09/26 16:04:48 UTC
[6/9] flink git commit: [FLINK-7393] [kinesis] Move unit tests of
KinesisConfigUtil from FlinkKinesisConsumerTest to KinesisConfigUtilTest
[FLINK-7393] [kinesis] Move unit tests of KinesisConfigUtil from FlinkKinesisConsumerTest to KinesisConfigUtilTest
This closes #4708.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff417e76
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff417e76
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff417e76
Branch: refs/heads/master
Commit: ff417e762032b100a74a6e9ee215618f4ac9db3e
Parents: 415e7d0
Author: Bowen Li <bo...@gmail.com>
Authored: Fri Sep 22 16:10:56 2017 -0700
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Sep 26 18:04:07 2017 +0200
----------------------------------------------------------------------
.../kinesis/FlinkKinesisConsumerTest.java | 417 ------------------
.../kinesis/util/KinesisConfigUtilTest.java | 421 +++++++++++++++++++
2 files changed, 421 insertions(+), 417 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ff417e76/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 4a007d5..78ca160 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
@@ -73,7 +72,6 @@ import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
@@ -90,421 +88,6 @@ public class FlinkKinesisConsumerTest {
private ExpectedException exception = ExpectedException.none();
// ----------------------------------------------------------------------
- // FlinkKinesisConsumer.validateAwsConfiguration() tests
- // ----------------------------------------------------------------------
-
- @Test
- public void testMissingAwsRegionInConfig() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("The AWS region ('" + AWSConfigConstants.AWS_REGION + "') must be set in the config.");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey");
- testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
- KinesisConfigUtil.validateAwsConfiguration(testConfig);
- }
-
- @Test
- public void testUnrecognizableAwsRegionInConfig() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Invalid AWS region");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(AWSConfigConstants.AWS_REGION, "wrongRegionId");
- testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
- KinesisConfigUtil.validateAwsConfiguration(testConfig);
- }
-
- @Test
- public void testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Please set values for AWS Access Key ID ('" + AWSConfigConstants.AWS_ACCESS_KEY_ID + "') " +
- "and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
-
- KinesisConfigUtil.validateAwsConfiguration(testConfig);
- }
-
- @Test
- public void testUnrecognizableCredentialProviderTypeInConfig() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Invalid AWS Credential Provider Type");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "wrongProviderType");
- testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
- KinesisConfigUtil.validateAwsConfiguration(testConfig);
- }
-
- // ----------------------------------------------------------------------
- // FlinkKinesisConsumer.validateConsumerConfiguration() tests
- // ----------------------------------------------------------------------
-
- @Test
- public void testUnrecognizableStreamInitPositionTypeInConfig() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Invalid initial position in stream");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
- testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "wrongInitPosition");
-
- KinesisConfigUtil.validateConsumerConfiguration(testConfig);
- }
-
- @Test
- public void testStreamInitPositionTypeSetToAtTimestampButNoInitTimestampSetInConfig() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Please set value for initial timestamp ('"
- + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position.");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
- testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
-
- KinesisConfigUtil.validateConsumerConfiguration(testConfig);
- }
-
- @Test
- public void testUnparsableDateForInitialTimestampInConfig() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
- testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
- testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "unparsableDate");
-
- KinesisConfigUtil.validateConsumerConfiguration(testConfig);
- }
-
- @Test
- public void testIllegalValueForInitialTimestampInConfig() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
- testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
- testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "-1.0");
-
- KinesisConfigUtil.validateConsumerConfiguration(testConfig);
- }
-
- @Test
- public void testDateStringForValidateOptionDateProperty() {
- String timestamp = "2016-04-04T19:58:46.480-00:00";
-
- Properties testConfig = new Properties();
- testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
- testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
- testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, timestamp);
-
- try {
- KinesisConfigUtil.validateConsumerConfiguration(testConfig);
- } catch (Exception e) {
- e.printStackTrace();
- fail();
- }
- }
-
- @Test
- public void testUnixTimestampForValidateOptionDateProperty() {
- String unixTimestamp = "1459799926.480";
-
- Properties testConfig = new Properties();
- testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
- testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
- testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);
-
- try {
- KinesisConfigUtil.validateConsumerConfiguration(testConfig);
- } catch (Exception e) {
- e.printStackTrace();
- fail();
- }
- }
-
- @Test
- public void testInvalidPatternForInitialTimestampInConfig() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
- testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
- testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "2016-03-14");
- testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "InvalidPattern");
-
- KinesisConfigUtil.validateConsumerConfiguration(testConfig);
- }
-
- @Test
- public void testUnparsableDateForUserDefinedDateFormatForInitialTimestampInConfig() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
- testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
- testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "stillUnparsable");
- testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "yyyy-MM-dd");
-
- KinesisConfigUtil.validateConsumerConfiguration(testConfig);
- }
-
- @Test
- public void testDateStringForUserDefinedDateFormatForValidateOptionDateProperty() {
- String unixTimestamp = "2016-04-04";
- String pattern = "yyyy-MM-dd";
-
- Properties testConfig = new Properties();
- testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
- testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
- testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);
- testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, pattern);
-
- try {
- KinesisConfigUtil.validateConsumerConfiguration(testConfig);
- } catch (Exception e) {
- e.printStackTrace();
- fail();
- }
- }
-
- @Test
- public void testUnparsableLongForDescribeStreamBackoffBaseMillisInConfig() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Invalid value given for describe stream operation base backoff milliseconds");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, "unparsableLong");
-
- KinesisConfigUtil.validateConsumerConfiguration(testConfig);
- }
-
- @Test
- public void testUnparsableLongForDescribeStreamBackoffMaxMillisInConfig() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Invalid value given for describe stream operation max backoff milliseconds");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, "unparsableLong");
-
- KinesisConfigUtil.validateConsumerConfiguration(testConfig);
- }
-
- @Test
- public void testUnparsableDoubleForDescribeStreamBackoffExponentialConstantInConfig() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Invalid value given for describe stream operation backoff exponential constant");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
-
- KinesisConfigUtil.validateConsumerConfiguration(testConfig);
- }
-
- @Test
- public void testUnparsableIntForGetRecordsRetriesInConfig() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Invalid value given for maximum retry attempts for getRecords shard operation");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "unparsableInt");
-
- KinesisConfigUtil.validateConsumerConfiguration(testConfig);
- }
-
- @Test
- public void testUnparsableIntForGetRecordsMaxCountInConfig() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Invalid value given for maximum records per getRecords shard operation");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "unparsableInt");
-
- KinesisConfigUtil.validateConsumerConfiguration(testConfig);
- }
-
- @Test
- public void testUnparsableLongForGetRecordsBackoffBaseMillisInConfig() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Invalid value given for get records operation base backoff milliseconds");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "unparsableLong");
-
- KinesisConfigUtil.validateConsumerConfiguration(testConfig);
- }
-
- @Test
- public void testUnparsableLongForGetRecordsBackoffMaxMillisInConfig() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Invalid value given for get records operation max backoff milliseconds");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "unparsableLong");
-
- KinesisConfigUtil.validateConsumerConfiguration(testConfig);
- }
-
- @Test
- public void testUnparsableDoubleForGetRecordsBackoffExponentialConstantInConfig() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Invalid value given for get records operation backoff exponential constant");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
-
- KinesisConfigUtil.validateConsumerConfiguration(testConfig);
- }
-
- @Test
- public void testUnparsableLongForGetRecordsIntervalMillisInConfig() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Invalid value given for getRecords sleep interval in milliseconds");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "unparsableLong");
-
- KinesisConfigUtil.validateConsumerConfiguration(testConfig);
- }
-
- @Test
- public void testUnparsableIntForGetShardIteratorRetriesInConfig() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Invalid value given for maximum retry attempts for getShardIterator shard operation");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES, "unparsableInt");
-
- KinesisConfigUtil.validateConsumerConfiguration(testConfig);
- }
-
- @Test
- public void testUnparsableLongForGetShardIteratorBackoffBaseMillisInConfig() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Invalid value given for get shard iterator operation base backoff milliseconds");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE, "unparsableLong");
-
- KinesisConfigUtil.validateConsumerConfiguration(testConfig);
- }
-
- @Test
- public void testUnparsableLongForGetShardIteratorBackoffMaxMillisInConfig() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Invalid value given for get shard iterator operation max backoff milliseconds");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX, "unparsableLong");
-
- KinesisConfigUtil.validateConsumerConfiguration(testConfig);
- }
-
- @Test
- public void testUnparsableDoubleForGetShardIteratorBackoffExponentialConstantInConfig() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Invalid value given for get shard iterator operation backoff exponential constant");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
-
- KinesisConfigUtil.validateConsumerConfiguration(testConfig);
- }
-
- @Test
- public void testUnparsableLongForShardDiscoveryIntervalMillisInConfig() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Invalid value given for shard discovery sleep interval in milliseconds");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- testConfig.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, "unparsableLong");
-
- KinesisConfigUtil.validateConsumerConfiguration(testConfig);
- }
-
- // ----------------------------------------------------------------------
// Tests related to state initialization
// ----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ff417e76/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
index a576748..ddb300e 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kinesis.util;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
@@ -32,6 +33,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
/**
* Tests for KinesisConfigUtil.
@@ -42,6 +44,10 @@ public class KinesisConfigUtilTest {
@Rule
private ExpectedException exception = ExpectedException.none();
+ // ----------------------------------------------------------------------
+ // getValidatedProducerConfiguration() tests
+ // ----------------------------------------------------------------------
+
@Test
public void testUnparsableLongForProducerConfiguration() {
exception.expect(IllegalArgumentException.class);
@@ -125,4 +131,419 @@ public class KinesisConfigUtilTest {
assertEquals("1", replacedConfig.getProperty(KinesisConfigUtil.AGGREGATION_MAX_COUNT));
assertEquals("2", replacedConfig.getProperty(KinesisConfigUtil.COLLECTION_MAX_COUNT));
}
+
+ // ----------------------------------------------------------------------
+ // validateAwsConfiguration() tests
+ // ----------------------------------------------------------------------
+
+ @Test
+ public void testMissingAwsRegionInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("The AWS region ('" + AWSConfigConstants.AWS_REGION + "') must be set in the config.");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey");
+ testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+ KinesisConfigUtil.validateAwsConfiguration(testConfig);
+ }
+
+ @Test
+ public void testUnrecognizableAwsRegionInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Invalid AWS region");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(AWSConfigConstants.AWS_REGION, "wrongRegionId");
+ testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+ KinesisConfigUtil.validateAwsConfiguration(testConfig);
+ }
+
+ @Test
+ public void testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Please set values for AWS Access Key ID ('" + AWSConfigConstants.AWS_ACCESS_KEY_ID + "') " +
+ "and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+
+ KinesisConfigUtil.validateAwsConfiguration(testConfig);
+ }
+
+ @Test
+ public void testUnrecognizableCredentialProviderTypeInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Invalid AWS Credential Provider Type");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "wrongProviderType");
+ testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+ KinesisConfigUtil.validateAwsConfiguration(testConfig);
+ }
+
+ // ----------------------------------------------------------------------
+ // validateConsumerConfiguration() tests
+ // ----------------------------------------------------------------------
+
+ @Test
+ public void testUnrecognizableStreamInitPositionTypeInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Invalid initial position in stream");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+ testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "wrongInitPosition");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
+ @Test
+ public void testStreamInitPositionTypeSetToAtTimestampButNoInitTimestampSetInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Please set value for initial timestamp ('"
+ + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position.");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+ testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
+ @Test
+ public void testUnparsableDateForInitialTimestampInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+ testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+ testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "unparsableDate");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
+ @Test
+ public void testIllegalValueForInitialTimestampInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+ testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+ testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "-1.0");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
+ @Test
+ public void testDateStringForValidateOptionDateProperty() {
+ String timestamp = "2016-04-04T19:58:46.480-00:00";
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+ testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+ testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, timestamp);
+
+ try {
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void testUnixTimestampForValidateOptionDateProperty() {
+ String unixTimestamp = "1459799926.480";
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+ testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+ testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);
+
+ try {
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void testInvalidPatternForInitialTimestampInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+ testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+ testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "2016-03-14");
+ testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "InvalidPattern");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
+ @Test
+ public void testUnparsableDateForUserDefinedDateFormatForInitialTimestampInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+ testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+ testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "stillUnparsable");
+ testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "yyyy-MM-dd");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
+ @Test
+ public void testDateStringForUserDefinedDateFormatForValidateOptionDateProperty() {
+ String unixTimestamp = "2016-04-04";
+ String pattern = "yyyy-MM-dd";
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+ testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+ testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);
+ testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, pattern);
+
+ try {
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void testUnparsableLongForDescribeStreamBackoffBaseMillisInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Invalid value given for describe stream operation base backoff milliseconds");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+ testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, "unparsableLong");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
+ @Test
+ public void testUnparsableLongForDescribeStreamBackoffMaxMillisInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Invalid value given for describe stream operation max backoff milliseconds");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+ testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, "unparsableLong");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
+ @Test
+ public void testUnparsableDoubleForDescribeStreamBackoffExponentialConstantInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Invalid value given for describe stream operation backoff exponential constant");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+ testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
+ @Test
+ public void testUnparsableIntForGetRecordsRetriesInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Invalid value given for maximum retry attempts for getRecords shard operation");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+ testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "unparsableInt");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
+ @Test
+ public void testUnparsableIntForGetRecordsMaxCountInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Invalid value given for maximum records per getRecords shard operation");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+ testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "unparsableInt");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
+ @Test
+ public void testUnparsableLongForGetRecordsBackoffBaseMillisInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Invalid value given for get records operation base backoff milliseconds");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+ testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "unparsableLong");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
+ @Test
+ public void testUnparsableLongForGetRecordsBackoffMaxMillisInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Invalid value given for get records operation max backoff milliseconds");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+ testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "unparsableLong");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
+ @Test
+ public void testUnparsableDoubleForGetRecordsBackoffExponentialConstantInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Invalid value given for get records operation backoff exponential constant");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+ testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
+ @Test
+ public void testUnparsableLongForGetRecordsIntervalMillisInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Invalid value given for getRecords sleep interval in milliseconds");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+ testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "unparsableLong");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
+ @Test
+ public void testUnparsableIntForGetShardIteratorRetriesInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Invalid value given for maximum retry attempts for getShardIterator shard operation");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+ testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES, "unparsableInt");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
+ @Test
+ public void testUnparsableLongForGetShardIteratorBackoffBaseMillisInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Invalid value given for get shard iterator operation base backoff milliseconds");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+ testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE, "unparsableLong");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
+ @Test
+ public void testUnparsableLongForGetShardIteratorBackoffMaxMillisInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Invalid value given for get shard iterator operation max backoff milliseconds");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+ testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX, "unparsableLong");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
+ @Test
+ public void testUnparsableDoubleForGetShardIteratorBackoffExponentialConstantInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Invalid value given for get shard iterator operation backoff exponential constant");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+ testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
+ @Test
+ public void testUnparsableLongForShardDiscoveryIntervalMillisInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Invalid value given for shard discovery sleep interval in milliseconds");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+ testConfig.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, "unparsableLong");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
}