You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/05/22 08:56:06 UTC

[07/17] flink git commit: [FLINK-8989] [e2eTests] Elasticsearch1&2&5 end to end test

[FLINK-8989] [e2eTests] Elasticsearch1&2&5 end to end test


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7abfcb2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7abfcb2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7abfcb2

Branch: refs/heads/release-1.5
Commit: a7abfcb278d2ef35c3b730c5d238cf32c6094674
Parents: 7f9e4c0
Author: zhangminglei <zm...@163.com>
Authored: Tue May 22 09:23:13 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 22 16:50:26 2018 +0800

----------------------------------------------------------------------
 .../examples/ElasticsearchSinkExample.java      |  85 -----------
 .../examples/ElasticsearchSinkExample.java      |  81 ----------
 .../examples/ElasticsearchSinkExample.java      |  83 -----------
 .../flink-elasticsearch1-test/pom.xml           | 117 +++++++++++++++
 .../tests/Elasticsearch1SinkExample.java        |  93 ++++++++++++
 .../flink-elasticsearch2-test/pom.xml           | 135 +++++++++++++++++
 .../tests/Elasticsearch2SinkExample.java        |  92 ++++++++++++
 .../flink-elasticsearch5-test/pom.xml           | 148 +++++++++++++++++++
 .../tests/Elasticsearch5SinkExample.java        |  92 ++++++++++++
 flink-end-to-end-tests/pom.xml                  |   3 +
 .../test-scripts/elasticsearch-common.sh        |  62 ++++++++
 .../test_streaming_elasticsearch125.sh          | 109 ++++++++++++++
 12 files changed, 851 insertions(+), 249 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
deleted file mode 100644
index 8a0321d..0000000
--- a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.elasticsearch.examples;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
-
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.common.transport.TransportAddress;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
- * you have a cluster named "elasticsearch" running or change the cluster name in the config map.
- */
-@SuppressWarnings("serial")
-public class ElasticsearchSinkExample {
-
-	public static void main(String[] args) throws Exception {
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
-			@Override
-			public String map(Long value) throws Exception {
-				return "message #" + value;
-			}
-		});
-
-		Map<String, String> userConfig = new HashMap<>();
-		userConfig.put("cluster.name", "elasticsearch");
-		// This instructs the sink to emit after every element, otherwise they would be buffered
-		userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-		List<TransportAddress> transports = new ArrayList<>();
-		transports.add(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-		source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() {
-			@Override
-			public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
-				indexer.add(createIndexRequest(element));
-			}
-		}));
-
-		env.execute("Elasticsearch Sink Example");
-	}
-
-	private static IndexRequest createIndexRequest(String element) {
-		Map<String, Object> json = new HashMap<>();
-		json.put("data", element);
-
-		return Requests.indexRequest()
-			.index("my-index")
-			.type("my-type")
-			.id(element)
-			.source(json);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java
deleted file mode 100644
index c963927..0000000
--- a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.elasticsearch2.examples;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
-
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
- * you have a cluster named "elasticsearch" running or change the name of cluster in the config map.
- */
-public class ElasticsearchSinkExample {
-
-	public static void main(String[] args) throws Exception {
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
-			@Override
-			public String map(Long value) throws Exception {
-				return "message #" + value;
-			}
-		});
-
-		Map<String, String> userConfig = new HashMap<>();
-		userConfig.put("cluster.name", "elasticsearch");
-		// This instructs the sink to emit after every element, otherwise they would be buffered
-		userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-		List<InetSocketAddress> transports = new ArrayList<>();
-		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-		source.addSink(new ElasticsearchSink<>(userConfig, transports, new org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<String>(){
-			@Override
-			public void process(String element, RuntimeContext ctx, org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer indexer) {
-				indexer.add(createIndexRequest(element));
-			}
-		}));
-
-		env.execute("Elasticsearch Sink Example");
-	}
-
-	private static IndexRequest createIndexRequest(String element) {
-		Map<String, Object> json = new HashMap<>();
-		json.put("data", element);
-
-		return Requests.indexRequest()
-			.index("my-index")
-			.type("my-type")
-			.id(element)
-			.source(json);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java
deleted file mode 100644
index 22c1053..0000000
--- a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.elasticsearch5.examples;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
-import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
-
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
- * you have a cluster named "elasticsearch" running or change the name of cluster in the config map.
- */
-public class ElasticsearchSinkExample {
-
-	public static void main(String[] args) throws Exception {
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
-			@Override
-			public String map(Long value) throws Exception {
-				return "message #" + value;
-			}
-		});
-
-		Map<String, String> userConfig = new HashMap<>();
-		userConfig.put("cluster.name", "elasticsearch");
-		// This instructs the sink to emit after every element, otherwise they would be buffered
-		userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-		List<InetSocketAddress> transports = new ArrayList<>();
-		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-		source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() {
-			@Override
-			public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
-				indexer.add(createIndexRequest(element));
-			}
-		}));
-
-		env.execute("Elasticsearch Sink Example");
-	}
-
-	private static IndexRequest createIndexRequest(String element) {
-		Map<String, Object> json = new HashMap<>();
-		json.put("data", element);
-
-		return Requests.indexRequest()
-			.index("my-index")
-			.type("my-type")
-			.id(element)
-			.source(json);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml b/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml
new file mode 100644
index 0000000..1960f05
--- /dev/null
+++ b/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+	<parent>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.5-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-elasticsearch1-test_${scala.binary.version}</artifactId>
+	<name>flink-elasticsearch1-test</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>3.0.0</version>
+				<executions>
+					<!-- Elasticsearch1Sink end to end example -->
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<minimizeJar>true</minimizeJar>
+							<artifactSet>
+								<excludes>
+									<exclude>com.google.code.findbugs:jsr305</exclude>
+									<exclude>org.slf4j:*</exclude>
+									<exclude>log4j:*</exclude>
+								</excludes>
+							</artifactSet>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.apache.flink.streaming.tests.Elasticsearch1SinkExample</mainClass>
+								</transformer>
+							</transformers>
+							<filters>
+								<filter>
+									<artifact>*:*</artifact>
+									<excludes>
+										<exclude>META-INF/*.SF</exclude>
+										<exclude>META-INF/*.DSA</exclude>
+										<exclude>META-INF/*.RSA</exclude>
+									</excludes>
+								</filter>
+							</filters>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!--simplify the name of the testing JAR for referring to it in test_streaming_elasticsearch1.sh scripts-->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-antrun-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<execution>
+						<id>rename</id>
+						<phase>package</phase>
+						<goals>
+							<goal>run</goal>
+						</goals>
+						<configuration>
+							<target>
+								<copy file="${project.basedir}/target/flink-elasticsearch1-test_${scala.binary.version}-${project.version}.jar" tofile="${project.basedir}/target/Elasticsearch1SinkExample.jar" />
+							</target>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
new file mode 100644
index 0000000..bfdb806
--- /dev/null
+++ b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * End to end test for Elasticsearch1Sink.
+ */
+public class Elasticsearch1SinkExample {
+	public static void main(String[] args) throws Exception {
+
+		final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+		if (parameterTool.getNumberOfParameters() < 2) {
+			System.out.println("Missing parameters!\n" +
+				"Usage: --index <index> --type <type>");
+			return;
+		}
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().disableSysoutLogging();
+		env.enableCheckpointing(5000);
+
+		DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
+			@Override
+			public String map(Long value) throws Exception {
+				return "message # " + value;
+			}
+		});
+
+		Map<String, String> userConfig = new HashMap<>();
+		userConfig.put("cluster.name", "elasticsearch");
+		// This instructs the sink to emit after every element, otherwise they would be buffered
+		userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+		List<TransportAddress> transports = new ArrayList<>();
+		transports.add(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+		source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() {
+			@Override
+			public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+				indexer.add(createIndexRequest(element, parameterTool));
+			}
+		}));
+
+		env.execute("Elasticsearch1.x end to end sink test example");
+	}
+
+	private static IndexRequest createIndexRequest(String element, ParameterTool parameterTool) {
+		Map<String, Object> json = new HashMap<>();
+		json.put("data", element);
+
+		return Requests.indexRequest()
+			.index(parameterTool.getRequired("index"))
+			.type(parameterTool.getRequired("type"))
+			.id(element)
+			.source(json);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml b/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml
new file mode 100644
index 0000000..4fd93de
--- /dev/null
+++ b/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<parent>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.5-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-elasticsearch2-test_${scala.binary.version}</artifactId>
+	<name>flink-elasticsearch2-test</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch2_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<!-- Remove elasticsearch1.7.1 -->
+			<exclusions>
+				<exclusion>
+					<groupId>org.elasticsearch</groupId>
+					<artifactId>elasticsearch</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<dependency>
+			<groupId>org.elasticsearch</groupId>
+			<artifactId>elasticsearch</artifactId>
+			<version>2.3.5</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>3.0.0</version>
+				<executions>
+					<!-- Elasticsearch2Sink end to end example -->
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<minimizeJar>true</minimizeJar>
+							<artifactSet>
+								<excludes>
+									<exclude>com.google.code.findbugs:jsr305</exclude>
+									<exclude>org.slf4j:*</exclude>
+									<exclude>log4j:*</exclude>
+								</excludes>
+							</artifactSet>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.apache.flink.streaming.tests.Elasticsearch2SinkExample</mainClass>
+								</transformer>
+							</transformers>
+							<filters>
+								<filter>
+									<artifact>*:*</artifact>
+									<excludes>
+										<exclude>META-INF/*.SF</exclude>
+										<exclude>META-INF/*.DSA</exclude>
+										<exclude>META-INF/*.RSA</exclude>
+									</excludes>
+								</filter>
+							</filters>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!--simplify the name of the testing JAR for referring to it in test_streaming_elasticsearch2.sh scripts-->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-antrun-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<execution>
+						<id>rename</id>
+						<phase>package</phase>
+						<goals>
+							<goal>run</goal>
+						</goals>
+						<configuration>
+							<target>
+								<copy file="${project.basedir}/target/flink-elasticsearch2-test_${scala.binary.version}-${project.version}.jar" tofile="${project.basedir}/target/Elasticsearch2SinkExample.jar" />
+							</target>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
new file mode 100644
index 0000000..4ec03aa
--- /dev/null
+++ b/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * End to end test for Elasticsearch2Sink.
+ */
+public class Elasticsearch2SinkExample {
+
+	public static void main(String[] args) throws Exception {
+
+		final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+		if (parameterTool.getNumberOfParameters() < 2) {
+			System.out.println("Missing parameters!\n" +
+				"Usage: --index <index> --type <type>");
+			return;
+		}
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().disableSysoutLogging();
+		env.enableCheckpointing(5000);
+
+		DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
+			@Override
+			public String map(Long value) throws Exception {
+				return "message #" + value;
+			}
+		});
+
+		Map<String, String> userConfig = new HashMap<>();
+		userConfig.put("cluster.name", "elasticsearch");
+		// This instructs the sink to emit after every element, otherwise they would be buffered
+		userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+		List<InetSocketAddress> transports = new ArrayList<>();
+		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+		source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>(){
+			@Override
+			public void process(String element, RuntimeContext ctx, org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer indexer) {
+				indexer.add(createIndexRequest(element, parameterTool));
+			}
+		}));
+
+		env.execute("Elasticsearch2.x end to end sink test example");
+	}
+
+	private static IndexRequest createIndexRequest(String element, ParameterTool parameterTool) {
+		Map<String, Object> json = new HashMap<>();
+		json.put("data", element);
+
+		return Requests.indexRequest()
+			.index(parameterTool.getRequired("index"))
+			.type(parameterTool.getRequired("type"))
+			.id(element)
+			.source(json);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml b/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml
new file mode 100644
index 0000000..3a1e734
--- /dev/null
+++ b/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml
@@ -0,0 +1,148 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<parent>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.5-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-elasticsearch5-test_${scala.binary.version}</artifactId>
+	<name>flink-elasticsearch5-test</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch5_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<!-- Remove elasticsearch1.7.1 -->
+				<exclusion>
+					<groupId>org.elasticsearch</groupId>
+					<artifactId>elasticsearch</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<!-- Dependency for Elasticsearch 5.x Java Client -->
+		<dependency>
+			<groupId>org.elasticsearch.client</groupId>
+			<artifactId>transport</artifactId>
+			<version>5.1.2</version>
+		</dependency>
+
+		<!--
+			Elasticsearch 5.x uses Log4j2 and no longer detects logging implementations, making
+			Log4j2 a strict dependency. The following is added so that the Log4j2 API in
+			Elasticsearch 5.x is routed to SLF4J. This way, user projects can remain flexible
+			in the logging implementation preferred.
+		-->
+
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-to-slf4j</artifactId>
+			<version>2.7</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>3.0.0</version>
+				<executions>
+					<!-- Elasticsearch5Sink end to end example -->
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<minimizeJar>true</minimizeJar>
+							<artifactSet>
+								<excludes>
+									<exclude>com.google.code.findbugs:jsr305</exclude>
+									<exclude>org.slf4j:*</exclude>
+									<exclude>log4j:*</exclude>
+								</excludes>
+							</artifactSet>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.apache.flink.streaming.tests.Elasticsearch5SinkExample</mainClass>
+								</transformer>
+							</transformers>
+							<filters>
+								<filter>
+									<artifact>*:*</artifact>
+									<excludes>
+										<exclude>META-INF/*.SF</exclude>
+										<exclude>META-INF/*.DSA</exclude>
+										<exclude>META-INF/*.RSA</exclude>
+									</excludes>
+								</filter>
+							</filters>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!--simplify the name of the testing JAR for referring to it in test_streaming_elasticsearch5.sh scripts-->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-antrun-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<execution>
+						<id>rename</id>
+						<phase>package</phase>
+						<goals>
+							<goal>run</goal>
+						</goals>
+						<configuration>
+							<target>
+								<copy file="${project.basedir}/target/flink-elasticsearch5-test_${scala.binary.version}-${project.version}.jar" tofile="${project.basedir}/target/Elasticsearch5SinkExample.jar" />
+							</target>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
new file mode 100644
index 0000000..285f902
--- /dev/null
+++ b/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * End to end test for Elasticsearch5Sink.
+ */
+public class Elasticsearch5SinkExample {
+	public static void main(String[] args) throws Exception {
+
+		final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+		if (parameterTool.getNumberOfParameters() < 2) {
+			System.out.println("Missing parameters!\n" +
+				"Usage: --index <index> --type <type>");
+			return;
+		}
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().disableSysoutLogging();
+		env.enableCheckpointing(5000);
+
+		DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
+			@Override
+			public String map(Long value) throws Exception {
+				return "message #" + value;
+			}
+		});
+
+		Map<String, String> userConfig = new HashMap<>();
+		userConfig.put("cluster.name", "elasticsearch");
+		// This instructs the sink to emit after every element, otherwise they would be buffered
+		userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+		List<InetSocketAddress> transports = new ArrayList<>();
+		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+		source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() {
+			@Override
+			public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+				indexer.add(createIndexRequest(element, parameterTool));
+			}
+		}));
+
+		env.execute("Elasticsearch5.x end to end sink test example");
+	}
+
+	private static IndexRequest createIndexRequest(String element, ParameterTool parameterTool) {
+		Map<String, Object> json = new HashMap<>();
+		json.put("data", element);
+
+		return Requests.indexRequest()
+			.index(parameterTool.getRequired("index"))
+			.type(parameterTool.getRequired("type"))
+			.id(element)
+			.source(json);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 45b63f0..04b8532 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -43,6 +43,9 @@ under the License.
 		<module>flink-high-parallelism-iterations-test</module>
 		<module>flink-stream-stateful-job-upgrade-test</module>
 		<module>flink-local-recovery-and-allocation-test</module>
+		<module>flink-elasticsearch1-test</module>
+		<module>flink-elasticsearch2-test</module>
+		<module>flink-elasticsearch5-test</module>
 	</modules>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
new file mode 100644
index 0000000..3fda344
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
@@ -0,0 +1,62 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+set -o pipefail
+
+if [[ -z $TEST_DATA_DIR ]]; then
+  echo "Must run common.sh before kafka-common.sh."
+  exit 1
+fi
+
+function verify_elasticsearch_process_exist {
+    ELASTICSEARCH_PROCESS=$(jps | grep Elasticsearch | awk '{print $2}')
+
+    # make sure the elasticsearch node is actually running
+    if [ "$ELASTICSEARCH_PROCESS" != "Elasticsearch" ]; then
+      echo "Elasticsearch node is not running."
+      PASS=""
+      exit 1
+    else
+      echo "Elasticsearch node is running."
+    fi
+}
+
+function verify_result {
+    if [ -f "$TEST_DATA_DIR/output" ]; then
+        rm $TEST_DATA_DIR/output
+    fi
+
+    curl 'localhost:9200/index/_search?q=*&pretty&size=21' > $TEST_DATA_DIR/output
+
+    if [ -n "$(grep '\"total\" : 21' $TEST_DATA_DIR/output)" ]; then
+        echo "Elasticsearch end to end test pass."
+    else
+        echo "Elasticsearch end to end test failed."
+        PASS=""
+        exit 1
+    fi
+}
+
+function shutdown_elasticsearch_cluster {
+   pid=$(jps | grep Elasticsearch | awk '{print $1}')
+   kill -SIGTERM $pid
+
+   # make sure to run regular cleanup as well
+   cleanup
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh
new file mode 100755
index 0000000..dea3f13
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh
@@ -0,0 +1,109 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/elasticsearch-common.sh
+
+mkdir -p $TEST_DATA_DIR
+
+ELASTICSEARCH1_URL="https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz"
+ELASTICSEARCH2_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz"
+ELASTICSEARCH5_URL="https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz"
+
+# start downloading elasticsearch1
+echo "Downloading Elasticsearch1 from $ELASTICSEARCH1_URL"
+curl "$ELASTICSEARCH1_URL" > $TEST_DATA_DIR/elasticsearch1.tar.gz
+
+tar xzf $TEST_DATA_DIR/elasticsearch1.tar.gz -C $TEST_DATA_DIR/
+ELASTICSEARCH1_DIR=$TEST_DATA_DIR/elasticsearch-1.7.1
+
+# start elasticsearch1 cluster
+$ELASTICSEARCH1_DIR/bin/elasticsearch -daemon
+
+verify_elasticsearch_process_exist
+
+start_cluster
+
+TEST_ES1_JAR=$TEST_DATA_DIR/../../flink-elasticsearch1-test/target/Elasticsearch1SinkExample.jar
+
+# run the Flink job
+$FLINK_DIR/bin/flink run -p 1 $TEST_ES1_JAR \
+  --index index \
+  --type type
+
+verify_result
+
+shutdown_elasticsearch_cluster
+
+mkdir -p $TEST_DATA_DIR
+
+# start downloading elasticsearch2
+echo "Downloading Elasticsearch2 from $ELASTICSEARCH2_URL"
+curl "$ELASTICSEARCH2_URL" > $TEST_DATA_DIR/elasticsearch2.tar.gz
+
+tar xzf $TEST_DATA_DIR/elasticsearch2.tar.gz -C $TEST_DATA_DIR/
+ELASTICSEARCH2_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5
+
+# start elasticsearch cluster, different from elasticsearch1 since using -daemon here will hang the shell.
+nohup $ELASTICSEARCH2_DIR/bin/elasticsearch &
+
+verify_elasticsearch_process_exist
+
+start_cluster
+
+TEST_ES2_JAR=$TEST_DATA_DIR/../../flink-elasticsearch2-test/target/Elasticsearch2SinkExample.jar
+
+# run the Flink job
+$FLINK_DIR/bin/flink run -p 1 $TEST_ES2_JAR \
+  --index index \
+  --type type
+
+verify_result
+
+shutdown_elasticsearch_cluster
+
+mkdir -p $TEST_DATA_DIR
+
+# start downloading elasticsearch5
+echo "Downloading Elasticsearch5 from $ELASTICSEARCH5_URL"
+curl "$ELASTICSEARCH5_URL" > $TEST_DATA_DIR/elasticsearch5.tar.gz
+
+tar xzf $TEST_DATA_DIR/elasticsearch5.tar.gz -C $TEST_DATA_DIR/
+ELASTICSEARCH5_DIR=$TEST_DATA_DIR/elasticsearch-5.1.2
+
+# start elasticsearch cluster, different from elasticsearch1 since using -daemon here will hang the shell.
+nohup $ELASTICSEARCH5_DIR/bin/elasticsearch &
+
+verify_elasticsearch_process_exist
+
+start_cluster
+
+TEST_ES5_JAR=$TEST_DATA_DIR/../../flink-elasticsearch5-test/target/Elasticsearch5SinkExample.jar
+
+# run the Flink job
+$FLINK_DIR/bin/flink run -p 1 $TEST_ES5_JAR \
+  --index index \
+  --type type
+
+verify_result
+
+rm -rf $FLINK_DIR/log/* 2> /dev/null
+
+trap shutdown_elasticsearch_cluster INT
+trap shutdown_elasticsearch_cluster EXIT