You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/02/07 14:47:56 UTC

[1/4] flink git commit: [FLINK-4988] [elasticsearch] Restructure Elasticsearch connectors

Repository: flink
Updated Branches:
  refs/heads/master b452c8bbb -> b5caaef82


http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
index bc9bedc..93ac6c8 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
@@ -1,13 +1,12 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,217 +16,53 @@
  */
 package org.apache.flink.streaming.connectors.elasticsearch2;
 
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
-import org.junit.Assert;
-import org.junit.ClassRule;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase {
-
-	private static final int NUM_ELEMENTS = 20;
-
-	@ClassRule
-	public static TemporaryFolder tempFolder = new TemporaryFolder();
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
 
 	@Test
 	public void testTransportClient() throws Exception {
-
-		File dataDir = tempFolder.newFolder();
-
-		Node node = NodeBuilder.nodeBuilder()
-				.settings(Settings.settingsBuilder()
-						.put("path.home", dataDir.getParent())
-						.put("http.enabled", false)
-						.put("path.data", dataDir.getAbsolutePath()))
-				// set a custom cluster name to verify that user config works correctly
-				.clusterName("my-transport-client-cluster")
-				.node();
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
-
-		Map<String, String> config = new HashMap<>();
-		// This instructs the sink to emit after every element, otherwise they would be buffered
-		config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-		config.put("cluster.name", "my-transport-client-cluster");
-
-		// Can't use {@link TransportAddress} as its not Serializable in Elasticsearch 2.x
-		List<InetSocketAddress> transports = new ArrayList<>();
-		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-		source.addSink(new ElasticsearchSink<>(config, transports, new TestElasticsearchSinkFunction()));
-
-		env.execute("Elasticsearch TransportClient Test");
-
-		// verify the results
-		Client client = node.client();
-		for (int i = 0; i < NUM_ELEMENTS; i++) {
-			GetResponse response = client.get(new GetRequest("my-index",
-					"my-type", Integer.toString(i))).actionGet();
-			Assert.assertEquals("message #" + i, response.getSource().get("data"));
-		}
-
-		node.close();
+		runTransportClientTest();
 	}
 
- @Test(expected = IllegalArgumentException.class)
- public void testNullTransportClient() throws Exception {
-
-	File dataDir = tempFolder.newFolder();
-
-	Node node = NodeBuilder.nodeBuilder()
-		.settings(Settings.settingsBuilder()
-			.put("path.home", dataDir.getParent())
-			.put("http.enabled", false)
-			.put("path.data", dataDir.getAbsolutePath()))
-		// set a custom cluster name to verify that user config works correctly
-		.clusterName("my-transport-client-cluster")
-		.node();
-
-	final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-	DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
-
-	Map<String, String> config = new HashMap<>();
-	// This instructs the sink to emit after every element, otherwise they would be buffered
-	config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-	config.put("cluster.name", "my-transport-client-cluster");
-
-	source.addSink(new ElasticsearchSink<>(config, null, new TestElasticsearchSinkFunction()));
-
-	env.execute("Elasticsearch TransportClient Test");
-
-	// verify the results
-	Client client = node.client();
-	for (int i = 0; i < NUM_ELEMENTS; i++) {
-	 GetResponse response = client.get(new GetRequest("my-index",
-		 "my-type", Integer.toString(i))).actionGet();
-	 Assert.assertEquals("message #" + i, response.getSource().get("data"));
+	@Test
+	public void testNullTransportClient() throws Exception {
+		runNullTransportClientTest();
 	}
 
-	node.close();
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testEmptyTransportClient() throws Exception {
-
-	File dataDir = tempFolder.newFolder();
-
-	Node node = NodeBuilder.nodeBuilder()
-		.settings(Settings.settingsBuilder()
-			.put("path.home", dataDir.getParent())
-			.put("http.enabled", false)
-			.put("path.data", dataDir.getAbsolutePath()))
-		// set a custom cluster name to verify that user config works correctly
-		.clusterName("my-transport-client-cluster")
-		.node();
-
-	final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-	DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
-
-	Map<String, String> config = new HashMap<>();
-	// This instructs the sink to emit after every element, otherwise they would be buffered
-	config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-	config.put("cluster.name", "my-transport-client-cluster");
-
-	source.addSink(new ElasticsearchSink<>(config, new ArrayList<InetSocketAddress>(), new TestElasticsearchSinkFunction()));
-
-	env.execute("Elasticsearch TransportClient Test");
-
-	// verify the results
-	Client client = node.client();
-	for (int i = 0; i < NUM_ELEMENTS; i++) {
-	 GetResponse response = client.get(new GetRequest("my-index",
-		 "my-type", Integer.toString(i))).actionGet();
-	 Assert.assertEquals("message #" + i, response.getSource().get("data"));
+	@Test
+	public void testEmptyTransportClient() throws Exception {
+		runEmptyTransportClientTest();
 	}
 
-	node.close();
- }
-
-	@Test(expected = JobExecutionException.class)
+	@Test
 	public void testTransportClientFails() throws Exception{
-		// this checks whether the TransportClient fails early when there is no cluster to
-		// connect to. There isn't a similar test for the Node Client version since that
-		// one will block and wait for a cluster to come online
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
-
-		Map<String, String> config = new HashMap<>();
-		// This instructs the sink to emit after every element, otherwise they would be buffered
-		config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-		config.put("cluster.name", "my-node-client-cluster");
-
-		List<InetSocketAddress> transports = new ArrayList<>();
-		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-		source.addSink(new ElasticsearchSink<>(config, transports, new TestElasticsearchSinkFunction()));
-
-		env.execute("Elasticsearch Node Client Test");
+		runTransportClientFailsTest();
 	}
 
-	private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
-		private static final long serialVersionUID = 1L;
-
-		private volatile boolean running = true;
-
-		@Override
-		public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
-			for (int i = 0; i < NUM_ELEMENTS && running; i++) {
-				ctx.collect(Tuple2.of(i, "message #" + i));
-			}
-		}
-
-		@Override
-		public void cancel() {
-			running = false;
-		}
+	@Override
+	protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig,
+																List<InetSocketAddress> transportAddresses,
+																ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+		return new ElasticsearchSink<>(userConfig, transportAddresses, elasticsearchSinkFunction);
 	}
 
-	private static class TestElasticsearchSinkFunction implements ElasticsearchSinkFunction<Tuple2<Integer, String>> {
-		private static final long serialVersionUID = 1L;
+	@Override
+	protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(
+		Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception {
 
-		public IndexRequest createIndexRequest(Tuple2<Integer, String> element) {
-			Map<String, Object> json = new HashMap<>();
-			json.put("data", element.f1);
-
-			return Requests.indexRequest()
-					.index("my-index")
-					.type("my-type")
-					.id(element.f0.toString())
-					.source(json);
-		}
+		List<InetSocketAddress> transports = new ArrayList<>();
+		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
 
-		@Override
-		public void process(Tuple2<Integer, String> element, RuntimeContext ctx, RequestIndexer indexer) {
-			indexer.add(createIndexRequest(element));
-		}
+		return new ElasticsearchSink<>(userConfig, transports, elasticsearchSinkFunction);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java
deleted file mode 100644
index 05760e8..0000000
--- a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.elasticsearch2.examples;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
-import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch2.RequestIndexer;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
- * you have a cluster named "elasticsearch" running or change the name of cluster in the config map.
- */
-public class ElasticsearchExample {
-
-	public static void main(String[] args) throws Exception {
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		SingleOutputStreamOperator<String> source =
-				env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
-					/**
-					 * The mapping method. Takes an element from the input data set and transforms
-					 * it into exactly one element.
-					 *
-					 * @param value The input value.
-					 * @return The transformed value
-					 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
-					 *                   to fail and may trigger recovery.
-					 */
-					@Override
-					public String map(Long value) throws Exception {
-						return "message #" + value;
-					}
-				});
-
-		Map<String, String> config = new HashMap<>();
-		// This instructs the sink to emit after every element, otherwise they would be buffered
-		config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-		List<InetSocketAddress> transports = new ArrayList<>();
-		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-		source.addSink(new ElasticsearchSink<>(config, transports, new ElasticsearchSinkFunction<String>(){
-			@Override
-			public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
-				indexer.add(createIndexRequest(element));
-			}
-		}));
-
-		env.execute("Elasticsearch Example");
-	}
-
-	private static IndexRequest createIndexRequest(String element) {
-		Map<String, Object> json = new HashMap<>();
-		json.put("data", element);
-
-		return Requests.indexRequest()
-				.index("my-index")
-				.type("my-type")
-				.id(element)
-				.source(json);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java
new file mode 100644
index 0000000..8c50847
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch2.examples;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
+ * you have a cluster named "elasticsearch" running or change the name of cluster in the config map.
+ */
+public class ElasticsearchSinkExample {
+
+	public static void main(String[] args) throws Exception {
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
+			@Override
+			public String map(Long value) throws Exception {
+				return "message #" + value;
+			}
+		});
+
+		Map<String, String> userConfig = new HashMap<>();
+		userConfig.put("cluster.name", "elasticsearch");
+		// This instructs the sink to emit after every element, otherwise they would be buffered
+		userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+		List<InetSocketAddress> transports = new ArrayList<>();
+		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+		source.addSink(new ElasticsearchSink<>(userConfig, transports, new org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<String>(){
+			@Override
+			public void process(String element, RuntimeContext ctx, org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer indexer) {
+				indexer.add(createIndexRequest(element));
+			}
+		}));
+
+		env.execute("Elasticsearch Sink Example");
+	}
+
+	private static IndexRequest createIndexRequest(String element) {
+		Map<String, Object> json = new HashMap<>();
+		json.put("data", element);
+
+		return Requests.indexRequest()
+			.index("my-index")
+			.type("my-type")
+			.id(element)
+			.source(json);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties
index dc20726..2055184 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties
+++ b/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties
@@ -16,12 +16,12 @@
 # limitations under the License.
 ################################################################################
 
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
 
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.target=System.err
 log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
 log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/pom.xml b/flink-connectors/flink-connector-elasticsearch5/pom.xml
index 8fc5c8b..a0bd328 100644
--- a/flink-connectors/flink-connector-elasticsearch5/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch5/pom.xml
@@ -26,7 +26,7 @@ under the License.
 	<parent>
 		<groupId>org.apache.flink</groupId>
 		<artifactId>flink-connectors</artifactId>
-		<version>1.2-SNAPSHOT</version>
+		<version>1.3-SNAPSHOT</version>
 		<relativePath>..</relativePath>
 	</parent>
 
@@ -37,7 +37,7 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<elasticsearch.version>5.0.0</elasticsearch.version>
+		<elasticsearch.version>5.1.2</elasticsearch.version>
 	</properties>
 
 	<dependencies>
@@ -52,27 +52,38 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch-base_2.10</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<!-- Elasticsearch Java Client has been moved to a different module in 5.x -->
+				<exclusion>
+					<groupId>org.elasticsearch</groupId>
+					<artifactId>elasticsearch</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<!-- Dependency for Elasticsearch 5.x Java Client -->
+		<dependency>
 			<groupId>org.elasticsearch.client</groupId>
 			<artifactId>transport</artifactId>
 			<version>${elasticsearch.version}</version>
 		</dependency>
 
+		<!--
+			Elasticsearch 5.x uses Log4j2 and no longer detects logging implementations, making
+			Log4j2 a strict dependency. The following is added so that the Log4j2 API in
+			Elasticsearch 5.x is routed to SLF4J. This way, user projects can remain flexible
+			in the logging implementation preferred.
+		-->
+
 		<dependency>
 			<groupId>org.apache.logging.log4j</groupId>
-			<artifactId>log4j-api</artifactId>
-			<version>2.7</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.logging.log4j</groupId>
-			<artifactId>log4j-core</artifactId>
+			<artifactId>log4j-to-slf4j</artifactId>
 			<version>2.7</version>
 		</dependency>
 
-		<dependency>
-			<groupId>com.fasterxml.jackson.core</groupId>
-			<artifactId>jackson-core</artifactId>
-		</dependency>
-
 		<!-- test dependencies -->
 
 		<dependency>
@@ -81,6 +92,7 @@ under the License.
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-java_2.10</artifactId>
@@ -88,6 +100,63 @@ under the License.
 			<scope>test</scope>
 			<type>test-jar</type>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch-base_2.10</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.elasticsearch</groupId>
+					<artifactId>elasticsearch</artifactId>
+				</exclusion>
+			</exclusions>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<!--
+			Including Log4j2 dependencies for tests is required for the
+			embedded Elasticsearch nodes used in tests to run correctly.
+		-->
+
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-api</artifactId>
+			<version>2.7</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-core</artifactId>
+			<version>2.7</version>
+			<scope>test</scope>
+		</dependency>
+
 	</dependencies>
 
+	<build>
+		<plugins>
+			<!--
+				For the tests, we need to exclude the Log4j2 to slf4j adapter dependency
+				and let Elasticsearch directly use Log4j2, otherwise the embedded Elasticsearch node
+				used in tests will fail to work.
+
+				In other words, the connector jar is routing Elasticsearch 5.x's Log4j2 API's to SLF4J,
+				but for the test builds, we still stick to directly using Log4j2.
+			-->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.12.2</version>
+				<configuration>
+					<classpathDependencyExcludes>
+						<classpathDependencyExclude>org.apache.logging.log4j:log4j-to-slf4j</classpathDependencyExclude>
+					</classpathDependencyExcludes>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/BulkProcessorIndexer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/BulkProcessorIndexer.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/BulkProcessorIndexer.java
deleted file mode 100644
index f7ca499..0000000
--- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/BulkProcessorIndexer.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.elasticsearch5;
-
-import org.elasticsearch.action.ActionRequest;
-import org.elasticsearch.action.bulk.BulkProcessor;
-
-public class BulkProcessorIndexer implements RequestIndexer {
-	private final BulkProcessor bulkProcessor;
-
-	public BulkProcessorIndexer(BulkProcessor bulkProcessor) {
-		this.bulkProcessor = bulkProcessor;
-	}
-
-	@Override
-	public void add(ActionRequest... actionRequests) {
-		for (ActionRequest actionRequest : actionRequests) {
-			this.bulkProcessor.add(actionRequest);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
new file mode 100644
index 0000000..1389e7d
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch5;
+
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
+import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
+import org.apache.flink.util.Preconditions;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.transport.Netty3Plugin;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 5.x.
+ */
+public class Elasticsearch5ApiCallBridge implements ElasticsearchApiCallBridge {
+
+	private static final long serialVersionUID = -5222683870097809633L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch5ApiCallBridge.class);
+
+	/**
+	 * User-provided transport addresses.
+	 *
+	 * We are using {@link InetSocketAddress} because {@link TransportAddress} is not serializable in Elasticsearch 5.x.
+	 */
+	private final List<InetSocketAddress> transportAddresses;
+
+	Elasticsearch5ApiCallBridge(List<InetSocketAddress> transportAddresses) {
+		Preconditions.checkArgument(transportAddresses != null && !transportAddresses.isEmpty());
+		this.transportAddresses = transportAddresses;
+	}
+
+	@Override
+	public Client createClient(Map<String, String> clientConfig) {
+		Settings settings = Settings.builder().put(clientConfig)
+			.put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
+			.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME)
+			.build();
+
+		TransportClient transportClient = new PreBuiltTransportClient(settings);
+		for (TransportAddress transport : ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) {
+			transportClient.addTransportAddress(transport);
+		}
+
+		// verify that we actually are connected to a cluster
+		if (transportClient.connectedNodes().isEmpty()) {
+			throw new RuntimeException("Elasticsearch client is not connected to any Elasticsearch nodes!");
+		}
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Created Elasticsearch TransportClient with connected nodes {}", transportClient.connectedNodes());
+		}
+
+		return transportClient;
+	}
+
+	@Override
+	public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
+		if (!bulkItemResponse.isFailed()) {
+			return null;
+		} else {
+			return bulkItemResponse.getFailure().getCause();
+		}
+	}
+
+	@Override
+	public void cleanup() {
+		// nothing to cleanup
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
index 29c69c4..9107d4e 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
@@ -16,244 +16,61 @@
  */
 package org.apache.flink.streaming.connectors.elasticsearch5;
 
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.util.Preconditions;
-import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.network.NetworkModule;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.transport.Netty3Plugin;
-import org.elasticsearch.transport.client.PreBuiltTransportClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * Sink that emits its input elements in bulk to an Elasticsearch cluster.
- * <p>
+ * Elasticsearch 5.x sink that requests multiple {@link ActionRequest ActionRequests}
+ * against a cluster for each incoming element.
+ *
  * <p>
- * The first {@link Map} passed to the constructor is forwarded to Elasticsearch when creating
- * {@link TransportClient}. The config keys can be found in the Elasticsearch
- * documentation. An important setting is {@code cluster.name}, this should be set to the name
- * of the cluster that the sink should emit to.
+ * The sink internally uses a {@link TransportClient} to communicate with an Elasticsearch cluster.
+ * The sink will fail if no cluster can be connected to using the provided transport addresses passed to the constructor.
+ *
  * <p>
- * <b>Attention: </b> When using the {@code TransportClient} the sink will fail if no cluster
- * can be connected to.
+ * The {@link Map} passed to the constructor is used to create the {@code TransportClient}. The config keys can be found
+ * in the <a href="https://www.elastic.io">Elasticsearch documentation</a>. An important setting is {@code cluster.name},
+ * which should be set to the name of the cluster that the sink should emit to.
+ *
  * <p>
- * The second {@link Map} is used to configure a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}.
+ * Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}.
  * This will buffer elements before sending a request to the cluster. The behaviour of the
  * {@code BulkProcessor} can be configured using these config keys:
  * <ul>
- * <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer
- * <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer
- * <li> {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two
- * settings in milliseconds
+ *   <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer
+ *   <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer
+ *   <li> {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two
+ *   settings in milliseconds
  * </ul>
+ *
  * <p>
- * <p>
- * You also have to provide an {@link RequestIndexer}. This is used to create an
- * {@link IndexRequest} from an element that needs to be added to Elasticsearch. See
- * {@link RequestIndexer} for an example.
+ * You also have to provide an {@link ElasticsearchSinkFunction}. This is used to create multiple
+ * {@link ActionRequest ActionRequests} for each incoming element. See the class level documentation of
+ * {@link ElasticsearchSinkFunction} for an example.
  *
- * @param <T> Type of the elements emitted by this sink
+ * @param <T> Type of the elements handled by this sink
  */
-public class ElasticsearchSink<T> extends RichSinkFunction<T> {
-
-	public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
-	public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
-	public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
 
 	private static final long serialVersionUID = 1L;
 
-	private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class);
-
-	/**
-	 * The user specified config map that we forward to Elasticsearch when we create the Client.
-	 */
-	private final Map<String, String> esConfig;
-
-	/**
-	 * The user specified config map that we use to configure BulkProcessor.
-	 */
-	private final Map<String, String> sinkConfig;
-
-	/**
-	 * The list of nodes that the TransportClient should connect to. This is null if we are using
-	 * an embedded Node to get a Client.
-	 */
-	private final List<InetSocketAddress> transportAddresses;
-
-	/**
-	 * The builder that is used to construct an {@link IndexRequest} from the incoming element.
-	 */
-	private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
-
-	/**
-	 * The Client that was either retrieved from a Node or is a TransportClient.
-	 */
-	private transient Client client;
-
-	/**
-	 * Bulk processor that was created using the client
-	 */
-	private transient BulkProcessor bulkProcessor;
-
-	/**
-	 * Bulk {@link org.elasticsearch.action.ActionRequest} indexer
-	 */
-	private transient RequestIndexer requestIndexer;
-
-	/**
-	 * This is set from inside the BulkProcessor listener if there where failures in processing.
-	 */
-	private final AtomicBoolean hasFailure = new AtomicBoolean(false);
-
-	/**
-	 * This is set from inside the BulkProcessor listener if a Throwable was thrown during processing.
-	 */
-	private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
-
 	/**
-	 * Creates a new ElasticsearchSink that connects to the cluster using a TransportClient.
+	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
 	 *
-	 * @param esConfig                  The map of user settings that are passed when constructing the TransportClient
-	 * @param sinkConfig                The map of user settings that are passed when constructing the BulkProcessor
-	 * @param transportAddresses        The Elasticsearch Nodes to which to connect using a {@code TransportClient}
-	 * @param elasticsearchSinkFunction This is used to generate the ActionRequest from the incoming element
+	 * @param userConfig The map of user settings that are used when constructing the {@link TransportClient}
+	 * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
+	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
 	 */
-	public ElasticsearchSink(Map<String, String> esConfig, Map<String, String> sinkConfig, List<InetSocketAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
-		this.esConfig = esConfig;
-		this.sinkConfig = sinkConfig;
-		this.elasticsearchSinkFunction = elasticsearchSinkFunction;
-		Preconditions.checkArgument(transportAddresses != null && transportAddresses.size() > 0);
-		this.transportAddresses = transportAddresses;
+	public ElasticsearchSink(Map<String, String> userConfig,
+							List<InetSocketAddress> transportAddresses,
+							ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+		super(new Elasticsearch5ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction);
 	}
-
-	/**
-	 * Initializes the connection to Elasticsearch by creating a
-	 * {@link org.elasticsearch.client.transport.TransportClient}.
-	 */
-	@Override
-	public void open(Configuration configuration) {
-		List<TransportAddress> transportNodes;
-		transportNodes = new ArrayList<>(transportAddresses.size());
-		for (InetSocketAddress address : transportAddresses) {
-			transportNodes.add(new InetSocketTransportAddress(address));
-		}
-
-		Settings settings = Settings.builder().put(esConfig)
-			.put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
-			.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME)
-			.build();
-
-		TransportClient transportClient = new PreBuiltTransportClient(settings);
-		for (TransportAddress transport : transportNodes) {
-			transportClient.addTransportAddress(transport);
-		}
-
-		// verify that we actually are connected to a cluster
-		if (transportClient.connectedNodes().isEmpty()) {
-			throw new RuntimeException("Client is not connected to any Elasticsearch nodes!");
-		}
-
-		client = transportClient;
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Created Elasticsearch TransportClient {}", client);
-		}
-
-		BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(client, new BulkProcessor.Listener() {
-			@Override
-			public void beforeBulk(long executionId, BulkRequest request) {
-
-			}
-
-			@Override
-			public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
-				if (response.hasFailures()) {
-					for (BulkItemResponse itemResp : response.getItems()) {
-						if (itemResp.isFailed()) {
-							LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
-							failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
-						}
-					}
-					hasFailure.set(true);
-				}
-			}
-
-			@Override
-			public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
-				LOG.error(failure.getMessage());
-				failureThrowable.compareAndSet(null, failure);
-				hasFailure.set(true);
-			}
-		});
-
-		// This makes flush() blocking
-		bulkProcessorBuilder.setConcurrentRequests(0);
-
-		ParameterTool params = ParameterTool.fromMap(sinkConfig);
-
-		if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
-			bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
-		}
-
-		if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
-			bulkProcessorBuilder.setBulkSize(new ByteSizeValue(params.getInt(
-				CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB));
-		}
-
-		if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
-			bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
-		}
-
-		bulkProcessor = bulkProcessorBuilder.build();
-		requestIndexer = new BulkProcessorIndexer(bulkProcessor);
-	}
-
-	@Override
-	public void invoke(T element) {
-		elasticsearchSinkFunction.process(element, getRuntimeContext(), requestIndexer);
-	}
-
-	@Override
-	public void close() {
-		if (bulkProcessor != null) {
-			bulkProcessor.close();
-			bulkProcessor = null;
-		}
-
-		if (client != null) {
-			client.close();
-		}
-
-		if (hasFailure.get()) {
-			Throwable cause = failureThrowable.get();
-			if (cause != null) {
-				throw new RuntimeException("An error occurred in ElasticsearchSink.", cause);
-			} else {
-				throw new RuntimeException("An error occurred in ElasticsearchSink.");
-			}
-		}
-
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkFunction.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkFunction.java
deleted file mode 100644
index 752a83e..0000000
--- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkFunction.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.elasticsearch5;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.RuntimeContext;
-
-import java.io.Serializable;
-
-/**
- * Method that creates an {@link org.elasticsearch.action.ActionRequest} from an element in a Stream.
- *
- * <p>
- * This is used by {@link ElasticsearchSink} to prepare elements for sending them to Elasticsearch.
- *
- * <p>
- * Example:
- *
- * <pre>{@code
- *					private static class TestElasticSearchSinkFunction implements
- *						ElasticsearchSinkFunction<Tuple2<Integer, String>> {
- *
- *					public IndexRequest createIndexRequest(Tuple2<Integer, String> element) {
- *						Map<String, Object> json = new HashMap<>();
- *						json.put("data", element.f1);
- *
- *						return Requests.indexRequest()
- *							.index("my-index")
- *							.type("my-type")
- *							.id(element.f0.toString())
- *							.source(json);
- *						}
- *
- *				public void process(Tuple2<Integer, String> element, RuntimeContext ctx, RequestIndexer indexer) {
- *					indexer.add(createIndexRequest(element));
- *				}
- *		}
- *
- * }</pre>
- *
- * @param <T> The type of the element handled by this {@code ElasticsearchSinkFunction}
- */
-public interface ElasticsearchSinkFunction<T> extends Serializable, Function {
-	void process(T element, RuntimeContext ctx, RequestIndexer indexer);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/RequestIndexer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/RequestIndexer.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/RequestIndexer.java
deleted file mode 100644
index 170df31..0000000
--- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/RequestIndexer.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.elasticsearch5;
-
-import org.elasticsearch.action.ActionRequest;
-
-import java.io.Serializable;
-
-public interface RequestIndexer extends Serializable {
-	void add(ActionRequest... actionRequests);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
new file mode 100644
index 0000000..f3d8897
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSinkITCase;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.internal.InternalSettingsPreparer;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.transport.Netty3Plugin;
+
+import java.io.File;
+import java.util.Collections;
+
+/**
+ * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for Elasticsearch 5.x.
+ * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for integration tests.
+ */
+public class EmbeddedElasticsearchNodeEnvironmentImpl implements EmbeddedElasticsearchNodeEnvironment {
+
+	private Node node;
+
+	@Override
+	public void start(File tmpDataFolder, String clusterName) throws Exception {
+		if (node == null) {
+			Settings settings = Settings.builder()
+				.put("cluster.name", clusterName)
+				.put("http.enabled", false)
+				.put("path.home", tmpDataFolder.getParent())
+				.put("path.data", tmpDataFolder.getAbsolutePath())
+				.put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
+				.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME)
+				.build();
+
+			node = new PluginNode(settings);
+			node.start();
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		if (node != null && !node.isClosed()) {
+			node.close();
+			node = null;
+		}
+	}
+
+	@Override
+	public Client getClient() {
+		if (node != null && !node.isClosed()) {
+			return node.client();
+		} else {
+			return null;
+		}
+	}
+
+	private static class PluginNode extends Node {
+		public PluginNode(Settings settings) {
+			super(InternalSettingsPreparer.prepareEnvironment(settings, null), Collections.<Class<? extends Plugin>>singletonList(Netty3Plugin.class));
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
index b4a370b..3ebda52 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,184 +17,54 @@
  */
 package org.apache.flink.streaming.connectors.elasticsearch5;
 
-import com.google.common.collect.ImmutableMap;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.common.network.NetworkModule;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.internal.InternalSettingsPreparer;
-import org.elasticsearch.plugins.Plugin;
-import org.elasticsearch.transport.Netty3Plugin;
-import org.junit.ClassRule;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase {
-
-	private static final int NUM_ELEMENTS = 20;
-
-	@ClassRule
-	public static TemporaryFolder tempFolder = new TemporaryFolder();
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
 
 	@Test
 	public void testTransportClient() throws Exception {
-
-		File dataDir = tempFolder.newFolder();
-
-		Settings settings = Settings.builder()
-			.put("cluster.name", "my-transport-client-cluster")
-			.put("http.enabled", false)
-			.put("path.home", dataDir.getParent())
-			.put("path.data", dataDir.getAbsolutePath())
-			.put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
-			.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME)
-			.build();
-
-		Node node = new PluginNode(settings);
-		node.start();
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
-
-		Map<String, String> esConfig = ImmutableMap.of("cluster.name", "my-transport-client-cluster");
-		Map<String, String> sinkConfig = ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-		List<InetSocketAddress> transports = new ArrayList<>();
-		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-		source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, transports, new TestElasticsearchSinkFunction()));
-
-		env.execute("Elasticsearch TransportClient Test");
-
-		// verify the results
-		Client client = node.client();
-		for (int i = 0; i < NUM_ELEMENTS; i++) {
-			GetResponse response = client.prepareGet("my-index", "my-type", Integer.toString(i)).get();
-			assertEquals("message #" + i, response.getSource().get("data"));
-		}
-
-		node.close();
+		runTransportClientTest();
 	}
 
-	@Test(expected = IllegalArgumentException.class)
+	@Test
 	public void testNullTransportClient() throws Exception {
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
-
-		Map<String, String> esConfig = ImmutableMap.of("cluster.name", "my-transport-client-cluster");
-		Map<String, String> sinkConfig = ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-		source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, null, new TestElasticsearchSinkFunction()));
-
-		fail();
+		runNullTransportClientTest();
 	}
 
-	@Test(expected = IllegalArgumentException.class)
+	@Test
 	public void testEmptyTransportClient() throws Exception {
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
-
-		Map<String, String> esConfig = ImmutableMap.of("cluster.name", "my-transport-client-cluster");
-		Map<String, String> sinkConfig = ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-		source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, new ArrayList<InetSocketAddress>(), new TestElasticsearchSinkFunction()));
-
-		env.execute("Elasticsearch TransportClient Test");
-
-		fail();
+		runEmptyTransportClientTest();
 	}
 
-	@Test(expected = JobExecutionException.class)
+	@Test
 	public void testTransportClientFails() throws Exception {
-		// this checks whether the TransportClient fails early when there is no cluster to
-		// connect to. There isn't a similar test for the Node Client version since that
-		// one will block and wait for a cluster to come online
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
-
-		Map<String, String> esConfig = ImmutableMap.of("cluster.name", "my-transport-client-cluster");
-		Map<String, String> sinkConfig = ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-		List<InetSocketAddress> transports = new ArrayList<>();
-		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-		source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, transports, new TestElasticsearchSinkFunction()));
-
-		env.execute("Elasticsearch Node Client Test");
-
-		fail();
+		runTransportClientFailsTest();
 	}
 
-	private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
-		private static final long serialVersionUID = 1L;
-
-		private volatile boolean running = true;
-
-		@Override
-		public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
-			for (int i = 0; i < NUM_ELEMENTS && running; i++) {
-				ctx.collect(Tuple2.of(i, "message #" + i));
-			}
-		}
-
-		@Override
-		public void cancel() {
-			running = false;
-		}
+	@Override
+	protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig,
+																List<InetSocketAddress> transportAddresses,
+																ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+		return new ElasticsearchSink<>(userConfig, transportAddresses, elasticsearchSinkFunction);
 	}
 
-	private static class TestElasticsearchSinkFunction implements ElasticsearchSinkFunction<Tuple2<Integer, String>> {
-		private static final long serialVersionUID = 1L;
-
-		public IndexRequest createIndexRequest(Tuple2<Integer, String> element) {
-			Map<String, Object> json = new HashMap<>();
-			json.put("data", element.f1);
+	@Override
+	protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(
+		Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception {
 
-			return Requests.indexRequest()
-				.index("my-index")
-				.type("my-type")
-				.id(element.f0.toString())
-				.source(json);
-		}
+		List<InetSocketAddress> transports = new ArrayList<>();
+		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
 
-		@Override
-		public void process(Tuple2<Integer, String> element, RuntimeContext ctx, RequestIndexer indexer) {
-			indexer.add(createIndexRequest(element));
-		}
+		return new ElasticsearchSink<>(userConfig, transports, elasticsearchSinkFunction);
 	}
 
-	private static class PluginNode extends Node {
-		public PluginNode(Settings settings) {
-			super(InternalSettingsPreparer.prepareEnvironment(settings, null), Collections.<Class<? extends Plugin>>singletonList(Netty3Plugin.class));
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchExample.java b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchExample.java
deleted file mode 100644
index 47ce846..0000000
--- a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchExample.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.elasticsearch5.examples;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
-import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch5.RequestIndexer;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
- * you have a cluster named "elasticsearch" running or change the name of cluster in the config map.
- */
-public class ElasticsearchExample {
-
-	public static void main(String[] args) throws Exception {
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		SingleOutputStreamOperator<String> source =
-			env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
-				@Override
-				public String map(Long value) throws Exception {
-					return "message #" + value;
-				}
-			});
-
-		Map<String, String> esConfig = ImmutableMap.of("cluster.name", "elasticsearch");
-
-		// This instructs the sink to emit after every element, otherwise they would be buffered
-		Map<String, String> sinkConfig = ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-		List<InetSocketAddress> transports = new ArrayList<>();
-		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-		source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, transports, new ElasticsearchSinkFunction<String>() {
-			@Override
-			public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
-				indexer.add(createIndexRequest(element));
-			}
-		}));
-
-		env.execute("Elasticsearch Example");
-	}
-
-	private static IndexRequest createIndexRequest(String element) {
-		Map<String, Object> json = new HashMap<>();
-		json.put("data", element);
-
-		return Requests.indexRequest()
-			.index("my-index")
-			.type("my-type")
-			.id(element)
-			.source(json);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java
new file mode 100644
index 0000000..4135283
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch5.examples;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
+ * you have a cluster named "elasticsearch" running or change the name of cluster in the config map.
+ */
+public class ElasticsearchSinkExample {
+
+	public static void main(String[] args) throws Exception {
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
+			@Override
+			public String map(Long value) throws Exception {
+				return "message #" + value;
+			}
+		});
+
+		Map<String, String> userConfig = new HashMap<>();
+		userConfig.put("cluster.name", "elasticsearch");
+		// This instructs the sink to emit after every element, otherwise they would be buffered
+		userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+		List<InetSocketAddress> transports = new ArrayList<>();
+		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+		source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() {
+			@Override
+			public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+				indexer.add(createIndexRequest(element));
+			}
+		}));
+
+		env.execute("Elasticsearch Sink Example");
+	}
+
+	private static IndexRequest createIndexRequest(String element) {
+		Map<String, Object> json = new HashMap<>();
+		json.put("data", element);
+
+		return Requests.indexRequest()
+			.index("my-index")
+			.type("my-type")
+			.id(element)
+			.source(json);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2055184
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j2.properties b/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j2.properties
deleted file mode 100644
index dc20726..0000000
--- a/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j2.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=OFF, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index e19c77f..5d8ca70 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -45,9 +45,9 @@ under the License.
 		<module>flink-connector-kafka-0.8</module>
 		<module>flink-connector-kafka-0.9</module>
 		<module>flink-connector-kafka-0.10</module>
+		<module>flink-connector-elasticsearch-base</module>
 		<module>flink-connector-elasticsearch</module>
 		<module>flink-connector-elasticsearch2</module>
-		<module>flink-connector-elasticsearch5</module>
 		<module>flink-connector-rabbitmq</module>
 		<module>flink-connector-twitter</module>
 		<module>flink-connector-nifi</module>
@@ -86,6 +86,20 @@ under the License.
 				<module>flink-connector-kinesis</module>
 			</modules>
 		</profile>
+
+		<!--
+			Since Elasticsearch 5.x requires Java 8 at a minimum, we use this profile
+			to include it as part of Java 8 builds only.
+		-->
+		<profile>
+			<id>include-elasticsearch5</id>
+			<activation>
+				<jdk>1.8</jdk>
+			</activation>
+			<modules>
+				<module>flink-connector-elasticsearch5</module>
+			</modules>
+		</profile>
 	</profiles>
 
 </project>


[3/4] flink git commit: [FLINK-4988] [elasticsearch] Restructure Elasticsearch connectors

Posted by tz...@apache.org.
[FLINK-4988] [elasticsearch] Restructure Elasticsearch connectors

This closes #3112.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5caaef8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5caaef8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5caaef8

Branch: refs/heads/master
Commit: b5caaef82add4a6f424094d526700c77b011724e
Parents: 8699b03
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu Jan 12 15:21:56 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Feb 7 22:45:45 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/elasticsearch.md            | 271 ++++++++++++-----
 docs/dev/connectors/elasticsearch2.md           | 173 -----------
 docs/dev/connectors/elasticsearch5.md           | 146 ----------
 docs/dev/connectors/filesystem_sink.md          |   2 +-
 docs/dev/connectors/nifi.md                     |   2 +-
 docs/dev/connectors/rabbitmq.md                 |   2 +-
 docs/dev/connectors/twitter.md                  |   2 +-
 docs/redirects/elasticsearch2.md                |   2 +-
 docs/redirects/elasticsearch2_2.md              |  24 ++
 .../flink-connector-elasticsearch-base/pom.xml  |  95 ++++++
 .../elasticsearch/BulkProcessorIndexer.java     |  44 +++
 .../ElasticsearchApiCallBridge.java             |  60 ++++
 .../elasticsearch/ElasticsearchSinkBase.java    | 237 +++++++++++++++
 .../ElasticsearchSinkFunction.java              |  71 +++++
 .../elasticsearch/RequestIndexer.java           |  37 +++
 .../elasticsearch/util/ElasticsearchUtils.java  |  51 ++++
 .../ElasticsearchSinkTestBase.java              | 186 ++++++++++++
 .../EmbeddedElasticsearchNodeEnvironment.java   |  55 ++++
 .../testutils/SourceSinkDataTestKit.java        | 112 +++++++
 .../src/test/resources/log4j-test.properties    |  27 ++
 .../src/test/resources/logback-test.xml         |  30 ++
 .../flink-connector-elasticsearch/pom.xml       |  27 +-
 .../Elasticsearch1ApiCallBridge.java            | 128 ++++++++
 .../elasticsearch/ElasticsearchSink.java        | 290 +++----------------
 .../elasticsearch/IndexRequestBuilder.java      |   8 +-
 .../IndexRequestBuilderWrapperFunction.java     |  41 +++
 .../elasticsearch/ElasticsearchSinkITCase.java  | 204 ++++++-------
 ...mbeddedElasticsearchNodeEnvironmentImpl.java |  68 +++++
 .../examples/ElasticsearchExample.java          |  80 -----
 .../examples/ElasticsearchSinkExample.java      |  84 ++++++
 .../src/test/resources/log4j-test.properties    |   6 +-
 .../flink-connector-elasticsearch2/pom.xml      |  23 +-
 .../elasticsearch2/BulkProcessorIndexer.java    |  35 ---
 .../Elasticsearch2ApiCallBridge.java            |  91 ++++++
 .../elasticsearch2/ElasticsearchSink.java       | 231 ++-------------
 .../ElasticsearchSinkFunction.java              |  24 +-
 .../OldNewElasticsearchSinkFunctionBridge.java  |  45 +++
 .../OldNewRequestIndexerBridge.java             |  41 +++
 .../elasticsearch2/RequestIndexer.java          |   8 +-
 ...mbeddedElasticsearchNodeEnvironmentImpl.java |  68 +++++
 .../elasticsearch2/ElasticsearchSinkITCase.java | 229 ++-------------
 .../examples/ElasticsearchExample.java          |  90 ------
 .../examples/ElasticsearchSinkExample.java      |  79 +++++
 .../src/test/resources/log4j-test.properties    |   6 +-
 .../flink-connector-elasticsearch5/pom.xml      |  95 +++++-
 .../elasticsearch5/BulkProcessorIndexer.java    |  35 ---
 .../Elasticsearch5ApiCallBridge.java            |  97 +++++++
 .../elasticsearch5/ElasticsearchSink.java       | 247 ++--------------
 .../ElasticsearchSinkFunction.java              |  60 ----
 .../elasticsearch5/RequestIndexer.java          |  25 --
 ...mbeddedElasticsearchNodeEnvironmentImpl.java |  81 ++++++
 .../elasticsearch5/ElasticsearchSinkITCase.java | 182 ++----------
 .../examples/ElasticsearchExample.java          |  83 ------
 .../examples/ElasticsearchSinkExample.java      |  81 ++++++
 .../src/test/resources/log4j-test.properties    |  27 ++
 .../src/test/resources/log4j2.properties        |  27 --
 flink-connectors/pom.xml                        |  16 +-
 57 files changed, 2584 insertions(+), 2007 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/docs/dev/connectors/elasticsearch.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md
index 3e8c68a..a40de68 100644
--- a/docs/dev/connectors/elasticsearch.md
+++ b/docs/dev/connectors/elasticsearch.md
@@ -23,130 +23,226 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This connector provides a Sink that can write to an
-[Elasticsearch](https://elastic.co/) Index. To use this connector, add the
-following dependency to your project:
-
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-elasticsearch{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
+This connector provides sinks that can request document actions to an
+[Elasticsearch](https://elastic.co/) Index. To use this connector, add one
+of the following dependencies to your project, depending on the version
+of the Elasticsearch installation:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left">Maven Dependency</th>
+      <th class="text-left">Supported since</th>
+      <th class="text-left">Elasticsearch version</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td>flink-connector-elasticsearch{{ site.scala_version_suffix }}</td>
+        <td>1.0.0</td>
+        <td>1.x</td>
+    </tr>
+    <tr>
+        <td>flink-connector-elasticsearch2{{ site.scala_version_suffix }}</td>
+        <td>1.0.0</td>
+        <td>2.x</td>
+    </tr>
+    <tr>
+        <td>flink-connector-elasticsearch5{{ site.scala_version_suffix }}</td>
+        <td>1.2.0</td>
+        <td>5.x</td>
+    </tr>
+  </tbody>
+</table>
 
 Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/dev/linking.html)
-for information about how to package the program with the libraries for
-cluster execution.
+distribution. See [here]({{site.baseurl}}/dev/linking.html) for information
+about how to package the program with the libraries for cluster execution.
 
 #### Installing Elasticsearch
 
 Instructions for setting up an Elasticsearch cluster can be found
 [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
 Make sure to set and remember a cluster name. This must be set when
-creating a Sink for writing to your cluster
+creating an `ElasticsearchSink` for requesting document actions against your cluster.
 
 #### Elasticsearch Sink
-The connector provides a Sink that can send data to an Elasticsearch Index.
-
-The sink can use two different methods for communicating with Elasticsearch:
-
-1. An embedded Node
-2. The TransportClient
 
-See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
-for information about the differences between the two modes.
+The `ElasticsearchSink` uses a `TransportClient` to communicate with an
+Elasticsearch cluster.
 
-This code shows how to create a sink that uses an embedded Node for
-communication:
+The example below shows how to configure and create a sink:
 
 <div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
+<div data-lang="java, Elasticsearch 1.x" markdown="1">
 {% highlight java %}
 DataStream<String> input = ...;
 
-Map<String, String> config = Maps.newHashMap();
+Map<String, String> config = new HashMap<>();
+config.put("cluster.name", "my-cluster-name");
 // This instructs the sink to emit after every element, otherwise they would be buffered
 config.put("bulk.flush.max.actions", "1");
-config.put("cluster.name", "my-cluster-name");
 
-input.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<String>() {
-    @Override
-    public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
-        Map<String, Object> json = new HashMap<>();
-        json.put("data", element);
+List<TransportAddress> transportAddresses = new ArrayList<String>();
+transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300));
+transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300));
 
+input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
+    public IndexRequest createIndexRequest(String element) {
+        Map<String, String> json = new HashMap<>();
+        json.put("data", element);
+    
         return Requests.indexRequest()
                 .index("my-index")
                 .type("my-type")
                 .source(json);
     }
+    
+    @Override
+    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+        indexer.add(createIndexRequest(element));
+    }
 }));
 {% endhighlight %}
 </div>
-<div data-lang="scala" markdown="1">
+<div data-lang="java, Elasticsearch 2.x / 5.x" markdown="1">
+{% highlight java %}
+DataStream<String> input = ...;
+
+Map<String, String> config = new HashMap<>();
+config.put("cluster.name", "my-cluster-name");
+// This instructs the sink to emit after every element, otherwise they would be buffered
+config.put("bulk.flush.max.actions", "1");
+
+List<InetSocketAddress> transportAddresses = new ArrayList<>();
+transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
+
+input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
+    public IndexRequest createIndexRequest(String element) {
+        Map<String, String> json = new HashMap<>();
+        json.put("data", element);
+    
+        return Requests.indexRequest()
+                .index("my-index")
+                .type("my-type")
+                .source(json);
+    }
+    
+    @Override
+    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+        indexer.add(createIndexRequest(element));
+    }
+}));{% endhighlight %}
+</div>
+<div data-lang="scala, Elasticsearch 1.x" markdown="1">
 {% highlight scala %}
 val input: DataStream[String] = ...
 
-val config = new util.HashMap[String, String]
+val config = new java.util.HashMap[String, String]
+config.put("cluster.name", "my-cluster-name")
+// This instructs the sink to emit after every element, otherwise they would be buffered
 config.put("bulk.flush.max.actions", "1")
+
+val transportAddresses = new java.util.ArrayList[TransportAddress]
+transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300))
+transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300))
+
+input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] {
+  def createIndexRequest(element: String): IndexRequest = {
+    val json = new java.util.HashMap[String, String]
+    json.put("data", element)
+    
+    return Requests.indexRequest()
+            .index("my-index")
+            .type("my-type")
+            .source(json);
+  }
+}))
+{% endhighlight %}
+</div>
+<div data-lang="scala, Elasticsearch 2.x / 5.x" markdown="1">
+{% highlight scala %}
+val input: DataStream[String] = ...
+
+val config = new java.util.HashMap[String, String]
 config.put("cluster.name", "my-cluster-name")
+// This instructs the sink to emit after every element, otherwise they would be buffered
+config.put("bulk.flush.max.actions", "1")
+
+val transportAddresses = new java.util.ArrayList[InetSocketAddress]
+transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
+transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300))
 
-text.addSink(new ElasticsearchSink(config, new IndexRequestBuilder[String] {
-  override def createIndexRequest(element: String, ctx: RuntimeContext): IndexRequest = {
-    val json = new util.HashMap[String, AnyRef]
+input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] {
+  def createIndexRequest(element: String): IndexRequest = {
+    val json = new java.util.HashMap[String, String]
     json.put("data", element)
-    println("SENDING: " + element)
-    Requests.indexRequest.index("my-index").`type`("my-type").source(json)
+    
+    return Requests.indexRequest()
+            .index("my-index")
+            .type("my-type")
+            .source(json);
   }
 }))
 {% endhighlight %}
 </div>
 </div>
 
-Note how a Map of Strings is used to configure the Sink. The configuration keys
-are documented in the Elasticsearch documentation
+Note how a `Map` of `String`s is used to configure the `ElasticsearchSink`.
+The configuration keys are documented in the Elasticsearch documentation
 [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
 Especially important is the `cluster.name` parameter that must correspond to
 the name of your cluster.
 
-Internally, the sink uses a `BulkProcessor` to send index requests to the cluster.
-This will buffer elements before sending a request to the cluster. The behaviour of the
-`BulkProcessor` can be configured using these config keys:
+Also note that the example only demonstrates performing a single index
+request for each incoming element. Generally, the `ElasticsearchSinkFunction`
+can be used to perform multiple requests of different types (ex.,
+`DeleteRequest`, `UpdateRequest`, etc.). 
+
+Internally, the sink uses a `BulkProcessor` to send action requests to the cluster.
+This will buffer elements before sending them in bulk to the cluster. The behaviour of the
+`BulkProcessor` can be set using these config keys in the provided `Map` configuration:
  * **bulk.flush.max.actions**: Maximum amount of elements to buffer
  * **bulk.flush.max.size.mb**: Maximum amount of data (in megabytes) to buffer
  * **bulk.flush.interval.ms**: Interval at which to flush data regardless of the other two
   settings in milliseconds
 
-This example code does the same, but with a `TransportClient`:
+#### Communication using Embedded Node (only for Elasticsearch 1.x)
+
+For Elasticsearch versions 1.x, communication using an embedded node is
+also supported. See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
+for information about the differences between communicating with Elasticsearch
+with an embedded node and a `TransportClient`.
+
+Below is an example of how to create an `ElasticsearchSink` use an
+embedded node instead of a `TransportClient`:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 DataStream<String> input = ...;
 
-Map<String, String> config = Maps.newHashMap();
+Map<String, String> config = new HashMap<>;
 // This instructs the sink to emit after every element, otherwise they would be buffered
 config.put("bulk.flush.max.actions", "1");
 config.put("cluster.name", "my-cluster-name");
 
-List<TransportAddress> transports = new ArrayList<String>();
-transports.add(new InetSocketTransportAddress("node-1", 9300));
-transports.add(new InetSocketTransportAddress("node-2", 9300));
-
-input.addSink(new ElasticsearchSink<>(config, transports, new IndexRequestBuilder<String>() {
-    @Override
-    public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
-        Map<String, Object> json = new HashMap<>();
+input.addSink(new ElasticsearchSink<>(config, new ElasticsearchSinkFunction<String>() {
+    public IndexRequest createIndexRequest(String element) {
+        Map<String, String> json = new HashMap<>();
         json.put("data", element);
-
+    
         return Requests.indexRequest()
                 .index("my-index")
                 .type("my-type")
                 .source(json);
     }
+    
+    @Override
+    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+        indexer.add(createIndexRequest(element));
+    }
 }));
 {% endhighlight %}
 </div>
@@ -154,27 +250,64 @@ input.addSink(new ElasticsearchSink<>(config, transports, new IndexRequestBuilde
 {% highlight scala %}
 val input: DataStream[String] = ...
 
-val config = new util.HashMap[String, String]
+val config = new java.util.HashMap[String, String]
 config.put("bulk.flush.max.actions", "1")
 config.put("cluster.name", "my-cluster-name")
 
-val transports = new ArrayList[String]
-transports.add(new InetSocketTransportAddress("node-1", 9300))
-transports.add(new InetSocketTransportAddress("node-2", 9300))
-
-text.addSink(new ElasticsearchSink(config, transports, new IndexRequestBuilder[String] {
-  override def createIndexRequest(element: String, ctx: RuntimeContext): IndexRequest = {
-    val json = new util.HashMap[String, AnyRef]
+input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String] {
+  def createIndexRequest(element: String): IndexRequest = {
+    val json = new java.util.HashMap[String, String]
     json.put("data", element)
-    println("SENDING: " + element)
-    Requests.indexRequest.index("my-index").`type`("my-type").source(json)
+    
+    return Requests.indexRequest()
+            .index("my-index")
+            .type("my-type")
+            .source(json);
   }
 }))
 {% endhighlight %}
 </div>
 </div>
 
-The difference is that we now need to provide a list of Elasticsearch Nodes
-to which the sink should connect using a `TransportClient`.
+The difference is that now we do not need to provide a list of addresses
+of Elasticsearch nodes.
 
 More information about Elasticsearch can be found [here](https://elastic.co).
+
+#### Packaging the Elasticsearch Connector into an Uber-Jar
+
+For the execution of your Flink program, it is recommended to build a
+so-called uber-jar (executable jar) containing all your dependencies
+(see [here]({{site.baseurl}}/dev/linking.html) for further information).
+
+However, when an uber-jar containing an Elasticsearch sink is executed,
+an `IllegalArgumentException` may occur, which is caused by conflicting
+files of Elasticsearch and it's dependencies in `META-INF/services`:
+
+```
+IllegalArgumentException[An SPI class of type org.apache.lucene.codecs.PostingsFormat with name 'Lucene50' does not exist.  You need to add the corresponding JAR file supporting this SPI to your classpath.  The current classpath supports the following names: [es090, completion090, XBloomFilter]]
+```
+
+If the uber-jar is built using Maven, this issue can be avoided by
+adding the following to the Maven POM file in the plugins section:
+
+~~~xml
+<plugin>
+    <groupId>org.apache.maven.plugins</groupId>
+    <artifactId>maven-shade-plugin</artifactId>
+    <version>2.4.3</version>
+    <executions>
+        <execution>
+            <phase>package</phase>
+            <goals>
+                <goal>shade</goal>
+            </goals>
+            <configuration>
+                <transformers>
+                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                </transformers>
+            </configuration>
+        </execution>
+    </executions>
+</plugin>
+~~~

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/docs/dev/connectors/elasticsearch2.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/elasticsearch2.md b/docs/dev/connectors/elasticsearch2.md
deleted file mode 100644
index af02c84..0000000
--- a/docs/dev/connectors/elasticsearch2.md
+++ /dev/null
@@ -1,173 +0,0 @@
----
-title: "Elasticsearch 2.x Connector"
-nav-title: Elasticsearch 2.x
-nav-parent_id: connectors
-nav-pos: 5
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-This connector provides a Sink that can write to an
-[Elasticsearch 2.x](https://elastic.co/) Index. To use this connector, add the
-following dependency to your project:
-
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-elasticsearch2{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-
-Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/dev/linking.html)
-for information about how to package the program with the libraries for
-cluster execution.
-
-#### Installing Elasticsearch 2.x
-
-Instructions for setting up an Elasticsearch cluster can be found
-[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
-Make sure to set and remember a cluster name. This must be set when
-creating a Sink for writing to your cluster
-
-#### Elasticsearch 2.x Sink
-The connector provides a Sink that can send data to an Elasticsearch 2.x Index.
-
-The sink communicates with Elasticsearch via Transport Client
-
-See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/transport-client.html)
-for information about the Transport Client.
-
-The code below shows how to create a sink that uses a `TransportClient` for communication:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-File dataDir = ....;
-
-DataStream<String> input = ...;
-
-Map<String, String> config = new HashMap<>();
-// This instructs the sink to emit after every element, otherwise they would be buffered
-config.put("bulk.flush.max.actions", "1");
-config.put("cluster.name", "my-cluster-name");
-
-List<InetSocketAddress> transports = new ArrayList<>();
-transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
-
-input.addSink(new ElasticsearchSink(config, transports, new ElasticsearchSinkFunction<String>() {
-  public IndexRequest createIndexRequest(String element) {
-    Map<String, String> json = new HashMap<>();
-    json.put("data", element);
-
-    return Requests.indexRequest()
-            .index("my-index")
-            .type("my-type")
-            .source(json);
-  }
-
-  @Override
-  public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
-    indexer.add(createIndexRequest(element));
-  }
-}));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val dataDir = ....;
-
-val input: DataStream[String] = ...
-
-val config = new util.HashMap[String, String]
-config.put("bulk.flush.max.actions", "1")
-config.put("cluster.name", "my-cluster-name")
-
-val transports = new ArrayList[String]
-transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
-transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
-
-input.addSink(new ElasticsearchSink(config, transports, new ElasticsearchSinkFunction[String] {
-  def createIndexRequest(element: String): IndexRequest = {
-    val json = new util.HashMap[String, AnyRef]
-    json.put("data", element)
-    Requests.indexRequest.index("my-index").`type`("my-type").source(json)
-  }
-
-  override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
-    indexer.add(createIndexRequest(element))
-  }
-}))
-{% endhighlight %}
-</div>
-</div>
-
-A Map of Strings is used to configure the Sink. The configuration keys
-are documented in the Elasticsearch documentation
-[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
-Especially important is the `cluster.name`. parameter that must correspond to
-the name of your cluster and with ElasticSearch 2x you also need to specify `path.home`.
-
-Internally, the sink uses a `BulkProcessor` to send Action requests to the cluster.
-This will buffer elements and Action Requests before sending to the cluster. The behaviour of the
-`BulkProcessor` can be configured using these config keys:
- * **bulk.flush.max.actions**: Maximum amount of elements to buffer
- * **bulk.flush.max.size.mb**: Maximum amount of data (in megabytes) to buffer
- * **bulk.flush.interval.ms**: Interval at which to flush data regardless of the other two
-  settings in milliseconds
-
-This now provides a list of Elasticsearch Nodes
-to which the sink should connect via a `TransportClient`.
-
-More information about Elasticsearch can be found [here](https://elastic.co).
-
-
-#### Packaging the Elasticsearch Connector into an Uber-jar
-
-For the execution of your Flink program,
-it is recommended to build a so-called uber-jar (executable jar) containing all your dependencies
-(see [here]({{site.baseurl}}/dev/linking.html) for further information).
-
-However,
-when an uber-jar containing an Elasticsearch sink is executed,
-an `IllegalArgumentException` may occur,
-which is caused by conflicting files of Elasticsearch and it's dependencies
-in `META-INF/services`:
-
-```
-IllegalArgumentException[An SPI class of type org.apache.lucene.codecs.PostingsFormat with name 'Lucene50' does not exist.  You need to add the corresponding JAR file supporting this SPI to your classpath.  The current classpath supports the following names: [es090, completion090, XBloomFilter]]
-```
-
-If the uber-jar is build by means of maven,
-this issue can be avoided by adding the following bits to the pom file:
-
-```
-<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
-    <resource>META-INF/services/org.apache.lucene.codecs.Codec</resource>
-</transformer>
-<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
-    <resource>META-INF/services/org.apache.lucene.codecs.DocValuesFormat</resource>
-</transformer>
-<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
-   <resource>META-INF/services/org.apache.lucene.codecs.PostingsFormat</resource>
-</transformer>
-```

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/docs/dev/connectors/elasticsearch5.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/elasticsearch5.md b/docs/dev/connectors/elasticsearch5.md
deleted file mode 100644
index 2673d86..0000000
--- a/docs/dev/connectors/elasticsearch5.md
+++ /dev/null
@@ -1,146 +0,0 @@
----
-title: "Elasticsearch 5.x Connector"
-nav-title: Elasticsearch 5.x
-nav-parent_id: connectors
-nav-pos: 6
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-This connector provides a Sink that can write to an
-[Elasticsearch 5.x](https://elastic.co/) Index. To use this connector, add the
-following dependency to your project:
-
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-elasticsearch5{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-
-Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/dev/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution)
-for information about how to package the program with the libraries for
-cluster execution.
-
-#### Installing Elasticsearch 5.x
-
-Instructions for setting up an Elasticsearch cluster can be found
-    [here](https://www.elastic.co/guide/en/elasticsearch/reference/5.x/setup.html).
-Make sure to set and remember a cluster name. This must be set when
-creating a Sink for writing to your cluster
-
-#### Elasticsearch 5.x Sink
-The connector provides a Sink that can send data to an Elasticsearch 5.x Index.
-
-The sink communicates with Elasticsearch via Transport Client
-
-See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.x/transport-client.html)
-for information about the Transport Client.
-
-The code below shows how to create a sink that uses a `TransportClient` for communication:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-File dataDir = ....;
-
-DataStream<String> input = ...;
-
-Map<String, String> esConfig = new HashMap<>();
-esConfig.put("cluster.name", "my-cluster-name");
-
-// This instructs the sink to emit after every element, otherwise they would be buffered
-Map<String, String> sinkConfig = new HashMap<>();
-sinkConfig.put("bulk.flush.max.actions", "1");
-
-List<InetSocketAddress> transports = new ArrayList<>();
-transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
-
-input.addSink(new ElasticsearchSink(esConfig, sinkConfig, transports, new ElasticsearchSinkFunction<String>() {
-  public IndexRequest createIndexRequest(String element) {
-    Map<String, String> json = new HashMap<>();
-    json.put("data", element);
-
-    return Requests.indexRequest()
-            .index("my-index")
-            .type("my-type")
-            .source(json);
-  }
-
-  @Override
-  public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
-    indexer.add(createIndexRequest(element));
-  }
-}));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val dataDir = ....;
-
-val input: DataStream[String] = ...
-
-val esConfig = new util.HashMap[String, String]
-esConfig.put("cluster.name", "my-cluster-name")
-
-val sinkConfig = new util.HashMap[String, String]
-sinkConfig.put("bulk.flush.max.actions", "1")
-
-val transports = new ArrayList[String]
-transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
-transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
-
-input.addSink(new ElasticsearchSink(esConfig, sinkConfig, transports, new ElasticsearchSinkFunction[String] {
-  def createIndexRequest(element: String): IndexRequest = {
-    val json = new util.HashMap[String, AnyRef]
-    json.put("data", element)
-    Requests.indexRequest.index("my-index").`type`("my-type").source(json)
-  }
-
-  override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
-    indexer.add(createIndexRequest(element))
-  }
-}))
-{% endhighlight %}
-</div>
-</div>
-
-The first Map of Strings is used to configure the Transport Client. The configuration keys
-are documented in the Elasticsearch documentation
-[here](https://www.elastic.co/guide/en/elasticsearch/reference/5.x/index.html).
-Especially important is the `cluster.name`. parameter that must correspond to
-the name of your cluster.
-
-The second Map of Strings is used to configure a `BulkProcessor` to send Action requests to the cluster.
-This will buffer elements and Action Requests before sending to the cluster. The behaviour of the
-`BulkProcessor` can be configured using these config keys:
- * **bulk.flush.max.actions**: Maximum amount of elements to buffer
- * **bulk.flush.max.size.mb**: Maximum amount of data (in megabytes) to buffer
- * **bulk.flush.interval.ms**: Interval at which to flush data regardless of the other two
-  settings in milliseconds
-
-This now provides a list of Elasticsearch Nodes
-to which the sink should connect via a `TransportClient`.
-
-More information about Elasticsearch can be found [here](https://elastic.co).
-

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/docs/dev/connectors/filesystem_sink.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/filesystem_sink.md b/docs/dev/connectors/filesystem_sink.md
index bcaeb17..67250f0 100644
--- a/docs/dev/connectors/filesystem_sink.md
+++ b/docs/dev/connectors/filesystem_sink.md
@@ -2,7 +2,7 @@
 title: "HDFS Connector"
 nav-title: Rolling File Sink
 nav-parent_id: connectors
-nav-pos: 7
+nav-pos: 5
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/docs/dev/connectors/nifi.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/nifi.md b/docs/dev/connectors/nifi.md
index 8223867..dbc1e8a 100644
--- a/docs/dev/connectors/nifi.md
+++ b/docs/dev/connectors/nifi.md
@@ -2,7 +2,7 @@
 title: "Apache NiFi Connector"
 nav-title: NiFi
 nav-parent_id: connectors
-nav-pos: 9
+nav-pos: 7
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/docs/dev/connectors/rabbitmq.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/rabbitmq.md b/docs/dev/connectors/rabbitmq.md
index b4da248..d3360e4 100644
--- a/docs/dev/connectors/rabbitmq.md
+++ b/docs/dev/connectors/rabbitmq.md
@@ -2,7 +2,7 @@
 title: "RabbitMQ Connector"
 nav-title: RabbitMQ
 nav-parent_id: connectors
-nav-pos: 8
+nav-pos: 6
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/docs/dev/connectors/twitter.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/twitter.md b/docs/dev/connectors/twitter.md
index f1fbbd4..5fb7d68 100644
--- a/docs/dev/connectors/twitter.md
+++ b/docs/dev/connectors/twitter.md
@@ -2,7 +2,7 @@
 title: "Twitter Connector"
 nav-title: Twitter
 nav-parent_id: connectors
-nav-pos: 10
+nav-pos: 8
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/docs/redirects/elasticsearch2.md
----------------------------------------------------------------------
diff --git a/docs/redirects/elasticsearch2.md b/docs/redirects/elasticsearch2.md
index 8442e52..f40199b 100644
--- a/docs/redirects/elasticsearch2.md
+++ b/docs/redirects/elasticsearch2.md
@@ -1,7 +1,7 @@
 ---
 title: "Elasticsearch2 Connector"
 layout: redirect
-redirect: /dev/connectors/elasticsearch2.html
+redirect: /dev/connectors/elasticsearch.html
 permalink: /apis/streaming/connectors/elasticsearch2.html
 ---
 <!--

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/docs/redirects/elasticsearch2_2.md
----------------------------------------------------------------------
diff --git a/docs/redirects/elasticsearch2_2.md b/docs/redirects/elasticsearch2_2.md
new file mode 100644
index 0000000..561ac69
--- /dev/null
+++ b/docs/redirects/elasticsearch2_2.md
@@ -0,0 +1,24 @@
+---
+title: "Elasticsearch2 Connector"
+layout: redirect
+redirect: /dev/connectors/elasticsearch.html
+permalink: /dev/connectors/elasticsearch2.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/pom.xml b/flink-connectors/flink-connector-elasticsearch-base/pom.xml
new file mode 100644
index 0000000..81652c4
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/pom.xml
@@ -0,0 +1,95 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+			xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+			xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connectors</artifactId>
+		<version>1.3-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-elasticsearch-base_2.10</artifactId>
+	<name>flink-connector-elasticsearch-base</name>
+
+	<packaging>jar</packaging>
+
+	<!-- Allow users to pass custom connector versions -->
+	<properties>
+		<elasticsearch.version>1.7.1</elasticsearch.version>
+	</properties>
+
+	<dependencies>
+
+		<!-- core dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.elasticsearch</groupId>
+			<artifactId>elasticsearch</artifactId>
+			<version>${elasticsearch.version}</version>
+		</dependency>
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
new file mode 100644
index 0000000..d802550
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}.
+ * {@link ActionRequest ActionRequests} will be buffered before sending a bulk request to the Elasticsearch cluster.
+ */
+class BulkProcessorIndexer implements RequestIndexer {
+
+	private static final long serialVersionUID = 6841162943062034253L;
+
+	private final BulkProcessor bulkProcessor;
+
+	BulkProcessorIndexer(BulkProcessor bulkProcessor) {
+		this.bulkProcessor = bulkProcessor;
+	}
+
+	@Override
+	public void add(ActionRequest... actionRequests) {
+		for (ActionRequest actionRequest : actionRequests) {
+			this.bulkProcessor.add(actionRequest);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
new file mode 100644
index 0000000..6298a85
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.client.Client;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * An {@link ElasticsearchApiCallBridge} is used to bridge incompatible Elasticsearch Java API calls across different versions.
+ * This includes calls to create Elasticsearch clients, handle failed item responses, etc. Any incompatible Elasticsearch
+ * Java APIs should be bridged using this interface.
+ *
+ * Implementations are allowed to be stateful. For example, for Elasticsearch 1.x, since connecting via an embedded node
+ * is allowed, the call bridge will hold reference to the created embedded node. Each instance of the sink will hold
+ * exactly one instance of the call bridge, and state cleanup is performed when the sink is closed.
+ */
+public interface ElasticsearchApiCallBridge extends Serializable {
+
+	/**
+	 * Creates an Elasticsearch {@link Client}.
+	 *
+	 * @param clientConfig The configuration to use when constructing the client.
+	 * @return The created client.
+	 */
+	Client createClient(Map<String, String> clientConfig);
+
+	/**
+	 * Extracts the cause of failure of a bulk item action.
+	 *
+	 * @param bulkItemResponse the bulk item response to extract cause of failure
+	 * @return the extracted {@link Throwable} from the response ({@code null} is the response is successful).
+	 */
+	@Nullable Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse);
+
+	/**
+	 * Perform any necessary state cleanup.
+	 */
+	void cleanup();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
new file mode 100644
index 0000000..6a2d65f
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all Flink Elasticsearch Sinks.
+ *
+ * <p>
+ * This class implements the common behaviour across Elasticsearch versions, such as
+ * the use of an internal {@link BulkProcessor} to buffer multiple {@link ActionRequest}s before
+ * sending the requests to the cluster, as well as passing input records to the user provided
+ * {@link ElasticsearchSinkFunction} for processing.
+ *
+ * <p>
+ * The version specific API calls for different Elasticsearch versions should be defined by a concrete implementation of
+ * a {@link ElasticsearchApiCallBridge}, which is provided to the constructor of this class. This call bridge is used,
+ * for example, to create a Elasticsearch {@link Client}, handle failed item responses, etc.
+ *
+ * @param <T> Type of the elements handled by this sink
+ */
+public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
+
+	private static final long serialVersionUID = -1007596293618451942L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkBase.class);
+
+	// ------------------------------------------------------------------------
+	//  Internal bulk processor configuration
+	// ------------------------------------------------------------------------
+
+	public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
+	public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
+	public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
+
+	private final Integer bulkProcessorFlushMaxActions;
+	private final Integer bulkProcessorFlushMaxSizeMb;
+	private final Integer bulkProcessorFlushIntervalMillis;
+
+	// ------------------------------------------------------------------------
+	//  User-facing API and configuration
+	// ------------------------------------------------------------------------
+
+	/** The user specified config map that we forward to Elasticsearch when we create the {@link Client}. */
+	private final Map<String, String> userConfig;
+
+	/** The function that is used to construct mulitple {@link ActionRequest ActionRequests} from each incoming element. */
+	private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
+
+	/** Provided to the user via the {@link ElasticsearchSinkFunction} to add {@link ActionRequest ActionRequests}. */
+	private transient BulkProcessorIndexer requestIndexer;
+
+	// ------------------------------------------------------------------------
+	//  Internals for the Flink Elasticsearch Sink
+	// ------------------------------------------------------------------------
+
+	/** Call bridge for different version-specfic */
+	private final ElasticsearchApiCallBridge callBridge;
+
+	/** Elasticsearch client created using the call bridge. */
+	private transient Client client;
+
+	/** Bulk processor to buffer and send requests to Elasticsearch, created using the client. */
+	private transient BulkProcessor bulkProcessor;
+
+	/**
+	 * This is set from inside the {@link BulkProcessor.Listener} if a {@link Throwable} was thrown in callbacks.
+	 */
+	private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
+
+	public ElasticsearchSinkBase(
+		ElasticsearchApiCallBridge callBridge,
+		Map<String, String> userConfig,
+		ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+
+		this.callBridge = checkNotNull(callBridge);
+		this.elasticsearchSinkFunction = checkNotNull(elasticsearchSinkFunction);
+
+		// we eagerly check if the user-provided sink function is serializable;
+		// otherwise, if it isn't serializable, users will merely get a non-informative error message
+		// "ElasticsearchSinkBase is not serializable"
+		try {
+			InstantiationUtil.serializeObject(elasticsearchSinkFunction);
+		} catch (Exception e) {
+			throw new IllegalArgumentException(
+				"The implementation of the provided ElasticsearchSinkFunction is not serializable. " +
+				"The object probably contains or references non serializable fields.");
+		}
+
+		checkNotNull(userConfig);
+
+		// extract and remove bulk processor related configuration from the user-provided config,
+		// so that the resulting user config only contains configuration related to the Elasticsearch client.
+		ParameterTool params = ParameterTool.fromMap(userConfig);
+
+		if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
+			bulkProcessorFlushMaxActions = params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS);
+			userConfig.remove(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS);
+		} else {
+			bulkProcessorFlushMaxActions = null;
+		}
+
+		if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
+			bulkProcessorFlushMaxSizeMb = params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB);
+			userConfig.remove(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB);
+		} else {
+			bulkProcessorFlushMaxSizeMb = null;
+		}
+
+		if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
+			bulkProcessorFlushIntervalMillis = params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
+			userConfig.remove(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
+		} else {
+			bulkProcessorFlushIntervalMillis = null;
+		}
+
+		this.userConfig = userConfig;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		client = callBridge.createClient(userConfig);
+
+		BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
+			client,
+			new BulkProcessor.Listener() {
+				@Override
+				public void beforeBulk(long executionId, BulkRequest request) { }
+
+				@Override
+				public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+					if (response.hasFailures()) {
+						for (BulkItemResponse itemResp : response.getItems()) {
+							Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
+							if (failure != null) {
+								LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
+								failureThrowable.compareAndSet(null, failure);
+							}
+						}
+					}
+				}
+
+				@Override
+				public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+					LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure);
+					failureThrowable.compareAndSet(null, failure);
+				}
+			}
+		);
+
+		// This makes flush() blocking
+		bulkProcessorBuilder.setConcurrentRequests(0);
+
+		if (bulkProcessorFlushMaxActions != null) {
+			bulkProcessorBuilder.setBulkActions(bulkProcessorFlushMaxActions);
+		}
+
+		if (bulkProcessorFlushMaxSizeMb != null) {
+			bulkProcessorBuilder.setBulkSize(new ByteSizeValue(bulkProcessorFlushMaxSizeMb, ByteSizeUnit.MB));
+		}
+
+		if (bulkProcessorFlushIntervalMillis != null) {
+			bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(bulkProcessorFlushIntervalMillis));
+		}
+
+		bulkProcessor = bulkProcessorBuilder.build();
+		requestIndexer = new BulkProcessorIndexer(bulkProcessor);
+	}
+
+	@Override
+	public void invoke(T value) throws Exception {
+		// if bulk processor callbacks have previously reported an error, we rethrow the error and fail the sink
+		checkErrorAndRethrow();
+
+		elasticsearchSinkFunction.process(value, getRuntimeContext(), requestIndexer);
+	}
+
+	@Override
+	public void close() throws Exception {
+		if (bulkProcessor != null) {
+			bulkProcessor.close();
+			bulkProcessor = null;
+		}
+
+		if (client != null) {
+			client.close();
+			client = null;
+		}
+
+		callBridge.cleanup();
+
+		// make sure any errors from callbacks are rethrown
+		checkErrorAndRethrow();
+	}
+
+	private void checkErrorAndRethrow() {
+		Throwable cause = failureThrowable.get();
+		if (cause != null) {
+			throw new RuntimeException("An error occured in ElasticsearchSink.", cause);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
new file mode 100644
index 0000000..1e20a0a
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.elasticsearch.action.ActionRequest;
+
+import java.io.Serializable;
+
+/**
+ * Creates multiple {@link ActionRequest ActionRequests} from an element in a stream.
+ *
+ * <p>
+ * This is used by sinks to prepare elements for sending them to Elasticsearch.
+ *
+ * <p>
+ * Example:
+ *
+ * <pre>{@code
+ *					private static class TestElasticSearchSinkFunction implements
+ *						ElasticsearchSinkFunction<Tuple2<Integer, String>> {
+ *
+ *					public IndexRequest createIndexRequest(Tuple2<Integer, String> element) {
+ *						Map<String, Object> json = new HashMap<>();
+ *						json.put("data", element.f1);
+ *
+ *						return Requests.indexRequest()
+ *							.index("my-index")
+ *							.type("my-type")
+ *							.id(element.f0.toString())
+ *							.source(json);
+ *						}
+ *
+ *				public void process(Tuple2<Integer, String> element, RuntimeContext ctx, RequestIndexer indexer) {
+ *					indexer.add(createIndexRequest(element));
+ *				}
+ *		}
+ *
+ * }</pre>
+ *
+ * @param <T> The type of the element handled by this {@code ElasticsearchSinkFunction}
+ */
+public interface ElasticsearchSinkFunction<T> extends Serializable, Function {
+
+	/**
+	 * Process the incoming element to produce multiple {@link ActionRequest ActionsRequests}.
+	 * The produced requests should be added to the provided {@link RequestIndexer}.
+	 *
+	 * @param element incoming element to process
+	 * @param ctx     runtime context containing information about the sink instance
+	 * @param indexer request indexer that {@code ActionRequest} should be added to
+	 */
+	void process(T element, RuntimeContext ctx, RequestIndexer indexer);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
new file mode 100644
index 0000000..4587a80
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.elasticsearch.action.ActionRequest;
+
+import java.io.Serializable;
+
+/**
+ * Users add multiple {@link ActionRequest ActionRequests} to a {@link RequestIndexer} to prepare
+ * them for sending to an Elasticsearch cluster.
+ */
+public interface RequestIndexer extends Serializable {
+
+	/**
+	 * Add multiple {@link ActionRequest} to the indexer to prepare for sending requests to Elasticsearch.
+	 *
+	 * @param actionRequests The multiple {@link ActionRequest} to add.
+	 */
+	void add(ActionRequest... actionRequests);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/ElasticsearchUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/ElasticsearchUtils.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/ElasticsearchUtils.java
new file mode 100644
index 0000000..9776c4c
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/ElasticsearchUtils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.util;
+
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Suite of utility methods for Elasticsearch.
+ */
+public class ElasticsearchUtils {
+
+	/**
+	 * Utility method to convert a {@link List} of {@link InetSocketAddress} to Elasticsearch {@link TransportAddress}.
+	 *
+	 * @param inetSocketAddresses The list of {@link InetSocketAddress} to convert.
+	 */
+	public static List<TransportAddress> convertInetSocketAddresses(List<InetSocketAddress> inetSocketAddresses) {
+		if (inetSocketAddresses == null) {
+			return null;
+		} else {
+			List<TransportAddress> converted;
+			converted = new ArrayList<>(inetSocketAddresses.size());
+			for (InetSocketAddress address : inetSocketAddresses) {
+				converted.add(new InetSocketTransportAddress(address));
+			}
+			return converted;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
new file mode 100644
index 0000000..2f9e4c1
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Environment preparation and suite of tests for version-specific {@link ElasticsearchSinkBase} implementations.
+ */
+public abstract class ElasticsearchSinkTestBase extends StreamingMultipleProgramsTestBase {
+
+	protected final static String CLUSTER_NAME = "test-cluster";
+
+	protected static EmbeddedElasticsearchNodeEnvironment embeddedNodeEnv;
+
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@BeforeClass
+	public static void prepare() throws Exception {
+
+		LOG.info("-------------------------------------------------------------------------");
+		LOG.info("    Starting embedded Elasticsearch node ");
+		LOG.info("-------------------------------------------------------------------------");
+
+		// dynamically load version-specific implementation of the Elasticsearch embedded node environment
+		Class<?> clazz = Class.forName(
+			"org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironmentImpl");
+		embeddedNodeEnv = (EmbeddedElasticsearchNodeEnvironment) InstantiationUtil.instantiate(clazz);
+
+		embeddedNodeEnv.start(tempFolder.newFolder(), CLUSTER_NAME);
+
+	}
+
+	@AfterClass
+	public static void shutdown() throws Exception {
+
+		LOG.info("-------------------------------------------------------------------------");
+		LOG.info("    Shutting down embedded Elasticsearch node ");
+		LOG.info("-------------------------------------------------------------------------");
+
+		embeddedNodeEnv.close();
+
+	}
+
+	/**
+	 * Tests that the Elasticsearch sink works properly using a {@link TransportClient}.
+	 */
+	public void runTransportClientTest() throws Exception {
+		final String index = "transport-client-test-index";
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
+
+		Map<String, String> userConfig = new HashMap<>();
+		// This instructs the sink to emit after every element, otherwise they would be buffered
+		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+		userConfig.put("cluster.name", CLUSTER_NAME);
+
+		source.addSink(createElasticsearchSinkForEmbeddedNode(
+			userConfig, new SourceSinkDataTestKit.TestElasticsearchSinkFunction(index)));
+
+		env.execute("Elasticsearch TransportClient Test");
+
+		// verify the results
+		Client client = embeddedNodeEnv.getClient();
+		SourceSinkDataTestKit.verifyProducedSinkData(client, index);
+
+		client.close();
+	}
+
+	/**
+	 * Tests that the Elasticsearch sink fails eagerly if the provided list of transport addresses is {@code null}.
+	 */
+	public void runNullTransportClientTest() throws Exception {
+		Map<String, String> userConfig = new HashMap<>();
+		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+		userConfig.put("cluster.name", "my-transport-client-cluster");
+
+		try {
+			createElasticsearchSink(userConfig, null, new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
+		} catch(IllegalArgumentException expectedException) {
+			// test passes
+			return;
+		}
+
+		fail();
+	}
+
+	/**
+	 * Tests that the Elasticsearch sink fails eagerly if the provided list of transport addresses is empty.
+	 */
+	public void runEmptyTransportClientTest() throws Exception {
+		Map<String, String> userConfig = new HashMap<>();
+		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+		userConfig.put("cluster.name", "my-transport-client-cluster");
+
+		try {
+			createElasticsearchSink(
+				userConfig,
+				Collections.<InetSocketAddress>emptyList(),
+				new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
+		} catch(IllegalArgumentException expectedException) {
+			// test passes
+			return;
+		}
+
+		fail();
+	}
+
+	/**
+	 * Tests whether the Elasticsearch sink fails when there is no cluster to connect to.
+	 */
+	public void runTransportClientFailsTest() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
+
+		Map<String, String> userConfig = new HashMap<>();
+		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+		userConfig.put("cluster.name", "my-transport-client-cluster");
+
+		source.addSink(createElasticsearchSinkForEmbeddedNode(
+			userConfig, new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test")));
+
+		try {
+			env.execute("Elasticsearch Transport Client Test");
+		} catch(JobExecutionException expectedException) {
+			assertTrue(expectedException.getCause().getMessage().contains("not connected to any Elasticsearch nodes"));
+			return;
+		}
+
+		fail();
+	}
+
+	/** Creates a version-specific Elasticsearch sink, using arbitrary transport addresses */
+	protected abstract <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig,
+																			List<InetSocketAddress> transportAddresses,
+																			ElasticsearchSinkFunction<T> elasticsearchSinkFunction);
+
+	/**
+	 * Creates a version-specific Elasticsearch sink to connect to a local embedded Elasticsearch node.
+	 *
+	 * This case is singled out from {@link ElasticsearchSinkTestBase#createElasticsearchSink(Map, List, ElasticsearchSinkFunction)}
+	 * because the Elasticsearch Java API to do so is incompatible across different versions.
+	 */
+	protected abstract <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(
+		Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
new file mode 100644
index 0000000..f59eb03
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.elasticsearch.client.Client;
+
+import java.io.File;
+
+/**
+ * The {@link EmbeddedElasticsearchNodeEnvironment} is used in integration tests to manage Elasticsearch embedded nodes.
+ *
+ * NOTE: In order for {@link ElasticsearchSinkTestBase} to dynamically load version-specific implementations
+ *       for the tests, concrete implementations must be named {@code EmbeddedElasticsearchNodeEnvironmentImpl}. It must
+ *       also be located under the same package. The intentional package-private accessibility of this interface
+ *       enforces that.
+ */
+interface EmbeddedElasticsearchNodeEnvironment {
+
+	/**
+	 * Start an embedded Elasticsearch node instance.
+	 * Calling this method multiple times consecutively should not restart the embedded node.
+	 *
+	 * @param tmpDataFolder The temporary data folder for the embedded node to use.
+	 * @param clusterName The name of the cluster that the embedded node should be configured with.
+	 */
+	void start(File tmpDataFolder, String clusterName) throws Exception;
+
+	/**
+	 * Close the embedded node, if previously started.
+	 */
+	void close() throws Exception;
+
+	/**
+	 * Returns a client to communicate with the embedded node.
+	 *
+	 * @return Client to communicate with the embedded node.
+	 *         Returns {@code null} if the embedded node wasn't started or is closed.
+	 */
+	Client getClient();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/SourceSinkDataTestKit.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/SourceSinkDataTestKit.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/SourceSinkDataTestKit.java
new file mode 100644
index 0000000..55a48fa
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/SourceSinkDataTestKit.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.testutils;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
+import org.junit.Assert;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class contains utilities and a pre-defined source function and
+ * Elasticsearch Sink function used to simulate and verify data used in tests.
+ */
+public class SourceSinkDataTestKit {
+
+	private static final int NUM_ELEMENTS = 20;
+
+	private static final String DATA_PREFIX = "message #";
+	private static final String DATA_FIELD_NAME = "data";
+
+	private static final String TYPE_NAME = "flink-es-test-type";
+
+	/**
+	 * A {@link SourceFunction} that generates the elements (id, "message #" + id) with id being 0 - 20.
+	 */
+	public static class TestDataSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
+		private static final long serialVersionUID = 1L;
+
+		private volatile boolean running = true;
+
+		@Override
+		public void run(SourceFunction.SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
+			for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+				ctx.collect(Tuple2.of(i, DATA_PREFIX + i));
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
+
+	/**
+	 * A {@link ElasticsearchSinkFunction} that indexes each element it receives to a sepecified Elasticsearch index.
+	 */
+	public static class TestElasticsearchSinkFunction implements ElasticsearchSinkFunction<Tuple2<Integer, String>> {
+		private static final long serialVersionUID = 1L;
+
+		private final String index;
+
+		/**
+		 * Create the sink function, specifying a target Elasticsearch index.
+		 *
+		 * @param index Name of the target Elasticsearch index.
+		 */
+		public TestElasticsearchSinkFunction(String index) {
+			this.index = index;
+		}
+
+		public IndexRequest createIndexRequest(Tuple2<Integer, String> element) {
+			Map<String, Object> json = new HashMap<>();
+			json.put(DATA_FIELD_NAME, element.f1);
+
+			return new IndexRequest(index, TYPE_NAME, element.f0.toString()).source(json);
+		}
+
+		@Override
+		public void process(Tuple2<Integer, String> element, RuntimeContext ctx, RequestIndexer indexer) {
+			indexer.add(createIndexRequest(element));
+		}
+	}
+
+	/**
+	 * Verify the results in an Elasticsearch index. The results must first be produced into the index
+	 * using a {@link TestElasticsearchSinkFunction};
+	 *
+	 * @param client The client to use to connect to Elasticsearch
+	 * @param index The index to check
+	 */
+	public static void verifyProducedSinkData(Client client, String index) {
+		for (int i = 0; i < NUM_ELEMENTS; i++) {
+			GetResponse response = client.get(new GetRequest(index, TYPE_NAME, Integer.toString(i))).actionGet();
+			Assert.assertEquals(DATA_PREFIX + i, response.getSource().get(DATA_FIELD_NAME));
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch-base/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-elasticsearch-base/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2055184
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch-base/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/resources/logback-test.xml b/flink-connectors/flink-connector-elasticsearch-base/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/pom.xml b/flink-connectors/flink-connector-elasticsearch/pom.xml
index b2d7284..14f28d0 100644
--- a/flink-connectors/flink-connector-elasticsearch/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch/pom.xml
@@ -52,9 +52,9 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>org.elasticsearch</groupId>
-			<artifactId>elasticsearch</artifactId>
-			<version>${elasticsearch.version}</version>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch-base_2.10</artifactId>
+			<version>${project.version}</version>
 		</dependency>
 
 		<!-- test dependencies -->
@@ -73,18 +73,15 @@ under the License.
 			<scope>test</scope>
 			<type>test-jar</type>
 		</dependency>
-	</dependencies>
 
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-surefire-plugin</artifactId>
-				<configuration>
-					<rerunFailingTestsCount>3</rerunFailingTestsCount>
-				</configuration>
-			</plugin>
-		</plugins>
-	</build>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch-base_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+	</dependencies>
 
 </project>


[2/4] flink git commit: [FLINK-4988] [elasticsearch] Restructure Elasticsearch connectors

Posted by tz...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
new file mode 100644
index 0000000..098afa9
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.util.Preconditions;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.node.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
+import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+
+/**
+ * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 1.x.
+ */
+public class Elasticsearch1ApiCallBridge implements ElasticsearchApiCallBridge {
+
+	private static final long serialVersionUID = -2632363720584123682L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch1ApiCallBridge.class);
+
+	/** User-provided transport addresses. This is null if we are using an embedded {@link Node} for communication. */
+	private final List<TransportAddress> transportAddresses;
+
+	/** The embedded {@link Node} used for communication. This is null if we are using a TransportClient. */
+	private transient Node node;
+
+	/**
+	 * Constructor for use of an embedded {@link Node} for communication with the Elasticsearch cluster.
+	 */
+	Elasticsearch1ApiCallBridge() {
+		this.transportAddresses = null;
+	}
+
+	/**
+	 * Constructor for use of a {@link TransportClient} for communication with the Elasticsearch cluster.
+	 */
+	Elasticsearch1ApiCallBridge(List<TransportAddress> transportAddresses) {
+		Preconditions.checkArgument(transportAddresses != null && !transportAddresses.isEmpty());
+		this.transportAddresses = transportAddresses;
+	}
+
+	@Override
+	public Client createClient(Map<String, String> clientConfig) {
+		if (transportAddresses == null) {
+
+			// Make sure that we disable http access to our embedded node
+			Settings settings = settingsBuilder()
+				.put(clientConfig)
+				.put("http.enabled", false)
+				.build();
+
+			node = nodeBuilder()
+				.settings(settings)
+				.client(true)
+				.data(false)
+				.node();
+
+			Client client = node.client();
+
+			if (LOG.isInfoEnabled()) {
+				LOG.info("Created Elasticsearch client from embedded node");
+			}
+
+			return client;
+		} else {
+			Settings settings = settingsBuilder()
+				.put(clientConfig)
+				.build();
+
+			TransportClient transportClient = new TransportClient(settings);
+			for (TransportAddress transport: transportAddresses) {
+				transportClient.addTransportAddress(transport);
+			}
+
+			// verify that we actually are connected to a cluster
+			if (transportClient.connectedNodes().isEmpty()) {
+				throw new RuntimeException("Elasticsearch client is not connected to any Elasticsearch nodes!");
+			}
+
+			if (LOG.isInfoEnabled()) {
+				LOG.info("Created Elasticsearch TransportClient with connected nodes {}", transportClient.connectedNodes());
+			}
+
+			return transportClient;
+		}
+	}
+
+	@Override
+	public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
+		if (!bulkItemResponse.isFailed()) {
+			return null;
+		} else {
+			return new RuntimeException(bulkItemResponse.getFailureMessage());
+		}
+	}
+
+	@Override
+	public void cleanup() {
+		if (node != null && !node.isClosed()) {
+			node.close();
+			node = null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
index ac14ade..c338860 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
@@ -17,59 +17,38 @@
 
 package org.apache.flink.streaming.connectors.elasticsearch;
 
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.common.collect.ImmutableList;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.node.Node;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
-
 
 /**
- * Sink that emits its input elements to an Elasticsearch cluster.
+ * Elasticsearch 1.x sink that requests multiple {@link ActionRequest ActionRequests}
+ * against a cluster for each incoming element.
  *
  * <p>
- * When using the first constructor {@link #ElasticsearchSink(java.util.Map, IndexRequestBuilder)}
- * the sink will create a local {@link Node} for communicating with the
- * Elasticsearch cluster. When using the second constructor
- * {@link #ElasticsearchSink(java.util.Map, java.util.List, IndexRequestBuilder)} a {@link TransportClient} will
- * be used instead.
+ * When using the first constructor {@link #ElasticsearchSink(java.util.Map, ElasticsearchSinkFunction)}
+ * the sink will create a local {@link Node} for communicating with the Elasticsearch cluster. When using the second
+ * constructor {@link #ElasticsearchSink(java.util.Map, java.util.List, ElasticsearchSinkFunction)} a
+ * {@link TransportClient} will be used instead.
  *
  * <p>
  * <b>Attention: </b> When using the {@code TransportClient} the sink will fail if no cluster
- * can be connected to. With the {@code Node Client} the sink will block and wait for a cluster
+ * can be connected to. When using the local {@code Node} for communicating, the sink will block and wait for a cluster
  * to come online.
  *
  * <p>
- * The {@link Map} passed to the constructor is forwarded to Elasticsearch when creating
- * the {@link Node} or {@link TransportClient}. The config keys can be found in the Elasticsearch
- * documentation. An important setting is {@code cluster.name}, this should be set to the name
- * of the cluster that the sink should emit to.
+ * The {@link Map} passed to the constructor is used to create the {@link Node} or {@link TransportClient}. The config
+ * keys can be found in the <a href="https://www.elastic.io">Elasticsearch documentation</a>. An important setting is
+ * {@code cluster.name}, which should be set to the name of the cluster that the sink should emit to.
  *
  * <p>
- * Internally, the sink will use a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}.
+ * Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}.
  * This will buffer elements before sending a request to the cluster. The behaviour of the
  * {@code BulkProcessor} can be configured using these config keys:
  * <ul>
@@ -80,236 +59,63 @@ import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
  * </ul>
  *
  * <p>
- * You also have to provide an {@link IndexRequestBuilder}. This is used to create an
- * {@link IndexRequest} from an element that needs to be added to Elasticsearch. See
- * {@link org.apache.flink.streaming.connectors.elasticsearch.IndexRequestBuilder} for an example.
+ * You also have to provide an {@link ElasticsearchSinkFunction}. This is used to create multiple
+ * {@link ActionRequest ActionRequests} for each incoming element. See the class level documentation of
+ * {@link ElasticsearchSinkFunction} for an example.
  *
- * @param <T> Type of the elements emitted by this sink
+ * @param <T> Type of the elements handled by this sink
  */
-public class ElasticsearchSink<T> extends RichSinkFunction<T> {
-
-	public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
-	public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
-	public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
 
 	private static final long serialVersionUID = 1L;
 
-	private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class);
-
-	/**
-	 * The user specified config map that we forward to Elasticsearch when we create the Client.
-	 */
-	private final Map<String, String> userConfig;
-
-	/**
-	 * The list of nodes that the TransportClient should connect to. This is null if we are using
-	 * an embedded Node to get a Client.
-	 */
-	private final List<TransportAddress> transportNodes;
-
-	/**
-	 * The builder that is used to construct an {@link IndexRequest} from the incoming element.
-	 */
-	private final IndexRequestBuilder<T> indexRequestBuilder;
-
-	/**
-	 * The embedded Node that is used to communicate with the Elasticsearch cluster. This is null
-	 * if we are using a TransportClient.
-	 */
-	private transient Node node;
-
 	/**
-	 * The Client that was either retrieved from a Node or is a TransportClient.
-	 */
-	private transient Client client;
-
-	/**
-	 * Bulk processor that was created using the client
-	 */
-	private transient BulkProcessor bulkProcessor;
-
-	/**
-	 * This is set from inside the BulkProcessor listener if there where failures in processing.
-	 */
-	private final AtomicBoolean hasFailure = new AtomicBoolean(false);
-
-	/**
-	 * This is set from inside the BulkProcessor listener if a Throwable was thrown during processing.
-	 */
-	private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
-
-	/**
-	 * Creates a new ElasticsearchSink that connects to the cluster using an embedded Node.
+	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using an embedded {@link Node}.
 	 *
-	 * @param userConfig The map of user settings that are passed when constructing the Node and BulkProcessor
+	 * @param userConfig The map of user settings that are used when constructing the {@link Node} and {@link BulkProcessor}
 	 * @param indexRequestBuilder This is used to generate the IndexRequest from the incoming element
+	 *
+	 * @deprecated Deprecated since version 1.2, to be removed at version 2.0.
+	 *             Please use {@link ElasticsearchSink#ElasticsearchSink(Map, ElasticsearchSinkFunction)} instead.
 	 */
+	@Deprecated
 	public ElasticsearchSink(Map<String, String> userConfig, IndexRequestBuilder<T> indexRequestBuilder) {
-		this.userConfig = userConfig;
-		this.indexRequestBuilder = indexRequestBuilder;
-		transportNodes = null;
+		this(userConfig, new IndexRequestBuilderWrapperFunction<>(indexRequestBuilder));
 	}
 
 	/**
-	 * Creates a new ElasticsearchSink that connects to the cluster using a TransportClient.
+	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
 	 *
-	 * @param userConfig The map of user settings that are passed when constructing the TransportClient and BulkProcessor
-	 * @param transportNodes The Elasticsearch Nodes to which to connect using a {@code TransportClient}
-	 * @param indexRequestBuilder This is used to generate the IndexRequest from the incoming element
+	 * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor}
+	 * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
+	 * @param indexRequestBuilder This is used to generate a {@link IndexRequest} from the incoming element
 	 *
+	 * @deprecated Deprecated since 1.2, to be removed at 2.0.
+	 *             Please use {@link ElasticsearchSink#ElasticsearchSink(Map, List, ElasticsearchSinkFunction)} instead.
 	 */
-	public ElasticsearchSink(Map<String, String> userConfig, List<TransportAddress> transportNodes, IndexRequestBuilder<T> indexRequestBuilder) {
-		this.userConfig = userConfig;
-		this.indexRequestBuilder = indexRequestBuilder;
-		this.transportNodes = transportNodes;
+	@Deprecated
+	public ElasticsearchSink(Map<String, String> userConfig, List<TransportAddress> transportAddresses, IndexRequestBuilder<T> indexRequestBuilder) {
+		this(userConfig, transportAddresses, new IndexRequestBuilderWrapperFunction<>(indexRequestBuilder));
 	}
 
 	/**
-	 * Initializes the connection to Elasticsearch by either creating an embedded
-	 * {@link org.elasticsearch.node.Node} and retrieving the
-	 * {@link org.elasticsearch.client.Client} from it or by creating a
-	 * {@link org.elasticsearch.client.transport.TransportClient}.
+	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using an embedded {@link Node}.
+	 *
+	 * @param userConfig The map of user settings that are used when constructing the embedded {@link Node} and {@link BulkProcessor}
+	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
 	 */
-	@Override
-	public void open(Configuration configuration) {
-		if (transportNodes == null) {
-			// Make sure that we disable http access to our embedded node
-			Settings settings =
-					ImmutableSettings.settingsBuilder()
-							.put(userConfig)
-							.put("http.enabled", false)
-							.build();
-
-			node =
-					nodeBuilder()
-							.settings(settings)
-							.client(true)
-							.data(false)
-							.node();
-
-			client = node.client();
-
-			if (LOG.isInfoEnabled()) {
-				LOG.info("Created Elasticsearch Client {} from embedded Node", client);
-			}
-
-		} else {
-			Settings settings = ImmutableSettings.settingsBuilder()
-					.put(userConfig)
-					.build();
-
-			TransportClient transportClient = new TransportClient(settings);
-			for (TransportAddress transport: transportNodes) {
-				transportClient.addTransportAddress(transport);
-			}
-
-			// verify that we actually are connected to a cluster
-			ImmutableList<DiscoveryNode> nodes = transportClient.connectedNodes();
-			if (nodes.isEmpty()) {
-				throw new RuntimeException("Client is not connected to any Elasticsearch nodes!");
-			} else {
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Connected to nodes: " + nodes.toString());
-				}
-			}
-
-			client = transportClient;
-
-			if (LOG.isInfoEnabled()) {
-				LOG.info("Created Elasticsearch TransportClient {}", client);
-			}
-		}
-
-		BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
-				client,
-				new BulkProcessor.Listener() {
-					@Override
-					public void beforeBulk(long executionId,
-							BulkRequest request) {
-
-					}
-
-					@Override
-					public void afterBulk(long executionId,
-							BulkRequest request,
-							BulkResponse response) {
-						if (response.hasFailures()) {
-							for (BulkItemResponse itemResp : response.getItems()) {
-								if (itemResp.isFailed()) {
-									LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
-									failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
-								}
-							}
-							hasFailure.set(true);
-						}
-					}
-
-					@Override
-					public void afterBulk(long executionId,
-							BulkRequest request,
-							Throwable failure) {
-						LOG.error(failure.getMessage());
-						failureThrowable.compareAndSet(null, failure);
-						hasFailure.set(true);
-					}
-				});
-
-		// This makes flush() blocking
-		bulkProcessorBuilder.setConcurrentRequests(0);
-
-		ParameterTool params = ParameterTool.fromMap(userConfig);
-
-		if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
-			bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
-		}
-
-		if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
-			bulkProcessorBuilder.setBulkSize(new ByteSizeValue(params.getInt(
-					CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB));
-		}
-
-		if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
-			bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
-		}
-
-		bulkProcessor = bulkProcessorBuilder.build();
-	}
-
-	@Override
-	public void invoke(T element) {
-		IndexRequest indexRequest = indexRequestBuilder.createIndexRequest(element, getRuntimeContext());
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Emitting IndexRequest: {}", indexRequest);
-		}
-
-		bulkProcessor.add(indexRequest);
+	public ElasticsearchSink(Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+		super(new Elasticsearch1ApiCallBridge(), userConfig, elasticsearchSinkFunction);
 	}
 
-	@Override
-	public void close() {
-		if (bulkProcessor != null) {
-			bulkProcessor.close();
-			bulkProcessor = null;
-		}
-
-		if (client != null) {
-			client.close();
-		}
-
-		if (node != null) {
-			node.close();
-		}
-
-		if (hasFailure.get()) {
-			Throwable cause = failureThrowable.get();
-			if (cause != null) {
-				throw new RuntimeException("An error occured in ElasticsearchSink.", cause);
-			} else {
-				throw new RuntimeException("An error occured in ElasticsearchSink.");
-
-			}
-		}
+	/**
+	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
+	 *
+	 * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor}
+	 * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
+	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
+	 */
+	public ElasticsearchSink(Map<String, String> userConfig, List<TransportAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+		super(new Elasticsearch1ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
index 04ae40a..18aa11e 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -51,7 +51,11 @@ import java.io.Serializable;
  * }</pre>
  *
  * @param <T> The type of the element handled by this {@code IndexRequestBuilder}
+ *
+ * @deprecated Deprecated since version 1.2, to be removed at version 2.0.
+ *             Please create a {@link ElasticsearchSink} using a {@link ElasticsearchSinkFunction} instead.
  */
+@Deprecated
 public interface IndexRequestBuilder<T> extends Function, Serializable {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilderWrapperFunction.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilderWrapperFunction.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilderWrapperFunction.java
new file mode 100644
index 0000000..6f1d138
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilderWrapperFunction.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+/**
+ * A dummy {@link ElasticsearchSinkFunction} that wraps a {@link IndexRequestBuilder}.
+ * This serves as a bridge for the usage deprecation of the {@code IndexRequestBuilder} interface.
+ */
+class IndexRequestBuilderWrapperFunction<T> implements ElasticsearchSinkFunction<T> {
+
+	private static final long serialVersionUID = 289876038414250101L;
+
+	private final IndexRequestBuilder<T> indexRequestBuilder;
+
+	IndexRequestBuilderWrapperFunction(IndexRequestBuilder<T> indexRequestBuilder) {
+		this.indexRequestBuilder = indexRequestBuilder;
+	}
+
+	@Override
+	public void process(T element, RuntimeContext ctx, RequestIndexer indexer) {
+		indexer.add(indexRequestBuilder.createIndexRequest(element, ctx));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
index 33a2e47..3a7b113 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,177 +18,149 @@
 package org.apache.flink.streaming.connectors.elasticsearch;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
+import org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
+import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Requests;
-import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.transport.LocalTransportAddress;
 import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.node.Node;
-import org.junit.Assert;
-import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
+import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
 
-public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase {
+	@Test
+	public void testTransportClient() throws Exception {
+		runTransportClientTest();
+	}
 
-	private static final int NUM_ELEMENTS = 20;
+	@Test
+	public void testNullTransportClient() throws Exception {
+		runNullTransportClientTest();
+	}
 
-	@ClassRule
-	public static TemporaryFolder tempFolder = new TemporaryFolder();
+	@Test
+	public void testEmptyTransportClient() throws Exception {
+		runEmptyTransportClientTest();
+	}
 
 	@Test
-	public void testNodeClient() throws Exception{
+	public void testTransportClientFails() throws Exception{
+		runTransportClientFailsTest();
+	}
 
-		File dataDir = tempFolder.newFolder();
+	// -- Tests specific to Elasticsearch 1.x --
 
-		Node node = nodeBuilder()
-				.settings(ImmutableSettings.settingsBuilder()
-						.put("http.enabled", false)
-						.put("path.data", dataDir.getAbsolutePath()))
-				// set a custom cluster name to verify that user config works correctly
-				.clusterName("my-node-client-cluster")
-				.local(true)
-				.node();
+	/**
+	 * Tests that the Elasticsearch sink works properly using an embedded node to connect to Elasticsearch.
+	 */
+	@Test
+	public void testEmbeddedNode() throws Exception {
+		final String index = "embedded-node-test-index";
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
+		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
 
-		Map<String, String> config = Maps.newHashMap();
+		Map<String, String> userConfig = new HashMap<>();
 		// This instructs the sink to emit after every element, otherwise they would be buffered
-		config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-		config.put("cluster.name", "my-node-client-cluster");
-
-		// connect to our local node
-		config.put("node.local", "true");
+		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+		userConfig.put("cluster.name", CLUSTER_NAME);
+		userConfig.put("node.local", "true");
 
-		source.addSink(new ElasticsearchSink<>(config, new TestIndexRequestBuilder()));
-
-		env.execute("Elasticsearch Node Client Test");
+		source.addSink(new ElasticsearchSink<>(
+			userConfig,
+			new SourceSinkDataTestKit.TestElasticsearchSinkFunction(index))
+		);
 
+		env.execute("Elasticsearch Embedded Node Test");
 
 		// verify the results
-		Client client = node.client();
-		for (int i = 0; i < NUM_ELEMENTS; i++) {
-			GetResponse response = client.get(new GetRequest("my-index",
-					"my-type",
-					Integer.toString(i))).actionGet();
-			Assert.assertEquals("message #" + i, response.getSource().get("data"));
-		}
+		Client client = embeddedNodeEnv.getClient();
+		SourceSinkDataTestKit.verifyProducedSinkData(client, index);
 
-		node.close();
+		client.close();
 	}
 
+	/**
+	 * Tests that behaviour of the deprecated {@link IndexRequestBuilder} constructor works properly.
+	 */
 	@Test
-	public void testTransportClient() throws Exception {
-
-		File dataDir = tempFolder.newFolder();
+	public void testDeprecatedIndexRequestBuilderVariant() throws Exception {
+		final String index = "index-req-builder-test-index";
 
-		Node node = nodeBuilder()
-				.settings(ImmutableSettings.settingsBuilder()
-						.put("http.enabled", false)
-						.put("path.data", dataDir.getAbsolutePath()))
-						// set a custom cluster name to verify that user config works correctly
-				.clusterName("my-node-client-cluster")
-				.local(true)
-				.node();
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
 
-		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
-
-		Map<String, String> config = Maps.newHashMap();
+		Map<String, String> userConfig = new HashMap<>();
 		// This instructs the sink to emit after every element, otherwise they would be buffered
-		config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-		config.put("cluster.name", "my-node-client-cluster");
-
-		// connect to our local node
-		config.put("node.local", "true");
+		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+		userConfig.put("cluster.name", CLUSTER_NAME);
+		userConfig.put("node.local", "true");
 
 		List<TransportAddress> transports = Lists.newArrayList();
 		transports.add(new LocalTransportAddress("1"));
 
-		source.addSink(new ElasticsearchSink<>(config, transports, new TestIndexRequestBuilder()));
-
-		env.execute("Elasticsearch TransportClient Test");
+		source.addSink(new ElasticsearchSink<>(
+			userConfig,
+			transports,
+			new TestIndexRequestBuilder(index))
+		);
 
+		env.execute("Elasticsearch Deprecated IndexRequestBuilder Bridge Test");
 
 		// verify the results
-		Client client = node.client();
-		for (int i = 0; i < NUM_ELEMENTS; i++) {
-			GetResponse response = client.get(new GetRequest("my-index",
-					"my-type",
-					Integer.toString(i))).actionGet();
-			Assert.assertEquals("message #" + i, response.getSource().get("data"));
-		}
+		Client client = embeddedNodeEnv.getClient();
+		SourceSinkDataTestKit.verifyProducedSinkData(client, index);
 
-		node.close();
+		client.close();
 	}
 
-	@Test(expected = JobExecutionException.class)
-	public void testTransportClientFails() throws Exception{
-		// this checks whether the TransportClient fails early when there is no cluster to
-		// connect to. We don't hava such as test for the Node Client version since that
-		// one will block and wait for a cluster to come online
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
+	@Override
+	protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig,
+																List<InetSocketAddress> transportAddresses,
+																ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+		return new ElasticsearchSink<>(userConfig, ElasticsearchUtils.convertInetSocketAddresses(transportAddresses), elasticsearchSinkFunction);
+	}
 
-		Map<String, String> config = Maps.newHashMap();
-		// This instructs the sink to emit after every element, otherwise they would be buffered
-		config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-		config.put("cluster.name", "my-node-client-cluster");
+	@Override
+	protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(
+		Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception {
 
-		// connect to our local node
-		config.put("node.local", "true");
+		// Elasticsearch 1.x requires this setting when using
+		// LocalTransportAddress to connect to a local embedded node
+		userConfig.put("node.local", "true");
 
 		List<TransportAddress> transports = Lists.newArrayList();
 		transports.add(new LocalTransportAddress("1"));
 
-		source.addSink(new ElasticsearchSink<>(config, transports, new TestIndexRequestBuilder()));
-
-		env.execute("Elasticsearch Node Client Test");
+		return new ElasticsearchSink<>(
+			userConfig,
+			transports,
+			elasticsearchSinkFunction);
 	}
 
-	private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
+	/**
+	 * A {@link IndexRequestBuilder} with equivalent functionality to {@link SourceSinkDataTestKit.TestElasticsearchSinkFunction}.
+	 */
+	private static class TestIndexRequestBuilder implements IndexRequestBuilder<Tuple2<Integer, String>> {
 		private static final long serialVersionUID = 1L;
 
-		private volatile boolean running = true;
+		private final String index;
 
-		@Override
-		public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
-			for (int i = 0; i < NUM_ELEMENTS && running; i++) {
-				ctx.collect(Tuple2.of(i, "message #" + i));
-			}
-		}
-
-		@Override
-		public void cancel() {
-			running = false;
+		public TestIndexRequestBuilder(String index) {
+			this.index = index;
 		}
-	}
-
-	private static class TestIndexRequestBuilder implements IndexRequestBuilder<Tuple2<Integer, String>> {
-		private static final long serialVersionUID = 1L;
 
 		@Override
 		public IndexRequest createIndexRequest(Tuple2<Integer, String> element, RuntimeContext ctx) {
@@ -196,10 +168,10 @@ public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase {
 			json.put("data", element.f1);
 
 			return Requests.indexRequest()
-					.index("my-index")
-					.type("my-type")
-					.id(element.f0.toString())
-					.source(json);
+				.index(index)
+				.type("flink-es-test-type")
+				.id(element.f0.toString())
+				.source(json);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
new file mode 100644
index 0000000..a0c809b
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.node.Node;
+
+import java.io.File;
+
+import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+
+/**
+ * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for Elasticsearch 1.x.
+ * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for integration tests.
+ */
+public class EmbeddedElasticsearchNodeEnvironmentImpl implements EmbeddedElasticsearchNodeEnvironment {
+
+	private Node node;
+
+	@Override
+	public void start(File tmpDataFolder, String clusterName) throws Exception {
+		if (node == null) {
+			node = nodeBuilder()
+				.settings(ImmutableSettings.settingsBuilder()
+					.put("http.enabled", false)
+					.put("path.data", tmpDataFolder.getAbsolutePath()))
+				.clusterName(clusterName)
+				.local(true)
+				.node();
+
+			node.start();
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		if (node != null && !node.isClosed()) {
+			node.close();
+			node = null;
+		}
+	}
+
+	@Override
+	public Client getClient() {
+		if (node != null && !node.isClosed()) {
+			return node.client();
+		} else {
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
deleted file mode 100644
index 136ae77..0000000
--- a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.elasticsearch.examples;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
-import org.apache.flink.streaming.connectors.elasticsearch.IndexRequestBuilder;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
- * you have a cluster named "elasticsearch" running or change the cluster name in the config map.
- */
-public class ElasticsearchExample {
-
-	public static void main(String[] args) throws Exception {
-		
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStreamSource<String> source = env.addSource(new SourceFunction<String>() {
-			private static final long serialVersionUID = 1L;
-
-			private volatile boolean running = true;
-
-			@Override
-			public void run(SourceContext<String> ctx) throws Exception {
-				for (int i = 0; i < 20 && running; i++) {
-					ctx.collect("message #" + i);
-				}
-			}
-
-			@Override
-			public void cancel() {
-				running = false;
-			}
-		});
-
-		Map<String, String> config = new HashMap<>();
-		// This instructs the sink to emit after every element, otherwise they would be buffered
-		config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-		source.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<String>() {
-			@Override
-			public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
-				Map<String, Object> json = new HashMap<>();
-				json.put("data", element);
-
-				return Requests.indexRequest()
-						.index("my-index")
-						.type("my-type")
-						.source(json);
-			}
-		}));
-
-
-		env.execute("Elasticsearch Example");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
new file mode 100644
index 0000000..d697c3c
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.examples;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
+ * you have a cluster named "elasticsearch" running or change the cluster name in the config map.
+ */
+public class ElasticsearchSinkExample {
+
+	public static void main(String[] args) throws Exception {
+		
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
+			@Override
+			public String map(Long value) throws Exception {
+				return "message #" + value;
+			}
+		});
+
+		Map<String, String> userConfig = new HashMap<>();
+		userConfig.put("cluster.name", "elasticsearch");
+		// This instructs the sink to emit after every element, otherwise they would be buffered
+		userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+		List<TransportAddress> transports = new ArrayList<>();
+		transports.add(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+		source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() {
+			@Override
+			public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+				indexer.add(createIndexRequest(element));
+			}
+		}));
+
+
+		env.execute("Elasticsearch Sink Example");
+	}
+
+	private static IndexRequest createIndexRequest(String element) {
+		Map<String, Object> json = new HashMap<>();
+		json.put("data", element);
+
+		return Requests.indexRequest()
+			.index("my-index")
+			.type("my-type")
+			.id(element)
+			.source(json);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties
index dc20726..2055184 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties
+++ b/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties
@@ -16,12 +16,12 @@
 # limitations under the License.
 ################################################################################
 
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
 
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.target=System.err
 log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
 log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/pom.xml b/flink-connectors/flink-connector-elasticsearch2/pom.xml
index 7aba36e..30396dd 100644
--- a/flink-connectors/flink-connector-elasticsearch2/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch2/pom.xml
@@ -52,17 +52,19 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>org.elasticsearch</groupId>
-			<artifactId>elasticsearch</artifactId>
-			<version>${elasticsearch.version}</version>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch-base_2.10</artifactId>
+			<version>${project.version}</version>
 		</dependency>
 
+		<!-- Override Elasticsearch version in base from 1.x to 2.x -->
 		<dependency>
-			<groupId>com.fasterxml.jackson.core</groupId>
-			<artifactId>jackson-core</artifactId>
+			<groupId>org.elasticsearch</groupId>
+			<artifactId>elasticsearch</artifactId>
+			<version>${elasticsearch.version}</version>
 		</dependency>
 
-		<!-- core dependencies -->
+		<!-- test dependencies -->
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
@@ -78,6 +80,15 @@ under the License.
 			<scope>test</scope>
 			<type>test-jar</type>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch-base_2.10</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
 	</dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java
deleted file mode 100644
index 650931f..0000000
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.elasticsearch2;
-
-import org.elasticsearch.action.ActionRequest;
-import org.elasticsearch.action.bulk.BulkProcessor;
-
-public class BulkProcessorIndexer implements RequestIndexer {
-	private final BulkProcessor bulkProcessor;
-
-	public BulkProcessorIndexer(BulkProcessor bulkProcessor) {
-		this.bulkProcessor = bulkProcessor;
-	}
-
-	@Override
-	public void add(ActionRequest... actionRequests) {
-		for (ActionRequest actionRequest : actionRequests) {
-			this.bulkProcessor.add(actionRequest);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
new file mode 100644
index 0000000..9407d9f
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch2;
+
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
+import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
+import org.apache.flink.util.Preconditions;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 2.x.
+ */
+public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge {
+
+	private static final long serialVersionUID = 2638252694744361079L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch2ApiCallBridge.class);
+
+	/**
+	 * User-provided transport addresses.
+	 *
+	 * We are using {@link InetSocketAddress} because {@link TransportAddress} is not serializable in Elasticsearch 2.x.
+	 */
+	private final List<InetSocketAddress> transportAddresses;
+
+	Elasticsearch2ApiCallBridge(List<InetSocketAddress> transportAddresses) {
+		Preconditions.checkArgument(transportAddresses != null && !transportAddresses.isEmpty());
+		this.transportAddresses = transportAddresses;
+	}
+
+	@Override
+	public Client createClient(Map<String, String> clientConfig) {
+		Settings settings = Settings.settingsBuilder().put(clientConfig).build();
+
+		TransportClient transportClient = TransportClient.builder().settings(settings).build();
+		for (TransportAddress address : ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) {
+			transportClient.addTransportAddress(address);
+		}
+
+		// verify that we actually are connected to a cluster
+		if (transportClient.connectedNodes().isEmpty()) {
+			throw new RuntimeException("Elasticsearch client is not connected to any Elasticsearch nodes!");
+		}
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Created Elasticsearch TransportClient with connected nodes {}", transportClient.connectedNodes());
+		}
+
+		return transportClient;
+	}
+
+	@Override
+	public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
+		if (!bulkItemResponse.isFailed()) {
+			return null;
+		} else {
+			return bulkItemResponse.getFailure().getCause();
+		}
+	}
+
+	@Override
+	public void cleanup() {
+		// nothing to cleanup
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
index e839589..a0abc51 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
@@ -16,55 +16,30 @@
  */
 package org.apache.flink.streaming.connectors.elasticsearch2;
 
-import com.google.common.collect.ImmutableList;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.util.Preconditions;
-import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.unit.TimeValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * Sink that emits its input elements in bulk to an Elasticsearch cluster.
+ * Elasticsearch 2.x sink that requests multiple {@link ActionRequest ActionRequests}
+ * against a cluster for each incoming element.
  *
  * <p>
- * When using the second constructor
- * {@link #ElasticsearchSink(java.util.Map, java.util.List, ElasticsearchSinkFunction)} a {@link TransportClient} will
- * be used.
+ * The sink internally uses a {@link TransportClient} to communicate with an Elasticsearch cluster.
+ * The sink will fail if no cluster can be connected to using the provided transport addresses passed to the constructor.
  *
  * <p>
- * <b>Attention: </b> When using the {@code TransportClient} the sink will fail if no cluster
- * can be connected to.
+ * The {@link Map} passed to the constructor is used to create the {@code TransportClient}. The config keys can be found
+ * in the <a href="https://www.elastic.io">Elasticsearch documentation</a>. An important setting is {@code cluster.name},
+ * which should be set to the name of the cluster that the sink should emit to.
  *
  * <p>
- * The {@link Map} passed to the constructor is forwarded to Elasticsearch when creating
- * {@link TransportClient}. The config keys can be found in the Elasticsearch
- * documentation. An important setting is {@code cluster.name}, this should be set to the name
- * of the cluster that the sink should emit to.
- *
- * <p>
- * Internally, the sink will use a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}.
+ * Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}.
  * This will buffer elements before sending a request to the cluster. The behaviour of the
  * {@code BulkProcessor} can be configured using these config keys:
  * <ul>
@@ -75,183 +50,41 @@ import java.util.concurrent.atomic.AtomicReference;
  * </ul>
  *
  * <p>
- * You also have to provide an {@link RequestIndexer}. This is used to create an
- * {@link IndexRequest} from an element that needs to be added to Elasticsearch. See
- * {@link RequestIndexer} for an example.
+ * You also have to provide an {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction}.
+ * This is used to create multiple {@link ActionRequest ActionRequests} for each incoming element. See the class level
+ * documentation of {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction} for an example.
  *
- * @param <T> Type of the elements emitted by this sink
+ * @param <T> Type of the elements handled by this sink
  */
-public class ElasticsearchSink<T> extends RichSinkFunction<T>  {
-
-	public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
-	public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
-	public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
 
 	private static final long serialVersionUID = 1L;
 
-	private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class);
-
-	/**
-	 * The user specified config map that we forward to Elasticsearch when we create the Client.
-	 */
-	private final Map<String, String> userConfig;
-
-	/**
-	 * The list of nodes that the TransportClient should connect to. This is null if we are using
-	 * an embedded Node to get a Client.
-	 */
-	private final List<InetSocketAddress> transportAddresses;
-
-	/**
-	 * The builder that is used to construct an {@link IndexRequest} from the incoming element.
-	 */
-	private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
-
-	/**
-	 * The Client that was either retrieved from a Node or is a TransportClient.
-	 */
-	private transient Client client;
-
-	/**
-	 * Bulk processor that was created using the client
-	 */
-	private transient BulkProcessor bulkProcessor;
-
-	/**
-	 * Bulk {@link org.elasticsearch.action.ActionRequest} indexer
-	 */
-	private transient RequestIndexer requestIndexer;
-
-	/**
-	 * This is set from inside the BulkProcessor listener if there where failures in processing.
-	 */
-	private final AtomicBoolean hasFailure = new AtomicBoolean(false);
-
-	/**
-	 * This is set from inside the BulkProcessor listener if a Throwable was thrown during processing.
-	 */
-	private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
-
 	/**
-	 * Creates a new ElasticsearchSink that connects to the cluster using a TransportClient.
+	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
 	 *
-	 * @param userConfig The map of user settings that are passed when constructing the TransportClient and BulkProcessor
-	 * @param transportAddresses The Elasticsearch Nodes to which to connect using a {@code TransportClient}
-	 * @param elasticsearchSinkFunction This is used to generate the ActionRequest from the incoming element
+	 * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor}
+	 * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
+	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
 	 *
+	 * @deprecated Deprecated since 1.2, to be removed at 2.0.
+	 *             Please use {@link ElasticsearchSink#ElasticsearchSink(Map, List, org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction)} instead.
 	 */
+	@Deprecated
 	public ElasticsearchSink(Map<String, String> userConfig, List<InetSocketAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
-		this.userConfig = userConfig;
-		this.elasticsearchSinkFunction = elasticsearchSinkFunction;
-		Preconditions.checkArgument(transportAddresses != null && transportAddresses.size() > 0);
-		this.transportAddresses = transportAddresses;
+		this(userConfig, transportAddresses, new OldNewElasticsearchSinkFunctionBridge<>(elasticsearchSinkFunction));
 	}
 
 	/**
-	 * Initializes the connection to Elasticsearch by creating a
-	 * {@link org.elasticsearch.client.transport.TransportClient}.
+	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
+	 *
+	 * @param userConfig The map of user settings that are passed when constructing the {@link TransportClient} and {@link BulkProcessor}
+	 * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
+	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
 	 */
-	@Override
-	public void open(Configuration configuration) {
-		List<TransportAddress> transportNodes;
-		transportNodes = new ArrayList<>(transportAddresses.size());
-		for (InetSocketAddress address : transportAddresses) {
-			transportNodes.add(new InetSocketTransportAddress(address));
-		}
-
-		Settings settings = Settings.settingsBuilder().put(userConfig).build();
-
-		TransportClient transportClient = TransportClient.builder().settings(settings).build();
-		for (TransportAddress transport: transportNodes) {
-			transportClient.addTransportAddress(transport);
-		}
-
-		// verify that we actually are connected to a cluster
-		ImmutableList<DiscoveryNode> nodes = ImmutableList.copyOf(transportClient.connectedNodes());
-		if (nodes.isEmpty()) {
-			throw new RuntimeException("Client is not connected to any Elasticsearch nodes!");
-		}
-
-		client = transportClient;
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Created Elasticsearch TransportClient {}", client);
-		}
-
-		BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(client, new BulkProcessor.Listener() {
-			@Override
-			public void beforeBulk(long executionId, BulkRequest request) {
-
-			}
-
-			@Override
-			public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
-				if (response.hasFailures()) {
-					for (BulkItemResponse itemResp : response.getItems()) {
-						if (itemResp.isFailed()) {
-							LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
-							failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
-						}
-					}
-					hasFailure.set(true);
-				}
-			}
-
-			@Override
-			public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
-				LOG.error(failure.getMessage());
-				failureThrowable.compareAndSet(null, failure);
-				hasFailure.set(true);
-			}
-		});
-
-		// This makes flush() blocking
-		bulkProcessorBuilder.setConcurrentRequests(0);
-
-		ParameterTool params = ParameterTool.fromMap(userConfig);
-
-		if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
-			bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
-		}
-
-		if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
-			bulkProcessorBuilder.setBulkSize(new ByteSizeValue(params.getInt(
-					CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB));
-		}
-
-		if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
-			bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
-		}
-
-		bulkProcessor = bulkProcessorBuilder.build();
-		requestIndexer = new BulkProcessorIndexer(bulkProcessor);
-	}
-
-	@Override
-	public void invoke(T element) {
-		elasticsearchSinkFunction.process(element, getRuntimeContext(), requestIndexer);
+	public ElasticsearchSink(Map<String, String> userConfig,
+							List<InetSocketAddress> transportAddresses,
+							org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+		super(new Elasticsearch2ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction);
 	}
-
-	@Override
-	public void close() {
-		if (bulkProcessor != null) {
-			bulkProcessor.close();
-			bulkProcessor = null;
-		}
-
-		if (client != null) {
-			client.close();
-		}
-
-		if (hasFailure.get()) {
-			Throwable cause = failureThrowable.get();
-			if (cause != null) {
-				throw new RuntimeException("An error occured in ElasticsearchSink.", cause);
-			} else {
-				throw new RuntimeException("An error occured in ElasticsearchSink.");
-			}
-		}
-
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java
index 55ba720..c474390 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java
@@ -1,13 +1,12 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -23,7 +22,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import java.io.Serializable;
 
 /**
- * Method that creates an {@link org.elasticsearch.action.ActionRequest} from an element in a Stream.
+ * Method that creates multiple {@link org.elasticsearch.action.ActionRequest}s from an element in a Stream.
  *
  * <p>
  * This is used by {@link ElasticsearchSink} to prepare elements for sending them to Elasticsearch.
@@ -54,7 +53,12 @@ import java.io.Serializable;
  * }</pre>
  *
  * @param <T> The type of the element handled by this {@code ElasticsearchSinkFunction}
+ *
+ * @deprecated Deprecated since 1.2, to be removed at 2.0.
+ *             This class has been deprecated due to package relocation.
+ *             Please use {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction} instead.
  */
+@Deprecated
 public interface ElasticsearchSinkFunction<T> extends Serializable, Function {
 	void process(T element, RuntimeContext ctx, RequestIndexer indexer);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewElasticsearchSinkFunctionBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewElasticsearchSinkFunctionBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewElasticsearchSinkFunctionBridge.java
new file mode 100644
index 0000000..c95fff5
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewElasticsearchSinkFunctionBridge.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch2;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+/**
+ * A dummy {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction} to bridge
+ * the migration from the deprecated {@link ElasticsearchSinkFunction}.
+ */
+class OldNewElasticsearchSinkFunctionBridge<T> implements org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<T> {
+
+	private static final long serialVersionUID = 2415651895272659448L;
+
+	private final ElasticsearchSinkFunction<T> deprecated;
+	private OldNewRequestIndexerBridge reusedRequestIndexerBridge;
+
+	OldNewElasticsearchSinkFunctionBridge(ElasticsearchSinkFunction<T> deprecated) {
+		this.deprecated = deprecated;
+	}
+
+	@Override
+	public void process(T element, RuntimeContext ctx, RequestIndexer indexer) {
+		if (reusedRequestIndexerBridge == null) {
+			reusedRequestIndexerBridge = new OldNewRequestIndexerBridge(indexer);
+		}
+		deprecated.process(element, ctx, reusedRequestIndexerBridge);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewRequestIndexerBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewRequestIndexerBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewRequestIndexerBridge.java
new file mode 100644
index 0000000..f42fb44
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewRequestIndexerBridge.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch2;
+
+import org.elasticsearch.action.ActionRequest;
+
+/**
+ * A dummy {@link org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer} to bridge
+ * the migration from the deprecated {@link RequestIndexer}.
+ */
+class OldNewRequestIndexerBridge implements RequestIndexer {
+
+	private static final long serialVersionUID = 4213982619497149416L;
+
+	private final org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer requestIndexer;
+
+	OldNewRequestIndexerBridge(org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer requestIndexer) {
+		this.requestIndexer = requestIndexer;
+	}
+
+	@Override
+	public void add(ActionRequest... actionRequests) {
+		requestIndexer.add(actionRequests);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java
index 144a87b..b2b3de4 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java
@@ -20,6 +20,12 @@ import org.elasticsearch.action.ActionRequest;
 
 import java.io.Serializable;
 
+/**
+ * @deprecated Deprecated since 1.2, to be removed at 2.0.
+ *             This class has been deprecated due to package relocation.
+ *             Please use {@link org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer} instead.
+ */
+@Deprecated
 public interface RequestIndexer extends Serializable {
 	void add(ActionRequest... actionRequests);
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
new file mode 100644
index 0000000..ddf3bd6
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSinkITCase;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+
+import java.io.File;
+
+/**
+ * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for Elasticsearch 2.x.
+ * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for integration tests.
+ */
+public class EmbeddedElasticsearchNodeEnvironmentImpl implements EmbeddedElasticsearchNodeEnvironment {
+
+	private Node node;
+
+	@Override
+	public void start(File tmpDataFolder, String clusterName) throws Exception {
+		if (node == null) {
+			node = NodeBuilder.nodeBuilder().settings(
+				Settings.settingsBuilder()
+					.put("path.home", tmpDataFolder.getParent())
+					.put("http.enabled", false)
+					.put("path.data", tmpDataFolder.getAbsolutePath()))
+				.clusterName(clusterName)
+				.node();
+
+			node.start();
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		if (node != null && !node.isClosed()) {
+			node.close();
+			node = null;
+		}
+	}
+
+	@Override
+	public Client getClient() {
+		if (node != null && !node.isClosed()) {
+			return node.client();
+		} else {
+			return null;
+		}
+	}
+
+}


[4/4] flink git commit: [FLINK-4988] Elasticsearch 5.x support

Posted by tz...@apache.org.
[FLINK-4988] Elasticsearch 5.x support


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8699b03d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8699b03d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8699b03d

Branch: refs/heads/master
Commit: 8699b03d79a441ca33d9f62b96490d29a0efaf44
Parents: b452c8b
Author: Mike Dias <mi...@gmail.com>
Authored: Mon Nov 7 18:09:48 2016 -0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Feb 7 22:45:45 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/elasticsearch5.md           | 146 +++++++++++
 docs/dev/connectors/filesystem_sink.md          |   2 +-
 docs/dev/connectors/index.md                    |   1 -
 docs/dev/connectors/nifi.md                     |   2 +-
 docs/dev/connectors/rabbitmq.md                 |   2 +-
 docs/dev/connectors/twitter.md                  |   2 +-
 .../flink-connector-elasticsearch5/pom.xml      |  93 +++++++
 .../elasticsearch5/BulkProcessorIndexer.java    |  35 +++
 .../elasticsearch5/ElasticsearchSink.java       | 259 +++++++++++++++++++
 .../ElasticsearchSinkFunction.java              |  60 +++++
 .../elasticsearch5/RequestIndexer.java          |  25 ++
 .../elasticsearch5/ElasticsearchSinkITCase.java | 200 ++++++++++++++
 .../examples/ElasticsearchExample.java          |  83 ++++++
 .../src/test/resources/log4j2.properties        |  27 ++
 .../src/test/resources/logback-test.xml         |  30 +++
 flink-connectors/pom.xml                        |   3 +-
 16 files changed, 964 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/docs/dev/connectors/elasticsearch5.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/elasticsearch5.md b/docs/dev/connectors/elasticsearch5.md
new file mode 100644
index 0000000..2673d86
--- /dev/null
+++ b/docs/dev/connectors/elasticsearch5.md
@@ -0,0 +1,146 @@
+---
+title: "Elasticsearch 5.x Connector"
+nav-title: Elasticsearch 5.x
+nav-parent_id: connectors
+nav-pos: 6
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This connector provides a Sink that can write to an
+[Elasticsearch 5.x](https://elastic.co/) Index. To use this connector, add the
+following dependency to your project:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-elasticsearch5{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See
+[here]({{site.baseurl}}/dev/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution)
+for information about how to package the program with the libraries for
+cluster execution.
+
+#### Installing Elasticsearch 5.x
+
+Instructions for setting up an Elasticsearch cluster can be found
+    [here](https://www.elastic.co/guide/en/elasticsearch/reference/5.x/setup.html).
+Make sure to set and remember a cluster name. This must be set when
+creating a Sink for writing to your cluster
+
+#### Elasticsearch 5.x Sink
+The connector provides a Sink that can send data to an Elasticsearch 5.x Index.
+
+The sink communicates with Elasticsearch via Transport Client
+
+See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.x/transport-client.html)
+for information about the Transport Client.
+
+The code below shows how to create a sink that uses a `TransportClient` for communication:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+File dataDir = ....;
+
+DataStream<String> input = ...;
+
+Map<String, String> esConfig = new HashMap<>();
+esConfig.put("cluster.name", "my-cluster-name");
+
+// This instructs the sink to emit after every element, otherwise they would be buffered
+Map<String, String> sinkConfig = new HashMap<>();
+sinkConfig.put("bulk.flush.max.actions", "1");
+
+List<InetSocketAddress> transports = new ArrayList<>();
+transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
+
+input.addSink(new ElasticsearchSink(esConfig, sinkConfig, transports, new ElasticsearchSinkFunction<String>() {
+  public IndexRequest createIndexRequest(String element) {
+    Map<String, String> json = new HashMap<>();
+    json.put("data", element);
+
+    return Requests.indexRequest()
+            .index("my-index")
+            .type("my-type")
+            .source(json);
+  }
+
+  @Override
+  public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+    indexer.add(createIndexRequest(element));
+  }
+}));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val dataDir = ....;
+
+val input: DataStream[String] = ...
+
+val esConfig = new util.HashMap[String, String]
+esConfig.put("cluster.name", "my-cluster-name")
+
+val sinkConfig = new util.HashMap[String, String]
+sinkConfig.put("bulk.flush.max.actions", "1")
+
+val transports = new ArrayList[String]
+transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
+transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
+
+input.addSink(new ElasticsearchSink(esConfig, sinkConfig, transports, new ElasticsearchSinkFunction[String] {
+  def createIndexRequest(element: String): IndexRequest = {
+    val json = new util.HashMap[String, AnyRef]
+    json.put("data", element)
+    Requests.indexRequest.index("my-index").`type`("my-type").source(json)
+  }
+
+  override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
+    indexer.add(createIndexRequest(element))
+  }
+}))
+{% endhighlight %}
+</div>
+</div>
+
+The first Map of Strings is used to configure the Transport Client. The configuration keys
+are documented in the Elasticsearch documentation
+[here](https://www.elastic.co/guide/en/elasticsearch/reference/5.x/index.html).
+Especially important is the `cluster.name`. parameter that must correspond to
+the name of your cluster.
+
+The second Map of Strings is used to configure a `BulkProcessor` to send Action requests to the cluster.
+This will buffer elements and Action Requests before sending to the cluster. The behaviour of the
+`BulkProcessor` can be configured using these config keys:
+ * **bulk.flush.max.actions**: Maximum amount of elements to buffer
+ * **bulk.flush.max.size.mb**: Maximum amount of data (in megabytes) to buffer
+ * **bulk.flush.interval.ms**: Interval at which to flush data regardless of the other two
+  settings in milliseconds
+
+This now provides a list of Elasticsearch Nodes
+to which the sink should connect via a `TransportClient`.
+
+More information about Elasticsearch can be found [here](https://elastic.co).
+

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/docs/dev/connectors/filesystem_sink.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/filesystem_sink.md b/docs/dev/connectors/filesystem_sink.md
index 0fa8bb1..bcaeb17 100644
--- a/docs/dev/connectors/filesystem_sink.md
+++ b/docs/dev/connectors/filesystem_sink.md
@@ -2,7 +2,7 @@
 title: "HDFS Connector"
 nav-title: Rolling File Sink
 nav-parent_id: connectors
-nav-pos: 6
+nav-pos: 7
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/docs/dev/connectors/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/index.md b/docs/dev/connectors/index.md
index ec0725a..f5c3eec 100644
--- a/docs/dev/connectors/index.md
+++ b/docs/dev/connectors/index.md
@@ -31,7 +31,6 @@ Currently these systems are supported: (Please select the respective documentati
 
  * [Apache Kafka](https://kafka.apache.org/) (sink/source)
  * [Elasticsearch](https://elastic.co/) (sink)
- * [Elasticsearch 2x](https://elastic.co/) (sink)
  * [Hadoop FileSystem](http://hadoop.apache.org) (sink)
  * [RabbitMQ](http://www.rabbitmq.com/) (sink/source)
  * [Amazon Kinesis Streams](http://aws.amazon.com/kinesis/streams/) (sink/source)

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/docs/dev/connectors/nifi.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/nifi.md b/docs/dev/connectors/nifi.md
index aa9eba2..8223867 100644
--- a/docs/dev/connectors/nifi.md
+++ b/docs/dev/connectors/nifi.md
@@ -2,7 +2,7 @@
 title: "Apache NiFi Connector"
 nav-title: NiFi
 nav-parent_id: connectors
-nav-pos: 8
+nav-pos: 9
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/docs/dev/connectors/rabbitmq.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/rabbitmq.md b/docs/dev/connectors/rabbitmq.md
index c94c99d..b4da248 100644
--- a/docs/dev/connectors/rabbitmq.md
+++ b/docs/dev/connectors/rabbitmq.md
@@ -2,7 +2,7 @@
 title: "RabbitMQ Connector"
 nav-title: RabbitMQ
 nav-parent_id: connectors
-nav-pos: 7
+nav-pos: 8
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/docs/dev/connectors/twitter.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/twitter.md b/docs/dev/connectors/twitter.md
index be15aaf..f1fbbd4 100644
--- a/docs/dev/connectors/twitter.md
+++ b/docs/dev/connectors/twitter.md
@@ -2,7 +2,7 @@
 title: "Twitter Connector"
 nav-title: Twitter
 nav-parent_id: connectors
-nav-pos: 9
+nav-pos: 10
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/flink-connector-elasticsearch5/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/pom.xml b/flink-connectors/flink-connector-elasticsearch5/pom.xml
new file mode 100644
index 0000000..8fc5c8b
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch5/pom.xml
@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+			xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+			xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connectors</artifactId>
+		<version>1.2-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-elasticsearch5_2.10</artifactId>
+	<name>flink-connector-elasticsearch5</name>
+
+	<packaging>jar</packaging>
+
+	<!-- Allow users to pass custom connector versions -->
+	<properties>
+		<elasticsearch.version>5.0.0</elasticsearch.version>
+	</properties>
+
+	<dependencies>
+
+		<!-- core dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.elasticsearch.client</groupId>
+			<artifactId>transport</artifactId>
+			<version>${elasticsearch.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-api</artifactId>
+			<version>2.7</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-core</artifactId>
+			<version>2.7</version>
+		</dependency>
+
+		<dependency>
+			<groupId>com.fasterxml.jackson.core</groupId>
+			<artifactId>jackson-core</artifactId>
+		</dependency>
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+	</dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/BulkProcessorIndexer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/BulkProcessorIndexer.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/BulkProcessorIndexer.java
new file mode 100644
index 0000000..f7ca499
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/BulkProcessorIndexer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch5;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+
+public class BulkProcessorIndexer implements RequestIndexer {
+	private final BulkProcessor bulkProcessor;
+
+	public BulkProcessorIndexer(BulkProcessor bulkProcessor) {
+		this.bulkProcessor = bulkProcessor;
+	}
+
+	@Override
+	public void add(ActionRequest... actionRequests) {
+		for (ActionRequest actionRequest : actionRequests) {
+			this.bulkProcessor.add(actionRequest);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
new file mode 100644
index 0000000..29c69c4
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch5;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.transport.Netty3Plugin;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Sink that emits its input elements in bulk to an Elasticsearch cluster.
+ * <p>
+ * <p>
+ * The first {@link Map} passed to the constructor is forwarded to Elasticsearch when creating
+ * {@link TransportClient}. The config keys can be found in the Elasticsearch
+ * documentation. An important setting is {@code cluster.name}, this should be set to the name
+ * of the cluster that the sink should emit to.
+ * <p>
+ * <b>Attention: </b> When using the {@code TransportClient} the sink will fail if no cluster
+ * can be connected to.
+ * <p>
+ * The second {@link Map} is used to configure a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}.
+ * This will buffer elements before sending a request to the cluster. The behaviour of the
+ * {@code BulkProcessor} can be configured using these config keys:
+ * <ul>
+ * <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer
+ * <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer
+ * <li> {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two
+ * settings in milliseconds
+ * </ul>
+ * <p>
+ * <p>
+ * You also have to provide an {@link RequestIndexer}. This is used to create an
+ * {@link IndexRequest} from an element that needs to be added to Elasticsearch. See
+ * {@link RequestIndexer} for an example.
+ *
+ * @param <T> Type of the elements emitted by this sink
+ */
+public class ElasticsearchSink<T> extends RichSinkFunction<T> {
+
+	public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
+	public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
+	public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class);
+
+	/**
+	 * The user specified config map that we forward to Elasticsearch when we create the Client.
+	 */
+	private final Map<String, String> esConfig;
+
+	/**
+	 * The user specified config map that we use to configure BulkProcessor.
+	 */
+	private final Map<String, String> sinkConfig;
+
+	/**
+	 * The list of nodes that the TransportClient should connect to. This is null if we are using
+	 * an embedded Node to get a Client.
+	 */
+	private final List<InetSocketAddress> transportAddresses;
+
+	/**
+	 * The builder that is used to construct an {@link IndexRequest} from the incoming element.
+	 */
+	private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
+
+	/**
+	 * The Client that was either retrieved from a Node or is a TransportClient.
+	 */
+	private transient Client client;
+
+	/**
+	 * Bulk processor that was created using the client
+	 */
+	private transient BulkProcessor bulkProcessor;
+
+	/**
+	 * Bulk {@link org.elasticsearch.action.ActionRequest} indexer
+	 */
+	private transient RequestIndexer requestIndexer;
+
+	/**
+	 * This is set from inside the BulkProcessor listener if there where failures in processing.
+	 */
+	private final AtomicBoolean hasFailure = new AtomicBoolean(false);
+
+	/**
+	 * This is set from inside the BulkProcessor listener if a Throwable was thrown during processing.
+	 */
+	private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
+
+	/**
+	 * Creates a new ElasticsearchSink that connects to the cluster using a TransportClient.
+	 *
+	 * @param esConfig                  The map of user settings that are passed when constructing the TransportClient
+	 * @param sinkConfig                The map of user settings that are passed when constructing the BulkProcessor
+	 * @param transportAddresses        The Elasticsearch Nodes to which to connect using a {@code TransportClient}
+	 * @param elasticsearchSinkFunction This is used to generate the ActionRequest from the incoming element
+	 */
+	public ElasticsearchSink(Map<String, String> esConfig, Map<String, String> sinkConfig, List<InetSocketAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+		this.esConfig = esConfig;
+		this.sinkConfig = sinkConfig;
+		this.elasticsearchSinkFunction = elasticsearchSinkFunction;
+		Preconditions.checkArgument(transportAddresses != null && transportAddresses.size() > 0);
+		this.transportAddresses = transportAddresses;
+	}
+
+	/**
+	 * Initializes the connection to Elasticsearch by creating a
+	 * {@link org.elasticsearch.client.transport.TransportClient}.
+	 */
+	@Override
+	public void open(Configuration configuration) {
+		List<TransportAddress> transportNodes;
+		transportNodes = new ArrayList<>(transportAddresses.size());
+		for (InetSocketAddress address : transportAddresses) {
+			transportNodes.add(new InetSocketTransportAddress(address));
+		}
+
+		Settings settings = Settings.builder().put(esConfig)
+			.put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
+			.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME)
+			.build();
+
+		TransportClient transportClient = new PreBuiltTransportClient(settings);
+		for (TransportAddress transport : transportNodes) {
+			transportClient.addTransportAddress(transport);
+		}
+
+		// verify that we actually are connected to a cluster
+		if (transportClient.connectedNodes().isEmpty()) {
+			throw new RuntimeException("Client is not connected to any Elasticsearch nodes!");
+		}
+
+		client = transportClient;
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Created Elasticsearch TransportClient {}", client);
+		}
+
+		BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(client, new BulkProcessor.Listener() {
+			@Override
+			public void beforeBulk(long executionId, BulkRequest request) {
+
+			}
+
+			@Override
+			public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+				if (response.hasFailures()) {
+					for (BulkItemResponse itemResp : response.getItems()) {
+						if (itemResp.isFailed()) {
+							LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
+							failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
+						}
+					}
+					hasFailure.set(true);
+				}
+			}
+
+			@Override
+			public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+				LOG.error(failure.getMessage());
+				failureThrowable.compareAndSet(null, failure);
+				hasFailure.set(true);
+			}
+		});
+
+		// This makes flush() blocking
+		bulkProcessorBuilder.setConcurrentRequests(0);
+
+		ParameterTool params = ParameterTool.fromMap(sinkConfig);
+
+		if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
+			bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
+		}
+
+		if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
+			bulkProcessorBuilder.setBulkSize(new ByteSizeValue(params.getInt(
+				CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB));
+		}
+
+		if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
+			bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
+		}
+
+		bulkProcessor = bulkProcessorBuilder.build();
+		requestIndexer = new BulkProcessorIndexer(bulkProcessor);
+	}
+
+	@Override
+	public void invoke(T element) {
+		elasticsearchSinkFunction.process(element, getRuntimeContext(), requestIndexer);
+	}
+
+	@Override
+	public void close() {
+		if (bulkProcessor != null) {
+			bulkProcessor.close();
+			bulkProcessor = null;
+		}
+
+		if (client != null) {
+			client.close();
+		}
+
+		if (hasFailure.get()) {
+			Throwable cause = failureThrowable.get();
+			if (cause != null) {
+				throw new RuntimeException("An error occurred in ElasticsearchSink.", cause);
+			} else {
+				throw new RuntimeException("An error occurred in ElasticsearchSink.");
+			}
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkFunction.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkFunction.java
new file mode 100644
index 0000000..752a83e
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkFunction.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch5;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+import java.io.Serializable;
+
+/**
+ * Method that creates an {@link org.elasticsearch.action.ActionRequest} from an element in a Stream.
+ *
+ * <p>
+ * This is used by {@link ElasticsearchSink} to prepare elements for sending them to Elasticsearch.
+ *
+ * <p>
+ * Example:
+ *
+ * <pre>{@code
+ *					private static class TestElasticSearchSinkFunction implements
+ *						ElasticsearchSinkFunction<Tuple2<Integer, String>> {
+ *
+ *					public IndexRequest createIndexRequest(Tuple2<Integer, String> element) {
+ *						Map<String, Object> json = new HashMap<>();
+ *						json.put("data", element.f1);
+ *
+ *						return Requests.indexRequest()
+ *							.index("my-index")
+ *							.type("my-type")
+ *							.id(element.f0.toString())
+ *							.source(json);
+ *						}
+ *
+ *				public void process(Tuple2<Integer, String> element, RuntimeContext ctx, RequestIndexer indexer) {
+ *					indexer.add(createIndexRequest(element));
+ *				}
+ *		}
+ *
+ * }</pre>
+ *
+ * @param <T> The type of the element handled by this {@code ElasticsearchSinkFunction}
+ */
+public interface ElasticsearchSinkFunction<T> extends Serializable, Function {
+	void process(T element, RuntimeContext ctx, RequestIndexer indexer);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/RequestIndexer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/RequestIndexer.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/RequestIndexer.java
new file mode 100644
index 0000000..170df31
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/RequestIndexer.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch5;
+
+import org.elasticsearch.action.ActionRequest;
+
+import java.io.Serializable;
+
+public interface RequestIndexer extends Serializable {
+	void add(ActionRequest... actionRequests);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
new file mode 100644
index 0000000..b4a370b
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch5;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.internal.InternalSettingsPreparer;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.transport.Netty3Plugin;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase {
+
+	private static final int NUM_ELEMENTS = 20;
+
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Test
+	public void testTransportClient() throws Exception {
+
+		File dataDir = tempFolder.newFolder();
+
+		Settings settings = Settings.builder()
+			.put("cluster.name", "my-transport-client-cluster")
+			.put("http.enabled", false)
+			.put("path.home", dataDir.getParent())
+			.put("path.data", dataDir.getAbsolutePath())
+			.put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
+			.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME)
+			.build();
+
+		Node node = new PluginNode(settings);
+		node.start();
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
+
+		Map<String, String> esConfig = ImmutableMap.of("cluster.name", "my-transport-client-cluster");
+		Map<String, String> sinkConfig = ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+		List<InetSocketAddress> transports = new ArrayList<>();
+		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+		source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, transports, new TestElasticsearchSinkFunction()));
+
+		env.execute("Elasticsearch TransportClient Test");
+
+		// verify the results
+		Client client = node.client();
+		for (int i = 0; i < NUM_ELEMENTS; i++) {
+			GetResponse response = client.prepareGet("my-index", "my-type", Integer.toString(i)).get();
+			assertEquals("message #" + i, response.getSource().get("data"));
+		}
+
+		node.close();
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testNullTransportClient() throws Exception {
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
+
+		Map<String, String> esConfig = ImmutableMap.of("cluster.name", "my-transport-client-cluster");
+		Map<String, String> sinkConfig = ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+		source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, null, new TestElasticsearchSinkFunction()));
+
+		fail();
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testEmptyTransportClient() throws Exception {
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
+
+		Map<String, String> esConfig = ImmutableMap.of("cluster.name", "my-transport-client-cluster");
+		Map<String, String> sinkConfig = ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+		source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, new ArrayList<InetSocketAddress>(), new TestElasticsearchSinkFunction()));
+
+		env.execute("Elasticsearch TransportClient Test");
+
+		fail();
+	}
+
+	@Test(expected = JobExecutionException.class)
+	public void testTransportClientFails() throws Exception {
+		// this checks whether the TransportClient fails early when there is no cluster to
+		// connect to. There isn't a similar test for the Node Client version since that
+		// one will block and wait for a cluster to come online
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
+
+		Map<String, String> esConfig = ImmutableMap.of("cluster.name", "my-transport-client-cluster");
+		Map<String, String> sinkConfig = ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+		List<InetSocketAddress> transports = new ArrayList<>();
+		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+		source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, transports, new TestElasticsearchSinkFunction()));
+
+		env.execute("Elasticsearch Node Client Test");
+
+		fail();
+	}
+
+	private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
+		private static final long serialVersionUID = 1L;
+
+		private volatile boolean running = true;
+
+		@Override
+		public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
+			for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+				ctx.collect(Tuple2.of(i, "message #" + i));
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
+
+	private static class TestElasticsearchSinkFunction implements ElasticsearchSinkFunction<Tuple2<Integer, String>> {
+		private static final long serialVersionUID = 1L;
+
+		public IndexRequest createIndexRequest(Tuple2<Integer, String> element) {
+			Map<String, Object> json = new HashMap<>();
+			json.put("data", element.f1);
+
+			return Requests.indexRequest()
+				.index("my-index")
+				.type("my-type")
+				.id(element.f0.toString())
+				.source(json);
+		}
+
+		@Override
+		public void process(Tuple2<Integer, String> element, RuntimeContext ctx, RequestIndexer indexer) {
+			indexer.add(createIndexRequest(element));
+		}
+	}
+
+	private static class PluginNode extends Node {
+		public PluginNode(Settings settings) {
+			super(InternalSettingsPreparer.prepareEnvironment(settings, null), Collections.<Class<? extends Plugin>>singletonList(Netty3Plugin.class));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchExample.java b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchExample.java
new file mode 100644
index 0000000..47ce846
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchExample.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch5.examples;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch5.RequestIndexer;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
+ * you have a cluster named "elasticsearch" running or change the name of cluster in the config map.
+ */
+public class ElasticsearchExample {
+
+	public static void main(String[] args) throws Exception {
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		SingleOutputStreamOperator<String> source =
+			env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
+				@Override
+				public String map(Long value) throws Exception {
+					return "message #" + value;
+				}
+			});
+
+		Map<String, String> esConfig = ImmutableMap.of("cluster.name", "elasticsearch");
+
+		// This instructs the sink to emit after every element, otherwise they would be buffered
+		Map<String, String> sinkConfig = ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+		List<InetSocketAddress> transports = new ArrayList<>();
+		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+		source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, transports, new ElasticsearchSinkFunction<String>() {
+			@Override
+			public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+				indexer.add(createIndexRequest(element));
+			}
+		}));
+
+		env.execute("Elasticsearch Example");
+	}
+
+	private static IndexRequest createIndexRequest(String element) {
+		Map<String, Object> json = new HashMap<>();
+		json.put("data", element);
+
+		return Requests.indexRequest()
+			.index("my-index")
+			.type("my-type")
+			.id(element)
+			.source(json);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j2.properties b/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j2.properties
new file mode 100644
index 0000000..dc20726
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j2.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/flink-connector-elasticsearch5/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/resources/logback-test.xml b/flink-connectors/flink-connector-elasticsearch5/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..8893f7c
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch5/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.streaming.connectors.elasticsearch5" level="WARN"/>
+</configuration>

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 91ee6af..e19c77f 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -47,6 +47,7 @@ under the License.
 		<module>flink-connector-kafka-0.10</module>
 		<module>flink-connector-elasticsearch</module>
 		<module>flink-connector-elasticsearch2</module>
+		<module>flink-connector-elasticsearch5</module>
 		<module>flink-connector-rabbitmq</module>
 		<module>flink-connector-twitter</module>
 		<module>flink-connector-nifi</module>
@@ -86,5 +87,5 @@ under the License.
 			</modules>
 		</profile>
 	</profiles>
-	
+
 </project>