You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/05/22 08:56:06 UTC
[07/17] flink git commit: [FLINK-8989] [e2eTests] Elasticsearch1&2&5
end to end test
[FLINK-8989] [e2eTests] Elasticsearch1&2&5 end to end test
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7abfcb2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7abfcb2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7abfcb2
Branch: refs/heads/release-1.5
Commit: a7abfcb278d2ef35c3b730c5d238cf32c6094674
Parents: 7f9e4c0
Author: zhangminglei <zm...@163.com>
Authored: Tue May 22 09:23:13 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 22 16:50:26 2018 +0800
----------------------------------------------------------------------
.../examples/ElasticsearchSinkExample.java | 85 -----------
.../examples/ElasticsearchSinkExample.java | 81 ----------
.../examples/ElasticsearchSinkExample.java | 83 -----------
.../flink-elasticsearch1-test/pom.xml | 117 +++++++++++++++
.../tests/Elasticsearch1SinkExample.java | 93 ++++++++++++
.../flink-elasticsearch2-test/pom.xml | 135 +++++++++++++++++
.../tests/Elasticsearch2SinkExample.java | 92 ++++++++++++
.../flink-elasticsearch5-test/pom.xml | 148 +++++++++++++++++++
.../tests/Elasticsearch5SinkExample.java | 92 ++++++++++++
flink-end-to-end-tests/pom.xml | 3 +
.../test-scripts/elasticsearch-common.sh | 62 ++++++++
.../test_streaming_elasticsearch125.sh | 109 ++++++++++++++
12 files changed, 851 insertions(+), 249 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
deleted file mode 100644
index 8a0321d..0000000
--- a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.elasticsearch.examples;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
-
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.common.transport.TransportAddress;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
- * you have a cluster named "elasticsearch" running or change the cluster name in the config map.
- */
-@SuppressWarnings("serial")
-public class ElasticsearchSinkExample {
-
- public static void main(String[] args) throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
- @Override
- public String map(Long value) throws Exception {
- return "message #" + value;
- }
- });
-
- Map<String, String> userConfig = new HashMap<>();
- userConfig.put("cluster.name", "elasticsearch");
- // This instructs the sink to emit after every element, otherwise they would be buffered
- userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
- List<TransportAddress> transports = new ArrayList<>();
- transports.add(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
- source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() {
- @Override
- public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
- indexer.add(createIndexRequest(element));
- }
- }));
-
- env.execute("Elasticsearch Sink Example");
- }
-
- private static IndexRequest createIndexRequest(String element) {
- Map<String, Object> json = new HashMap<>();
- json.put("data", element);
-
- return Requests.indexRequest()
- .index("my-index")
- .type("my-type")
- .id(element)
- .source(json);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java
deleted file mode 100644
index c963927..0000000
--- a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.elasticsearch2.examples;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
-
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
- * you have a cluster named "elasticsearch" running or change the name of cluster in the config map.
- */
-public class ElasticsearchSinkExample {
-
- public static void main(String[] args) throws Exception {
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
- @Override
- public String map(Long value) throws Exception {
- return "message #" + value;
- }
- });
-
- Map<String, String> userConfig = new HashMap<>();
- userConfig.put("cluster.name", "elasticsearch");
- // This instructs the sink to emit after every element, otherwise they would be buffered
- userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
- List<InetSocketAddress> transports = new ArrayList<>();
- transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
- source.addSink(new ElasticsearchSink<>(userConfig, transports, new org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<String>(){
- @Override
- public void process(String element, RuntimeContext ctx, org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer indexer) {
- indexer.add(createIndexRequest(element));
- }
- }));
-
- env.execute("Elasticsearch Sink Example");
- }
-
- private static IndexRequest createIndexRequest(String element) {
- Map<String, Object> json = new HashMap<>();
- json.put("data", element);
-
- return Requests.indexRequest()
- .index("my-index")
- .type("my-type")
- .id(element)
- .source(json);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java
deleted file mode 100644
index 22c1053..0000000
--- a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.elasticsearch5.examples;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
-import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
-
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
- * you have a cluster named "elasticsearch" running or change the name of cluster in the config map.
- */
-public class ElasticsearchSinkExample {
-
- public static void main(String[] args) throws Exception {
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
- @Override
- public String map(Long value) throws Exception {
- return "message #" + value;
- }
- });
-
- Map<String, String> userConfig = new HashMap<>();
- userConfig.put("cluster.name", "elasticsearch");
- // This instructs the sink to emit after every element, otherwise they would be buffered
- userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
- List<InetSocketAddress> transports = new ArrayList<>();
- transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
- source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() {
- @Override
- public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
- indexer.add(createIndexRequest(element));
- }
- }));
-
- env.execute("Elasticsearch Sink Example");
- }
-
- private static IndexRequest createIndexRequest(String element) {
- Map<String, Object> json = new HashMap<>();
- json.put("data", element);
-
- return Requests.indexRequest()
- .index("my-index")
- .type("my-type")
- .id(element)
- .source(json);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml b/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml
new file mode 100644
index 0000000..1960f05
--- /dev/null
+++ b/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <artifactId>flink-end-to-end-tests</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.5-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-elasticsearch1-test_${scala.binary.version}</artifactId>
+ <name>flink-elasticsearch1-test</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-elasticsearch_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <!-- Elasticsearch1Sink end to end example -->
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <minimizeJar>true</minimizeJar>
+ <artifactSet>
+ <excludes>
+ <exclude>com.google.code.findbugs:jsr305</exclude>
+ <exclude>org.slf4j:*</exclude>
+ <exclude>log4j:*</exclude>
+ </excludes>
+ </artifactSet>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.flink.streaming.tests.Elasticsearch1SinkExample</mainClass>
+ </transformer>
+ </transformers>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!--simplify the name of the testing JAR for referring to it in test_streaming_elasticsearch1.sh scripts-->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <execution>
+ <id>rename</id>
+ <phase>package</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <target>
+ <copy file="${project.basedir}/target/flink-elasticsearch1-test_${scala.binary.version}-${project.version}.jar" tofile="${project.basedir}/target/Elasticsearch1SinkExample.jar" />
+ </target>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
new file mode 100644
index 0000000..bfdb806
--- /dev/null
+++ b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * End to end test for Elasticsearch1Sink.
+ */
+public class Elasticsearch1SinkExample {
+ public static void main(String[] args) throws Exception {
+
+ final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+ if (parameterTool.getNumberOfParameters() < 2) {
+ System.out.println("Missing parameters!\n" +
+ "Usage: --index <index> --type <type>");
+ return;
+ }
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
+ env.enableCheckpointing(5000);
+
+ DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
+ @Override
+ public String map(Long value) throws Exception {
+ return "message # " + value;
+ }
+ });
+
+ Map<String, String> userConfig = new HashMap<>();
+ userConfig.put("cluster.name", "elasticsearch");
+ // This instructs the sink to emit after every element, otherwise they would be buffered
+ userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+ List<TransportAddress> transports = new ArrayList<>();
+ transports.add(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+ source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() {
+ @Override
+ public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+ indexer.add(createIndexRequest(element, parameterTool));
+ }
+ }));
+
+ env.execute("Elasticsearch1.x end to end sink test example");
+ }
+
+ private static IndexRequest createIndexRequest(String element, ParameterTool parameterTool) {
+ Map<String, Object> json = new HashMap<>();
+ json.put("data", element);
+
+ return Requests.indexRequest()
+ .index(parameterTool.getRequired("index"))
+ .type(parameterTool.getRequired("type"))
+ .id(element)
+ .source(json);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml b/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml
new file mode 100644
index 0000000..4fd93de
--- /dev/null
+++ b/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>flink-end-to-end-tests</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.5-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-elasticsearch2-test_${scala.binary.version}</artifactId>
+ <name>flink-elasticsearch2-test</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-elasticsearch2_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <!-- Remove elasticsearch1.7.1 -->
+ <exclusions>
+ <exclusion>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>2.3.5</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <!-- Elasticsearch2Sink end to end example -->
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <minimizeJar>true</minimizeJar>
+ <artifactSet>
+ <excludes>
+ <exclude>com.google.code.findbugs:jsr305</exclude>
+ <exclude>org.slf4j:*</exclude>
+ <exclude>log4j:*</exclude>
+ </excludes>
+ </artifactSet>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.flink.streaming.tests.Elasticsearch2SinkExample</mainClass>
+ </transformer>
+ </transformers>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!--simplify the name of the testing JAR for referring to it in test_streaming_elasticsearch2.sh scripts-->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <execution>
+ <id>rename</id>
+ <phase>package</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <target>
+ <copy file="${project.basedir}/target/flink-elasticsearch2-test_${scala.binary.version}-${project.version}.jar" tofile="${project.basedir}/target/Elasticsearch2SinkExample.jar" />
+ </target>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
new file mode 100644
index 0000000..4ec03aa
--- /dev/null
+++ b/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * End to end test for Elasticsearch2Sink.
+ */
+public class Elasticsearch2SinkExample {
+
+ public static void main(String[] args) throws Exception {
+
+ final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+ if (parameterTool.getNumberOfParameters() < 2) {
+ System.out.println("Missing parameters!\n" +
+ "Usage: --index <index> --type <type>");
+ return;
+ }
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
+ env.enableCheckpointing(5000);
+
+ DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
+ @Override
+ public String map(Long value) throws Exception {
+ return "message #" + value;
+ }
+ });
+
+ Map<String, String> userConfig = new HashMap<>();
+ userConfig.put("cluster.name", "elasticsearch");
+ // This instructs the sink to emit after every element, otherwise they would be buffered
+ userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+ List<InetSocketAddress> transports = new ArrayList<>();
+ transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+ source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>(){
+ @Override
+ public void process(String element, RuntimeContext ctx, org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer indexer) {
+ indexer.add(createIndexRequest(element, parameterTool));
+ }
+ }));
+
+ env.execute("Elasticsearch2.x end to end sink test example");
+ }
+
+ private static IndexRequest createIndexRequest(String element, ParameterTool parameterTool) {
+ Map<String, Object> json = new HashMap<>();
+ json.put("data", element);
+
+ return Requests.indexRequest()
+ .index(parameterTool.getRequired("index"))
+ .type(parameterTool.getRequired("type"))
+ .id(element)
+ .source(json);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml b/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml
new file mode 100644
index 0000000..3a1e734
--- /dev/null
+++ b/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml
@@ -0,0 +1,148 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>flink-end-to-end-tests</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.5-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-elasticsearch5-test_${scala.binary.version}</artifactId>
+ <name>flink-elasticsearch5-test</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-elasticsearch5_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <!-- Remove elasticsearch1.7.1 -->
+ <exclusion>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- Dependency for Elasticsearch 5.x Java Client -->
+ <dependency>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>transport</artifactId>
+ <version>5.1.2</version>
+ </dependency>
+
+ <!--
+ Elasticsearch 5.x uses Log4j2 and no longer detects logging implementations, making
+ Log4j2 a strict dependency. The following is added so that the Log4j2 API in
+ Elasticsearch 5.x is routed to SLF4J. This way, user projects can remain flexible
+ in the logging implementation preferred.
+ -->
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-to-slf4j</artifactId>
+ <version>2.7</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <!-- Elasticsearch5Sink end to end example -->
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <minimizeJar>true</minimizeJar>
+ <artifactSet>
+ <excludes>
+ <exclude>com.google.code.findbugs:jsr305</exclude>
+ <exclude>org.slf4j:*</exclude>
+ <exclude>log4j:*</exclude>
+ </excludes>
+ </artifactSet>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.flink.streaming.tests.Elasticsearch5SinkExample</mainClass>
+ </transformer>
+ </transformers>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!--simplify the name of the testing JAR for referring to it in test_streaming_elasticsearch5.sh scripts-->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <execution>
+ <id>rename</id>
+ <phase>package</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <target>
+ <copy file="${project.basedir}/target/flink-elasticsearch5-test_${scala.binary.version}-${project.version}.jar" tofile="${project.basedir}/target/Elasticsearch5SinkExample.jar" />
+ </target>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
new file mode 100644
index 0000000..285f902
--- /dev/null
+++ b/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * End to end test for Elasticsearch5Sink.
+ */
+public class Elasticsearch5SinkExample {
+ public static void main(String[] args) throws Exception {
+
+ final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+ if (parameterTool.getNumberOfParameters() < 2) {
+ System.out.println("Missing parameters!\n" +
+ "Usage: --index <index> --type <type>");
+ return;
+ }
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
+ env.enableCheckpointing(5000);
+
+ DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
+ @Override
+ public String map(Long value) throws Exception {
+ return "message #" + value;
+ }
+ });
+
+ Map<String, String> userConfig = new HashMap<>();
+ userConfig.put("cluster.name", "elasticsearch");
+ // This instructs the sink to emit after every element, otherwise they would be buffered
+ userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+ List<InetSocketAddress> transports = new ArrayList<>();
+ transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+ source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() {
+ @Override
+ public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+ indexer.add(createIndexRequest(element, parameterTool));
+ }
+ }));
+
+ env.execute("Elasticsearch5.x end to end sink test example");
+ }
+
+ private static IndexRequest createIndexRequest(String element, ParameterTool parameterTool) {
+ Map<String, Object> json = new HashMap<>();
+ json.put("data", element);
+
+ return Requests.indexRequest()
+ .index(parameterTool.getRequired("index"))
+ .type(parameterTool.getRequired("type"))
+ .id(element)
+ .source(json);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 45b63f0..04b8532 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -43,6 +43,9 @@ under the License.
<module>flink-high-parallelism-iterations-test</module>
<module>flink-stream-stateful-job-upgrade-test</module>
<module>flink-local-recovery-and-allocation-test</module>
+ <module>flink-elasticsearch1-test</module>
+ <module>flink-elasticsearch2-test</module>
+ <module>flink-elasticsearch5-test</module>
</modules>
<build>
http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
new file mode 100644
index 0000000..3fda344
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
@@ -0,0 +1,62 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+set -o pipefail
+
+if [[ -z $TEST_DATA_DIR ]]; then
+ echo "Must run common.sh before kafka-common.sh."
+ exit 1
+fi
+
+function verify_elasticsearch_process_exist {
+ ELASTICSEARCH_PROCESS=$(jps | grep Elasticsearch | awk '{print $2}')
+
+ # make sure the elasticsearch node is actually running
+ if [ "$ELASTICSEARCH_PROCESS" != "Elasticsearch" ]; then
+ echo "Elasticsearch node is not running."
+ PASS=""
+ exit 1
+ else
+ echo "Elasticsearch node is running."
+ fi
+}
+
+function verify_result {
+ if [ -f "$TEST_DATA_DIR/output" ]; then
+ rm $TEST_DATA_DIR/output
+ fi
+
+ curl 'localhost:9200/index/_search?q=*&pretty&size=21' > $TEST_DATA_DIR/output
+
+ if [ -n "$(grep '\"total\" : 21' $TEST_DATA_DIR/output)" ]; then
+ echo "Elasticsearch end to end test pass."
+ else
+ echo "Elasticsearch end to end test failed."
+ PASS=""
+ exit 1
+ fi
+}
+
+function shutdown_elasticsearch_cluster {
+ pid=$(jps | grep Elasticsearch | awk '{print $1}')
+ kill -SIGTERM $pid
+
+ # make sure to run regular cleanup as well
+ cleanup
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh
new file mode 100755
index 0000000..dea3f13
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh
@@ -0,0 +1,109 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/elasticsearch-common.sh
+
+mkdir -p $TEST_DATA_DIR
+
+ELASTICSEARCH1_URL="https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz"
+ELASTICSEARCH2_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz"
+ELASTICSEARCH5_URL="https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz"
+
+# start downloading elasticsearch1
+echo "Downloading Elasticsearch1 from $ELASTICSEARCH1_URL"
+curl "$ELASTICSEARCH1_URL" > $TEST_DATA_DIR/elasticsearch1.tar.gz
+
+tar xzf $TEST_DATA_DIR/elasticsearch1.tar.gz -C $TEST_DATA_DIR/
+ELASTICSEARCH1_DIR=$TEST_DATA_DIR/elasticsearch-1.7.1
+
+# start elasticsearch1 cluster
+$ELASTICSEARCH1_DIR/bin/elasticsearch -daemon
+
+verify_elasticsearch_process_exist
+
+start_cluster
+
+TEST_ES1_JAR=$TEST_DATA_DIR/../../flink-elasticsearch1-test/target/Elasticsearch1SinkExample.jar
+
+# run the Flink job
+$FLINK_DIR/bin/flink run -p 1 $TEST_ES1_JAR \
+ --index index \
+ --type type
+
+verify_result
+
+shutdown_elasticsearch_cluster
+
+mkdir -p $TEST_DATA_DIR
+
+# start downloading elasticsearch2
+echo "Downloading Elasticsearch2 from $ELASTICSEARCH2_URL"
+curl "$ELASTICSEARCH2_URL" > $TEST_DATA_DIR/elasticsearch2.tar.gz
+
+tar xzf $TEST_DATA_DIR/elasticsearch2.tar.gz -C $TEST_DATA_DIR/
+ELASTICSEARCH2_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5
+
+# start elasticsearch cluster, different from elasticsearch1 since using -daemon here will hang the shell.
+nohup $ELASTICSEARCH2_DIR/bin/elasticsearch &
+
+verify_elasticsearch_process_exist
+
+start_cluster
+
+TEST_ES2_JAR=$TEST_DATA_DIR/../../flink-elasticsearch2-test/target/Elasticsearch2SinkExample.jar
+
+# run the Flink job
+$FLINK_DIR/bin/flink run -p 1 $TEST_ES2_JAR \
+ --index index \
+ --type type
+
+verify_result
+
+shutdown_elasticsearch_cluster
+
+mkdir -p $TEST_DATA_DIR
+
+# start downloading elasticsearch5
+echo "Downloading Elasticsearch5 from $ELASTICSEARCH5_URL"
+curl "$ELASTICSEARCH5_URL" > $TEST_DATA_DIR/elasticsearch5.tar.gz
+
+tar xzf $TEST_DATA_DIR/elasticsearch5.tar.gz -C $TEST_DATA_DIR/
+ELASTICSEARCH5_DIR=$TEST_DATA_DIR/elasticsearch-5.1.2
+
+# start elasticsearch cluster, different from elasticsearch1 since using -daemon here will hang the shell.
+nohup $ELASTICSEARCH5_DIR/bin/elasticsearch &
+
+verify_elasticsearch_process_exist
+
+start_cluster
+
+TEST_ES5_JAR=$TEST_DATA_DIR/../../flink-elasticsearch5-test/target/Elasticsearch5SinkExample.jar
+
+# run the Flink job
+$FLINK_DIR/bin/flink run -p 1 $TEST_ES5_JAR \
+ --index index \
+ --type type
+
+verify_result
+
+rm -rf $FLINK_DIR/log/* 2> /dev/null
+
+trap shutdown_elasticsearch_cluster INT
+trap shutdown_elasticsearch_cluster EXIT