You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/12/18 07:25:26 UTC

[4/5] flink git commit: [FLINK-8216] [kinesis] Unify test utils in flink-connector-kinesis

[FLINK-8216] [kinesis] Unify test utils in flink-connector-kinesis

This closes #5130.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c57e56f1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c57e56f1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c57e56f1

Branch: refs/heads/master
Commit: c57e56f183bd923e6947c70f533a2919c888565b
Parents: a7465f0
Author: Bowen Li <bo...@gmail.com>
Authored: Wed Dec 6 23:22:50 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Sun Dec 17 20:44:09 2017 -0800

----------------------------------------------------------------------
 .../FlinkKinesisConsumerMigrationTest.java      |  9 +--
 .../kinesis/FlinkKinesisConsumerTest.java       | 12 +---
 .../kinesis/FlinkKinesisProducerTest.java       | 24 +++-----
 .../connectors/kinesis/testutils/TestUtils.java | 39 +++++++++++++
 .../kinesis/util/KinesisConfigUtilTest.java     | 58 +++++++++-----------
 5 files changed, 76 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c57e56f1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
index 364560c..ab9826e 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
@@ -22,12 +22,12 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
@@ -213,12 +213,7 @@ public class FlinkKinesisConsumerMigrationTest {
 
 		private KinesisDataFetcher<T> mockFetcher;
 
-		private static Properties dummyConfig = new Properties();
-		static {
-			dummyConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
-			dummyConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-			dummyConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-		}
+		private static Properties dummyConfig = TestUtils.getStandardProperties();
 
 		DummyFlinkKinesisConsumer(KinesisDataFetcher<T> mockFetcher) {
 			super("dummy-topic", mock(KinesisDeserializationSchema.class), dummyConfig);

http://git-wip-us.apache.org/repos/asf/flink/blob/c57e56f1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index a354bb3..ea63476 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
@@ -42,6 +41,7 @@ import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 
@@ -92,10 +92,7 @@ public class FlinkKinesisConsumerTest {
 
 	@Test
 	public void testUseRestoredStateForSnapshotIfFetcherNotInitialized() throws Exception {
-		Properties config = new Properties();
-		config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
-		config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		Properties config = TestUtils.getStandardProperties();
 
 		List<Tuple2<StreamShardMetadata, SequenceNumber>> globalUnionState = new ArrayList<>(4);
 		globalUnionState.add(Tuple2.of(
@@ -155,10 +152,7 @@ public class FlinkKinesisConsumerTest {
 		// ----------------------------------------------------------------------
 		// setup config, initial state and expected state snapshot
 		// ----------------------------------------------------------------------
-		Properties config = new Properties();
-		config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
-		config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		Properties config = TestUtils.getStandardProperties();
 
 		ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> initialState = new ArrayList<>(1);
 		initialState.add(Tuple2.of(

http://git-wip-us.apache.org/repos/asf/flink/blob/c57e56f1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
index 8351f8e..07c9cd7 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
@@ -24,8 +24,8 @@ import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.ExceptionUtils;
@@ -45,7 +45,6 @@ import org.mockito.stubbing.Answer;
 import java.nio.ByteBuffer;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Properties;
 
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
@@ -71,12 +70,12 @@ public class FlinkKinesisProducerTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("The provided serialization schema is not serializable");
 
-		new FlinkKinesisProducer<>(new NonSerializableSerializationSchema(), getStandardProperties());
+		new FlinkKinesisProducer<>(new NonSerializableSerializationSchema(), TestUtils.getStandardProperties());
 	}
 
 	@Test
 	public void testCreateWithSerializableDeserializer() {
-		new FlinkKinesisProducer<>(new SerializableSerializationSchema(), getStandardProperties());
+		new FlinkKinesisProducer<>(new SerializableSerializationSchema(), TestUtils.getStandardProperties());
 	}
 
 	@Test
@@ -84,19 +83,19 @@ public class FlinkKinesisProducerTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("The provided custom partitioner is not serializable");
 
-		new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties())
+		new FlinkKinesisProducer<>(new SimpleStringSchema(), TestUtils.getStandardProperties())
 			.setCustomPartitioner(new NonSerializableCustomPartitioner());
 	}
 
 	@Test
 	public void testConfigureWithSerializableCustomPartitioner() {
-		new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties())
+		new FlinkKinesisProducer<>(new SimpleStringSchema(), TestUtils.getStandardProperties())
 			.setCustomPartitioner(new SerializableCustomPartitioner());
 	}
 
 	@Test
 	public void testProducerIsSerializable() {
-		FlinkKinesisProducer<String> consumer = new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties());
+		FlinkKinesisProducer<String> consumer = new FlinkKinesisProducer<>(new SimpleStringSchema(), TestUtils.getStandardProperties());
 		assertTrue(InstantiationUtil.isSerializable(consumer));
 	}
 
@@ -350,7 +349,7 @@ public class FlinkKinesisProducerTest {
 		private boolean isFlushed;
 
 		DummyFlinkKinesisProducer(SerializationSchema<T> schema) {
-			super(schema, getStandardProperties());
+			super(schema, TestUtils.getStandardProperties());
 
 			setDefaultStream(DUMMY_STREAM);
 			setDefaultPartition(DUMMY_PARTITION);
@@ -440,13 +439,4 @@ public class FlinkKinesisProducerTest {
 			return numPending;
 		}
 	}
-
-	private static Properties getStandardProperties() {
-		Properties standardProps = new Properties();
-		standardProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
-		standardProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		standardProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
-		return standardProps;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c57e56f1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
new file mode 100644
index 0000000..f6d0a44
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.testutils;
+
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+
+import java.util.Properties;
+
+/**
+ * General test utils.
+ */
+public class TestUtils {
+	/**
+	 * Get standard Kinesis-related config properties.
+	 */
+	public static Properties getStandardProperties() {
+		Properties config = new Properties();
+		config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+		config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+		return config;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c57e56f1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
index dab6ea2..074b676 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.util;
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
 
 import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
 import org.junit.Rule;
@@ -171,7 +172,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid AWS Credential Provider Type");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "wrongProviderType");
 
 		KinesisConfigUtil.validateAwsConfiguration(testConfig);
@@ -186,7 +187,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid initial position in stream");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "wrongInitPosition");
 
@@ -199,7 +200,7 @@ public class KinesisConfigUtilTest {
 		exception.expectMessage("Please set value for initial timestamp ('"
 				+ ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position.");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 
@@ -211,7 +212,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "unparsableDate");
@@ -224,7 +225,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "-1.0");
@@ -236,7 +237,7 @@ public class KinesisConfigUtilTest {
 	public void testDateStringForValidateOptionDateProperty() {
 		String timestamp = "2016-04-04T19:58:46.480-00:00";
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, timestamp);
@@ -253,7 +254,7 @@ public class KinesisConfigUtilTest {
 	public void testUnixTimestampForValidateOptionDateProperty() {
 		String unixTimestamp = "1459799926.480";
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);
@@ -271,7 +272,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "2016-03-14");
@@ -285,7 +286,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "stillUnparsable");
@@ -299,7 +300,7 @@ public class KinesisConfigUtilTest {
 		String unixTimestamp = "2016-04-04";
 		String pattern = "yyyy-MM-dd";
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);
@@ -318,7 +319,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for describe stream operation base backoff milliseconds");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, "unparsableLong");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -329,7 +330,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for describe stream operation max backoff milliseconds");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, "unparsableLong");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -340,7 +341,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for describe stream operation backoff exponential constant");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -351,7 +352,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for maximum retry attempts for getRecords shard operation");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "unparsableInt");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -362,7 +363,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for maximum records per getRecords shard operation");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "unparsableInt");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -373,7 +374,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for get records operation base backoff milliseconds");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "unparsableLong");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -384,7 +385,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for get records operation max backoff milliseconds");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "unparsableLong");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -395,7 +396,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for get records operation backoff exponential constant");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -406,7 +407,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for getRecords sleep interval in milliseconds");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "unparsableLong");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -417,7 +418,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for maximum retry attempts for getShardIterator shard operation");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES, "unparsableInt");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -428,7 +429,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for get shard iterator operation base backoff milliseconds");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE, "unparsableLong");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -439,7 +440,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for get shard iterator operation max backoff milliseconds");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX, "unparsableLong");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -450,7 +451,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for get shard iterator operation backoff exponential constant");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -461,18 +462,9 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for shard discovery sleep interval in milliseconds");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, "unparsableLong");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
 	}
-
-	private Properties getPropertiesWithRequiredFields() {
-		Properties config = new Properties();
-		config.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		config.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		config.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
-		return config;
-	}
 }