You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/12/18 07:25:26 UTC
[4/5] flink git commit: [FLINK-8216] [kinesis] Unify test utils in
flink-connector-kinesis
[FLINK-8216] [kinesis] Unify test utils in flink-connector-kinesis
This closes #5130.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c57e56f1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c57e56f1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c57e56f1
Branch: refs/heads/master
Commit: c57e56f183bd923e6947c70f533a2919c888565b
Parents: a7465f0
Author: Bowen Li <bo...@gmail.com>
Authored: Wed Dec 6 23:22:50 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Sun Dec 17 20:44:09 2017 -0800
----------------------------------------------------------------------
.../FlinkKinesisConsumerMigrationTest.java | 9 +--
.../kinesis/FlinkKinesisConsumerTest.java | 12 +---
.../kinesis/FlinkKinesisProducerTest.java | 24 +++-----
.../connectors/kinesis/testutils/TestUtils.java | 39 +++++++++++++
.../kinesis/util/KinesisConfigUtilTest.java | 58 +++++++++-----------
5 files changed, 76 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c57e56f1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
index 364560c..ab9826e 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
@@ -22,12 +22,12 @@ import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
@@ -213,12 +213,7 @@ public class FlinkKinesisConsumerMigrationTest {
private KinesisDataFetcher<T> mockFetcher;
- private static Properties dummyConfig = new Properties();
- static {
- dummyConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
- dummyConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- dummyConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- }
+ private static Properties dummyConfig = TestUtils.getStandardProperties();
DummyFlinkKinesisConsumer(KinesisDataFetcher<T> mockFetcher) {
super("dummy-topic", mock(KinesisDeserializationSchema.class), dummyConfig);
http://git-wip-us.apache.org/repos/asf/flink/blob/c57e56f1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index a354bb3..ea63476 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
@@ -42,6 +41,7 @@ import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
@@ -92,10 +92,7 @@ public class FlinkKinesisConsumerTest {
@Test
public void testUseRestoredStateForSnapshotIfFetcherNotInitialized() throws Exception {
- Properties config = new Properties();
- config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
- config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+ Properties config = TestUtils.getStandardProperties();
List<Tuple2<StreamShardMetadata, SequenceNumber>> globalUnionState = new ArrayList<>(4);
globalUnionState.add(Tuple2.of(
@@ -155,10 +152,7 @@ public class FlinkKinesisConsumerTest {
// ----------------------------------------------------------------------
// setup config, initial state and expected state snapshot
// ----------------------------------------------------------------------
- Properties config = new Properties();
- config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
- config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+ Properties config = TestUtils.getStandardProperties();
ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> initialState = new ArrayList<>(1);
initialState.add(Tuple2.of(
http://git-wip-us.apache.org/repos/asf/flink/blob/c57e56f1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
index 8351f8e..07c9cd7 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
@@ -24,8 +24,8 @@ import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.ExceptionUtils;
@@ -45,7 +45,6 @@ import org.mockito.stubbing.Answer;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
-import java.util.Properties;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
@@ -71,12 +70,12 @@ public class FlinkKinesisProducerTest {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("The provided serialization schema is not serializable");
- new FlinkKinesisProducer<>(new NonSerializableSerializationSchema(), getStandardProperties());
+ new FlinkKinesisProducer<>(new NonSerializableSerializationSchema(), TestUtils.getStandardProperties());
}
@Test
public void testCreateWithSerializableDeserializer() {
- new FlinkKinesisProducer<>(new SerializableSerializationSchema(), getStandardProperties());
+ new FlinkKinesisProducer<>(new SerializableSerializationSchema(), TestUtils.getStandardProperties());
}
@Test
@@ -84,19 +83,19 @@ public class FlinkKinesisProducerTest {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("The provided custom partitioner is not serializable");
- new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties())
+ new FlinkKinesisProducer<>(new SimpleStringSchema(), TestUtils.getStandardProperties())
.setCustomPartitioner(new NonSerializableCustomPartitioner());
}
@Test
public void testConfigureWithSerializableCustomPartitioner() {
- new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties())
+ new FlinkKinesisProducer<>(new SimpleStringSchema(), TestUtils.getStandardProperties())
.setCustomPartitioner(new SerializableCustomPartitioner());
}
@Test
public void testProducerIsSerializable() {
- FlinkKinesisProducer<String> consumer = new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties());
+ FlinkKinesisProducer<String> consumer = new FlinkKinesisProducer<>(new SimpleStringSchema(), TestUtils.getStandardProperties());
assertTrue(InstantiationUtil.isSerializable(consumer));
}
@@ -350,7 +349,7 @@ public class FlinkKinesisProducerTest {
private boolean isFlushed;
DummyFlinkKinesisProducer(SerializationSchema<T> schema) {
- super(schema, getStandardProperties());
+ super(schema, TestUtils.getStandardProperties());
setDefaultStream(DUMMY_STREAM);
setDefaultPartition(DUMMY_PARTITION);
@@ -440,13 +439,4 @@ public class FlinkKinesisProducerTest {
return numPending;
}
}
-
- private static Properties getStandardProperties() {
- Properties standardProps = new Properties();
- standardProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
- standardProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- standardProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
- return standardProps;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c57e56f1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
new file mode 100644
index 0000000..f6d0a44
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.testutils;
+
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+
+import java.util.Properties;
+
+/**
+ * General test utils.
+ */
+public class TestUtils {
+ /**
+ * Get standard Kinesis-related config properties.
+ */
+ public static Properties getStandardProperties() {
+ Properties config = new Properties();
+ config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+ config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+ return config;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c57e56f1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
index dab6ea2..074b676 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.util;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import org.junit.Rule;
@@ -171,7 +172,7 @@ public class KinesisConfigUtilTest {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid AWS Credential Provider Type");
- Properties testConfig = getPropertiesWithRequiredFields();
+ Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "wrongProviderType");
KinesisConfigUtil.validateAwsConfiguration(testConfig);
@@ -186,7 +187,7 @@ public class KinesisConfigUtilTest {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid initial position in stream");
- Properties testConfig = getPropertiesWithRequiredFields();
+ Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "wrongInitPosition");
@@ -199,7 +200,7 @@ public class KinesisConfigUtilTest {
exception.expectMessage("Please set value for initial timestamp ('"
+ ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position.");
- Properties testConfig = getPropertiesWithRequiredFields();
+ Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
@@ -211,7 +212,7 @@ public class KinesisConfigUtilTest {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
- Properties testConfig = getPropertiesWithRequiredFields();
+ Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "unparsableDate");
@@ -224,7 +225,7 @@ public class KinesisConfigUtilTest {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
- Properties testConfig = getPropertiesWithRequiredFields();
+ Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "-1.0");
@@ -236,7 +237,7 @@ public class KinesisConfigUtilTest {
public void testDateStringForValidateOptionDateProperty() {
String timestamp = "2016-04-04T19:58:46.480-00:00";
- Properties testConfig = getPropertiesWithRequiredFields();
+ Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, timestamp);
@@ -253,7 +254,7 @@ public class KinesisConfigUtilTest {
public void testUnixTimestampForValidateOptionDateProperty() {
String unixTimestamp = "1459799926.480";
- Properties testConfig = getPropertiesWithRequiredFields();
+ Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);
@@ -271,7 +272,7 @@ public class KinesisConfigUtilTest {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
- Properties testConfig = getPropertiesWithRequiredFields();
+ Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "2016-03-14");
@@ -285,7 +286,7 @@ public class KinesisConfigUtilTest {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
- Properties testConfig = getPropertiesWithRequiredFields();
+ Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "stillUnparsable");
@@ -299,7 +300,7 @@ public class KinesisConfigUtilTest {
String unixTimestamp = "2016-04-04";
String pattern = "yyyy-MM-dd";
- Properties testConfig = getPropertiesWithRequiredFields();
+ Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);
@@ -318,7 +319,7 @@ public class KinesisConfigUtilTest {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for describe stream operation base backoff milliseconds");
- Properties testConfig = getPropertiesWithRequiredFields();
+ Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, "unparsableLong");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -329,7 +330,7 @@ public class KinesisConfigUtilTest {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for describe stream operation max backoff milliseconds");
- Properties testConfig = getPropertiesWithRequiredFields();
+ Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, "unparsableLong");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -340,7 +341,7 @@ public class KinesisConfigUtilTest {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for describe stream operation backoff exponential constant");
- Properties testConfig = getPropertiesWithRequiredFields();
+ Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -351,7 +352,7 @@ public class KinesisConfigUtilTest {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for maximum retry attempts for getRecords shard operation");
- Properties testConfig = getPropertiesWithRequiredFields();
+ Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "unparsableInt");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -362,7 +363,7 @@ public class KinesisConfigUtilTest {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for maximum records per getRecords shard operation");
- Properties testConfig = getPropertiesWithRequiredFields();
+ Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "unparsableInt");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -373,7 +374,7 @@ public class KinesisConfigUtilTest {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for get records operation base backoff milliseconds");
- Properties testConfig = getPropertiesWithRequiredFields();
+ Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "unparsableLong");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -384,7 +385,7 @@ public class KinesisConfigUtilTest {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for get records operation max backoff milliseconds");
- Properties testConfig = getPropertiesWithRequiredFields();
+ Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "unparsableLong");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -395,7 +396,7 @@ public class KinesisConfigUtilTest {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for get records operation backoff exponential constant");
- Properties testConfig = getPropertiesWithRequiredFields();
+ Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -406,7 +407,7 @@ public class KinesisConfigUtilTest {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for getRecords sleep interval in milliseconds");
- Properties testConfig = getPropertiesWithRequiredFields();
+ Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "unparsableLong");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -417,7 +418,7 @@ public class KinesisConfigUtilTest {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for maximum retry attempts for getShardIterator shard operation");
- Properties testConfig = getPropertiesWithRequiredFields();
+ Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES, "unparsableInt");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -428,7 +429,7 @@ public class KinesisConfigUtilTest {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for get shard iterator operation base backoff milliseconds");
- Properties testConfig = getPropertiesWithRequiredFields();
+ Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE, "unparsableLong");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -439,7 +440,7 @@ public class KinesisConfigUtilTest {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for get shard iterator operation max backoff milliseconds");
- Properties testConfig = getPropertiesWithRequiredFields();
+ Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX, "unparsableLong");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -450,7 +451,7 @@ public class KinesisConfigUtilTest {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for get shard iterator operation backoff exponential constant");
- Properties testConfig = getPropertiesWithRequiredFields();
+ Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -461,18 +462,9 @@ public class KinesisConfigUtilTest {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for shard discovery sleep interval in milliseconds");
- Properties testConfig = getPropertiesWithRequiredFields();
+ Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, "unparsableLong");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
-
- private Properties getPropertiesWithRequiredFields() {
- Properties config = new Properties();
- config.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
- config.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- config.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
- return config;
- }
}