You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/05/22 08:56:00 UTC

[01/17] flink git commit: [hotfix] [e2e-tests] Make SequenceGeneratorSource usable for 0-size key ranges

Repository: flink
Updated Branches:
  refs/heads/release-1.5 2b4137ce1 -> 9d0cd5848


[hotfix] [e2e-tests] Make SequenceGeneratorSource usable for 0-size key ranges


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0d5d086f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0d5d086f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0d5d086f

Branch: refs/heads/release-1.5
Commit: 0d5d086f5a1e929965ed8b00cc48de041c30124f
Parents: 2b4137c
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Apr 30 18:04:43 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 22 16:49:00 2018 +0800

----------------------------------------------------------------------
 .../tests/SequenceGeneratorSource.java          | 28 ++++++++++++++++++++
 1 file changed, 28 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0d5d086f/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
index e641551..40c0db5 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
@@ -91,7 +91,14 @@ public class SequenceGeneratorSource extends RichParallelSourceFunction<Event> i
 
 	@Override
 	public void run(SourceContext<Event> ctx) throws Exception {
+		if (keyRanges.size() > 0) {
+			runActive(ctx);
+		} else {
+			runIdle(ctx);
+		}
+	}
 
+	private void runActive(SourceContext<Event> ctx) throws Exception {
 		Random random = new Random();
 
 		// this holds the current event time, from which generated events can up to +/- (maxOutOfOrder).
@@ -133,6 +140,27 @@ public class SequenceGeneratorSource extends RichParallelSourceFunction<Event> i
 		}
 	}
 
+	private void runIdle(SourceContext<Event> ctx) throws Exception {
+		ctx.markAsTemporarilyIdle();
+
+		// just wait until this source is canceled
+		final Object waitLock = new Object();
+		while (running) {
+			try {
+				//noinspection SynchronizationOnLocalVariableOrMethodParameter
+				synchronized (waitLock) {
+					waitLock.wait();
+				}
+			}
+			catch (InterruptedException e) {
+				if (!running) {
+					// restore the interrupted state, and fall through the loop
+					Thread.currentThread().interrupt();
+				}
+			}
+		}
+	}
+
 	private long generateEventTimeWithOutOfOrderness(Random random, long correctTime) {
 		return correctTime - maxOutOfOrder + ((random.nextLong() & Long.MAX_VALUE) % (2 * maxOutOfOrder));
 	}


[10/17] flink git commit: [FLINK-9008] [e2e] Reuse flink-elasticsearch5-test job code as quickstart e2e test's modified job

Posted by tz...@apache.org.
[FLINK-9008] [e2e] Reuse flink-elasticsearch5-test job code as quickstart e2e test's modified job

Previously, the modified job used in the `test_quickstarts.sh` test
script is maintained as a new Maven module. This is an overkill, since all
we are doing is replacing the quickstart's contained job with something
more complex and with additional dependencies.

This commit changes this by simply reusing job code in
flink-elasticsearch5-test as the modified job,
which is copied to the quickstart project.

This closes #5823.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d718fff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d718fff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d718fff

Branch: refs/heads/release-1.5
Commit: 6d718fff633260026eaea56ab29e92d7f1faf5d9
Parents: a814a9e
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri May 18 18:45:57 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 22 16:53:38 2018 +0800

----------------------------------------------------------------------
 .../flink-quickstart-test/pom.xml               | 49 -----------
 .../quickstart/ElasticsearchStreamingJob.java   | 89 --------------------
 flink-end-to-end-tests/pom.xml                  |  1 -
 .../test-scripts/test_quickstarts.sh            | 69 ++++++---------
 4 files changed, 24 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6d718fff/flink-end-to-end-tests/flink-quickstart-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-quickstart-test/pom.xml b/flink-end-to-end-tests/flink-quickstart-test/pom.xml
deleted file mode 100644
index f06bd7b..0000000
--- a/flink-end-to-end-tests/flink-quickstart-test/pom.xml
+++ /dev/null
@@ -1,49 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-    -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<parent>
-		<artifactId>flink-end-to-end-tests</artifactId>
-		<groupId>org.apache.flink</groupId>
-		<version>1.5-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<artifactId>flink-quickstart-test</artifactId>
-	<name>flink-quickstart-test</name>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-elasticsearch2_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-	</dependencies>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/6d718fff/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java b/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
deleted file mode 100644
index e20e045..0000000
--- a/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.quickstart;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
-
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Elasticsearch example for Flink Streaming Job.
- *
- * <p>In this streaming job, we generate a bunch of data from numbers, apply operator map
- * made a type conversion. Then we choose elasticsearch as its sink to storage these data.
- *
- * <p>Run test_quickstarts.sh to verify this program. Package this class to a jar, verify the jar,
- * then deploy it on a flink cluster.
- */
-public class ElasticsearchStreamingJob {
-
-	public static void main(String[] args) throws Exception {
-		// set up the streaming execution environment
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<String> source = env.generateSequence(0, 20)
-			.map(new MapFunction<Long, String>() {
-				@Override
-				public String map(Long value) throws Exception {
-					return value.toString();
-				}});
-
-		Map<String, String> userConfig = new HashMap<>();
-		userConfig.put("cluster.name", "elasticsearch");
-		// This instructs the sink to emit after every element, otherwise they would be buffered
-		userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-		List<InetSocketAddress> transports = new ArrayList<>();
-		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-		source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>(){
-			@Override
-			public void process(String element, RuntimeContext ctx, org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer indexer) {
-				indexer.add(createIndexRequest(element));
-			}
-		}));
-
-		// execute program
-		env.execute("Flink Streaming Job of writing data to elasticsearch");
-	}
-
-	private static IndexRequest createIndexRequest(String element) {
-		Map<String, Object> json = new HashMap<>();
-		json.put("data", element);
-
-		return Requests.indexRequest()
-			.index("my-index")
-			.type("my-type")
-			.id(element)
-			.source(json);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6d718fff/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index ba011d7..04b8532 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -46,7 +46,6 @@ under the License.
 		<module>flink-elasticsearch1-test</module>
 		<module>flink-elasticsearch2-test</module>
 		<module>flink-elasticsearch5-test</module>
-		<module>flink-quickstart-test</module>
 	</modules>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/6d718fff/flink-end-to-end-tests/test-scripts/test_quickstarts.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_quickstarts.sh b/flink-end-to-end-tests/test-scripts/test_quickstarts.sh
index 8f7aaab..3f63226 100755
--- a/flink-end-to-end-tests/test-scripts/test_quickstarts.sh
+++ b/flink-end-to-end-tests/test-scripts/test_quickstarts.sh
@@ -20,6 +20,7 @@
 # End to end test for quick starts test.
 
 source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/elasticsearch-common.sh
 
 mkdir -p $TEST_DATA_DIR
 
@@ -37,18 +38,20 @@ mvn archetype:generate                             \
 
 cd flink-java-project
 
-cp $TEST_DATA_DIR/../../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java $TEST_DATA_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/
+# use the Flink Elasticsearch sink example job code in flink-elasticsearch5-tests to simulate modifications to contained job
+cp $TEST_INFRA_DIR/../flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java $TEST_DATA_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/
+sed -i -e 's/package org.apache.flink.streaming.tests;/package org.apache.flink.quickstart;/' $TEST_DATA_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/Elasticsearch5SinkExample.java
 
 position=$(awk '/<dependencies>/ {print NR}' pom.xml | head -1)
 
 sed -i -e ''"$(($position + 1))"'i\
 <dependency>\
 <groupId>org.apache.flink</groupId>\
-<artifactId>flink-connector-elasticsearch2_${scala.binary.version}</artifactId>\
+<artifactId>flink-connector-elasticsearch5_${scala.binary.version}</artifactId>\
 <version>${flink.version}</version>\
 </dependency>' pom.xml
 
-sed -i -e "s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/" pom.xml
+sed -i -e "s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.streaming.tests.Elasticsearch5SinkExample/" pom.xml
 
 mvn clean package -nsu
 
@@ -69,59 +72,35 @@ else
 fi
 
 if [[ `grep -c "org/apache/flink/quickstart/StreamingJob.class" contentsInJar.txt` -eq '0' && \
-      `grep -c "org/apache/flink/quickstart/ElasticsearchStreamingJob.class" contentsInJar.txt` -eq '0' && \
-      `grep -c "org/apache/flink/streaming/connectors/elasticsearch2" contentsInJar.txt` -eq '0' ]]; then
+      `grep -c "org/apache/flink/quickstart/Elasticsearch5SinkExample.class" contentsInJar.txt` -eq '0' && \
+      `grep -c "org/apache/flink/streaming/connectors/elasticsearch5" contentsInJar.txt` -eq '0' ]]; then
 
-    echo "Failure: Since ElasticsearchStreamingJob.class and other user classes are not included in the jar. "
+    echo "Failure: Since Elasticsearch5SinkExample.class and other user classes are not included in the jar. "
     PASS=""
     exit 1
 else
-    echo "Success: ElasticsearchStreamingJob.class and other user classes are included in the jar."
+    echo "Success: Elasticsearch5SinkExample.class and other user classes are included in the jar."
 fi
 
-ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz"
-
-curl "$ELASTICSEARCH_URL" > $TEST_DATA_DIR/elasticsearch.tar.gz
-tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $TEST_DATA_DIR/
-ELASTICSEARCH_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5
-
-nohup $ELASTICSEARCH_DIR/bin/elasticsearch &
-
-ELASTICSEARCH_PROCESS=$(jps | grep Elasticsearch | awk '{print $2}')
-
-# make sure the elasticsearch node is actually running
-if [ "$ELASTICSEARCH_PROCESS" != "Elasticsearch" ]; then
-  echo "Elasticsearch node is not running."
-  PASS=""
-  exit 1
-else
-  echo "Elasticsearch node is running."
-fi
-
-TEST_PROGRAM_JAR=$TEST_DATA_DIR/flink-java-project/target/flink-java-project-0.1.jar
-
-start_cluster
-
-# run the Flink job
-$FLINK_DIR/bin/flink run -c org.apache.flink.quickstart.ElasticsearchStreamingJob $TEST_PROGRAM_JAR
-
-curl 'localhost:9200/my-index/_search?q=*&pretty&size=21' > $TEST_DATA_DIR/output
-
-if [ -n "$(grep '\"total\" : 21' $TEST_DATA_DIR/output)" ]; then
-    echo "Quickstarts end to end test pass."
-else
-    echo "Quickstarts end to end test failed."
-    PASS=""
-    exit 1
-fi
+setup_elasticsearch "https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz"
+verify_elasticsearch_process_exist
 
 function shutdownAndCleanup {
-    pid=$(jps | grep Elasticsearch | awk '{print $1}')
-    kill -SIGTERM $pid
+    shutdown_elasticsearch_cluster
 
     # make sure to run regular cleanup as well
     cleanup
 }
-
 trap shutdownAndCleanup INT
 trap shutdownAndCleanup EXIT
+
+TEST_PROGRAM_JAR=$TEST_DATA_DIR/flink-java-project/target/flink-java-project-0.1.jar
+
+start_cluster
+
+$FLINK_DIR/bin/flink run -c org.apache.flink.quickstart.Elasticsearch5SinkExample $TEST_PROGRAM_JAR \
+  --numRecords 20 \
+  --index index \
+  --type type
+
+verify_result 20


[09/17] flink git commit: [FLINK-9008] [e2e] Implements quickstarts end to end test

Posted by tz...@apache.org.
[FLINK-9008] [e2e] Implements quickstarts end to end test


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a814a9e9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a814a9e9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a814a9e9

Branch: refs/heads/release-1.5
Commit: a814a9e9836ef401d95602129aaf7eb71f59f84b
Parents: 3e434ee
Author: zhangminglei <zm...@163.com>
Authored: Thu May 17 21:05:10 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 22 16:52:32 2018 +0800

----------------------------------------------------------------------
 .../flink-quickstart-test/pom.xml               |  49 +++++++
 .../quickstart/ElasticsearchStreamingJob.java   |  89 +++++++++++++
 flink-end-to-end-tests/pom.xml                  |   1 +
 flink-end-to-end-tests/run-nightly-tests.sh     |   8 ++
 .../test-scripts/test_quickstarts.sh            | 127 +++++++++++++++++++
 5 files changed, 274 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a814a9e9/flink-end-to-end-tests/flink-quickstart-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-quickstart-test/pom.xml b/flink-end-to-end-tests/flink-quickstart-test/pom.xml
new file mode 100644
index 0000000..f06bd7b
--- /dev/null
+++ b/flink-end-to-end-tests/flink-quickstart-test/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+    -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<parent>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.5-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-quickstart-test</artifactId>
+	<name>flink-quickstart-test</name>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch2_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+	</dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/a814a9e9/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java b/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
new file mode 100644
index 0000000..e20e045
--- /dev/null
+++ b/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.quickstart;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Elasticsearch example for Flink Streaming Job.
+ *
+ * <p>In this streaming job, we generate a bunch of data from numbers, apply operator map
+ * made a type conversion. Then we choose elasticsearch as its sink to storage these data.
+ *
+ * <p>Run test_quickstarts.sh to verify this program. Package this class to a jar, verify the jar,
+ * then deploy it on a flink cluster.
+ */
+public class ElasticsearchStreamingJob {
+
+	public static void main(String[] args) throws Exception {
+		// set up the streaming execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<String> source = env.generateSequence(0, 20)
+			.map(new MapFunction<Long, String>() {
+				@Override
+				public String map(Long value) throws Exception {
+					return value.toString();
+				}});
+
+		Map<String, String> userConfig = new HashMap<>();
+		userConfig.put("cluster.name", "elasticsearch");
+		// This instructs the sink to emit after every element, otherwise they would be buffered
+		userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+		List<InetSocketAddress> transports = new ArrayList<>();
+		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+		source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>(){
+			@Override
+			public void process(String element, RuntimeContext ctx, org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer indexer) {
+				indexer.add(createIndexRequest(element));
+			}
+		}));
+
+		// execute program
+		env.execute("Flink Streaming Job of writing data to elasticsearch");
+	}
+
+	private static IndexRequest createIndexRequest(String element) {
+		Map<String, Object> json = new HashMap<>();
+		json.put("data", element);
+
+		return Requests.indexRequest()
+			.index("my-index")
+			.type("my-type")
+			.id(element)
+			.source(json);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a814a9e9/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 04b8532..ba011d7 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -46,6 +46,7 @@ under the License.
 		<module>flink-elasticsearch1-test</module>
 		<module>flink-elasticsearch2-test</module>
 		<module>flink-elasticsearch5-test</module>
+		<module>flink-quickstart-test</module>
 	</modules>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/a814a9e9/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 0ec3492..2a15e6d 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -191,5 +191,13 @@ if [ $EXIT_CODE == 0 ]; then
   EXIT_CODE=$?
 fi
 
+if [ $EXIT_CODE == 0 ]; then
+  printf "\n==============================================================================\n"
+  printf "Running Quickstarts nightly end-to-end test\n"
+  printf "==============================================================================\n"
+  $END_TO_END_DIR/test-scripts/test_quickstarts.sh
+  EXIT_CODE=$?
+fi
+
 # Exit code for Travis build success/failure
 exit $EXIT_CODE

http://git-wip-us.apache.org/repos/asf/flink/blob/a814a9e9/flink-end-to-end-tests/test-scripts/test_quickstarts.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_quickstarts.sh b/flink-end-to-end-tests/test-scripts/test_quickstarts.sh
new file mode 100755
index 0000000..8f7aaab
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_quickstarts.sh
@@ -0,0 +1,127 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# End to end test for quick starts test.
+
+source "$(dirname "$0")"/common.sh
+
+mkdir -p $TEST_DATA_DIR
+
+cd $TEST_DATA_DIR
+
+mvn archetype:generate                             \
+    -DarchetypeGroupId=org.apache.flink            \
+    -DarchetypeArtifactId=flink-quickstart-java    \
+    -DarchetypeVersion=1.6-SNAPSHOT                \
+    -DgroupId=org.apache.flink.quickstart          \
+    -DartifactId=flink-java-project                \
+    -Dversion=0.1                                  \
+    -Dpackage=org.apache.flink.quickstart          \
+    -DinteractiveMode=false
+
+cd flink-java-project
+
+cp $TEST_DATA_DIR/../../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java $TEST_DATA_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/
+
+position=$(awk '/<dependencies>/ {print NR}' pom.xml | head -1)
+
+sed -i -e ''"$(($position + 1))"'i\
+<dependency>\
+<groupId>org.apache.flink</groupId>\
+<artifactId>flink-connector-elasticsearch2_${scala.binary.version}</artifactId>\
+<version>${flink.version}</version>\
+</dependency>' pom.xml
+
+sed -i -e "s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/" pom.xml
+
+mvn clean package -nsu
+
+cd target
+jar tvf flink-java-project-0.1.jar > contentsInJar.txt
+
+if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \
+      `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' && \
+      `grep -c "org/apache/flink/streaming/experimental" contentsInJar.txt` -eq '0' && \
+      `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq '0' && \
+      `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq '0' ]]; then
+
+    echo "Success: There are no flink core classes are contained in the jar."
+else
+    echo "Failure: There are flink core classes are contained in the jar."
+    PASS=""
+    exit 1
+fi
+
+if [[ `grep -c "org/apache/flink/quickstart/StreamingJob.class" contentsInJar.txt` -eq '0' && \
+      `grep -c "org/apache/flink/quickstart/ElasticsearchStreamingJob.class" contentsInJar.txt` -eq '0' && \
+      `grep -c "org/apache/flink/streaming/connectors/elasticsearch2" contentsInJar.txt` -eq '0' ]]; then
+
+    echo "Failure: Since ElasticsearchStreamingJob.class and other user classes are not included in the jar. "
+    PASS=""
+    exit 1
+else
+    echo "Success: ElasticsearchStreamingJob.class and other user classes are included in the jar."
+fi
+
+ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz"
+
+curl "$ELASTICSEARCH_URL" > $TEST_DATA_DIR/elasticsearch.tar.gz
+tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $TEST_DATA_DIR/
+ELASTICSEARCH_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5
+
+nohup $ELASTICSEARCH_DIR/bin/elasticsearch &
+
+ELASTICSEARCH_PROCESS=$(jps | grep Elasticsearch | awk '{print $2}')
+
+# make sure the elasticsearch node is actually running
+if [ "$ELASTICSEARCH_PROCESS" != "Elasticsearch" ]; then
+  echo "Elasticsearch node is not running."
+  PASS=""
+  exit 1
+else
+  echo "Elasticsearch node is running."
+fi
+
+TEST_PROGRAM_JAR=$TEST_DATA_DIR/flink-java-project/target/flink-java-project-0.1.jar
+
+start_cluster
+
+# run the Flink job
+$FLINK_DIR/bin/flink run -c org.apache.flink.quickstart.ElasticsearchStreamingJob $TEST_PROGRAM_JAR
+
+curl 'localhost:9200/my-index/_search?q=*&pretty&size=21' > $TEST_DATA_DIR/output
+
+if [ -n "$(grep '\"total\" : 21' $TEST_DATA_DIR/output)" ]; then
+    echo "Quickstarts end to end test pass."
+else
+    echo "Quickstarts end to end test failed."
+    PASS=""
+    exit 1
+fi
+
+function shutdownAndCleanup {
+    pid=$(jps | grep Elasticsearch | awk '{print $1}')
+    kill -SIGTERM $pid
+
+    # make sure to run regular cleanup as well
+    cleanup
+}
+
+trap shutdownAndCleanup INT
+trap shutdownAndCleanup EXIT


[02/17] flink git commit: [FLINK-8971] [e2e-tests] Include broadcast / union state in general purpose DataStream job

Posted by tz...@apache.org.
[FLINK-8971] [e2e-tests] Include broadcast / union state in general purpose DataStream job

This closes #5941.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef6e40f0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef6e40f0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef6e40f0

Branch: refs/heads/release-1.5
Commit: ef6e40f008e9a25eb2ebbe86ef256cd4bf254663
Parents: 0d5d086
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Apr 30 18:05:46 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 22 16:49:05 2018 +0800

----------------------------------------------------------------------
 .../tests/DataStreamAllroundTestJobFactory.java |   7 +
 .../tests/DataStreamAllroundTestProgram.java    |  28 +++-
 .../ArtificalOperatorStateMapper.java           | 159 +++++++++++++++++++
 .../test-scripts/test_resume_savepoint.sh       |  19 +++
 4 files changed, 205 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ef6e40f0/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index 2577460..c2e4cf5 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
 import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.tests.artificialstate.ArtificalOperatorStateMapper;
 import org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateMapper;
 import org.apache.flink.streaming.tests.artificialstate.builder.ArtificialListStateBuilder;
 import org.apache.flink.streaming.tests.artificialstate.builder.ArtificialStateBuilder;
@@ -285,6 +286,12 @@ class DataStreamAllroundTestJobFactory {
 		return new ArtificialKeyedStateMapper<>(mapFunction, artificialStateBuilders);
 	}
 
+	static <IN, OUT> ArtificalOperatorStateMapper<IN, OUT> createArtificialOperatorStateMapper(
+		MapFunction<IN, OUT> mapFunction) {
+
+		return new ArtificalOperatorStateMapper<>(mapFunction);
+	}
+
 	static <IN, STATE> ArtificialStateBuilder<IN> createValueStateBuilder(
 		JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
 		TypeSerializer<STATE> typeSerializer) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ef6e40f0/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
index 75c14e5..5ae1d16 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
 import org.apache.flink.streaming.tests.artificialstate.ComplexPayload;
@@ -29,6 +30,7 @@ import org.apache.flink.streaming.tests.artificialstate.ComplexPayload;
 import java.util.Collections;
 
 import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper;
+import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialOperatorStateMapper;
 import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource;
 import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
 import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
@@ -43,13 +45,17 @@ import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.
  *     <li>A generic Kryo input type.</li>
  *     <li>A state type for which we register a {@link KryoSerializer}.</li>
  *     <li>Operators with {@link ValueState}.</li>
+ *     <li>Operators with union state.</li>
+ *     <li>Operators with broadcast state.</li>
  * </ul>
  *
  * <p>The cli job configuration options are described in {@link DataStreamAllroundTestJobFactory}.
  *
  */
 public class DataStreamAllroundTestProgram {
-	private static final String STATE_OPER_NAME = "ArtificalKeyedStateMapper";
+	private static final String KEYED_STATE_OPER_NAME = "ArtificalKeyedStateMapper";
+	private static final String OPERATOR_STATE_OPER_NAME = "ArtificalOperatorStateMapper";
+	private static final String SEMANTICS_CHECK_MAPPER_NAME = "SemanticsCheckMapper";
 
 	public static void main(String[] args) throws Exception {
 		final ParameterTool pt = ParameterTool.fromArgs(args);
@@ -58,7 +64,7 @@ public class DataStreamAllroundTestProgram {
 
 		setupEnvironment(env, pt);
 
-		env.addSource(createEventSource(pt))
+		DataStream<Event> eventStream = env.addSource(createEventSource(pt))
 			.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
 			.keyBy(Event::getKey)
 			.map(createArtificialKeyedStateMapper(
@@ -66,20 +72,26 @@ public class DataStreamAllroundTestProgram {
 					(MapFunction<Event, Event>) in -> in,
 					// state is verified and updated per event as a wrapped ComplexPayload state object
 					(Event first, ComplexPayload second) -> {
-							if (second != null && !second.getStrPayload().equals(STATE_OPER_NAME)) {
+							if (second != null && !second.getStrPayload().equals(KEYED_STATE_OPER_NAME)) {
 								System.out.println("State is set or restored incorrectly");
 							}
-							return new ComplexPayload(first, STATE_OPER_NAME);
+							return new ComplexPayload(first, KEYED_STATE_OPER_NAME);
 						},
 					Collections.singletonList(
 						new KryoSerializer<>(ComplexPayload.class, env.getConfig()))
 				)
 			)
-			.name(STATE_OPER_NAME)
-			.returns(Event.class)
-			.keyBy(Event::getKey)
+			.name(KEYED_STATE_OPER_NAME)
+			.returns(Event.class);
+
+		DataStream<Event> eventStream2 = eventStream
+			.map(createArtificialOperatorStateMapper((MapFunction<Event, Event>) in -> in))
+			.name(OPERATOR_STATE_OPER_NAME)
+			.returns(Event.class);
+
+		eventStream2.keyBy(Event::getKey)
 			.flatMap(createSemanticsCheckMapper(pt))
-			.name("SemanticsCheckMapper")
+			.name(SEMANTICS_CHECK_MAPPER_NAME)
 			.addSink(new PrintSinkFunction<>());
 
 		env.execute("General purpose test job");

http://git-wip-us.apache.org/repos/asf/flink/blob/ef6e40f0/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificalOperatorStateMapper.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificalOperatorStateMapper.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificalOperatorStateMapper.java
new file mode 100644
index 0000000..61501ea
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificalOperatorStateMapper.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.artificialstate;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.BroadcastState;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A self-verifiable {@link RichMapFunction} used to verify checkpointing and restore semantics for various
+ * kinds of operator state.
+ *
+ * <p>For verifying broadcast state, the each subtask stores as broadcast state a map of (Integer, String) entries,
+ * key being the subtask index, and value being a String that corresponds to the subtask index. The total number
+ * of subtasks is also stored as broadcast state. On restore, each subtask should be restored with exactly the same
+ * broadcast state, with one entry for each subtask in the previous run.
+ *
+ * <p>For verifying union state, each subtask of this operator stores its own subtask index as a subset of the whole
+ * union state. On restore, each subtask's restored union state should have one entry for each subtask in the previous
+ * run.
+ *
+ * <p>All input elements to the operator arre simply passed through a user-provided map function and emitted.
+ */
+public class ArtificalOperatorStateMapper<IN, OUT> extends RichMapFunction<IN, OUT> implements CheckpointedFunction {
+
+	private static final long serialVersionUID = -1741298597425077761L;
+
+	// ============================================================================
+	//  State names
+	// ============================================================================
+
+	private static final String LAST_NUM_SUBTASKS_STATE_NAME = "lastNumSubtasksState";
+	private static final String BROADCAST_STATE_NAME = "broadcastState";
+	private static final String UNION_STATE_NAME = "unionState";
+
+	// ============================================================================
+	//  Keys used in broadcast states
+	// ============================================================================
+
+	private static final String LAST_NUM_SUBTASKS_STATE_KEY = "lastNumSubtasks";
+	private static final String BROADCAST_STATE_ENTRY_VALUE_PREFIX = "broadcastStateEntry-";
+
+	private final MapFunction<IN, OUT> mapFunction;
+
+	private transient BroadcastState<String, Integer> lastNumSubtasksBroadcastState;
+
+	private transient BroadcastState<Integer, String> broadcastElementsState;
+	private transient ListState<Integer> unionElementsState;
+
+	public ArtificalOperatorStateMapper(MapFunction<IN, OUT> mapFunction) {
+		this.mapFunction = Preconditions.checkNotNull(mapFunction);
+	}
+
+	@Override
+	public OUT map(IN value) throws Exception {
+		return mapFunction.map(value);
+	}
+
+	@Override
+	public void initializeState(FunctionInitializationContext context) throws Exception {
+		this.lastNumSubtasksBroadcastState = context.getOperatorStateStore()
+			.getBroadcastState(new MapStateDescriptor<>(LAST_NUM_SUBTASKS_STATE_NAME, StringSerializer.INSTANCE, IntSerializer.INSTANCE));
+
+		this.broadcastElementsState = context.getOperatorStateStore()
+			.getBroadcastState(new MapStateDescriptor<>(BROADCAST_STATE_NAME, IntSerializer.INSTANCE, StringSerializer.INSTANCE));
+
+		this.unionElementsState = context.getOperatorStateStore()
+			.getUnionListState(new ListStateDescriptor<>(UNION_STATE_NAME, IntSerializer.INSTANCE));
+
+		if (context.isRestored()) {
+			Integer lastNumSubtasks = lastNumSubtasksBroadcastState.get(LAST_NUM_SUBTASKS_STATE_KEY);
+			Preconditions.checkState(lastNumSubtasks != null);
+
+			// -- verification for broadcast state --
+			Set<Integer> expected = new HashSet<>();
+			for (int i = 0; i < lastNumSubtasks; i++) {
+				expected.add(i);
+			}
+
+			for (Map.Entry<Integer, String> broadcastElementEntry : broadcastElementsState.entries()) {
+				int key = broadcastElementEntry.getKey();
+				Preconditions.checkState(expected.remove(key), "Unexpected keys in restored broadcast state.");
+				Preconditions.checkState(broadcastElementEntry.getValue().equals(getBroadcastStateEntryValue(key)), "Incorrect value in restored broadcast state.");
+			}
+
+			Preconditions.checkState(expected.size() == 0, "Missing keys in restored broadcast state.");
+
+			// -- verification for union state --
+			for (int i = 0; i < lastNumSubtasks; i++) {
+				expected.add(i);
+			}
+
+			for (Integer subtaskIndex : unionElementsState.get()) {
+				Preconditions.checkState(expected.remove(subtaskIndex), "Unexpected element in restored union state.");
+			}
+			Preconditions.checkState(expected.size() == 0, "Missing elements in restored union state.");
+		} else {
+			// verify that the broadcast / union state is actually empty if this is not a restored run, as told by the state context;
+			// this catches incorrect logic with the context.isRestored() when using broadcast state / union state.
+
+			Preconditions.checkState(!lastNumSubtasksBroadcastState.iterator().hasNext());
+			Preconditions.checkState(!broadcastElementsState.iterator().hasNext());
+			Preconditions.checkState(!unionElementsState.get().iterator().hasNext());
+		}
+	}
+
+	@Override
+	public void snapshotState(FunctionSnapshotContext context) throws Exception {
+		final int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+		final int thisSubtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+
+		// store total number of subtasks as broadcast state
+		lastNumSubtasksBroadcastState.clear();
+		lastNumSubtasksBroadcastState.put(LAST_NUM_SUBTASKS_STATE_KEY, numSubtasks);
+
+		// populate broadcast state (identical across all subtasks)
+		broadcastElementsState.clear();
+		for (int i = 0; i < numSubtasks; i++) {
+			broadcastElementsState.put(i, getBroadcastStateEntryValue(i));
+		}
+
+		// each subtask only stores its own subtask index as a subset of the union set
+		unionElementsState.clear();
+		unionElementsState.add(thisSubtaskIndex);
+	}
+
+	private String getBroadcastStateEntryValue(int thisSubtaskIndex) {
+		return BROADCAST_STATE_ENTRY_VALUE_PREFIX + thisSubtaskIndex;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ef6e40f0/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
index b19aa61..2060a87 100755
--- a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
+++ b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
@@ -17,6 +17,25 @@
 # limitations under the License.
 ################################################################################
 
+################################################################################
+# This end-to-end test verifies that manually taking a savepoint of a running
+# job and resuming from it works properly. It allows resuming the job with
+# a different parallelism than the original execution.
+#
+# Using the general purpose DataStream job, the test covers savepointing and
+# resuming when using different state backends (file, RocksDB), as well as the
+# following types of states:
+#  - Operator re-partitionable list state
+#  - Broadcast state
+#  - Union state
+#  - Keyed state (ValueState)
+#
+# The general purpose DataStream job is self-verifiable, such that if any
+# unexpected error occurs during savepoints or restores, exceptions will be
+# thrown; if exactly-once is violated, alerts will be sent to output (and
+# caught by the test script to fail the job).
+################################################################################
+
 if [ -z $1 ] || [ -z $2 ]; then
   echo "Usage: ./test_resume_savepoint.sh <original_dop> <new_dop> <state_backend_setting> <state_backend_file_async_setting>"
   exit 1


[05/17] flink git commit: [FLINK-8977] [e2e] Allow configuring restart strategy for general purpose DataStream job

Posted by tz...@apache.org.
[FLINK-8977] [e2e] Allow configuring restart strategy for general purpose DataStream job


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/22e400dd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/22e400dd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/22e400dd

Branch: refs/heads/release-1.5
Commit: 22e400dded26ea4b0cfebf2245572f3ca7b480a9
Parents: d7ec5a9
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon May 14 11:56:07 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 22 16:49:26 2018 +0800

----------------------------------------------------------------------
 .../tests/DataStreamAllroundTestJobFactory.java | 42 ++++++++++++++++----
 1 file changed, 35 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/22e400dd/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index 05bbc77..4710100 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -66,7 +66,9 @@ import java.util.List;
  *     <li>environment.externalize_checkpoint.cleanup (String, default - 'retain'): Configures the cleanup mode for externalized checkpoints. Can be 'retain' or 'delete'.</li>
  *     <li>environment.parallelism (int, default - 1): parallelism to use for the job.</li>
  *     <li>environment.max_parallelism (int, default - 128): max parallelism to use for the job</li>
- *     <li>environment.restart_strategy.delay (long, default - 0): delay between restart attempts, in milliseconds.</li>
+ *     <li>environment.restart_strategy (String, default - 'fixed_delay'): The failure restart strategy to use. Can be 'fixed_delay' or 'no_restart'.</li>
+ *     <li>environment.restart_strategy.fixed_delay.attempts (Integer, default - Integer.MAX_VALUE): The number of allowed attempts to restart the job, when using 'fixed_delay' restart.</li>
+ *     <li>environment.restart_strategy.fixed_delay.delay (long, default - 0): delay between restart attempts, in milliseconds, when using 'fixed_delay' restart.</li>
  *     <li>state_backend (String, default - 'file'): Supported values are 'file' for FsStateBackend and 'rocks' for RocksDBStateBackend.</li>
  *     <li>state_backend.checkpoint_directory (String): The checkpoint directory.</li>
  *     <li>state_backend.rocks.incremental (boolean, default - false): Activate or deactivate incremental snapshots if RocksDBStateBackend is selected.</li>
@@ -124,9 +126,17 @@ class DataStreamAllroundTestJobFactory {
 		.key("environment.max_parallelism")
 		.defaultValue(128);
 
-	private static final ConfigOption<Integer> ENVIRONMENT_RESTART_DELAY = ConfigOptions
-		.key("environment.restart_strategy.delay")
-		.defaultValue(0);
+	private static final ConfigOption<String> ENVIRONMENT_RESTART_STRATEGY = ConfigOptions
+		.key("environment.restart_strategy")
+		.defaultValue("fixed_delay");
+
+	private static final ConfigOption<Integer> ENVIRONMENT_RESTART_STRATEGY_FIXED_ATTEMPTS = ConfigOptions
+		.key("environment.restart_strategy.fixed_delay.attempts")
+		.defaultValue(Integer.MAX_VALUE);
+
+	private static final ConfigOption<Long> ENVIRONMENT_RESTART_STRATEGY_FIXED_DELAY = ConfigOptions
+		.key("environment.restart_strategy.fixed.delay")
+		.defaultValue(0L);
 
 	private static final ConfigOption<Boolean> ENVIRONMENT_EXTERNALIZE_CHECKPOINT = ConfigOptions
 		.key("environment.externalize_checkpoint")
@@ -199,9 +209,27 @@ class DataStreamAllroundTestJobFactory {
 		env.setMaxParallelism(pt.getInt(ENVIRONMENT_MAX_PARALLELISM.key(), ENVIRONMENT_MAX_PARALLELISM.defaultValue()));
 
 		// restart strategy
-		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
-			Integer.MAX_VALUE,
-			pt.getInt(ENVIRONMENT_RESTART_DELAY.key(), ENVIRONMENT_RESTART_DELAY.defaultValue())));
+		String restartStrategyConfig = pt.get(ENVIRONMENT_RESTART_STRATEGY.key());
+		if (restartStrategyConfig != null) {
+			RestartStrategies.RestartStrategyConfiguration restartStrategy;
+			switch (restartStrategyConfig) {
+				case "fixed_delay":
+					restartStrategy = RestartStrategies.fixedDelayRestart(
+						pt.getInt(
+							ENVIRONMENT_RESTART_STRATEGY_FIXED_ATTEMPTS.key(),
+							ENVIRONMENT_RESTART_STRATEGY_FIXED_ATTEMPTS.defaultValue()),
+						pt.getLong(
+							ENVIRONMENT_RESTART_STRATEGY_FIXED_DELAY.key(),
+							ENVIRONMENT_RESTART_STRATEGY_FIXED_DELAY.defaultValue()));
+					break;
+				case "no_restart":
+					restartStrategy = RestartStrategies.noRestart();
+					break;
+				default:
+					throw new IllegalArgumentException("Unkown restart strategy: " + restartStrategyConfig);
+			}
+			env.setRestartStrategy(restartStrategy);
+		}
 
 		// state backend
 		final String stateBackend = pt.get(


[11/17] flink git commit: [hotfix] [e2e] Properly backup Flink config in externalized checkpoint e2e test

Posted by tz...@apache.org.
[hotfix] [e2e] Properly backup Flink config in externalized checkpoint e2e test


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8224a7b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8224a7b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8224a7b8

Branch: refs/heads/release-1.5
Commit: 8224a7b824cbabb57da60ede10d8eafafae188ad
Parents: 6d718ff
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Tue May 22 16:41:13 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 22 16:54:03 2018 +0800

----------------------------------------------------------------------
 .../test-scripts/test_resume_externalized_checkpoints.sh            | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8224a7b8/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
index 3994e30..f9ce9d6 100755
--- a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
+++ b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
@@ -23,6 +23,7 @@ STATE_BACKEND_TYPE=${1:-file}
 STATE_BACKEND_FILE_ASYNC=${2:-true}
 SIMULATE_FAILURE=${3:-false}
 
+backup_config
 setup_flink_slf4j_metric_reporter
 start_cluster
 


[08/17] flink git commit: [FLINK-8989] [e2e] Cleanup / improve Elasticsearch e2e tests

Posted by tz...@apache.org.
[FLINK-8989] [e2e] Cleanup / improve Elasticsearch e2e tests

- Rework e2e test job modules to have correct Maven POM
- Parameterize num of records to write to Elasticsearch
- Parameterize Elasticsearch download URL and version in test script
- Improve robustness of test
- Move more Elasticsearch functionality to elasticsearch-common.sh

This closes #5761.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e434eeb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e434eeb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e434eeb

Branch: refs/heads/release-1.5
Commit: 3e434eeb1dd66f568d859b999360ebc6769cade1
Parents: a7abfcb
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Tue May 22 15:10:32 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 22 16:51:38 2018 +0800

----------------------------------------------------------------------
 .../flink-elasticsearch1-test/pom.xml           |  45 ++------
 .../tests/Elasticsearch1SinkExample.java        |  18 +--
 .../flink-elasticsearch2-test/pom.xml           |  65 ++---------
 .../tests/Elasticsearch2SinkExample.java        |  17 +--
 .../flink-elasticsearch5-test/pom.xml           |  78 ++-----------
 .../tests/Elasticsearch5SinkExample.java        |  18 +--
 flink-end-to-end-tests/run-nightly-tests.sh     |  23 +++-
 .../test-scripts/elasticsearch-common.sh        |  48 +++++---
 .../test_streaming_elasticsearch.sh             |  51 +++++++++
 .../test_streaming_elasticsearch125.sh          | 109 -------------------
 10 files changed, 167 insertions(+), 305 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3e434eeb/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml b/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml
index 1960f05..6ac8e71 100644
--- a/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml
+++ b/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml
@@ -21,16 +21,16 @@ under the License.
 		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 
+	<modelVersion>4.0.0</modelVersion>
+
 	<parent>
-		<artifactId>flink-end-to-end-tests</artifactId>
 		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-end-to-end-tests</artifactId>
 		<version>1.5-SNAPSHOT</version>
 		<relativePath>..</relativePath>
 	</parent>
 
-	<modelVersion>4.0.0</modelVersion>
-
-	<artifactId>flink-elasticsearch1-test_${scala.binary.version}</artifactId>
+	<artifactId>flink-elasticsearch1-test</artifactId>
 	<name>flink-elasticsearch1-test</name>
 	<packaging>jar</packaging>
 
@@ -41,7 +41,6 @@ under the License.
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
-
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-connector-elasticsearch_${scala.binary.version}</artifactId>
@@ -56,26 +55,18 @@ under the License.
 				<artifactId>maven-shade-plugin</artifactId>
 				<version>3.0.0</version>
 				<executions>
-					<!-- Elasticsearch1Sink end to end example -->
 					<execution>
 						<phase>package</phase>
 						<goals>
 							<goal>shade</goal>
 						</goals>
 						<configuration>
-							<minimizeJar>true</minimizeJar>
+							<finalName>Elasticsearch1SinkExample</finalName>
 							<artifactSet>
 								<excludes>
 									<exclude>com.google.code.findbugs:jsr305</exclude>
-									<exclude>org.slf4j:*</exclude>
-									<exclude>log4j:*</exclude>
 								</excludes>
 							</artifactSet>
-							<transformers>
-								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-									<mainClass>org.apache.flink.streaming.tests.Elasticsearch1SinkExample</mainClass>
-								</transformer>
-							</transformers>
 							<filters>
 								<filter>
 									<artifact>*:*</artifact>
@@ -86,27 +77,11 @@ under the License.
 									</excludes>
 								</filter>
 							</filters>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-
-			<!--simplify the name of the testing JAR for referring to it in test_streaming_elasticsearch1.sh scripts-->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-antrun-plugin</artifactId>
-				<version>1.7</version>
-				<executions>
-					<execution>
-						<id>rename</id>
-						<phase>package</phase>
-						<goals>
-							<goal>run</goal>
-						</goals>
-						<configuration>
-							<target>
-								<copy file="${project.basedir}/target/flink-elasticsearch1-test_${scala.binary.version}-${project.version}.jar" tofile="${project.basedir}/target/Elasticsearch1SinkExample.jar" />
-							</target>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.apache.flink.streaming.tests.Elasticsearch1SinkExample</mainClass>
+								</transformer>
+							</transformers>
 						</configuration>
 					</execution>
 				</executions>

http://git-wip-us.apache.org/repos/asf/flink/blob/3e434eeb/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
index bfdb806..18fa05a 100644
--- a/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
+++ b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
@@ -41,13 +41,14 @@ import java.util.Map;
  * End to end test for Elasticsearch1Sink.
  */
 public class Elasticsearch1SinkExample {
+
 	public static void main(String[] args) throws Exception {
 
 		final ParameterTool parameterTool = ParameterTool.fromArgs(args);
 
-		if (parameterTool.getNumberOfParameters() < 2) {
+		if (parameterTool.getNumberOfParameters() < 3) {
 			System.out.println("Missing parameters!\n" +
-				"Usage: --index <index> --type <type>");
+				"Usage: --numRecords <numRecords> --index <index> --type <type>");
 			return;
 		}
 
@@ -55,12 +56,13 @@ public class Elasticsearch1SinkExample {
 		env.getConfig().disableSysoutLogging();
 		env.enableCheckpointing(5000);
 
-		DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
-			@Override
-			public String map(Long value) throws Exception {
-				return "message # " + value;
-			}
-		});
+		DataStream<String> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
+			.map(new MapFunction<Long, String>() {
+				@Override
+				public String map(Long value) throws Exception {
+					return "message # " + value;
+				}
+			});
 
 		Map<String, String> userConfig = new HashMap<>();
 		userConfig.put("cluster.name", "elasticsearch");

http://git-wip-us.apache.org/repos/asf/flink/blob/3e434eeb/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml b/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml
index 4fd93de..d6a1abf 100644
--- a/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml
+++ b/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml
@@ -20,16 +20,17 @@ under the License.
 <project xmlns="http://maven.apache.org/POM/4.0.0"
 		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
 	<parent>
-		<artifactId>flink-end-to-end-tests</artifactId>
 		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-end-to-end-tests</artifactId>
 		<version>1.5-SNAPSHOT</version>
 		<relativePath>..</relativePath>
 	</parent>
 
-	<modelVersion>4.0.0</modelVersion>
-
-	<artifactId>flink-elasticsearch2-test_${scala.binary.version}</artifactId>
+	<artifactId>flink-elasticsearch2-test</artifactId>
 	<name>flink-elasticsearch2-test</name>
 	<packaging>jar</packaging>
 
@@ -40,31 +41,11 @@ under the License.
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
-
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-connector-elasticsearch2_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<!-- Remove elasticsearch1.7.1 -->
-			<exclusions>
-				<exclusion>
-					<groupId>org.elasticsearch</groupId>
-					<artifactId>elasticsearch</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-
-		<dependency>
-			<groupId>org.elasticsearch</groupId>
-			<artifactId>elasticsearch</artifactId>
-			<version>2.3.5</version>
-		</dependency>
 	</dependencies>
 
 	<build>
@@ -74,26 +55,18 @@ under the License.
 				<artifactId>maven-shade-plugin</artifactId>
 				<version>3.0.0</version>
 				<executions>
-					<!-- Elasticsearch2Sink end to end example -->
 					<execution>
 						<phase>package</phase>
 						<goals>
 							<goal>shade</goal>
 						</goals>
 						<configuration>
-							<minimizeJar>true</minimizeJar>
+							<finalName>Elasticsearch2SinkExample</finalName>
 							<artifactSet>
 								<excludes>
 									<exclude>com.google.code.findbugs:jsr305</exclude>
-									<exclude>org.slf4j:*</exclude>
-									<exclude>log4j:*</exclude>
 								</excludes>
 							</artifactSet>
-							<transformers>
-								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-									<mainClass>org.apache.flink.streaming.tests.Elasticsearch2SinkExample</mainClass>
-								</transformer>
-							</transformers>
 							<filters>
 								<filter>
 									<artifact>*:*</artifact>
@@ -104,27 +77,11 @@ under the License.
 									</excludes>
 								</filter>
 							</filters>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-
-			<!--simplify the name of the testing JAR for referring to it in test_streaming_elasticsearch2.sh scripts-->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-antrun-plugin</artifactId>
-				<version>1.7</version>
-				<executions>
-					<execution>
-						<id>rename</id>
-						<phase>package</phase>
-						<goals>
-							<goal>run</goal>
-						</goals>
-						<configuration>
-							<target>
-								<copy file="${project.basedir}/target/flink-elasticsearch2-test_${scala.binary.version}-${project.version}.jar" tofile="${project.basedir}/target/Elasticsearch2SinkExample.jar" />
-							</target>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.apache.flink.streaming.tests.Elasticsearch2SinkExample</mainClass>
+								</transformer>
+							</transformers>
 						</configuration>
 					</execution>
 				</executions>

http://git-wip-us.apache.org/repos/asf/flink/blob/3e434eeb/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
index 4ec03aa..f7532b1 100644
--- a/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
+++ b/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
@@ -44,9 +44,9 @@ public class Elasticsearch2SinkExample {
 
 		final ParameterTool parameterTool = ParameterTool.fromArgs(args);
 
-		if (parameterTool.getNumberOfParameters() < 2) {
+		if (parameterTool.getNumberOfParameters() < 3) {
 			System.out.println("Missing parameters!\n" +
-				"Usage: --index <index> --type <type>");
+				"Usage: --numRecords --index <index> --type <type>");
 			return;
 		}
 
@@ -54,12 +54,13 @@ public class Elasticsearch2SinkExample {
 		env.getConfig().disableSysoutLogging();
 		env.enableCheckpointing(5000);
 
-		DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
-			@Override
-			public String map(Long value) throws Exception {
-				return "message #" + value;
-			}
-		});
+		DataStream<String> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
+			.map(new MapFunction<Long, String>() {
+				@Override
+				public String map(Long value) throws Exception {
+					return "message #" + value;
+				}
+			});
 
 		Map<String, String> userConfig = new HashMap<>();
 		userConfig.put("cluster.name", "elasticsearch");

http://git-wip-us.apache.org/repos/asf/flink/blob/3e434eeb/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml b/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml
index 3a1e734..33241b0 100644
--- a/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml
+++ b/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml
@@ -20,15 +20,17 @@ under the License.
 <project xmlns="http://maven.apache.org/POM/4.0.0"
 		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
 	<parent>
-		<artifactId>flink-end-to-end-tests</artifactId>
 		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-end-to-end-tests</artifactId>
 		<version>1.5-SNAPSHOT</version>
 		<relativePath>..</relativePath>
 	</parent>
-	<modelVersion>4.0.0</modelVersion>
 
-	<artifactId>flink-elasticsearch5-test_${scala.binary.version}</artifactId>
+	<artifactId>flink-elasticsearch5-test</artifactId>
 	<name>flink-elasticsearch5-test</name>
 	<packaging>jar</packaging>
 
@@ -39,45 +41,11 @@ under the License.
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
-
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-connector-elasticsearch5_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<exclusions>
-				<!-- Remove elasticsearch1.7.1 -->
-				<exclusion>
-					<groupId>org.elasticsearch</groupId>
-					<artifactId>elasticsearch</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-
-		<!-- Dependency for Elasticsearch 5.x Java Client -->
-		<dependency>
-			<groupId>org.elasticsearch.client</groupId>
-			<artifactId>transport</artifactId>
-			<version>5.1.2</version>
-		</dependency>
-
-		<!--
-			Elasticsearch 5.x uses Log4j2 and no longer detects logging implementations, making
-			Log4j2 a strict dependency. The following is added so that the Log4j2 API in
-			Elasticsearch 5.x is routed to SLF4J. This way, user projects can remain flexible
-			in the logging implementation preferred.
-		-->
-
-		<dependency>
-			<groupId>org.apache.logging.log4j</groupId>
-			<artifactId>log4j-to-slf4j</artifactId>
-			<version>2.7</version>
-		</dependency>
 	</dependencies>
 
 	<build>
@@ -87,26 +55,18 @@ under the License.
 				<artifactId>maven-shade-plugin</artifactId>
 				<version>3.0.0</version>
 				<executions>
-					<!-- Elasticsearch5Sink end to end example -->
 					<execution>
 						<phase>package</phase>
 						<goals>
 							<goal>shade</goal>
 						</goals>
 						<configuration>
-							<minimizeJar>true</minimizeJar>
+							<finalName>Elasticsearch5SinkExample</finalName>
 							<artifactSet>
 								<excludes>
 									<exclude>com.google.code.findbugs:jsr305</exclude>
-									<exclude>org.slf4j:*</exclude>
-									<exclude>log4j:*</exclude>
 								</excludes>
 							</artifactSet>
-							<transformers>
-								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-									<mainClass>org.apache.flink.streaming.tests.Elasticsearch5SinkExample</mainClass>
-								</transformer>
-							</transformers>
 							<filters>
 								<filter>
 									<artifact>*:*</artifact>
@@ -117,27 +77,11 @@ under the License.
 									</excludes>
 								</filter>
 							</filters>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-
-			<!--simplify the name of the testing JAR for referring to it in test_streaming_elasticsearch5.sh scripts-->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-antrun-plugin</artifactId>
-				<version>1.7</version>
-				<executions>
-					<execution>
-						<id>rename</id>
-						<phase>package</phase>
-						<goals>
-							<goal>run</goal>
-						</goals>
-						<configuration>
-							<target>
-								<copy file="${project.basedir}/target/flink-elasticsearch5-test_${scala.binary.version}-${project.version}.jar" tofile="${project.basedir}/target/Elasticsearch5SinkExample.jar" />
-							</target>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.apache.flink.streaming.tests.Elasticsearch5SinkExample</mainClass>
+								</transformer>
+							</transformers>
 						</configuration>
 					</execution>
 				</executions>

http://git-wip-us.apache.org/repos/asf/flink/blob/3e434eeb/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
index 285f902..39808f6 100644
--- a/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
+++ b/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
@@ -40,13 +40,14 @@ import java.util.Map;
  * End to end test for Elasticsearch5Sink.
  */
 public class Elasticsearch5SinkExample {
+
 	public static void main(String[] args) throws Exception {
 
 		final ParameterTool parameterTool = ParameterTool.fromArgs(args);
 
-		if (parameterTool.getNumberOfParameters() < 2) {
+		if (parameterTool.getNumberOfParameters() < 3) {
 			System.out.println("Missing parameters!\n" +
-				"Usage: --index <index> --type <type>");
+				"Usage: --numRecords <numRecords> --index <index> --type <type>");
 			return;
 		}
 
@@ -54,12 +55,13 @@ public class Elasticsearch5SinkExample {
 		env.getConfig().disableSysoutLogging();
 		env.enableCheckpointing(5000);
 
-		DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
-			@Override
-			public String map(Long value) throws Exception {
-				return "message #" + value;
-			}
-		});
+		DataStream<String> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
+			.map(new MapFunction<Long, String>() {
+				@Override
+				public String map(Long value) throws Exception {
+					return "message #" + value;
+				}
+			});
 
 		Map<String, String> userConfig = new HashMap<>();
 		userConfig.put("cluster.name", "elasticsearch");

http://git-wip-us.apache.org/repos/asf/flink/blob/3e434eeb/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 2898682..0ec3492 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -158,7 +158,28 @@ if [ $EXIT_CODE == 0 ]; then
 fi
 
 if [ $EXIT_CODE == 0 ]; then
-  run_test "stateful stream job upgrade end-to-end test" "$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4"
+  run_test "Stateful stream job upgrade end-to-end test" "$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4"
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  run_test \
+    "Elasticsearch (v1.7.1) sink end-to-end test" \
+    "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 1 https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz"
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  run_test \
+    "Elasticsearch (v2.3.5) sink end-to-end test" \
+    "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 2 https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz"
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  run_test \
+    "Elasticsearch (v5.1.2) sink end-to-end test" \
+    "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 5 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz"
   EXIT_CODE=$?
 fi
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3e434eeb/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
index 3fda344..0ef6d55 100644
--- a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
+++ b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
@@ -20,15 +20,32 @@
 set -o pipefail
 
 if [[ -z $TEST_DATA_DIR ]]; then
-  echo "Must run common.sh before kafka-common.sh."
+  echo "Must run common.sh before elasticsearch-common.sh."
   exit 1
 fi
 
+function setup_elasticsearch {
+    mkdir -p $TEST_DATA_DIR
+
+    local downloadUrl=$1
+
+    # start downloading Elasticsearch
+    echo "Downloading Elasticsearch from $downloadUrl ..."
+    curl "$downloadUrl" > $TEST_DATA_DIR/elasticsearch.tar.gz
+
+    local elasticsearchDir=$TEST_DATA_DIR/elasticsearch
+    mkdir -p $elasticsearchDir
+    tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $elasticsearchDir --strip-components=1
+
+    # start Elasticsearch cluster
+    $elasticsearchDir/bin/elasticsearch &
+}
+
 function verify_elasticsearch_process_exist {
-    ELASTICSEARCH_PROCESS=$(jps | grep Elasticsearch | awk '{print $2}')
+    local elasticsearchProcess=$(jps | grep Elasticsearch | awk '{print $2}')
 
     # make sure the elasticsearch node is actually running
-    if [ "$ELASTICSEARCH_PROCESS" != "Elasticsearch" ]; then
+    if [ "$elasticsearchProcess" != "Elasticsearch" ]; then
       echo "Elasticsearch node is not running."
       PASS=""
       exit 1
@@ -38,25 +55,26 @@ function verify_elasticsearch_process_exist {
 }
 
 function verify_result {
+    local numRecords=$1
+
     if [ -f "$TEST_DATA_DIR/output" ]; then
         rm $TEST_DATA_DIR/output
     fi
 
-    curl 'localhost:9200/index/_search?q=*&pretty&size=21' > $TEST_DATA_DIR/output
+    while : ; do
+      curl 'localhost:9200/index/_search?q=*&pretty&size=21' > $TEST_DATA_DIR/output
 
-    if [ -n "$(grep '\"total\" : 21' $TEST_DATA_DIR/output)" ]; then
-        echo "Elasticsearch end to end test pass."
-    else
-        echo "Elasticsearch end to end test failed."
-        PASS=""
-        exit 1
-    fi
+      if [ -n "$(grep "\"total\" : $numRecords" $TEST_DATA_DIR/output)" ]; then
+          echo "Elasticsearch end to end test pass."
+          break
+      else
+          echo "Waiting for Elasticsearch records ..."
+          sleep 1
+      fi
+    done
 }
 
 function shutdown_elasticsearch_cluster {
    pid=$(jps | grep Elasticsearch | awk '{print $1}')
-   kill -SIGTERM $pid
-
-   # make sure to run regular cleanup as well
-   cleanup
+   kill -9 $pid
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3e434eeb/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
new file mode 100755
index 0000000..78ea283
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
@@ -0,0 +1,51 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/elasticsearch-common.sh
+
+ELASTICSEARCH_VERSION=$1
+DOWNLOAD_URL=$2
+
+mkdir -p $TEST_DATA_DIR
+
+setup_elasticsearch $DOWNLOAD_URL
+verify_elasticsearch_process_exist
+
+start_cluster
+
+function test_cleanup {
+  shutdown_elasticsearch_cluster
+
+  # make sure to run regular cleanup as well
+   cleanup
+}
+
+trap test_cleanup INT
+trap test_cleanup EXIT
+
+TEST_ES_JAR=$TEST_DATA_DIR/../../flink-elasticsearch${ELASTICSEARCH_VERSION}-test/target/Elasticsearch${ELASTICSEARCH_VERSION}SinkExample.jar
+
+# run the Flink job
+$FLINK_DIR/bin/flink run -p 1 $TEST_ES_JAR \
+  --numRecords 20 \
+  --index index \
+  --type type
+
+verify_result 20

http://git-wip-us.apache.org/repos/asf/flink/blob/3e434eeb/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh
deleted file mode 100755
index dea3f13..0000000
--- a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh
+++ /dev/null
@@ -1,109 +0,0 @@
-#!/usr/bin/env bash
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-source "$(dirname "$0")"/common.sh
-source "$(dirname "$0")"/elasticsearch-common.sh
-
-mkdir -p $TEST_DATA_DIR
-
-ELASTICSEARCH1_URL="https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz"
-ELASTICSEARCH2_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz"
-ELASTICSEARCH5_URL="https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz"
-
-# start downloading elasticsearch1
-echo "Downloading Elasticsearch1 from $ELASTICSEARCH1_URL"
-curl "$ELASTICSEARCH1_URL" > $TEST_DATA_DIR/elasticsearch1.tar.gz
-
-tar xzf $TEST_DATA_DIR/elasticsearch1.tar.gz -C $TEST_DATA_DIR/
-ELASTICSEARCH1_DIR=$TEST_DATA_DIR/elasticsearch-1.7.1
-
-# start elasticsearch1 cluster
-$ELASTICSEARCH1_DIR/bin/elasticsearch -daemon
-
-verify_elasticsearch_process_exist
-
-start_cluster
-
-TEST_ES1_JAR=$TEST_DATA_DIR/../../flink-elasticsearch1-test/target/Elasticsearch1SinkExample.jar
-
-# run the Flink job
-$FLINK_DIR/bin/flink run -p 1 $TEST_ES1_JAR \
-  --index index \
-  --type type
-
-verify_result
-
-shutdown_elasticsearch_cluster
-
-mkdir -p $TEST_DATA_DIR
-
-# start downloading elasticsearch2
-echo "Downloading Elasticsearch2 from $ELASTICSEARCH2_URL"
-curl "$ELASTICSEARCH2_URL" > $TEST_DATA_DIR/elasticsearch2.tar.gz
-
-tar xzf $TEST_DATA_DIR/elasticsearch2.tar.gz -C $TEST_DATA_DIR/
-ELASTICSEARCH2_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5
-
-# start elasticsearch cluster, different from elasticsearch1 since using -daemon here will hang the shell.
-nohup $ELASTICSEARCH2_DIR/bin/elasticsearch &
-
-verify_elasticsearch_process_exist
-
-start_cluster
-
-TEST_ES2_JAR=$TEST_DATA_DIR/../../flink-elasticsearch2-test/target/Elasticsearch2SinkExample.jar
-
-# run the Flink job
-$FLINK_DIR/bin/flink run -p 1 $TEST_ES2_JAR \
-  --index index \
-  --type type
-
-verify_result
-
-shutdown_elasticsearch_cluster
-
-mkdir -p $TEST_DATA_DIR
-
-# start downloading elasticsearch5
-echo "Downloading Elasticsearch5 from $ELASTICSEARCH5_URL"
-curl "$ELASTICSEARCH5_URL" > $TEST_DATA_DIR/elasticsearch5.tar.gz
-
-tar xzf $TEST_DATA_DIR/elasticsearch5.tar.gz -C $TEST_DATA_DIR/
-ELASTICSEARCH5_DIR=$TEST_DATA_DIR/elasticsearch-5.1.2
-
-# start elasticsearch cluster, different from elasticsearch1 since using -daemon here will hang the shell.
-nohup $ELASTICSEARCH5_DIR/bin/elasticsearch &
-
-verify_elasticsearch_process_exist
-
-start_cluster
-
-TEST_ES5_JAR=$TEST_DATA_DIR/../../flink-elasticsearch5-test/target/Elasticsearch5SinkExample.jar
-
-# run the Flink job
-$FLINK_DIR/bin/flink run -p 1 $TEST_ES5_JAR \
-  --index index \
-  --type type
-
-verify_result
-
-rm -rf $FLINK_DIR/log/* 2> /dev/null
-
-trap shutdown_elasticsearch_cluster INT
-trap shutdown_elasticsearch_cluster EXIT


[06/17] flink git commit: [FLINK-8977] [e2e] Extend externalized checkpoint e2e to simulate job failures

Posted by tz...@apache.org.
[FLINK-8977] [e2e] Extend externalized checkpoint e2e to simulate job failures

This closes #6004.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7f9e4c08
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7f9e4c08
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7f9e4c08

Branch: refs/heads/release-1.5
Commit: 7f9e4c0870ff85669fd857757d973227d446eef5
Parents: 22e400d
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon May 14 11:56:41 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 22 16:49:34 2018 +0800

----------------------------------------------------------------------
 flink-end-to-end-tests/run-nightly-tests.sh     | 21 +++++++--
 flink-end-to-end-tests/test-scripts/common.sh   | 17 +++++++
 .../test_resume_externalized_checkpoints.sh     | 48 +++++++++++++-------
 3 files changed, 66 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7f9e4c08/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 4cfd778..2898682 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -113,17 +113,32 @@ if [ $EXIT_CODE == 0 ]; then
 fi
 
 if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Externalized Checkpoint (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true"
+  run_test "Resuming Externalized Checkpoint (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true false"
   EXIT_CODE=$?
 fi
 
 if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Externalized Checkpoint (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false"
+  run_test "Resuming Externalized Checkpoint (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false false"
   EXIT_CODE=$?
 fi
 
 if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Externalized Checkpoint (rocks) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks"
+  run_test "Resuming Externalized Checkpoint (rocks) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks false"
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  run_test "Resuming Externalized Checkpoint after terminal failure (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true true"
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  run_test "Resuming Externalized Checkpoint after terminal failure (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false true"
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  run_test "Resuming Externalized Checkpoint after terminal failure (rocks) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks true"
   EXIT_CODE=$?
 fi
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7f9e4c08/flink-end-to-end-tests/test-scripts/common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index 2d0f13e..9a004ef 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -237,6 +237,23 @@ function wait_job_running {
   done
 }
 
+function wait_job_terminal_state {
+  local job=$1
+  local terminal_state=$2
+
+  echo "Waiting for job ($job) to reach terminal state $terminal_state ..."
+
+  while : ; do
+    N=$(grep -o "Job $job reached globally terminal state $terminal_state" $FLINK_DIR/log/*standalonesession*.log | tail -1)
+
+    if [[ -z $N ]]; then
+      sleep 1
+    else
+      break
+    fi
+  done
+}
+
 function take_savepoint {
   "$FLINK_DIR"/bin/flink savepoint $1 $2
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7f9e4c08/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
index 3dc9909..3994e30 100755
--- a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
+++ b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
@@ -21,6 +21,7 @@ source "$(dirname "$0")"/common.sh
 
 STATE_BACKEND_TYPE=${1:-file}
 STATE_BACKEND_FILE_ASYNC=${2:-true}
+SIMULATE_FAILURE=${3:-false}
 
 setup_flink_slf4j_metric_reporter
 start_cluster
@@ -43,8 +44,11 @@ CHECKPOINT_DIR="$TEST_DATA_DIR/externalized-chckpt-e2e-backend-dir"
 CHECKPOINT_DIR_URI="file://$CHECKPOINT_DIR"
 
 # run the DataStream allroundjob
+
+echo "Running externalized checkpoints test, with STATE_BACKEND_TYPE=$STATE_BACKEND_TYPE STATE_BACKEND_FILE_ASYNC=$STATE_BACKEND_FILE_ASYNC SIMULATE_FAILURE=$SIMULATE_FAILURE ..."
+
 TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \
+BASE_JOB_CMD="$FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \
   --test.semantics exactly-once \
   --environment.externalize_checkpoint true \
   --environment.externalize_checkpoint.cleanup retain \
@@ -52,15 +56,35 @@ DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \
   --state_backend.checkpoint_directory $CHECKPOINT_DIR_URI \
   --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
   --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+  --sequence_generator_source.sleep_after_elements 1"
+
+JOB_CMD=""
+if [[ $SIMULATE_FAILURE == "true" ]]; then
+  # the submitted job should fail after at least 1 complete checkpoint.
+  # When simulating failures with the general purpose DataStream job,
+  # we disable restarting because we want to manually do that after the job fails.
+  JOB_CMD="$BASE_JOB_CMD \
+    --test.simulate_failure true \
+    --test.simulate_failure.num_records 200 \
+    --test.simulate_failure.num_checkpoints 1 \
+    --test.simulate_failure.max_failures 1 \
+    --environment.restart_strategy no_restart"
+else
+  JOB_CMD=$BASE_JOB_CMD
+fi
+
+DATASTREAM_JOB=$($JOB_CMD | grep "Job has been submitted with JobID" | sed 's/.* //g')
 
 wait_job_running $DATASTREAM_JOB
 
-wait_num_checkpoints $DATASTREAM_JOB 1
-wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200
+if [[ $SIMULATE_FAILURE == "true" ]]; then
+  wait_job_terminal_state $DATASTREAM_JOB FAILED
+else
+  wait_num_checkpoints $DATASTREAM_JOB 1
+  wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200
 
-cancel_job $DATASTREAM_JOB
+  cancel_job $DATASTREAM_JOB
+fi
 
 CHECKPOINT_PATH=$(ls -d $CHECKPOINT_DIR/$DATASTREAM_JOB/chk-[1-9]*)
 
@@ -78,19 +102,9 @@ if (( $NUM_CHECKPOINTS > 1 )); then
 fi
 
 echo "Restoring job with externalized checkpoint at $CHECKPOINT_PATH ..."
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $CHECKPOINT_PATH -d $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.externalize_checkpoint true \
-  --environment.externalize_checkpoint.cleanup retain \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR_URI \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+DATASTREAM_JOB=$($BASE_JOB_CMD | grep "Job has been submitted with JobID" | sed 's/.* //g')
 
 wait_job_running $DATASTREAM_JOB
-
 wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200
 
 # if state is errorneous and the general purpose DataStream job produces alerting messages,


[14/17] flink git commit: [hotfix][tests] Introduce MockEnvironmentBuilder to deduplicate MockEnvironment constructors

Posted by tz...@apache.org.
[hotfix][tests] Introduce MockEnvironmentBuilder to deduplicate MockEnvironment constructors


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1f64d87
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1f64d87
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1f64d87

Branch: refs/heads/release-1.5
Commit: e1f64d8701535e265f3151d1a9838d01031023ea
Parents: 7b680c9
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Wed May 9 11:12:18 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 22 16:54:23 2018 +0800

----------------------------------------------------------------------
 .../kafka/FlinkKafkaConsumerBaseTest.java       |  13 +-
 .../kinesis/testutils/TestRuntimeContext.java   |  13 +-
 .../operators/testutils/MockEnvironment.java    | 102 +--------------
 .../testutils/MockEnvironmentBuilder.java       | 125 +++++++++++++++++++
 .../operators/testutils/TaskTestBase.java       |   8 +-
 .../source/InputFormatSourceFunctionTest.java   |  13 +-
 .../StreamOperatorSnapshotRestoreTest.java      |  29 ++---
 .../operators/async/AsyncWaitOperatorTest.java  |  13 +-
 .../operators/StreamOperatorChainingTest.java   |  14 +--
 .../streaming/runtime/tasks/StreamTaskTest.java |  35 +++---
 .../util/AbstractStreamOperatorTestHarness.java |  21 ++--
 .../streaming/util/SourceFunctionUtil.java      |  15 +--
 .../PojoSerializerUpgradeTest.java              |  25 ++--
 13 files changed, 224 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index b226ff1..4605015 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -33,10 +33,9 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
-import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
@@ -834,12 +833,10 @@ public class FlinkKafkaConsumerBaseTest {
 
 			super(
 				new MockStreamOperator(),
-				new MockEnvironment(
-					"mockTask",
-					4 * MemoryManager.DEFAULT_PAGE_SIZE,
-					null,
-					16,
-					new TestTaskStateManager()),
+				new MockEnvironmentBuilder()
+					.setTaskName("mockTask")
+					.setMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE)
+					.build(),
 				Collections.emptyMap());
 
 			this.isCheckpointingEnabled = isCheckpointingEnabled;

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java
index ce0bd97..740d2f2 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java
@@ -21,8 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
@@ -45,12 +44,10 @@ public class TestRuntimeContext extends StreamingRuntimeContext {
 
 		super(
 			new TestStreamOperator(),
-			new MockEnvironment(
-				"mockTask",
-				4 * MemoryManager.DEFAULT_PAGE_SIZE,
-				null,
-				16,
-				new TestTaskStateManager()),
+			new MockEnvironmentBuilder()
+				.setTaskName("mockTask")
+				.setMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE)
+				.build(),
 			Collections.emptyMap());
 
 		this.isCheckpointingEnabled = isCheckpointingEnabled;

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index ce19a5e..4bf94e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -45,7 +45,6 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.TaskStateManager;
-import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.types.Record;
@@ -109,106 +108,11 @@ public class MockEnvironment implements Environment, AutoCloseable {
 
 	private Optional<Throwable> actualExternalFailureCause = Optional.empty();
 
-	public MockEnvironment() {
-		this(
-			"mock-task",
-			1024 * MemoryManager.DEFAULT_PAGE_SIZE,
-			null,
-			16,
-			new TestTaskStateManager());
+	public static MockEnvironmentBuilder builder() {
+		return new MockEnvironmentBuilder();
 	}
 
-	public MockEnvironment(
-		String taskName,
-		long memorySize,
-		MockInputSplitProvider inputSplitProvider,
-		int bufferSize,
-		TaskStateManager taskStateManager) {
-		this(
-			taskName,
-			memorySize,
-			inputSplitProvider,
-			bufferSize,
-			new Configuration(),
-			new ExecutionConfig(),
-			taskStateManager);
-	}
-
-	public MockEnvironment(
-		String taskName,
-		long memorySize,
-		MockInputSplitProvider inputSplitProvider,
-		int bufferSize, Configuration taskConfiguration,
-		ExecutionConfig executionConfig,
-		TaskStateManager taskStateManager) {
-		this(
-			taskName,
-			memorySize,
-			inputSplitProvider,
-			bufferSize,
-			taskConfiguration,
-			executionConfig,
-			taskStateManager,
-			1,
-			1,
-			0);
-	}
-
-	public MockEnvironment(
-			String taskName,
-			long memorySize,
-			MockInputSplitProvider inputSplitProvider,
-			int bufferSize,
-			Configuration taskConfiguration,
-			ExecutionConfig executionConfig,
-			TaskStateManager taskStateManager,
-			int maxParallelism,
-			int parallelism,
-			int subtaskIndex) {
-		this(
-			taskName,
-			memorySize,
-			inputSplitProvider,
-			bufferSize,
-			taskConfiguration,
-			executionConfig,
-			taskStateManager,
-			maxParallelism,
-			parallelism,
-			subtaskIndex,
-			Thread.currentThread().getContextClassLoader());
-
-	}
-
-	public MockEnvironment(
-			String taskName,
-			long memorySize,
-			MockInputSplitProvider inputSplitProvider,
-			int bufferSize,
-			Configuration taskConfiguration,
-			ExecutionConfig executionConfig,
-			TaskStateManager taskStateManager,
-			int maxParallelism,
-			int parallelism,
-			int subtaskIndex,
-			ClassLoader userCodeClassLoader) {
-		this(
-			new JobID(),
-			new JobVertexID(),
-			taskName,
-			memorySize,
-			inputSplitProvider,
-			bufferSize,
-			taskConfiguration,
-			executionConfig,
-			taskStateManager,
-			maxParallelism,
-			parallelism,
-			subtaskIndex,
-			userCodeClassLoader);
-	}
-
-	public MockEnvironment(
+	protected MockEnvironment(
 		JobID jobID,
 		JobVertexID jobVertexID,
 		String taskName,

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
new file mode 100644
index 0000000..dfb10d4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.TaskStateManager;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+
+public class MockEnvironmentBuilder {
+	private String taskName = "mock-task";
+	private long memorySize = 1024 * MemoryManager.DEFAULT_PAGE_SIZE;
+	private MockInputSplitProvider inputSplitProvider = null;
+	private int bufferSize = 16;
+	private TaskStateManager taskStateManager = new TestTaskStateManager();
+	private Configuration taskConfiguration = new Configuration();
+	private ExecutionConfig executionConfig = new ExecutionConfig();
+	private int maxParallelism = 1;
+	private int parallelism = 1;
+	private int subtaskIndex = 0;
+	private ClassLoader userCodeClassLoader = Thread.currentThread().getContextClassLoader();
+	private JobID jobID = new JobID();
+	private JobVertexID jobVertexID = new JobVertexID();
+
+	public MockEnvironmentBuilder setTaskName(String taskName) {
+		this.taskName = taskName;
+		return this;
+	}
+
+	public MockEnvironmentBuilder setMemorySize(long memorySize) {
+		this.memorySize = memorySize;
+		return this;
+	}
+
+	public MockEnvironmentBuilder setInputSplitProvider(MockInputSplitProvider inputSplitProvider) {
+		this.inputSplitProvider = inputSplitProvider;
+		return this;
+	}
+
+	public MockEnvironmentBuilder setBufferSize(int bufferSize) {
+		this.bufferSize = bufferSize;
+		return this;
+	}
+
+	public MockEnvironmentBuilder setTaskStateManager(TaskStateManager taskStateManager) {
+		this.taskStateManager = taskStateManager;
+		return this;
+	}
+
+	public MockEnvironmentBuilder setTaskConfiguration(Configuration taskConfiguration) {
+		this.taskConfiguration = taskConfiguration;
+		return this;
+	}
+
+	public MockEnvironmentBuilder setExecutionConfig(ExecutionConfig executionConfig) {
+		this.executionConfig = executionConfig;
+		return this;
+	}
+
+	public MockEnvironmentBuilder setMaxParallelism(int maxParallelism) {
+		this.maxParallelism = maxParallelism;
+		return this;
+	}
+
+	public MockEnvironmentBuilder setParallelism(int parallelism) {
+		this.parallelism = parallelism;
+		return this;
+	}
+
+	public MockEnvironmentBuilder setSubtaskIndex(int subtaskIndex) {
+		this.subtaskIndex = subtaskIndex;
+		return this;
+	}
+
+	public MockEnvironmentBuilder setUserCodeClassLoader(ClassLoader userCodeClassLoader) {
+		this.userCodeClassLoader = userCodeClassLoader;
+		return this;
+	}
+
+	public MockEnvironmentBuilder setJobID(JobID jobID) {
+		this.jobID = jobID;
+		return this;
+	}
+
+	public MockEnvironmentBuilder setJobVertexID(JobVertexID jobVertexID) {
+		this.jobVertexID = jobVertexID;
+		return this;
+	}
+
+	public MockEnvironment build() {
+		return new MockEnvironment(
+			jobID,
+			jobVertexID,
+			taskName,
+			memorySize,
+			inputSplitProvider,
+			bufferSize,
+			taskConfiguration,
+			executionConfig,
+			taskStateManager,
+			maxParallelism,
+			parallelism,
+			subtaskIndex,
+			userCodeClassLoader);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
index a40992c..16485ca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
@@ -54,8 +54,12 @@ public abstract class TaskTestBase extends TestLogger {
 	public void initEnvironment(long memorySize, int bufferSize) {
 		this.memorySize = memorySize;
 		this.inputSplitProvider = new MockInputSplitProvider();
-		TestTaskStateManager taskStateManager = new TestTaskStateManager();
-		this.mockEnv = new MockEnvironment("mock task", this.memorySize, this.inputSplitProvider, bufferSize, taskStateManager);
+		this.mockEnv = new MockEnvironmentBuilder()
+			.setTaskName("mock task")
+			.setMemorySize(this.memorySize)
+			.setInputSplitProvider(this.inputSplitProvider)
+			.setBufferSize(bufferSize)
+			.build();
 	}
 
 	public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> input, int groupId) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
index 18c8ac5..84a45d8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -66,12 +66,11 @@ public class InputFormatSourceFunctionTest {
 		final LifeCycleTestInputFormat format = new LifeCycleTestInputFormat();
 		final InputFormatSourceFunction<Integer> reader = new InputFormatSourceFunction<>(format, TypeInformation.of(Integer.class));
 
-		try (MockEnvironment environment = new MockEnvironment(
-			"no",
-			4 * MemoryManager.DEFAULT_PAGE_SIZE,
-			null,
-			16,
-			new TestTaskStateManager())) {
+		try (MockEnvironment environment =
+				new MockEnvironmentBuilder()
+					.setTaskName("no")
+					.setMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE)
+					.build()) {
 
 			reader.setRuntimeContext(new MockRuntimeContext(format, noOfSplits, environment));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
index 6d011a3..a38ffa6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.api.operators;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -27,7 +26,6 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
@@ -36,6 +34,7 @@ import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
@@ -148,20 +147,18 @@ public class StreamOperatorSnapshotRestoreTest extends TestLogger {
 		LocalRecoveryConfig localRecoveryConfig =
 			new LocalRecoveryConfig(mode != ONLY_JM_RECOVERY, directoryProvider);
 
-		MockEnvironment mockEnvironment = new MockEnvironment(
-			jobID,
-			jobVertexID,
-			"test",
-			1024L * 1024L,
-			new MockInputSplitProvider(),
-			1024 * 1024,
-			new Configuration(),
-			new ExecutionConfig(),
-			new TestTaskStateManager(localRecoveryConfig),
-			MAX_PARALLELISM,
-			1,
-			subtaskIdx,
-			getClass().getClassLoader());
+		MockEnvironment mockEnvironment = new MockEnvironmentBuilder()
+			.setJobID(jobID)
+			.setJobVertexID(jobVertexID)
+			.setTaskName("test")
+			.setMemorySize(1024L * 1024L)
+			.setInputSplitProvider(new MockInputSplitProvider())
+			.setBufferSize(1024 * 1024)
+			.setTaskStateManager(new TestTaskStateManager(localRecoveryConfig))
+			.setMaxParallelism(MAX_PARALLELISM)
+			.setSubtaskIndex(subtaskIdx)
+			.setUserCodeClassLoader(getClass().getClassLoader())
+			.build();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 35e2fbd..cdd77d3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.streaming.api.datastream.AsyncDataStream;
@@ -650,12 +651,12 @@ public class AsyncWaitOperatorTest extends TestLogger {
 
 	@Nonnull
 	private MockEnvironment createMockEnvironment() {
-		return new MockEnvironment(
-			"foobarTask",
-			1024 * 1024L,
-			new MockInputSplitProvider(),
-			4 * 1024,
-			new TestTaskStateManager());
+		return new MockEnvironmentBuilder()
+			.setTaskName("foobarTask")
+			.setMemorySize(1024 * 1024L)
+			.setInputSplitProvider(new MockInputSplitProvider())
+			.setBufferSize(4 * 1024)
+			.build();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
index e980ab7..fd6a953 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
@@ -25,8 +25,8 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SplitStream;
@@ -170,12 +170,12 @@ public class StreamOperatorChainingTest {
 	}
 
 	private MockEnvironment createMockEnvironment(String taskName) {
-		return new MockEnvironment(
-			taskName,
-			3 * 1024 * 1024,
-			new MockInputSplitProvider(),
-			1024,
-			new TestTaskStateManager());
+		return new MockEnvironmentBuilder()
+			.setTaskName(taskName)
+			.setMemorySize(3 * 1024 * 1024)
+			.setInputSplitProvider(new MockInputSplitProvider())
+			.setBufferSize(1024)
+			.build();
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index caea662..cb31970 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -61,6 +61,7 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
@@ -332,7 +333,7 @@ public class StreamTaskTest extends TestLogger {
 		TaskInfo mockTaskInfo = mock(TaskInfo.class);
 		when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
 		when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
-		Environment mockEnvironment = new MockEnvironment();
+		Environment mockEnvironment = new MockEnvironmentBuilder().build();
 
 		StreamTask<?, ?> streamTask = new EmptyStreamTask(mockEnvironment);
 		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
@@ -402,7 +403,7 @@ public class StreamTaskTest extends TestLogger {
 		final long checkpointId = 42L;
 		final long timestamp = 1L;
 
-		MockEnvironment mockEnvironment = new MockEnvironment();
+		MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build();
 		StreamTask<?, ?> streamTask = spy(new EmptyStreamTask(mockEnvironment));
 		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
 
@@ -505,12 +506,10 @@ public class StreamTaskTest extends TestLogger {
 			null,
 			checkpointResponder);
 
-		MockEnvironment mockEnvironment = new MockEnvironment(
-			"mock-task",
-			1024 * MemoryManager.DEFAULT_PAGE_SIZE,
-			null,
-			16,
-			taskStateManager);
+		MockEnvironment mockEnvironment = new MockEnvironmentBuilder()
+			.setTaskName("mock-task")
+			.setTaskStateManager(taskStateManager)
+			.build();
 
 		StreamTask<?, ?> streamTask = new EmptyStreamTask(mockEnvironment);
 		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
@@ -603,7 +602,7 @@ public class StreamTaskTest extends TestLogger {
 		final OneShotLatch createSubtask = new OneShotLatch();
 		final OneShotLatch completeSubtask = new OneShotLatch();
 
-		Environment mockEnvironment = spy(new MockEnvironment());
+		Environment mockEnvironment = spy(new MockEnvironmentBuilder().build());
 
 		whenNew(OperatorSnapshotFinalizer.class).
 			withAnyArguments().
@@ -689,7 +688,7 @@ public class StreamTaskTest extends TestLogger {
 		final long checkpointId = 42L;
 		final long timestamp = 1L;
 
-		Environment mockEnvironment = spy(new MockEnvironment());
+		Environment mockEnvironment = spy(new MockEnvironmentBuilder().build());
 
 		// latch blocks until the async checkpoint thread acknowledges
 		final OneShotLatch checkpointCompletedLatch = new OneShotLatch();
@@ -769,14 +768,14 @@ public class StreamTaskTest extends TestLogger {
 		streamConfig.setStreamOperator(new BlockingCloseStreamOperator());
 		streamConfig.setOperatorID(new OperatorID());
 
-		try (MockEnvironment mockEnvironment = new MockEnvironment(
-				"Test Task",
-				32L * 1024L,
-				new MockInputSplitProvider(),
-				1,
-				taskConfiguration,
-				new ExecutionConfig(),
-				new TestTaskStateManager())) {
+		try (MockEnvironment mockEnvironment =
+				new MockEnvironmentBuilder()
+					.setTaskName("Test Task")
+					.setMemorySize(32L * 1024L)
+					.setInputSplitProvider(new MockInputSplitProvider())
+					.setBufferSize(1)
+					.setTaskConfiguration(taskConfiguration)
+					.build()) {
 			StreamTask<Void, BlockingCloseStreamOperator> streamTask = new NoOpStreamTask<>(mockEnvironment);
 			final AtomicReference<Throwable> atomicThrowable = new AtomicReference<>(null);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index ed2da18..26ad3ab 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
@@ -137,17 +138,15 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 			int subtaskIndex) throws Exception {
 		this(
 			operator,
-			new MockEnvironment(
-				"MockTask",
-				3 * 1024 * 1024,
-				new MockInputSplitProvider(),
-				1024,
-				new Configuration(),
-				new ExecutionConfig(),
-				new TestTaskStateManager(),
-				maxParallelism,
-				parallelism,
-				subtaskIndex),
+			new MockEnvironmentBuilder()
+				.setTaskName("MockTask")
+				.setMemorySize(3 * 1024 * 1024)
+				.setInputSplitProvider(new MockInputSplitProvider())
+				.setBufferSize(1024)
+				.setMaxParallelism(maxParallelism)
+				.setParallelism(parallelism)
+				.setSubtaskIndex(subtaskIndex)
+				.build(),
 			true);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
index 3f54081..660a333 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -51,12 +51,13 @@ public class SourceFunctionUtil {
 	}
 
 	private static <T extends Serializable> List<T> runRichSourceFunction(SourceFunction<T> sourceFunction) throws Exception {
-		try (MockEnvironment environment = new MockEnvironment(
-			"MockTask",
-			3 * 1024 * 1024,
-			new MockInputSplitProvider(),
-			1024,
-			new TestTaskStateManager())) {
+		try (MockEnvironment environment =
+				new MockEnvironmentBuilder()
+					.setTaskName("MockTask")
+					.setMemorySize(3 * 1024 * 1024)
+					.setInputSplitProvider(new MockInputSplitProvider())
+					.setBufferSize(1024)
+					.build()) {
 
 			AbstractStreamOperator<?> operator = mock(AbstractStreamOperator.class);
 			when(operator.getExecutionConfig()).thenReturn(new ExecutionConfig());

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
index aaa96fb..fe56782 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
@@ -37,12 +37,12 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StateBackendLoader;
-import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamMap;
@@ -351,18 +351,17 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 			OperatorSubtaskState operatorSubtaskState,
 			Iterable<Long> input) throws Exception {
 
-		try (final MockEnvironment environment = new MockEnvironment(
-			"test task",
-			32 * 1024,
-			new MockInputSplitProvider(),
-			256,
-			taskConfiguration,
-			executionConfig,
-			new TestTaskStateManager(),
-			16,
-			1,
-			0,
-			classLoader)) {
+		try (final MockEnvironment environment =
+				new MockEnvironmentBuilder()
+					.setTaskName("test task")
+					.setMemorySize(32 * 1024)
+					.setInputSplitProvider(new MockInputSplitProvider())
+					.setBufferSize(256)
+					.setTaskConfiguration(taskConfiguration)
+					.setExecutionConfig(executionConfig)
+					.setMaxParallelism(16)
+					.setUserCodeClassLoader(classLoader)
+					.build()) {
 
 			OneInputStreamOperatorTestHarness<Long, Long> harness = null;
 			try {


[15/17] flink git commit: [hotfix][tests] Reduce mockito usage in tests

Posted by tz...@apache.org.
[hotfix][tests] Reduce mockito usage in tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9ef10e65
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9ef10e65
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9ef10e65

Branch: refs/heads/release-1.5
Commit: 9ef10e659979d251d0e0bf1783e9322171f90f01
Parents: e1f64d8
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Wed May 9 11:29:18 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 22 16:54:29 2018 +0800

----------------------------------------------------------------------
 .../operators/StreamingRuntimeContextTest.java  | 10 +++---
 .../operators/async/AsyncWaitOperatorTest.java  |  9 +++---
 .../flink/streaming/util/MockStreamConfig.java  | 33 ++++++++++++++++++++
 3 files changed, 42 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9ef10e65/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
index 1072eec..87667b2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
@@ -42,6 +42,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.DefaultKeyedStateStore;
@@ -58,7 +59,6 @@ import org.mockito.stubbing.Answer;
 
 import java.util.Collections;
 import java.util.Map;
-import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertFalse;
@@ -373,10 +373,8 @@ public class StreamingRuntimeContextTest {
 	}
 
 	private static Environment createMockEnvironment() {
-		Environment env = mock(Environment.class);
-		when(env.getUserClassLoader()).thenReturn(StreamingRuntimeContextTest.class.getClassLoader());
-		when(env.getDistributedCacheEntries()).thenReturn(Collections.<String, Future<Path>>emptyMap());
-		when(env.getTaskInfo()).thenReturn(new TaskInfo("test task", 1, 0, 1, 1));
-		return env;
+		return MockEnvironment.builder()
+			.setTaskName("test task")
+			.build();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9ef10e65/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index cdd77d3..17d654e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -58,6 +58,7 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.MockStreamConfig;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.ExceptionUtils;
@@ -680,8 +681,8 @@ public class AsyncWaitOperatorTest extends TestLogger {
 		when(containingTask.getCheckpointLock()).thenReturn(lock);
 		when(containingTask.getProcessingTimeService()).thenReturn(new TestProcessingTimeService());
 
-		StreamConfig streamConfig = mock(StreamConfig.class);
-		doReturn(IntSerializer.INSTANCE).when(streamConfig).getTypeSerializerIn1(any(ClassLoader.class));
+		StreamConfig streamConfig = new MockStreamConfig();
+		streamConfig.setTypeSerializerIn1(IntSerializer.INSTANCE);
 
 		final OneShotLatch closingLatch = new OneShotLatch();
 		final OneShotLatch outputLatch = new OneShotLatch();
@@ -783,8 +784,8 @@ public class AsyncWaitOperatorTest extends TestLogger {
 		when(containingTask.getCheckpointLock()).thenReturn(lock);
 		when(containingTask.getProcessingTimeService()).thenReturn(processingTimeService);
 
-		StreamConfig streamConfig = mock(StreamConfig.class);
-		doReturn(IntSerializer.INSTANCE).when(streamConfig).getTypeSerializerIn1(any(ClassLoader.class));
+		StreamConfig streamConfig = new MockStreamConfig();
+		streamConfig.setTypeSerializerIn1(IntSerializer.INSTANCE);
 
 		Output<StreamRecord<Integer>> output = mock(Output.class);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9ef10e65/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java
new file mode 100644
index 0000000..598da48
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+
+/**
+ * Handy mock for {@link StreamConfig}.
+ */
+public class MockStreamConfig extends StreamConfig {
+	public MockStreamConfig() {
+		super(new Configuration());
+
+		setOperatorID(new OperatorID());
+	}
+}


[12/17] flink git commit: [FLINK-9372] [elasticsearch] Typo on Elasticsearch website link

Posted by tz...@apache.org.
[FLINK-9372] [elasticsearch] Typo on Elasticsearch website link

This closes #6018.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7c6d5af0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7c6d5af0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7c6d5af0

Branch: refs/heads/release-1.5
Commit: 7c6d5af09780cb51f94a778be0e0d4b559b4256e
Parents: 8224a7b
Author: Yadan.JS <y_...@yahoo.com>
Authored: Tue May 15 18:03:22 2018 -0400
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 22 16:54:11 2018 +0800

----------------------------------------------------------------------
 .../streaming/connectors/elasticsearch/ElasticsearchSink.java      | 2 +-
 .../streaming/connectors/elasticsearch2/ElasticsearchSink.java     | 2 +-
 .../streaming/connectors/elasticsearch5/ElasticsearchSink.java     | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7c6d5af0/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
index 9dd8209..e8eccd9 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
@@ -44,7 +44,7 @@ import java.util.Map;
  * to come online.
  *
  * <p>The {@link Map} passed to the constructor is used to create the {@link Node} or {@link TransportClient}. The config
- * keys can be found in the <a href="https://www.elastic.io">Elasticsearch documentation</a>. An important setting is
+ * keys can be found in the <a href="https://www.elastic.co">Elasticsearch documentation</a>. An important setting is
  * {@code cluster.name}, which should be set to the name of the cluster that the sink should emit to.
  *
  * <p>Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}.

http://git-wip-us.apache.org/repos/asf/flink/blob/7c6d5af0/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
index a17b4d8..ffccacf 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
@@ -38,7 +38,7 @@ import java.util.Map;
  * The sink will fail if no cluster can be connected to using the provided transport addresses passed to the constructor.
  *
  * <p>The {@link Map} passed to the constructor is used to create the {@code TransportClient}. The config keys can be found
- * in the <a href="https://www.elastic.io">Elasticsearch documentation</a>. An important setting is {@code cluster.name},
+ * in the <a href="https://www.elastic.co">Elasticsearch documentation</a>. An important setting is {@code cluster.name},
  * which should be set to the name of the cluster that the sink should emit to.
  *
  * <p>Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}.

http://git-wip-us.apache.org/repos/asf/flink/blob/7c6d5af0/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
index 3307b2c..6c09337 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
@@ -39,7 +39,7 @@ import java.util.Map;
  * The sink will fail if no cluster can be connected to using the provided transport addresses passed to the constructor.
  *
  * <p>The {@link Map} passed to the constructor is used to create the {@code TransportClient}. The config keys can be found
- * in the <a href="https://www.elastic.io">Elasticsearch documentation</a>. An important setting is {@code cluster.name},
+ * in the <a href="https://www.elastic.co">Elasticsearch documentation</a>. An important setting is {@code cluster.name},
  * which should be set to the name of the cluster that the sink should emit to.
  *
  * <p>Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}.


[07/17] flink git commit: [FLINK-8989] [e2eTests] Elasticsearch1&2&5 end to end test

Posted by tz...@apache.org.
[FLINK-8989] [e2eTests] Elasticsearch1&2&5 end to end test


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7abfcb2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7abfcb2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7abfcb2

Branch: refs/heads/release-1.5
Commit: a7abfcb278d2ef35c3b730c5d238cf32c6094674
Parents: 7f9e4c0
Author: zhangminglei <zm...@163.com>
Authored: Tue May 22 09:23:13 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 22 16:50:26 2018 +0800

----------------------------------------------------------------------
 .../examples/ElasticsearchSinkExample.java      |  85 -----------
 .../examples/ElasticsearchSinkExample.java      |  81 ----------
 .../examples/ElasticsearchSinkExample.java      |  83 -----------
 .../flink-elasticsearch1-test/pom.xml           | 117 +++++++++++++++
 .../tests/Elasticsearch1SinkExample.java        |  93 ++++++++++++
 .../flink-elasticsearch2-test/pom.xml           | 135 +++++++++++++++++
 .../tests/Elasticsearch2SinkExample.java        |  92 ++++++++++++
 .../flink-elasticsearch5-test/pom.xml           | 148 +++++++++++++++++++
 .../tests/Elasticsearch5SinkExample.java        |  92 ++++++++++++
 flink-end-to-end-tests/pom.xml                  |   3 +
 .../test-scripts/elasticsearch-common.sh        |  62 ++++++++
 .../test_streaming_elasticsearch125.sh          | 109 ++++++++++++++
 12 files changed, 851 insertions(+), 249 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
deleted file mode 100644
index 8a0321d..0000000
--- a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.elasticsearch.examples;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
-
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.common.transport.TransportAddress;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
- * you have a cluster named "elasticsearch" running or change the cluster name in the config map.
- */
-@SuppressWarnings("serial")
-public class ElasticsearchSinkExample {
-
-	public static void main(String[] args) throws Exception {
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
-			@Override
-			public String map(Long value) throws Exception {
-				return "message #" + value;
-			}
-		});
-
-		Map<String, String> userConfig = new HashMap<>();
-		userConfig.put("cluster.name", "elasticsearch");
-		// This instructs the sink to emit after every element, otherwise they would be buffered
-		userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-		List<TransportAddress> transports = new ArrayList<>();
-		transports.add(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-		source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() {
-			@Override
-			public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
-				indexer.add(createIndexRequest(element));
-			}
-		}));
-
-		env.execute("Elasticsearch Sink Example");
-	}
-
-	private static IndexRequest createIndexRequest(String element) {
-		Map<String, Object> json = new HashMap<>();
-		json.put("data", element);
-
-		return Requests.indexRequest()
-			.index("my-index")
-			.type("my-type")
-			.id(element)
-			.source(json);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java
deleted file mode 100644
index c963927..0000000
--- a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.elasticsearch2.examples;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
-
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
- * you have a cluster named "elasticsearch" running or change the name of cluster in the config map.
- */
-public class ElasticsearchSinkExample {
-
-	public static void main(String[] args) throws Exception {
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
-			@Override
-			public String map(Long value) throws Exception {
-				return "message #" + value;
-			}
-		});
-
-		Map<String, String> userConfig = new HashMap<>();
-		userConfig.put("cluster.name", "elasticsearch");
-		// This instructs the sink to emit after every element, otherwise they would be buffered
-		userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-		List<InetSocketAddress> transports = new ArrayList<>();
-		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-		source.addSink(new ElasticsearchSink<>(userConfig, transports, new org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<String>(){
-			@Override
-			public void process(String element, RuntimeContext ctx, org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer indexer) {
-				indexer.add(createIndexRequest(element));
-			}
-		}));
-
-		env.execute("Elasticsearch Sink Example");
-	}
-
-	private static IndexRequest createIndexRequest(String element) {
-		Map<String, Object> json = new HashMap<>();
-		json.put("data", element);
-
-		return Requests.indexRequest()
-			.index("my-index")
-			.type("my-type")
-			.id(element)
-			.source(json);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java
deleted file mode 100644
index 22c1053..0000000
--- a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.elasticsearch5.examples;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
-import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
-
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
- * you have a cluster named "elasticsearch" running or change the name of cluster in the config map.
- */
-public class ElasticsearchSinkExample {
-
-	public static void main(String[] args) throws Exception {
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
-			@Override
-			public String map(Long value) throws Exception {
-				return "message #" + value;
-			}
-		});
-
-		Map<String, String> userConfig = new HashMap<>();
-		userConfig.put("cluster.name", "elasticsearch");
-		// This instructs the sink to emit after every element, otherwise they would be buffered
-		userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-		List<InetSocketAddress> transports = new ArrayList<>();
-		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-		source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() {
-			@Override
-			public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
-				indexer.add(createIndexRequest(element));
-			}
-		}));
-
-		env.execute("Elasticsearch Sink Example");
-	}
-
-	private static IndexRequest createIndexRequest(String element) {
-		Map<String, Object> json = new HashMap<>();
-		json.put("data", element);
-
-		return Requests.indexRequest()
-			.index("my-index")
-			.type("my-type")
-			.id(element)
-			.source(json);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml b/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml
new file mode 100644
index 0000000..1960f05
--- /dev/null
+++ b/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+	<parent>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.5-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-elasticsearch1-test_${scala.binary.version}</artifactId>
+	<name>flink-elasticsearch1-test</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>3.0.0</version>
+				<executions>
+					<!-- Elasticsearch1Sink end to end example -->
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<minimizeJar>true</minimizeJar>
+							<artifactSet>
+								<excludes>
+									<exclude>com.google.code.findbugs:jsr305</exclude>
+									<exclude>org.slf4j:*</exclude>
+									<exclude>log4j:*</exclude>
+								</excludes>
+							</artifactSet>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.apache.flink.streaming.tests.Elasticsearch1SinkExample</mainClass>
+								</transformer>
+							</transformers>
+							<filters>
+								<filter>
+									<artifact>*:*</artifact>
+									<excludes>
+										<exclude>META-INF/*.SF</exclude>
+										<exclude>META-INF/*.DSA</exclude>
+										<exclude>META-INF/*.RSA</exclude>
+									</excludes>
+								</filter>
+							</filters>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!--simplify the name of the testing JAR for referring to it in test_streaming_elasticsearch1.sh scripts-->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-antrun-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<execution>
+						<id>rename</id>
+						<phase>package</phase>
+						<goals>
+							<goal>run</goal>
+						</goals>
+						<configuration>
+							<target>
+								<copy file="${project.basedir}/target/flink-elasticsearch1-test_${scala.binary.version}-${project.version}.jar" tofile="${project.basedir}/target/Elasticsearch1SinkExample.jar" />
+							</target>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
new file mode 100644
index 0000000..bfdb806
--- /dev/null
+++ b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * End to end test for Elasticsearch1Sink.
+ */
+public class Elasticsearch1SinkExample {
+	public static void main(String[] args) throws Exception {
+
+		final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+		if (parameterTool.getNumberOfParameters() < 2) {
+			System.out.println("Missing parameters!\n" +
+				"Usage: --index <index> --type <type>");
+			return;
+		}
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().disableSysoutLogging();
+		env.enableCheckpointing(5000);
+
+		DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
+			@Override
+			public String map(Long value) throws Exception {
+				return "message # " + value;
+			}
+		});
+
+		Map<String, String> userConfig = new HashMap<>();
+		userConfig.put("cluster.name", "elasticsearch");
+		// This instructs the sink to emit after every element, otherwise they would be buffered
+		userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+		List<TransportAddress> transports = new ArrayList<>();
+		transports.add(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+		source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() {
+			@Override
+			public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+				indexer.add(createIndexRequest(element, parameterTool));
+			}
+		}));
+
+		env.execute("Elasticsearch1.x end to end sink test example");
+	}
+
+	private static IndexRequest createIndexRequest(String element, ParameterTool parameterTool) {
+		Map<String, Object> json = new HashMap<>();
+		json.put("data", element);
+
+		return Requests.indexRequest()
+			.index(parameterTool.getRequired("index"))
+			.type(parameterTool.getRequired("type"))
+			.id(element)
+			.source(json);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml b/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml
new file mode 100644
index 0000000..4fd93de
--- /dev/null
+++ b/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<parent>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.5-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-elasticsearch2-test_${scala.binary.version}</artifactId>
+	<name>flink-elasticsearch2-test</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch2_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<!-- Remove elasticsearch1.7.1 -->
+			<exclusions>
+				<exclusion>
+					<groupId>org.elasticsearch</groupId>
+					<artifactId>elasticsearch</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<dependency>
+			<groupId>org.elasticsearch</groupId>
+			<artifactId>elasticsearch</artifactId>
+			<version>2.3.5</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>3.0.0</version>
+				<executions>
+					<!-- Elasticsearch2Sink end to end example -->
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<minimizeJar>true</minimizeJar>
+							<artifactSet>
+								<excludes>
+									<exclude>com.google.code.findbugs:jsr305</exclude>
+									<exclude>org.slf4j:*</exclude>
+									<exclude>log4j:*</exclude>
+								</excludes>
+							</artifactSet>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.apache.flink.streaming.tests.Elasticsearch2SinkExample</mainClass>
+								</transformer>
+							</transformers>
+							<filters>
+								<filter>
+									<artifact>*:*</artifact>
+									<excludes>
+										<exclude>META-INF/*.SF</exclude>
+										<exclude>META-INF/*.DSA</exclude>
+										<exclude>META-INF/*.RSA</exclude>
+									</excludes>
+								</filter>
+							</filters>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!--simplify the name of the testing JAR for referring to it in test_streaming_elasticsearch2.sh scripts-->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-antrun-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<execution>
+						<id>rename</id>
+						<phase>package</phase>
+						<goals>
+							<goal>run</goal>
+						</goals>
+						<configuration>
+							<target>
+								<copy file="${project.basedir}/target/flink-elasticsearch2-test_${scala.binary.version}-${project.version}.jar" tofile="${project.basedir}/target/Elasticsearch2SinkExample.jar" />
+							</target>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
new file mode 100644
index 0000000..4ec03aa
--- /dev/null
+++ b/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * End to end test for Elasticsearch2Sink.
+ */
+public class Elasticsearch2SinkExample {
+
+	public static void main(String[] args) throws Exception {
+
+		final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+		if (parameterTool.getNumberOfParameters() < 2) {
+			System.out.println("Missing parameters!\n" +
+				"Usage: --index <index> --type <type>");
+			return;
+		}
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().disableSysoutLogging();
+		env.enableCheckpointing(5000);
+
+		DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
+			@Override
+			public String map(Long value) throws Exception {
+				return "message #" + value;
+			}
+		});
+
+		Map<String, String> userConfig = new HashMap<>();
+		userConfig.put("cluster.name", "elasticsearch");
+		// This instructs the sink to emit after every element, otherwise they would be buffered
+		userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+		List<InetSocketAddress> transports = new ArrayList<>();
+		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+		source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>(){
+			@Override
+			public void process(String element, RuntimeContext ctx, org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer indexer) {
+				indexer.add(createIndexRequest(element, parameterTool));
+			}
+		}));
+
+		env.execute("Elasticsearch2.x end to end sink test example");
+	}
+
+	private static IndexRequest createIndexRequest(String element, ParameterTool parameterTool) {
+		Map<String, Object> json = new HashMap<>();
+		json.put("data", element);
+
+		return Requests.indexRequest()
+			.index(parameterTool.getRequired("index"))
+			.type(parameterTool.getRequired("type"))
+			.id(element)
+			.source(json);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml b/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml
new file mode 100644
index 0000000..3a1e734
--- /dev/null
+++ b/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml
@@ -0,0 +1,148 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<parent>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.5-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-elasticsearch5-test_${scala.binary.version}</artifactId>
+	<name>flink-elasticsearch5-test</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch5_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<!-- Remove elasticsearch1.7.1 -->
+				<exclusion>
+					<groupId>org.elasticsearch</groupId>
+					<artifactId>elasticsearch</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<!-- Dependency for Elasticsearch 5.x Java Client -->
+		<dependency>
+			<groupId>org.elasticsearch.client</groupId>
+			<artifactId>transport</artifactId>
+			<version>5.1.2</version>
+		</dependency>
+
+		<!--
+			Elasticsearch 5.x uses Log4j2 and no longer detects logging implementations, making
+			Log4j2 a strict dependency. The following is added so that the Log4j2 API in
+			Elasticsearch 5.x is routed to SLF4J. This way, user projects can remain flexible
+			in the logging implementation preferred.
+		-->
+
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-to-slf4j</artifactId>
+			<version>2.7</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>3.0.0</version>
+				<executions>
+					<!-- Elasticsearch5Sink end to end example -->
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<minimizeJar>true</minimizeJar>
+							<artifactSet>
+								<excludes>
+									<exclude>com.google.code.findbugs:jsr305</exclude>
+									<exclude>org.slf4j:*</exclude>
+									<exclude>log4j:*</exclude>
+								</excludes>
+							</artifactSet>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.apache.flink.streaming.tests.Elasticsearch5SinkExample</mainClass>
+								</transformer>
+							</transformers>
+							<filters>
+								<filter>
+									<artifact>*:*</artifact>
+									<excludes>
+										<exclude>META-INF/*.SF</exclude>
+										<exclude>META-INF/*.DSA</exclude>
+										<exclude>META-INF/*.RSA</exclude>
+									</excludes>
+								</filter>
+							</filters>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!--simplify the name of the testing JAR for referring to it in test_streaming_elasticsearch5.sh scripts-->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-antrun-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<execution>
+						<id>rename</id>
+						<phase>package</phase>
+						<goals>
+							<goal>run</goal>
+						</goals>
+						<configuration>
+							<target>
+								<copy file="${project.basedir}/target/flink-elasticsearch5-test_${scala.binary.version}-${project.version}.jar" tofile="${project.basedir}/target/Elasticsearch5SinkExample.jar" />
+							</target>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
new file mode 100644
index 0000000..285f902
--- /dev/null
+++ b/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * End to end test for Elasticsearch5Sink.
+ */
+public class Elasticsearch5SinkExample {
+	public static void main(String[] args) throws Exception {
+
+		final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+		if (parameterTool.getNumberOfParameters() < 2) {
+			System.out.println("Missing parameters!\n" +
+				"Usage: --index <index> --type <type>");
+			return;
+		}
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().disableSysoutLogging();
+		env.enableCheckpointing(5000);
+
+		DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
+			@Override
+			public String map(Long value) throws Exception {
+				return "message #" + value;
+			}
+		});
+
+		Map<String, String> userConfig = new HashMap<>();
+		userConfig.put("cluster.name", "elasticsearch");
+		// This instructs the sink to emit after every element, otherwise they would be buffered
+		userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+		List<InetSocketAddress> transports = new ArrayList<>();
+		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+		source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() {
+			@Override
+			public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+				indexer.add(createIndexRequest(element, parameterTool));
+			}
+		}));
+
+		env.execute("Elasticsearch5.x end to end sink test example");
+	}
+
+	private static IndexRequest createIndexRequest(String element, ParameterTool parameterTool) {
+		Map<String, Object> json = new HashMap<>();
+		json.put("data", element);
+
+		return Requests.indexRequest()
+			.index(parameterTool.getRequired("index"))
+			.type(parameterTool.getRequired("type"))
+			.id(element)
+			.source(json);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 45b63f0..04b8532 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -43,6 +43,9 @@ under the License.
 		<module>flink-high-parallelism-iterations-test</module>
 		<module>flink-stream-stateful-job-upgrade-test</module>
 		<module>flink-local-recovery-and-allocation-test</module>
+		<module>flink-elasticsearch1-test</module>
+		<module>flink-elasticsearch2-test</module>
+		<module>flink-elasticsearch5-test</module>
 	</modules>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
new file mode 100644
index 0000000..3fda344
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
@@ -0,0 +1,62 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+set -o pipefail
+
+if [[ -z $TEST_DATA_DIR ]]; then
+  echo "Must run common.sh before kafka-common.sh."
+  exit 1
+fi
+
+function verify_elasticsearch_process_exist {
+    ELASTICSEARCH_PROCESS=$(jps | grep Elasticsearch | awk '{print $2}')
+
+    # make sure the elasticsearch node is actually running
+    if [ "$ELASTICSEARCH_PROCESS" != "Elasticsearch" ]; then
+      echo "Elasticsearch node is not running."
+      PASS=""
+      exit 1
+    else
+      echo "Elasticsearch node is running."
+    fi
+}
+
+function verify_result {
+    if [ -f "$TEST_DATA_DIR/output" ]; then
+        rm $TEST_DATA_DIR/output
+    fi
+
+    curl 'localhost:9200/index/_search?q=*&pretty&size=21' > $TEST_DATA_DIR/output
+
+    if [ -n "$(grep '\"total\" : 21' $TEST_DATA_DIR/output)" ]; then
+        echo "Elasticsearch end to end test pass."
+    else
+        echo "Elasticsearch end to end test failed."
+        PASS=""
+        exit 1
+    fi
+}
+
+function shutdown_elasticsearch_cluster {
+   pid=$(jps | grep Elasticsearch | awk '{print $1}')
+   kill -SIGTERM $pid
+
+   # make sure to run regular cleanup as well
+   cleanup
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh
new file mode 100755
index 0000000..dea3f13
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh
@@ -0,0 +1,109 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/elasticsearch-common.sh
+
+mkdir -p $TEST_DATA_DIR
+
+ELASTICSEARCH1_URL="https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz"
+ELASTICSEARCH2_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz"
+ELASTICSEARCH5_URL="https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz"
+
+# start downloading elasticsearch1
+echo "Downloading Elasticsearch1 from $ELASTICSEARCH1_URL"
+curl "$ELASTICSEARCH1_URL" > $TEST_DATA_DIR/elasticsearch1.tar.gz
+
+tar xzf $TEST_DATA_DIR/elasticsearch1.tar.gz -C $TEST_DATA_DIR/
+ELASTICSEARCH1_DIR=$TEST_DATA_DIR/elasticsearch-1.7.1
+
+# start elasticsearch1 cluster
+$ELASTICSEARCH1_DIR/bin/elasticsearch -daemon
+
+verify_elasticsearch_process_exist
+
+start_cluster
+
+TEST_ES1_JAR=$TEST_DATA_DIR/../../flink-elasticsearch1-test/target/Elasticsearch1SinkExample.jar
+
+# run the Flink job
+$FLINK_DIR/bin/flink run -p 1 $TEST_ES1_JAR \
+  --index index \
+  --type type
+
+verify_result
+
+shutdown_elasticsearch_cluster
+
+mkdir -p $TEST_DATA_DIR
+
+# start downloading elasticsearch2
+echo "Downloading Elasticsearch2 from $ELASTICSEARCH2_URL"
+curl "$ELASTICSEARCH2_URL" > $TEST_DATA_DIR/elasticsearch2.tar.gz
+
+tar xzf $TEST_DATA_DIR/elasticsearch2.tar.gz -C $TEST_DATA_DIR/
+ELASTICSEARCH2_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5
+
+# start elasticsearch cluster, different from elasticsearch1 since using -daemon here will hang the shell.
+nohup $ELASTICSEARCH2_DIR/bin/elasticsearch &
+
+verify_elasticsearch_process_exist
+
+start_cluster
+
+TEST_ES2_JAR=$TEST_DATA_DIR/../../flink-elasticsearch2-test/target/Elasticsearch2SinkExample.jar
+
+# run the Flink job
+$FLINK_DIR/bin/flink run -p 1 $TEST_ES2_JAR \
+  --index index \
+  --type type
+
+verify_result
+
+shutdown_elasticsearch_cluster
+
+mkdir -p $TEST_DATA_DIR
+
+# start downloading elasticsearch5
+echo "Downloading Elasticsearch5 from $ELASTICSEARCH5_URL"
+curl "$ELASTICSEARCH5_URL" > $TEST_DATA_DIR/elasticsearch5.tar.gz
+
+tar xzf $TEST_DATA_DIR/elasticsearch5.tar.gz -C $TEST_DATA_DIR/
+ELASTICSEARCH5_DIR=$TEST_DATA_DIR/elasticsearch-5.1.2
+
+# start elasticsearch cluster, different from elasticsearch1 since using -daemon here will hang the shell.
+nohup $ELASTICSEARCH5_DIR/bin/elasticsearch &
+
+verify_elasticsearch_process_exist
+
+start_cluster
+
+TEST_ES5_JAR=$TEST_DATA_DIR/../../flink-elasticsearch5-test/target/Elasticsearch5SinkExample.jar
+
+# run the Flink job
+$FLINK_DIR/bin/flink run -p 1 $TEST_ES5_JAR \
+  --index index \
+  --type type
+
+verify_result
+
+rm -rf $FLINK_DIR/log/* 2> /dev/null
+
+trap shutdown_elasticsearch_cluster INT
+trap shutdown_elasticsearch_cluster EXIT


[03/17] flink git commit: [FLINK-9322] [e2e] Add failure simulation to the general purpose DataStream job

Posted by tz...@apache.org.
[FLINK-9322] [e2e] Add failure simulation to the general purpose DataStream job


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aec4496e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aec4496e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aec4496e

Branch: refs/heads/release-1.5
Commit: aec4496ebe1679f74b76e9eaa2f31d00d8c82447
Parents: ef6e40f
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri May 11 11:51:12 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 22 16:49:11 2018 +0800

----------------------------------------------------------------------
 .../tests/DataStreamAllroundTestJobFactory.java | 52 +++++++++++++
 .../tests/DataStreamAllroundTestProgram.java    | 10 +++
 .../tests/ExceptionThrowingFailureMapper.java   | 79 ++++++++++++++++++++
 3 files changed, 141 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aec4496e/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index c2e4cf5..05bbc77 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -54,6 +54,13 @@ import java.util.List;
  * <p>Program parameters:
  * <ul>
  *     <li>test.semantics (String, default - 'exactly-once'): This configures the semantics to test. Can be 'exactly-once' or 'at-least-once'.</li>
+ *     <li>test.simulate_failure (boolean, default - false): This configures whether or not to simulate failures by throwing exceptions within the job.</li>
+ *     <li>test.simulate_failure.num_records (long, default - 100L): The number of records to process before throwing an exception, per job execution attempt.
+ *         Only relevant if configured to simulate failures.</li>
+ *     <li>test.simulate_failure.num_checkpoints (long, default - 1L): The number of complete checkpoints before throwing an exception, per job execution attempt.
+ *         Only relevant if configured to simulate failures.</li>
+ *     <li>test.simulate_failure.max_failures (int, default - 1): The maximum number of times to fail the job. This also takes into account failures that
+ *         were not triggered by the job's own failure simulation, e.g. TaskManager or JobManager failures. Only relevant if configured to simulate failures.</li>
  *     <li>environment.checkpoint_interval (long, default - 1000): the checkpoint interval.</li>
  *     <li>environment.externalize_checkpoint (boolean, default - false): whether or not checkpoints should be externalized.</li>
  *     <li>environment.externalize_checkpoint.cleanup (String, default - 'retain'): Configures the cleanup mode for externalized checkpoints. Can be 'retain' or 'delete'.</li>
@@ -78,6 +85,33 @@ class DataStreamAllroundTestJobFactory {
 		.defaultValue("exactly-once")
 		.withDescription("This configures the semantics to test. Can be 'exactly-once' or 'at-least-once'");
 
+	private static final ConfigOption<Boolean> TEST_SIMULATE_FAILURE = ConfigOptions
+		.key("test.simulate_failure")
+		.defaultValue(false)
+		.withDescription("This configures whether or not to simulate failures by throwing exceptions within the job.");
+
+	private static final ConfigOption<Long> TEST_SIMULATE_FAILURE_NUM_RECORDS = ConfigOptions
+		.key("test.simulate_failure.num_records")
+		.defaultValue(100L)
+		.withDescription(
+			"The number of records to process before throwing an exception, per job execution attempt." +
+				" Only relevant if configured to simulate failures.");
+
+	private static final ConfigOption<Long> TEST_SIMULATE_FAILURE_NUM_CHECKPOINTS = ConfigOptions
+		.key("test.simulate_failure.num_checkpoints")
+		.defaultValue(1L)
+		.withDescription(
+			"The number of complete checkpoints before throwing an exception, per job execution attempt." +
+				" Only relevant if configured to simulate failures.");
+
+	private static final ConfigOption<Integer> TEST_SIMULATE_FAILURE_MAX_FAILURES = ConfigOptions
+		.key("test.simulate_failure.max_failures")
+		.defaultValue(1)
+		.withDescription(
+			"The maximum number of times to fail the job. This also takes into account failures that were not triggered" +
+				" by the job's own failure simulation, e.g. TaskManager or JobManager failures." +
+				" Only relevant if configured to simulate failures.");
+
 	private static final ConfigOption<Long> ENVIRONMENT_CHECKPOINT_INTERVAL = ConfigOptions
 		.key("environment.checkpoint_interval")
 		.defaultValue(1000L);
@@ -273,6 +307,24 @@ class DataStreamAllroundTestJobFactory {
 		return new SemanticsCheckMapper(validatorFunction);
 	}
 
+	static boolean isSimulateFailures(ParameterTool pt) {
+		return pt.getBoolean(TEST_SIMULATE_FAILURE.key(), TEST_SIMULATE_FAILURE.defaultValue());
+	}
+
+	static MapFunction<Event, Event> createExceptionThrowingFailureMapper(ParameterTool pt) {
+		return new ExceptionThrowingFailureMapper<>(
+			pt.getLong(
+				TEST_SIMULATE_FAILURE_NUM_RECORDS.key(),
+				TEST_SIMULATE_FAILURE_NUM_RECORDS.defaultValue()),
+			pt.getLong(
+				TEST_SIMULATE_FAILURE_NUM_CHECKPOINTS.key(),
+				TEST_SIMULATE_FAILURE_NUM_CHECKPOINTS.defaultValue()),
+			pt.getInt(
+				TEST_SIMULATE_FAILURE_MAX_FAILURES.key(),
+				TEST_SIMULATE_FAILURE_MAX_FAILURES.defaultValue()));
+
+	}
+
 	static <IN, OUT, STATE> ArtificialKeyedStateMapper<IN, OUT> createArtificialKeyedStateMapper(
 		MapFunction<IN, OUT> mapFunction,
 		JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,

http://git-wip-us.apache.org/repos/asf/flink/blob/aec4496e/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
index 5ae1d16..afbc01a 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
@@ -32,8 +32,10 @@ import java.util.Collections;
 import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper;
 import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialOperatorStateMapper;
 import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource;
+import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createExceptionThrowingFailureMapper;
 import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
 import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
+import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.isSimulateFailures;
 import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
 
 /**
@@ -56,6 +58,7 @@ public class DataStreamAllroundTestProgram {
 	private static final String KEYED_STATE_OPER_NAME = "ArtificalKeyedStateMapper";
 	private static final String OPERATOR_STATE_OPER_NAME = "ArtificalOperatorStateMapper";
 	private static final String SEMANTICS_CHECK_MAPPER_NAME = "SemanticsCheckMapper";
+	private static final String FAILURE_MAPPER_NAME = "ExceptionThrowingFailureMapper";
 
 	public static void main(String[] args) throws Exception {
 		final ParameterTool pt = ParameterTool.fromArgs(args);
@@ -89,6 +92,13 @@ public class DataStreamAllroundTestProgram {
 			.name(OPERATOR_STATE_OPER_NAME)
 			.returns(Event.class);
 
+		if (isSimulateFailures(pt)) {
+			eventStream2 = eventStream2
+				.map(createExceptionThrowingFailureMapper(pt))
+				.setParallelism(1)
+				.name(FAILURE_MAPPER_NAME);
+		}
+
 		eventStream2.keyBy(Event::getKey)
 			.flatMap(createSemanticsCheckMapper(pt))
 			.name(SEMANTICS_CHECK_MAPPER_NAME)

http://git-wip-us.apache.org/repos/asf/flink/blob/aec4496e/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/ExceptionThrowingFailureMapper.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/ExceptionThrowingFailureMapper.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/ExceptionThrowingFailureMapper.java
new file mode 100644
index 0000000..d758ef5
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/ExceptionThrowingFailureMapper.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.runtime.state.CheckpointListener;
+
+/**
+ * This mapper simulates failure by throwing exceptions. The timing to throw an
+ * exception by configuring the number of records to process, and number of
+ * complete checkpoints to be acknowledged before throwing the exception.
+ *
+ * <p>The total times to simulate a failure across multiple execution attempts
+ * of the operator can also be configured. Note that this also takes into account
+ * failures that were not triggered by this mapper, e.g. TaskManager failures.
+ */
+public class ExceptionThrowingFailureMapper<T> extends RichMapFunction<T, T> implements CheckpointListener {
+
+	private static final long serialVersionUID = -5286927943454740016L;
+
+	private final long numProcessedRecordsFailureThreshold;
+	private final long numCompleteCheckpointsFailureThreshold;
+	private final int maxNumFailures;
+
+	private long numProcessedRecords;
+	private long numCompleteCheckpoints;
+
+	public ExceptionThrowingFailureMapper(
+			long numProcessedRecordsFailureThreshold,
+			long numCompleteCheckpointsFailureThreshold,
+			int maxNumFailures) {
+
+		this.numProcessedRecordsFailureThreshold = numProcessedRecordsFailureThreshold;
+		this.numCompleteCheckpointsFailureThreshold = numCompleteCheckpointsFailureThreshold;
+		this.maxNumFailures = maxNumFailures;
+	}
+
+	@Override
+	public T map(T value) throws Exception {
+		numProcessedRecords++;
+
+		if (isReachedFailureThreshold()) {
+			throw new Exception("Artificial failure.");
+		}
+
+		return value;
+	}
+
+	@Override
+	public void notifyCheckpointComplete(long checkpointId) throws Exception {
+		numCompleteCheckpoints++;
+
+		if (isReachedFailureThreshold()) {
+			throw new Exception("Artificial failure.");
+		}
+	}
+
+	private boolean isReachedFailureThreshold() {
+		return numProcessedRecords >= numProcessedRecordsFailureThreshold
+			&& numCompleteCheckpoints >= numCompleteCheckpointsFailureThreshold
+			&& getRuntimeContext().getAttemptNumber() < maxNumFailures;
+	}
+}


[16/17] flink git commit: [FLINK-9316][streaming] Expose operator's unique ID in DataStream programs

Posted by tz...@apache.org.
[FLINK-9316][streaming] Expose operator's unique ID in DataStream programs

This allows to uniquely and stably across multiple job submissions identify operators.
Previously two different operators that were executed by tasks that had the same name
were indistinguishable.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f04dfb59
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f04dfb59
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f04dfb59

Branch: refs/heads/release-1.5
Commit: f04dfb59036aa9096b07a7e859bf70949aa12e37
Parents: 9ef10e6
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Tue May 8 17:46:29 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 22 16:54:34 2018 +0800

----------------------------------------------------------------------
 .../kafka/FlinkKafkaConsumerBaseTest.java       |  6 ++
 .../kinesis/testutils/TestRuntimeContext.java   |  6 ++
 flink-contrib/flink-storm/pom.xml               |  8 +++
 .../flink/storm/wrappers/BoltWrapperTest.java   | 18 ++---
 .../api/operators/StreamingRuntimeContext.java  | 15 ++++
 .../source/InputFormatSourceFunctionTest.java   |  6 ++
 .../api/operators/GetOperatorUniqueIDTest.java  | 75 ++++++++++++++++++++
 .../operators/StreamingRuntimeContextTest.java  |  4 ++
 8 files changed, 129 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f04dfb59/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 4605015..c9b5241 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -873,6 +874,11 @@ public class FlinkKafkaConsumerBaseTest {
 			public ExecutionConfig getExecutionConfig() {
 				return new ExecutionConfig();
 			}
+
+			@Override
+			public OperatorID getOperatorID() {
+				return new OperatorID();
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f04dfb59/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java
index 740d2f2..9a3ad72 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.testutils;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -83,5 +84,10 @@ public class TestRuntimeContext extends StreamingRuntimeContext {
 		public ExecutionConfig getExecutionConfig() {
 			return new ExecutionConfig();
 		}
+
+		@Override
+		public OperatorID getOperatorID() {
+			return new OperatorID(42, 44);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f04dfb59/flink-contrib/flink-storm/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/pom.xml b/flink-contrib/flink-storm/pom.xml
index 7abe054..8f25de4 100644
--- a/flink-contrib/flink-storm/pom.xml
+++ b/flink-contrib/flink-storm/pom.xml
@@ -180,6 +180,14 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
 	</dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/f04dfb59/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
index 430e4d8..d405a45 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
@@ -32,12 +32,12 @@ import org.apache.flink.storm.util.AbstractTest;
 import org.apache.flink.storm.util.SplitStreamType;
 import org.apache.flink.storm.util.StormConfig;
 import org.apache.flink.storm.util.TestDummyBolt;
-import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.MockStreamConfig;
 
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -159,7 +159,7 @@ public class BoltWrapperTest extends AbstractTest {
 		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
 		final BoltWrapper wrapper = new BoltWrapper(bolt, (Fields) null);
-		wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class));
+		wrapper.setup(createMockStreamTask(), new MockStreamConfig(), mock(Output.class));
 		wrapper.open();
 
 		wrapper.processElement(record);
@@ -195,7 +195,7 @@ public class BoltWrapperTest extends AbstractTest {
 		}
 
 		final BoltWrapper wrapper = new BoltWrapper(bolt, null, raw);
-		wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), output);
+		wrapper.setup(createMockStreamTask(), new MockStreamConfig(), output);
 		wrapper.open();
 
 		final SplitStreamType splitRecord = new SplitStreamType<Integer>();
@@ -248,7 +248,7 @@ public class BoltWrapperTest extends AbstractTest {
 
 			final IRichBolt bolt = mock(IRichBolt.class);
 			BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
-			wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class));
+			wrapper.setup(createMockStreamTask(execConfig), new MockStreamConfig(), mock(Output.class));
 
 			wrapper.open();
 			verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class));
@@ -261,7 +261,7 @@ public class BoltWrapperTest extends AbstractTest {
 
 			final IRichBolt bolt = mock(IRichBolt.class);
 			BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
-			wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class));
+			wrapper.setup(createMockStreamTask(execConfig), new MockStreamConfig(), mock(Output.class));
 
 			wrapper.open();
 			verify(bolt).prepare(same(stormConfig), any(TopologyContext.class), any(OutputCollector.class));
@@ -278,7 +278,7 @@ public class BoltWrapperTest extends AbstractTest {
 
 			TestDummyBolt testBolt = new TestDummyBolt();
 			BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(testBolt);
-			wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class));
+			wrapper.setup(createMockStreamTask(execConfig), new MockStreamConfig(), mock(Output.class));
 
 			wrapper.open();
 			for (Entry<String, String> entry : cfg.toMap().entrySet()) {
@@ -305,7 +305,7 @@ public class BoltWrapperTest extends AbstractTest {
 		final IRichBolt bolt = mock(IRichBolt.class);
 		BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
 
-		wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class));
+		wrapper.setup(createMockStreamTask(), new MockStreamConfig(), mock(Output.class));
 		wrapper.open();
 
 		verify(bolt).prepare(any(Map.class), any(TopologyContext.class), isNotNull(OutputCollector.class));
@@ -322,7 +322,7 @@ public class BoltWrapperTest extends AbstractTest {
 
 		final BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
 
-		wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class));
+		wrapper.setup(createMockStreamTask(), new MockStreamConfig(), mock(Output.class));
 
 		wrapper.close();
 		wrapper.dispose();
@@ -379,7 +379,7 @@ public class BoltWrapperTest extends AbstractTest {
 		final CloseableRegistry closeableRegistry = new CloseableRegistry();
 		StreamTask<?, ?> mockTask = mock(StreamTask.class);
 		when(mockTask.getCheckpointLock()).thenReturn(new Object());
-		when(mockTask.getConfiguration()).thenReturn(new StreamConfig(new Configuration()));
+		when(mockTask.getConfiguration()).thenReturn(new MockStreamConfig());
 		when(mockTask.getEnvironment()).thenReturn(env);
 		when(mockTask.getExecutionConfig()).thenReturn(execConfig);
 		when(mockTask.getCancelables()).thenReturn(closeableRegistry);

http://git-wip-us.apache.org/repos/asf/flink/blob/f04dfb59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index 1f42ccf..89c038f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -61,6 +61,8 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 
 	private final StreamConfig streamConfig;
 
+	private final String operatorUniqueID;
+
 	public StreamingRuntimeContext(AbstractStreamOperator<?> operator,
 									Environment env, Map<String, Accumulator<?, ?>> accumulators) {
 		super(env.getTaskInfo(),
@@ -73,6 +75,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 		this.operator = operator;
 		this.taskEnvironment = env;
 		this.streamConfig = new StreamConfig(env.getTaskConfiguration());
+		this.operatorUniqueID = operator.getOperatorID().toString();
 	}
 
 	// ------------------------------------------------------------------------
@@ -90,6 +93,18 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 		return operator.getProcessingTimeService();
 	}
 
+	/**
+	 * Returned value is guaranteed to be unique between operators within the same job and to be
+	 * stable and the same across job submissions.
+	 *
+	 * <p>This operation is currently only supported in Streaming (DataStream) contexts.
+	 *
+	 * @return String representation of the operator's unique id.
+	 */
+	public String getOperatorUniqueID() {
+		return operatorUniqueID;
+	}
+
 	// ------------------------------------------------------------------------
 	//  broadcast variables
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f04dfb59/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
index 84a45d8..cad3df8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -308,6 +309,11 @@ public class InputFormatSourceFunctionTest {
 			public ExecutionConfig getExecutionConfig() {
 				return new ExecutionConfig();
 			}
+
+			@Override
+			public OperatorID getOperatorID() {
+				return new OperatorID();
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f04dfb59/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/GetOperatorUniqueIDTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/GetOperatorUniqueIDTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/GetOperatorUniqueIDTest.java
new file mode 100644
index 0000000..9693e42
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/GetOperatorUniqueIDTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the uid translation to {@link org.apache.flink.runtime.jobgraph.OperatorID}.
+ */
+@SuppressWarnings("serial")
+public class GetOperatorUniqueIDTest extends TestLogger {
+
+	/**
+	 * If expected values ever change double check that the change is not braking the contract of
+	 * {@link StreamingRuntimeContext#getOperatorUniqueID()} being stable between job submissions.
+	 */
+	@Test
+	public void testGetOperatorUniqueID() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+
+		env.fromElements(1, 2, 3)
+			.map(new VerifyOperatorIDMapFunction("6c4f323f22da8fb6e34f80c61be7a689")).uid("42")
+			.map(new VerifyOperatorIDMapFunction("3e129e83691e7737fbf876b47452acbc")).uid("44");
+
+		env.execute();
+	}
+
+	private static class VerifyOperatorIDMapFunction extends AbstractRichFunction implements MapFunction<Integer, Integer> {
+		private static final long serialVersionUID = 6584823409744624276L;
+
+		private final String expectedOperatorUniqueID;
+
+		public VerifyOperatorIDMapFunction(String expectedOperatorUniqueID) {
+			this.expectedOperatorUniqueID = checkNotNull(expectedOperatorUniqueID);
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			assertEquals(expectedOperatorUniqueID, ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID());
+		}
+
+		@Override
+		public Integer map(Integer value) throws Exception {
+			return value;
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f04dfb59/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
index 87667b2..e04cedd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
@@ -41,6 +41,7 @@ import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.query.KvStateRegistry;
@@ -296,6 +297,7 @@ public class StreamingRuntimeContextTest {
 		}).when(keyedStateBackend).getPartitionedState(Matchers.any(), any(TypeSerializer.class), any(StateDescriptor.class));
 
 		when(operatorMock.getKeyedStateStore()).thenReturn(keyedStateStore);
+		when(operatorMock.getOperatorID()).thenReturn(new OperatorID());
 
 		return operatorMock;
 	}
@@ -333,6 +335,7 @@ public class StreamingRuntimeContextTest {
 		}).when(keyedStateBackend).getPartitionedState(Matchers.any(), any(TypeSerializer.class), any(ListStateDescriptor.class));
 
 		when(operatorMock.getKeyedStateStore()).thenReturn(keyedStateStore);
+		when(operatorMock.getOperatorID()).thenReturn(new OperatorID());
 		return operatorMock;
 	}
 
@@ -369,6 +372,7 @@ public class StreamingRuntimeContextTest {
 		}).when(keyedStateBackend).getPartitionedState(Matchers.any(), any(TypeSerializer.class), any(MapStateDescriptor.class));
 
 		when(operatorMock.getKeyedStateStore()).thenReturn(keyedStateStore);
+		when(operatorMock.getOperatorID()).thenReturn(new OperatorID());
 		return operatorMock;
 	}
 


[17/17] flink git commit: [FLINK-9295][kafka] Fix transactional.id collisions for FlinkKafkaProducer011

Posted by tz...@apache.org.
[FLINK-9295][kafka] Fix transactional.id collisions for FlinkKafkaProducer011

Previously if there were two completely independent FlinkKafkaProducer011 data sinks
in the job graph, their transactional.id would collide with one another. Fix is to
use operator's unique ID as well along task name and subtask id.

This change is backward compatible for recovering from older savepoints,
since transactional.ids generated by the old generator still will be used
after restoring from state.

This closes #5977.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d0cd584
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d0cd584
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d0cd584

Branch: refs/heads/release-1.5
Commit: 9d0cd5848d2eb620263dbb65c8ceb611fa440875
Parents: f04dfb5
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Tue May 8 17:49:31 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 22 16:54:40 2018 +0800

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer011.java |  2 +-
 .../kafka/FlinkKafkaProducer011ITCase.java      |  4 +-
 .../Kafka011ProducerExactlyOnceITCase.java      |  6 +++
 .../connectors/kafka/KafkaProducerTestBase.java | 55 +++++++++++---------
 .../util/AbstractStreamOperatorTestHarness.java | 31 +++++++++--
 .../util/OneInputStreamOperatorTestHarness.java | 17 ++++--
 6 files changed, 81 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9d0cd584/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 0ae5e03b..8497372 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -837,7 +837,7 @@ public class FlinkKafkaProducer011<IN>
 		nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(
 			NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
 		transactionalIdsGenerator = new TransactionalIdsGenerator(
-			getRuntimeContext().getTaskName(),
+			getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
 			getRuntimeContext().getIndexOfThisSubtask(),
 			getRuntimeContext().getNumberOfParallelSubtasks(),
 			kafkaProducersPoolSize,

http://git-wip-us.apache.org/repos/asf/flink/blob/9d0cd584/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index 36cb362..74c58ad 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -627,7 +628,8 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
 			maxParallelism,
 			parallelism,
 			subtaskIndex,
-			IntSerializer.INSTANCE);
+			IntSerializer.INSTANCE,
+			new OperatorID(42, 44));
 	}
 
 	private Properties createProperties() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9d0cd584/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
index 1167238..5038b7f 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.junit.BeforeClass;
+import org.junit.Test;
 
 /**
  * IT cases for the {@link FlinkKafkaProducer011}.
@@ -48,4 +49,9 @@ public class Kafka011ProducerExactlyOnceITCase extends KafkaProducerTestBase {
 		// that it doesn't work well with some weird network failures, or the NetworkFailureProxy is a broken design
 		// and this test should be reimplemented in completely different way...
 	}
+
+	@Test
+	public void testMultipleSinkOperators() throws Exception {
+		testExactlyOnce(false, 2);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9d0cd584/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 5023a7e..0807eb4 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -303,7 +303,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 	 */
 	@Test
 	public void testExactlyOnceRegularSink() throws Exception {
-		testExactlyOnce(true);
+		testExactlyOnce(true, 1);
 	}
 
 	/**
@@ -311,20 +311,22 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 	 */
 	@Test
 	public void testExactlyOnceCustomOperator() throws Exception {
-		testExactlyOnce(false);
+		testExactlyOnce(false, 1);
 	}
 
 	/**
 	 * This test sets KafkaProducer so that it will  automatically flush the data and
 	 * and fails the broker to check whether flushed records since last checkpoint were not duplicated.
 	 */
-	protected void testExactlyOnce(boolean regularSink) throws Exception {
-		final String topic = regularSink ? "exactlyOnceTopicRegularSink" : "exactlyTopicCustomOperator";
+	protected void testExactlyOnce(boolean regularSink, int sinksCount) throws Exception {
+		final String topic = (regularSink ? "exactlyOnceTopicRegularSink" : "exactlyTopicCustomOperator") + sinksCount;
 		final int partition = 0;
 		final int numElements = 1000;
 		final int failAfterElements = 333;
 
-		createTestTopic(topic, 1, 1);
+		for (int i = 0; i < sinksCount; i++) {
+			createTestTopic(topic + i, 1, 1);
+		}
 
 		TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
 		KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema);
@@ -346,32 +348,35 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 			.addSource(new IntegerSource(numElements))
 			.map(new FailingIdentityMapper<Integer>(failAfterElements));
 
-		FlinkKafkaPartitioner<Integer> partitioner = new FlinkKafkaPartitioner<Integer>() {
-			@Override
-			public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
-				return partition;
+		for (int i = 0; i < sinksCount; i++) {
+			FlinkKafkaPartitioner<Integer> partitioner = new FlinkKafkaPartitioner<Integer>() {
+				@Override
+				public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+					return partition;
+				}
+			};
+
+			if (regularSink) {
+				StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic + i, keyedSerializationSchema, properties, partitioner);
+				inputStream.addSink(kafkaSink.getUserFunction());
+			} else {
+				kafkaServer.produceIntoKafka(inputStream, topic + i, keyedSerializationSchema, properties, partitioner);
 			}
-		};
-		if (regularSink) {
-			StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, partitioner);
-			inputStream.addSink(kafkaSink.getUserFunction());
-		}
-		else {
-			kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, partitioner);
 		}
 
 		FailingIdentityMapper.failedBefore = false;
 		TestUtils.tryExecute(env, "Exactly once test");
 
-		// assert that before failure we successfully snapshot/flushed all expected elements
-		assertExactlyOnceForTopic(
-			properties,
-			topic,
-			partition,
-			expectedElements,
-			KAFKA_READ_TIMEOUT);
-
-		deleteTestTopic(topic);
+		for (int i = 0; i < sinksCount; i++) {
+			// assert that before failure we successfully snapshot/flushed all expected elements
+			assertExactlyOnceForTopic(
+				properties,
+				topic + i,
+				partition,
+				expectedElements,
+				KAFKA_READ_TIMEOUT);
+			deleteTestTopic(topic + i);
+		}
 	}
 
 	private List<Integer> getIntegersSequence(int size) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9d0cd584/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 26ad3ab..0c4ecc0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -147,19 +147,42 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 				.setParallelism(parallelism)
 				.setSubtaskIndex(subtaskIndex)
 				.build(),
-			true);
+			true,
+			new OperatorID());
+	}
+
+	public AbstractStreamOperatorTestHarness(
+			StreamOperator<OUT> operator,
+			int maxParallelism,
+			int parallelism,
+			int subtaskIndex,
+			OperatorID operatorID) throws Exception {
+		this(
+			operator,
+			new MockEnvironmentBuilder()
+				.setTaskName("MockTask")
+				.setMemorySize(3 * 1024 * 1024)
+				.setInputSplitProvider(new MockInputSplitProvider())
+				.setBufferSize(1024)
+				.setMaxParallelism(maxParallelism)
+				.setParallelism(parallelism)
+				.setSubtaskIndex(subtaskIndex)
+				.build(),
+			true,
+			operatorID);
 	}
 
 	public AbstractStreamOperatorTestHarness(
 			StreamOperator<OUT> operator,
 			MockEnvironment env) throws Exception {
-		this(operator, env, false);
+		this(operator, env, false, new OperatorID());
 	}
 
 	private AbstractStreamOperatorTestHarness(
 			StreamOperator<OUT> operator,
 			MockEnvironment env,
-			boolean environmentIsInternal) throws Exception {
+			boolean environmentIsInternal,
+			OperatorID operatorID) throws Exception {
 		this.operator = operator;
 		this.outputList = new ConcurrentLinkedQueue<>();
 		this.sideOutputLists = new HashMap<>();
@@ -167,7 +190,7 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 		Configuration underlyingConfig = env.getTaskConfiguration();
 		this.config = new StreamConfig(underlyingConfig);
 		this.config.setCheckpointingEnabled(true);
-		this.config.setOperatorID(new OperatorID());
+		this.config.setOperatorID(operatorID);
 		this.executionConfig = env.getExecutionConfig();
 		this.closableRegistry = new CloseableRegistry();
 		this.checkpointLock = new Object();

http://git-wip-us.apache.org/repos/asf/flink/blob/9d0cd584/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 66d2f69..0155198 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -54,8 +55,9 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
 		int maxParallelism,
 		int parallelism,
 		int subtaskIndex,
-		TypeSerializer<IN> typeSerializerIn) throws Exception {
-		this(operator, maxParallelism, parallelism, subtaskIndex);
+		TypeSerializer<IN> typeSerializerIn,
+		OperatorID operatorID) throws Exception {
+		this(operator, maxParallelism, parallelism, subtaskIndex, operatorID);
 
 		config.setTypeSerializerIn1(Preconditions.checkNotNull(typeSerializerIn));
 	}
@@ -78,7 +80,16 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
 			int maxParallelism,
 			int parallelism,
 			int subtaskIndex) throws Exception {
-		super(operator, maxParallelism, parallelism, subtaskIndex);
+		this(operator, maxParallelism, parallelism, subtaskIndex, new OperatorID());
+	}
+
+	public OneInputStreamOperatorTestHarness(
+			OneInputStreamOperator<IN, OUT> operator,
+			int maxParallelism,
+			int parallelism,
+			int subtaskIndex,
+			OperatorID operatorID) throws Exception {
+		super(operator, maxParallelism, parallelism, subtaskIndex, operatorID);
 
 		this.oneInputOperator = operator;
 	}


[04/17] flink git commit: [FLINK-9320] [e2e] Update test_ha e2e to use general purpose DataStream job

Posted by tz...@apache.org.
[FLINK-9320] [e2e] Update test_ha e2e to use general purpose DataStream job

This closes #5990.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d7ec5a93
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d7ec5a93
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d7ec5a93

Branch: refs/heads/release-1.5
Commit: d7ec5a9394713c5de1a2846ce96bd013ed23c53a
Parents: aec4496
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri May 11 15:09:00 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 22 16:49:17 2018 +0800

----------------------------------------------------------------------
 flink-end-to-end-tests/run-nightly-tests.sh    | 17 ++++++++-
 flink-end-to-end-tests/test-scripts/common.sh  |  1 -
 flink-end-to-end-tests/test-scripts/test_ha.sh | 40 +++++++++++++--------
 3 files changed, 41 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d7ec5a93/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index bd91bb2..4cfd778 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -48,7 +48,22 @@ EXIT_CODE=0
 
 
 if [ $EXIT_CODE == 0 ]; then
-  run_test "HA end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh"
+  run_test "Running HA (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh file true false"
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  run_test "Running HA (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh file false false"
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  run_test "Running HA (rocks, non-incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh rocks true false"
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  run_test "Running HA (rocks, incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh rocks true true"
   EXIT_CODE=$?
 fi
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d7ec5a93/flink-end-to-end-tests/test-scripts/common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index 1db5dd2..2d0f13e 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -97,7 +97,6 @@ function create_ha_config() {
     jobmanager.heap.mb: 1024
     taskmanager.heap.mb: 1024
     taskmanager.numberOfTaskSlots: 4
-    parallelism.default: 1
 
     #==============================================================================
     # High Availability

http://git-wip-us.apache.org/repos/asf/flink/blob/d7ec5a93/flink-end-to-end-tests/test-scripts/test_ha.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_ha.sh b/flink-end-to-end-tests/test-scripts/test_ha.sh
index 2e65504..6d94c03 100755
--- a/flink-end-to-end-tests/test-scripts/test_ha.sh
+++ b/flink-end-to-end-tests/test-scripts/test_ha.sh
@@ -19,7 +19,7 @@
 
 source "$(dirname "$0")"/common.sh
 
-TEST_PROGRAM_JAR=$FLINK_DIR/examples/streaming/StateMachineExample.jar\ --error-rate\ 0.0\ --sleep\ 2
+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
 
 JM_WATCHDOG_PID=0
 TM_WATCHDOG_PID=0
@@ -48,12 +48,12 @@ function stop_cluster_and_watchdog() {
 }
 
 function verify_logs() {
-    local OUTPUT=$1
-    local JM_FAILURES=$2
+    local OUTPUT=$FLINK_DIR/log/*.out
+    local JM_FAILURES=$1
 
     # verify that we have no alerts
     if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
-        echo "FAILURE: Alerts found at the StateMachineExample with 0.0 error rate."
+        echo "FAILURE: Alerts found at the general purpose DataStream job."
         PASS=""
     fi
 
@@ -151,7 +151,6 @@ function run_ha_test() {
     local BACKEND=$2
     local ASYNC=$3
     local INCREM=$4
-    local OUTPUT=$5
 
     local JM_KILLS=3
     local CHECKPOINT_DIR="${TEST_DATA_DIR}/checkpoints/"
@@ -166,11 +165,19 @@ function run_ha_test() {
     # submit a job in detached mode and let it run
     local JOB_ID=$($FLINK_DIR/bin/flink run -d -p ${PARALLELISM} \
      $TEST_PROGRAM_JAR \
-        --backend ${BACKEND} \
-        --checkpoint-dir "file://${CHECKPOINT_DIR}" \
-        --async-checkpoints ${ASYNC} \
-        --incremental-checkpoints ${INCREM} \
-        --output ${OUTPUT} | grep "Job has been submitted with JobID" | sed 's/.* //g')
+        --environment.parallelism ${PARALLELISM} \
+        --test.semantics exactly-once \
+        --test.simulate_failure true \
+        --test.simulate_failure.num_records 200 \
+        --test.simulate_failure.num_checkpoints 1 \
+        --test.simulate_failure.max_failures 20 \
+        --state_backend ${BACKEND} \
+        --state_backend.checkpoint_directory "file://${CHECKPOINT_DIR}" \
+        --state_backend.file.async ${ASYNC} \
+        --state_backend.rocks.incremental ${INCREM} \
+        --sequence_generator_source.sleep_time 15 \
+        --sequence_generator_source.sleep_after_elements 1 \
+        | grep "Job has been submitted with JobID" | sed 's/.* //g')
 
     wait_job_running ${JOB_ID}
 
@@ -196,14 +203,17 @@ function run_ha_test() {
         sleep 60
     done
 
-    verify_logs ${OUTPUT} ${JM_KILLS}
+    verify_logs ${JM_KILLS}
 
     # kill the cluster and zookeeper
     stop_cluster_and_watchdog
 }
 
+trap stop_cluster_and_watchdog INT
 trap stop_cluster_and_watchdog EXIT
-run_ha_test 4 "file" "false" "false" "${TEST_DATA_DIR}/output.txt"
-run_ha_test 4 "rocks" "false" "false" "${TEST_DATA_DIR}/output.txt"
-run_ha_test 4 "file" "true" "false" "${TEST_DATA_DIR}/output.txt"
-run_ha_test 4 "rocks" "false" "true" "${TEST_DATA_DIR}/output.txt"
+
+STATE_BACKEND_TYPE=${1:-file}
+STATE_BACKEND_FILE_ASYNC=${2:-true}
+STATE_BACKEND_ROCKS_INCREMENTAL=${3:-false}
+
+run_ha_test 4 ${STATE_BACKEND_TYPE} ${STATE_BACKEND_FILE_ASYNC} ${STATE_BACKEND_ROCKS_INCREMENTAL}


[13/17] flink git commit: [hotfix] [e2e] Properly use run_test utility for local recovery and quickstart tests

Posted by tz...@apache.org.
[hotfix] [e2e] Properly use run_test utility for local recovery and quickstart tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7b680c9a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7b680c9a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7b680c9a

Branch: refs/heads/release-1.5
Commit: 7b680c9a8bcf0a30d08647b926cb066f8143663a
Parents: 7c6d5af
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Tue May 22 15:58:48 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 22 16:54:17 2018 +0800

----------------------------------------------------------------------
 flink-end-to-end-tests/run-nightly-tests.sh | 10 ++--------
 1 file changed, 2 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7b680c9a/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 2a15e6d..a79754c 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -184,18 +184,12 @@ if [ $EXIT_CODE == 0 ]; then
 fi
 
 if [ $EXIT_CODE == 0 ]; then
-  printf "\n==============================================================================\n"
-  printf "Running local recovery and sticky scheduling nightly end-to-end test\n"
-  printf "==============================================================================\n"
-  $END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh
+  run_test "Local recovery and sticky scheduling nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh"
   EXIT_CODE=$?
 fi
 
 if [ $EXIT_CODE == 0 ]; then
-  printf "\n==============================================================================\n"
-  printf "Running Quickstarts nightly end-to-end test\n"
-  printf "==============================================================================\n"
-  $END_TO_END_DIR/test-scripts/test_quickstarts.sh
+  run_test "Quickstarts nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh"
   EXIT_CODE=$?
 fi