You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/12/18 07:25:23 UTC

[1/5] flink git commit: [hotfix] [javadoc] Fix typo in StreamExecutionEnvironment javadoc

Repository: flink
Updated Branches:
  refs/heads/master 3b1448ecb -> 7f99a0df6


[hotfix] [javadoc] Fix typo in StreamExecutionEnvironment javadoc

This closes #5164.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d12158e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d12158e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d12158e8

Branch: refs/heads/master
Commit: d12158e8c996aeb3a4900e4f57347a5b6f8fa1cb
Parents: 3b1448e
Author: Matrix42 <93...@qq.com>
Authored: Wed Dec 13 23:59:07 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Sun Dec 17 20:04:39 2017 -0800

----------------------------------------------------------------------
 .../streaming/api/environment/StreamExecutionEnvironment.java      | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d12158e8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index cc45ddc..355d277 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -439,7 +439,7 @@ public abstract class StreamExecutionEnvironment {
 	 * from operations on {@link org.apache.flink.streaming.api.datastream.KeyedStream}) is maintained
 	 * (heap, managed memory, externally), and where state snapshots/checkpoints are stored, both for
 	 * the key/value state, and for checkpointed functions (implementing the interface
-	 * {@link org.apache.flink.streaming.api.checkpoint.Checkpointed}).
+	 * {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}).
 	 *
 	 * <p>The {@link org.apache.flink.runtime.state.memory.MemoryStateBackend} for example
 	 * maintains the state in heap memory, as objects. It is lightweight without extra dependencies,


[3/5] flink git commit: [FLINK-8218] [kinesis] Move flink-connector-kinesis examples from /src to /test

Posted by tz...@apache.org.
[FLINK-8218] [kinesis] Move flink-connector-kinesis examples from /src to /test

This closes #5131.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7465f04
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7465f04
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7465f04

Branch: refs/heads/master
Commit: a7465f04ff2afa3774d7e3f746faadf9a5500fed
Parents: 3397bd6
Author: Bowen Li <bo...@gmail.com>
Authored: Thu Dec 7 00:04:12 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Sun Dec 17 20:39:38 2017 -0800

----------------------------------------------------------------------
 .../kinesis/examples/ConsumeFromKinesis.java    | 55 -------------
 .../kinesis/examples/ProduceIntoKinesis.java    | 82 --------------------
 .../kinesis/examples/ConsumeFromKinesis.java    | 55 +++++++++++++
 .../kinesis/examples/ProduceIntoKinesis.java    | 82 ++++++++++++++++++++
 4 files changed, 137 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a7465f04/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
deleted file mode 100644
index 29f631d..0000000
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.examples;
-
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
-import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-
-import java.util.Properties;
-
-/**
- * This is an example on how to consume data from Kinesis.
- */
-public class ConsumeFromKinesis {
-
-	public static void main(String[] args) throws Exception {
-		ParameterTool pt = ParameterTool.fromArgs(args);
-
-		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
-		see.setParallelism(1);
-
-		Properties kinesisConsumerConfig = new Properties();
-		kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, pt.getRequired("region"));
-		kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accesskey"));
-		kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretkey"));
-
-		DataStream<String> kinesis = see.addSource(new FlinkKinesisConsumer<>(
-			"flink-test",
-			new SimpleStringSchema(),
-			kinesisConsumerConfig));
-
-		kinesis.print();
-
-		see.execute();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7465f04/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
deleted file mode 100644
index 6846018..0000000
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.examples;
-
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
-import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
-
-import org.apache.commons.lang3.RandomStringUtils;
-
-import java.util.Properties;
-
-/**
- * This is an example on how to produce data into Kinesis.
- */
-public class ProduceIntoKinesis {
-
-	public static void main(String[] args) throws Exception {
-		ParameterTool pt = ParameterTool.fromArgs(args);
-
-		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
-		see.setParallelism(1);
-
-		DataStream<String> simpleStringStream = see.addSource(new EventsGenerator());
-
-		Properties kinesisProducerConfig = new Properties();
-		kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION, pt.getRequired("region"));
-		kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
-		kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
-
-		FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(
-				new SimpleStringSchema(), kinesisProducerConfig);
-
-		kinesis.setFailOnError(true);
-		kinesis.setDefaultStream("flink-test");
-		kinesis.setDefaultPartition("0");
-
-		simpleStringStream.addSink(kinesis);
-
-		see.execute();
-	}
-
-	/**
-	 * Data generator that creates strings starting with a sequence number followed by a dash and 12 random characters.
-	 */
-	public static class EventsGenerator implements SourceFunction<String> {
-		private boolean running = true;
-
-		@Override
-		public void run(SourceContext<String> ctx) throws Exception {
-			long seq = 0;
-			while (running) {
-				Thread.sleep(10);
-				ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12));
-			}
-		}
-
-		@Override
-		public void cancel() {
-			running = false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7465f04/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
new file mode 100644
index 0000000..29f631d
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.examples;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+
+import java.util.Properties;
+
+/**
+ * This is an example on how to consume data from Kinesis.
+ */
+public class ConsumeFromKinesis {
+
+	public static void main(String[] args) throws Exception {
+		ParameterTool pt = ParameterTool.fromArgs(args);
+
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.setParallelism(1);
+
+		Properties kinesisConsumerConfig = new Properties();
+		kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, pt.getRequired("region"));
+		kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accesskey"));
+		kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretkey"));
+
+		DataStream<String> kinesis = see.addSource(new FlinkKinesisConsumer<>(
+			"flink-test",
+			new SimpleStringSchema(),
+			kinesisConsumerConfig));
+
+		kinesis.print();
+
+		see.execute();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7465f04/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
new file mode 100644
index 0000000..6846018
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.examples;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+
+import org.apache.commons.lang3.RandomStringUtils;
+
+import java.util.Properties;
+
+/**
+ * This is an example on how to produce data into Kinesis.
+ */
+public class ProduceIntoKinesis {
+
+	public static void main(String[] args) throws Exception {
+		ParameterTool pt = ParameterTool.fromArgs(args);
+
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.setParallelism(1);
+
+		DataStream<String> simpleStringStream = see.addSource(new EventsGenerator());
+
+		Properties kinesisProducerConfig = new Properties();
+		kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION, pt.getRequired("region"));
+		kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
+		kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
+
+		FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(
+				new SimpleStringSchema(), kinesisProducerConfig);
+
+		kinesis.setFailOnError(true);
+		kinesis.setDefaultStream("flink-test");
+		kinesis.setDefaultPartition("0");
+
+		simpleStringStream.addSink(kinesis);
+
+		see.execute();
+	}
+
+	/**
+	 * Data generator that creates strings starting with a sequence number followed by a dash and 12 random characters.
+	 */
+	public static class EventsGenerator implements SourceFunction<String> {
+		private boolean running = true;
+
+		@Override
+		public void run(SourceContext<String> ctx) throws Exception {
+			long seq = 0;
+			while (running) {
+				Thread.sleep(10);
+				ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12));
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
+}


[2/5] flink git commit: [FLINK-8249] [kinesis] Fix setting region on KinesisProducerConfiguration

Posted by tz...@apache.org.
[FLINK-8249] [kinesis] Fix setting region on KinesisProducerConfiguration

This closes #5160.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3397bd66
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3397bd66
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3397bd66

Branch: refs/heads/master
Commit: 3397bd66abdc529a2d5940a09f9feee035fd0b90
Parents: d12158e
Author: eskabetxe <bo...@boto.pro>
Authored: Wed Dec 13 12:43:27 2017 +0100
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Sun Dec 17 20:27:24 2017 -0800

----------------------------------------------------------------------
 .../connectors/kinesis/util/KinesisConfigUtil.java        |  1 +
 .../connectors/kinesis/util/KinesisConfigUtilTest.java    | 10 ++++++++++
 2 files changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3397bd66/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index dae7f52..a6b0f04 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -189,6 +189,7 @@ public class KinesisConfigUtil {
 		validateAwsConfiguration(config);
 
 		KinesisProducerConfiguration kpc = KinesisProducerConfiguration.fromProperties(config);
+		kpc.setRegion(config.getProperty(AWSConfigConstants.AWS_REGION));
 
 		kpc.setCredentialsProvider(AWSUtil.getCredentialsProvider(config));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3397bd66/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
index 792d3a7..dab6ea2 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -114,6 +114,16 @@ public class KinesisConfigUtilTest {
 		assertEquals("2", replacedConfig.getProperty(KinesisConfigUtil.COLLECTION_MAX_COUNT));
 	}
 
+	@Test
+	public void testCorrectlySetRegionInProducerConfiguration() {
+		String region = "us-east-1";
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWSConfigConstants.AWS_REGION, region);
+		KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
+
+		assertEquals("incorrect region", region, kpc.getRegion());
+	}
+
 	// ----------------------------------------------------------------------
 	// validateAwsConfiguration() tests
 	// ----------------------------------------------------------------------


[4/5] flink git commit: [FLINK-8216] [kinesis] Unify test utils in flink-connector-kinesis

Posted by tz...@apache.org.
[FLINK-8216] [kinesis] Unify test utils in flink-connector-kinesis

This closes #5130.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c57e56f1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c57e56f1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c57e56f1

Branch: refs/heads/master
Commit: c57e56f183bd923e6947c70f533a2919c888565b
Parents: a7465f0
Author: Bowen Li <bo...@gmail.com>
Authored: Wed Dec 6 23:22:50 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Sun Dec 17 20:44:09 2017 -0800

----------------------------------------------------------------------
 .../FlinkKinesisConsumerMigrationTest.java      |  9 +--
 .../kinesis/FlinkKinesisConsumerTest.java       | 12 +---
 .../kinesis/FlinkKinesisProducerTest.java       | 24 +++-----
 .../connectors/kinesis/testutils/TestUtils.java | 39 +++++++++++++
 .../kinesis/util/KinesisConfigUtilTest.java     | 58 +++++++++-----------
 5 files changed, 76 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c57e56f1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
index 364560c..ab9826e 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
@@ -22,12 +22,12 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
@@ -213,12 +213,7 @@ public class FlinkKinesisConsumerMigrationTest {
 
 		private KinesisDataFetcher<T> mockFetcher;
 
-		private static Properties dummyConfig = new Properties();
-		static {
-			dummyConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
-			dummyConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-			dummyConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-		}
+		private static Properties dummyConfig = TestUtils.getStandardProperties();
 
 		DummyFlinkKinesisConsumer(KinesisDataFetcher<T> mockFetcher) {
 			super("dummy-topic", mock(KinesisDeserializationSchema.class), dummyConfig);

http://git-wip-us.apache.org/repos/asf/flink/blob/c57e56f1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index a354bb3..ea63476 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
@@ -42,6 +41,7 @@ import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 
@@ -92,10 +92,7 @@ public class FlinkKinesisConsumerTest {
 
 	@Test
 	public void testUseRestoredStateForSnapshotIfFetcherNotInitialized() throws Exception {
-		Properties config = new Properties();
-		config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
-		config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		Properties config = TestUtils.getStandardProperties();
 
 		List<Tuple2<StreamShardMetadata, SequenceNumber>> globalUnionState = new ArrayList<>(4);
 		globalUnionState.add(Tuple2.of(
@@ -155,10 +152,7 @@ public class FlinkKinesisConsumerTest {
 		// ----------------------------------------------------------------------
 		// setup config, initial state and expected state snapshot
 		// ----------------------------------------------------------------------
-		Properties config = new Properties();
-		config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
-		config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		Properties config = TestUtils.getStandardProperties();
 
 		ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> initialState = new ArrayList<>(1);
 		initialState.add(Tuple2.of(

http://git-wip-us.apache.org/repos/asf/flink/blob/c57e56f1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
index 8351f8e..07c9cd7 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
@@ -24,8 +24,8 @@ import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.ExceptionUtils;
@@ -45,7 +45,6 @@ import org.mockito.stubbing.Answer;
 import java.nio.ByteBuffer;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Properties;
 
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
@@ -71,12 +70,12 @@ public class FlinkKinesisProducerTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("The provided serialization schema is not serializable");
 
-		new FlinkKinesisProducer<>(new NonSerializableSerializationSchema(), getStandardProperties());
+		new FlinkKinesisProducer<>(new NonSerializableSerializationSchema(), TestUtils.getStandardProperties());
 	}
 
 	@Test
 	public void testCreateWithSerializableDeserializer() {
-		new FlinkKinesisProducer<>(new SerializableSerializationSchema(), getStandardProperties());
+		new FlinkKinesisProducer<>(new SerializableSerializationSchema(), TestUtils.getStandardProperties());
 	}
 
 	@Test
@@ -84,19 +83,19 @@ public class FlinkKinesisProducerTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("The provided custom partitioner is not serializable");
 
-		new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties())
+		new FlinkKinesisProducer<>(new SimpleStringSchema(), TestUtils.getStandardProperties())
 			.setCustomPartitioner(new NonSerializableCustomPartitioner());
 	}
 
 	@Test
 	public void testConfigureWithSerializableCustomPartitioner() {
-		new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties())
+		new FlinkKinesisProducer<>(new SimpleStringSchema(), TestUtils.getStandardProperties())
 			.setCustomPartitioner(new SerializableCustomPartitioner());
 	}
 
 	@Test
 	public void testProducerIsSerializable() {
-		FlinkKinesisProducer<String> consumer = new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties());
+		FlinkKinesisProducer<String> consumer = new FlinkKinesisProducer<>(new SimpleStringSchema(), TestUtils.getStandardProperties());
 		assertTrue(InstantiationUtil.isSerializable(consumer));
 	}
 
@@ -350,7 +349,7 @@ public class FlinkKinesisProducerTest {
 		private boolean isFlushed;
 
 		DummyFlinkKinesisProducer(SerializationSchema<T> schema) {
-			super(schema, getStandardProperties());
+			super(schema, TestUtils.getStandardProperties());
 
 			setDefaultStream(DUMMY_STREAM);
 			setDefaultPartition(DUMMY_PARTITION);
@@ -440,13 +439,4 @@ public class FlinkKinesisProducerTest {
 			return numPending;
 		}
 	}
-
-	private static Properties getStandardProperties() {
-		Properties standardProps = new Properties();
-		standardProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
-		standardProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		standardProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
-		return standardProps;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c57e56f1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
new file mode 100644
index 0000000..f6d0a44
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.testutils;
+
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+
+import java.util.Properties;
+
+/**
+ * General test utils.
+ */
+public class TestUtils {
+	/**
+	 * Get standard Kinesis-related config properties.
+	 */
+	public static Properties getStandardProperties() {
+		Properties config = new Properties();
+		config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+		config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+		return config;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c57e56f1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
index dab6ea2..074b676 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.util;
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
 
 import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
 import org.junit.Rule;
@@ -171,7 +172,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid AWS Credential Provider Type");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "wrongProviderType");
 
 		KinesisConfigUtil.validateAwsConfiguration(testConfig);
@@ -186,7 +187,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid initial position in stream");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "wrongInitPosition");
 
@@ -199,7 +200,7 @@ public class KinesisConfigUtilTest {
 		exception.expectMessage("Please set value for initial timestamp ('"
 				+ ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position.");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 
@@ -211,7 +212,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "unparsableDate");
@@ -224,7 +225,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "-1.0");
@@ -236,7 +237,7 @@ public class KinesisConfigUtilTest {
 	public void testDateStringForValidateOptionDateProperty() {
 		String timestamp = "2016-04-04T19:58:46.480-00:00";
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, timestamp);
@@ -253,7 +254,7 @@ public class KinesisConfigUtilTest {
 	public void testUnixTimestampForValidateOptionDateProperty() {
 		String unixTimestamp = "1459799926.480";
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);
@@ -271,7 +272,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "2016-03-14");
@@ -285,7 +286,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "stillUnparsable");
@@ -299,7 +300,7 @@ public class KinesisConfigUtilTest {
 		String unixTimestamp = "2016-04-04";
 		String pattern = "yyyy-MM-dd";
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);
@@ -318,7 +319,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for describe stream operation base backoff milliseconds");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, "unparsableLong");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -329,7 +330,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for describe stream operation max backoff milliseconds");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, "unparsableLong");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -340,7 +341,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for describe stream operation backoff exponential constant");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -351,7 +352,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for maximum retry attempts for getRecords shard operation");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "unparsableInt");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -362,7 +363,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for maximum records per getRecords shard operation");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "unparsableInt");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -373,7 +374,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for get records operation base backoff milliseconds");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "unparsableLong");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -384,7 +385,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for get records operation max backoff milliseconds");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "unparsableLong");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -395,7 +396,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for get records operation backoff exponential constant");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -406,7 +407,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for getRecords sleep interval in milliseconds");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "unparsableLong");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -417,7 +418,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for maximum retry attempts for getShardIterator shard operation");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES, "unparsableInt");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -428,7 +429,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for get shard iterator operation base backoff milliseconds");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE, "unparsableLong");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -439,7 +440,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for get shard iterator operation max backoff milliseconds");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX, "unparsableLong");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -450,7 +451,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for get shard iterator operation backoff exponential constant");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -461,18 +462,9 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for shard discovery sleep interval in milliseconds");
 
-		Properties testConfig = getPropertiesWithRequiredFields();
+		Properties testConfig = TestUtils.getStandardProperties();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, "unparsableLong");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
 	}
-
-	private Properties getPropertiesWithRequiredFields() {
-		Properties config = new Properties();
-		config.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		config.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		config.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
-		return config;
-	}
 }


[5/5] flink git commit: [hotfix] [doc] Fix typo in TaskManager and EnvironmentInformation doc

Posted by tz...@apache.org.
[hotfix] [doc] Fix typo in TaskManager and EnvironmentInformation doc

This closes #5135.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7f99a0df
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7f99a0df
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7f99a0df

Branch: refs/heads/master
Commit: 7f99a0df669dc73c983913c505c7f72dab3c0a4d
Parents: c57e56f
Author: Cristian <me...@cristian.io>
Authored: Thu Dec 7 11:00:00 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Sun Dec 17 20:52:09 2017 -0800

----------------------------------------------------------------------
 .../org/apache/flink/runtime/util/EnvironmentInformation.java  | 2 +-
 .../org/apache/flink/runtime/taskmanager/TaskManager.scala     | 6 +++---
 2 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7f99a0df/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
index a2a64cf..e700256 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
@@ -259,7 +259,7 @@ public class EnvironmentInformation {
 	}
 	
 	/**
-	 * Logs a information about the environment, like code revision, current user, java version,
+	 * Logs information about the environment, like code revision, current user, Java version,
 	 * and JVM parameters.
 	 *
 	 * @param log The logger to log the information to.

http://git-wip-us.apache.org/repos/asf/flink/blob/7f99a0df/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index f948df4..9979618 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -585,8 +585,8 @@ class TaskManager(
               config.getMaxRegistrationPause().toMilliseconds,
               TimeUnit.MILLISECONDS))
 
-            // schedule (with our timeout s delay) a check triggers a new registration
-            // attempt, if we are not registered by then
+            // schedule a check to trigger a new registration attempt if not registered
+            // by the timeout
             scheduledTaskManagerRegistration = Option(context.system.scheduler.scheduleOnce(
               timeout,
               self,
@@ -1898,7 +1898,7 @@ object TaskManager {
   }
 
   /**
-    * Starts and runs the TaskManager. with all its components trying to bind to
+    * Starts and runs the TaskManager with all its components trying to bind to
     * a port in the specified range.
     *
     * @param taskManagerHostname The hostname/address of the interface where the actor system