You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/02/07 14:47:57 UTC

[2/4] flink git commit: [FLINK-4988] [elasticsearch] Restructure Elasticsearch connectors

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
new file mode 100644
index 0000000..098afa9
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.util.Preconditions;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.node.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
+import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+
+/**
+ * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 1.x.
+ */
+public class Elasticsearch1ApiCallBridge implements ElasticsearchApiCallBridge {
+
+	private static final long serialVersionUID = -2632363720584123682L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch1ApiCallBridge.class);
+
+	/** User-provided transport addresses. This is null if we are using an embedded {@link Node} for communication. */
+	private final List<TransportAddress> transportAddresses;
+
+	/** The embedded {@link Node} used for communication. This is null if we are using a TransportClient. */
+	private transient Node node;
+
+	/**
+	 * Constructor for use of an embedded {@link Node} for communication with the Elasticsearch cluster.
+	 */
+	Elasticsearch1ApiCallBridge() {
+		this.transportAddresses = null;
+	}
+
+	/**
+	 * Constructor for use of a {@link TransportClient} for communication with the Elasticsearch cluster.
+	 */
+	Elasticsearch1ApiCallBridge(List<TransportAddress> transportAddresses) {
+		Preconditions.checkArgument(transportAddresses != null && !transportAddresses.isEmpty());
+		this.transportAddresses = transportAddresses;
+	}
+
+	@Override
+	public Client createClient(Map<String, String> clientConfig) {
+		if (transportAddresses == null) {
+
+			// Make sure that we disable http access to our embedded node
+			Settings settings = settingsBuilder()
+				.put(clientConfig)
+				.put("http.enabled", false)
+				.build();
+
+			node = nodeBuilder()
+				.settings(settings)
+				.client(true)
+				.data(false)
+				.node();
+
+			Client client = node.client();
+
+			if (LOG.isInfoEnabled()) {
+				LOG.info("Created Elasticsearch client from embedded node");
+			}
+
+			return client;
+		} else {
+			Settings settings = settingsBuilder()
+				.put(clientConfig)
+				.build();
+
+			TransportClient transportClient = new TransportClient(settings);
+			for (TransportAddress transport: transportAddresses) {
+				transportClient.addTransportAddress(transport);
+			}
+
+			// verify that we actually are connected to a cluster
+			if (transportClient.connectedNodes().isEmpty()) {
+				throw new RuntimeException("Elasticsearch client is not connected to any Elasticsearch nodes!");
+			}
+
+			if (LOG.isInfoEnabled()) {
+				LOG.info("Created Elasticsearch TransportClient with connected nodes {}", transportClient.connectedNodes());
+			}
+
+			return transportClient;
+		}
+	}
+
+	@Override
+	public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
+		if (!bulkItemResponse.isFailed()) {
+			return null;
+		} else {
+			return new RuntimeException(bulkItemResponse.getFailureMessage());
+		}
+	}
+
+	@Override
+	public void cleanup() {
+		if (node != null && !node.isClosed()) {
+			node.close();
+			node = null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
index ac14ade..c338860 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
@@ -17,59 +17,38 @@
 
 package org.apache.flink.streaming.connectors.elasticsearch;
 
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.common.collect.ImmutableList;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.node.Node;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
-
 
 /**
- * Sink that emits its input elements to an Elasticsearch cluster.
+ * Elasticsearch 1.x sink that requests multiple {@link ActionRequest ActionRequests}
+ * against a cluster for each incoming element.
  *
  * <p>
- * When using the first constructor {@link #ElasticsearchSink(java.util.Map, IndexRequestBuilder)}
- * the sink will create a local {@link Node} for communicating with the
- * Elasticsearch cluster. When using the second constructor
- * {@link #ElasticsearchSink(java.util.Map, java.util.List, IndexRequestBuilder)} a {@link TransportClient} will
- * be used instead.
+ * When using the first constructor {@link #ElasticsearchSink(java.util.Map, ElasticsearchSinkFunction)}
+ * the sink will create a local {@link Node} for communicating with the Elasticsearch cluster. When using the second
+ * constructor {@link #ElasticsearchSink(java.util.Map, java.util.List, ElasticsearchSinkFunction)} a
+ * {@link TransportClient} will be used instead.
  *
  * <p>
  * <b>Attention: </b> When using the {@code TransportClient} the sink will fail if no cluster
- * can be connected to. With the {@code Node Client} the sink will block and wait for a cluster
+ * can be connected to. When using the local {@code Node} for communicating, the sink will block and wait for a cluster
  * to come online.
  *
  * <p>
- * The {@link Map} passed to the constructor is forwarded to Elasticsearch when creating
- * the {@link Node} or {@link TransportClient}. The config keys can be found in the Elasticsearch
- * documentation. An important setting is {@code cluster.name}, this should be set to the name
- * of the cluster that the sink should emit to.
+ * The {@link Map} passed to the constructor is used to create the {@link Node} or {@link TransportClient}. The config
+ * keys can be found in the <a href="https://www.elastic.io">Elasticsearch documentation</a>. An important setting is
+ * {@code cluster.name}, which should be set to the name of the cluster that the sink should emit to.
  *
  * <p>
- * Internally, the sink will use a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}.
+ * Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}.
  * This will buffer elements before sending a request to the cluster. The behaviour of the
  * {@code BulkProcessor} can be configured using these config keys:
  * <ul>
@@ -80,236 +59,63 @@ import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
  * </ul>
  *
  * <p>
- * You also have to provide an {@link IndexRequestBuilder}. This is used to create an
- * {@link IndexRequest} from an element that needs to be added to Elasticsearch. See
- * {@link org.apache.flink.streaming.connectors.elasticsearch.IndexRequestBuilder} for an example.
+ * You also have to provide an {@link ElasticsearchSinkFunction}. This is used to create multiple
+ * {@link ActionRequest ActionRequests} for each incoming element. See the class level documentation of
+ * {@link ElasticsearchSinkFunction} for an example.
  *
- * @param <T> Type of the elements emitted by this sink
+ * @param <T> Type of the elements handled by this sink
  */
-public class ElasticsearchSink<T> extends RichSinkFunction<T> {
-
-	public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
-	public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
-	public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
 
 	private static final long serialVersionUID = 1L;
 
-	private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class);
-
-	/**
-	 * The user specified config map that we forward to Elasticsearch when we create the Client.
-	 */
-	private final Map<String, String> userConfig;
-
-	/**
-	 * The list of nodes that the TransportClient should connect to. This is null if we are using
-	 * an embedded Node to get a Client.
-	 */
-	private final List<TransportAddress> transportNodes;
-
-	/**
-	 * The builder that is used to construct an {@link IndexRequest} from the incoming element.
-	 */
-	private final IndexRequestBuilder<T> indexRequestBuilder;
-
-	/**
-	 * The embedded Node that is used to communicate with the Elasticsearch cluster. This is null
-	 * if we are using a TransportClient.
-	 */
-	private transient Node node;
-
 	/**
-	 * The Client that was either retrieved from a Node or is a TransportClient.
-	 */
-	private transient Client client;
-
-	/**
-	 * Bulk processor that was created using the client
-	 */
-	private transient BulkProcessor bulkProcessor;
-
-	/**
-	 * This is set from inside the BulkProcessor listener if there where failures in processing.
-	 */
-	private final AtomicBoolean hasFailure = new AtomicBoolean(false);
-
-	/**
-	 * This is set from inside the BulkProcessor listener if a Throwable was thrown during processing.
-	 */
-	private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
-
-	/**
-	 * Creates a new ElasticsearchSink that connects to the cluster using an embedded Node.
+	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using an embedded {@link Node}.
 	 *
-	 * @param userConfig The map of user settings that are passed when constructing the Node and BulkProcessor
+	 * @param userConfig The map of user settings that are used when constructing the {@link Node} and {@link BulkProcessor}
 	 * @param indexRequestBuilder This is used to generate the IndexRequest from the incoming element
+	 *
+	 * @deprecated Deprecated since version 1.2, to be removed at version 2.0.
+	 *             Please use {@link ElasticsearchSink#ElasticsearchSink(Map, ElasticsearchSinkFunction)} instead.
 	 */
+	@Deprecated
 	public ElasticsearchSink(Map<String, String> userConfig, IndexRequestBuilder<T> indexRequestBuilder) {
-		this.userConfig = userConfig;
-		this.indexRequestBuilder = indexRequestBuilder;
-		transportNodes = null;
+		this(userConfig, new IndexRequestBuilderWrapperFunction<>(indexRequestBuilder));
 	}
 
 	/**
-	 * Creates a new ElasticsearchSink that connects to the cluster using a TransportClient.
+	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
 	 *
-	 * @param userConfig The map of user settings that are passed when constructing the TransportClient and BulkProcessor
-	 * @param transportNodes The Elasticsearch Nodes to which to connect using a {@code TransportClient}
-	 * @param indexRequestBuilder This is used to generate the IndexRequest from the incoming element
+	 * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor}
+	 * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
+	 * @param indexRequestBuilder This is used to generate a {@link IndexRequest} from the incoming element
 	 *
+	 * @deprecated Deprecated since 1.2, to be removed at 2.0.
+	 *             Please use {@link ElasticsearchSink#ElasticsearchSink(Map, List, ElasticsearchSinkFunction)} instead.
 	 */
-	public ElasticsearchSink(Map<String, String> userConfig, List<TransportAddress> transportNodes, IndexRequestBuilder<T> indexRequestBuilder) {
-		this.userConfig = userConfig;
-		this.indexRequestBuilder = indexRequestBuilder;
-		this.transportNodes = transportNodes;
+	@Deprecated
+	public ElasticsearchSink(Map<String, String> userConfig, List<TransportAddress> transportAddresses, IndexRequestBuilder<T> indexRequestBuilder) {
+		this(userConfig, transportAddresses, new IndexRequestBuilderWrapperFunction<>(indexRequestBuilder));
 	}
 
 	/**
-	 * Initializes the connection to Elasticsearch by either creating an embedded
-	 * {@link org.elasticsearch.node.Node} and retrieving the
-	 * {@link org.elasticsearch.client.Client} from it or by creating a
-	 * {@link org.elasticsearch.client.transport.TransportClient}.
+	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using an embedded {@link Node}.
+	 *
+	 * @param userConfig The map of user settings that are used when constructing the embedded {@link Node} and {@link BulkProcessor}
+	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
 	 */
-	@Override
-	public void open(Configuration configuration) {
-		if (transportNodes == null) {
-			// Make sure that we disable http access to our embedded node
-			Settings settings =
-					ImmutableSettings.settingsBuilder()
-							.put(userConfig)
-							.put("http.enabled", false)
-							.build();
-
-			node =
-					nodeBuilder()
-							.settings(settings)
-							.client(true)
-							.data(false)
-							.node();
-
-			client = node.client();
-
-			if (LOG.isInfoEnabled()) {
-				LOG.info("Created Elasticsearch Client {} from embedded Node", client);
-			}
-
-		} else {
-			Settings settings = ImmutableSettings.settingsBuilder()
-					.put(userConfig)
-					.build();
-
-			TransportClient transportClient = new TransportClient(settings);
-			for (TransportAddress transport: transportNodes) {
-				transportClient.addTransportAddress(transport);
-			}
-
-			// verify that we actually are connected to a cluster
-			ImmutableList<DiscoveryNode> nodes = transportClient.connectedNodes();
-			if (nodes.isEmpty()) {
-				throw new RuntimeException("Client is not connected to any Elasticsearch nodes!");
-			} else {
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Connected to nodes: " + nodes.toString());
-				}
-			}
-
-			client = transportClient;
-
-			if (LOG.isInfoEnabled()) {
-				LOG.info("Created Elasticsearch TransportClient {}", client);
-			}
-		}
-
-		BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
-				client,
-				new BulkProcessor.Listener() {
-					@Override
-					public void beforeBulk(long executionId,
-							BulkRequest request) {
-
-					}
-
-					@Override
-					public void afterBulk(long executionId,
-							BulkRequest request,
-							BulkResponse response) {
-						if (response.hasFailures()) {
-							for (BulkItemResponse itemResp : response.getItems()) {
-								if (itemResp.isFailed()) {
-									LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
-									failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
-								}
-							}
-							hasFailure.set(true);
-						}
-					}
-
-					@Override
-					public void afterBulk(long executionId,
-							BulkRequest request,
-							Throwable failure) {
-						LOG.error(failure.getMessage());
-						failureThrowable.compareAndSet(null, failure);
-						hasFailure.set(true);
-					}
-				});
-
-		// This makes flush() blocking
-		bulkProcessorBuilder.setConcurrentRequests(0);
-
-		ParameterTool params = ParameterTool.fromMap(userConfig);
-
-		if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
-			bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
-		}
-
-		if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
-			bulkProcessorBuilder.setBulkSize(new ByteSizeValue(params.getInt(
-					CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB));
-		}
-
-		if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
-			bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
-		}
-
-		bulkProcessor = bulkProcessorBuilder.build();
-	}
-
-	@Override
-	public void invoke(T element) {
-		IndexRequest indexRequest = indexRequestBuilder.createIndexRequest(element, getRuntimeContext());
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Emitting IndexRequest: {}", indexRequest);
-		}
-
-		bulkProcessor.add(indexRequest);
+	public ElasticsearchSink(Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+		super(new Elasticsearch1ApiCallBridge(), userConfig, elasticsearchSinkFunction);
 	}
 
-	@Override
-	public void close() {
-		if (bulkProcessor != null) {
-			bulkProcessor.close();
-			bulkProcessor = null;
-		}
-
-		if (client != null) {
-			client.close();
-		}
-
-		if (node != null) {
-			node.close();
-		}
-
-		if (hasFailure.get()) {
-			Throwable cause = failureThrowable.get();
-			if (cause != null) {
-				throw new RuntimeException("An error occured in ElasticsearchSink.", cause);
-			} else {
-				throw new RuntimeException("An error occured in ElasticsearchSink.");
-
-			}
-		}
+	/**
+	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
+	 *
+	 * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor}
+	 * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
+	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
+	 */
+	public ElasticsearchSink(Map<String, String> userConfig, List<TransportAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+		super(new Elasticsearch1ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
index 04ae40a..18aa11e 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -51,7 +51,11 @@ import java.io.Serializable;
  * }</pre>
  *
  * @param <T> The type of the element handled by this {@code IndexRequestBuilder}
+ *
+ * @deprecated Deprecated since version 1.2, to be removed at version 2.0.
+ *             Please create a {@link ElasticsearchSink} using a {@link ElasticsearchSinkFunction} instead.
  */
+@Deprecated
 public interface IndexRequestBuilder<T> extends Function, Serializable {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilderWrapperFunction.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilderWrapperFunction.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilderWrapperFunction.java
new file mode 100644
index 0000000..6f1d138
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilderWrapperFunction.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+/**
+ * A dummy {@link ElasticsearchSinkFunction} that wraps a {@link IndexRequestBuilder}.
+ * This serves as a bridge for the usage deprecation of the {@code IndexRequestBuilder} interface.
+ */
+class IndexRequestBuilderWrapperFunction<T> implements ElasticsearchSinkFunction<T> {
+
+	private static final long serialVersionUID = 289876038414250101L;
+
+	private final IndexRequestBuilder<T> indexRequestBuilder;
+
+	IndexRequestBuilderWrapperFunction(IndexRequestBuilder<T> indexRequestBuilder) {
+		this.indexRequestBuilder = indexRequestBuilder;
+	}
+
+	@Override
+	public void process(T element, RuntimeContext ctx, RequestIndexer indexer) {
+		indexer.add(indexRequestBuilder.createIndexRequest(element, ctx));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
index 33a2e47..3a7b113 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,177 +18,149 @@
 package org.apache.flink.streaming.connectors.elasticsearch;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
+import org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
+import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Requests;
-import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.transport.LocalTransportAddress;
 import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.node.Node;
-import org.junit.Assert;
-import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
+import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
 
-public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase {
+	@Test
+	public void testTransportClient() throws Exception {
+		runTransportClientTest();
+	}
 
-	private static final int NUM_ELEMENTS = 20;
+	@Test
+	public void testNullTransportClient() throws Exception {
+		runNullTransportClientTest();
+	}
 
-	@ClassRule
-	public static TemporaryFolder tempFolder = new TemporaryFolder();
+	@Test
+	public void testEmptyTransportClient() throws Exception {
+		runEmptyTransportClientTest();
+	}
 
 	@Test
-	public void testNodeClient() throws Exception{
+	public void testTransportClientFails() throws Exception{
+		runTransportClientFailsTest();
+	}
 
-		File dataDir = tempFolder.newFolder();
+	// -- Tests specific to Elasticsearch 1.x --
 
-		Node node = nodeBuilder()
-				.settings(ImmutableSettings.settingsBuilder()
-						.put("http.enabled", false)
-						.put("path.data", dataDir.getAbsolutePath()))
-				// set a custom cluster name to verify that user config works correctly
-				.clusterName("my-node-client-cluster")
-				.local(true)
-				.node();
+	/**
+	 * Tests that the Elasticsearch sink works properly using an embedded node to connect to Elasticsearch.
+	 */
+	@Test
+	public void testEmbeddedNode() throws Exception {
+		final String index = "embedded-node-test-index";
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
+		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
 
-		Map<String, String> config = Maps.newHashMap();
+		Map<String, String> userConfig = new HashMap<>();
 		// This instructs the sink to emit after every element, otherwise they would be buffered
-		config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-		config.put("cluster.name", "my-node-client-cluster");
-
-		// connect to our local node
-		config.put("node.local", "true");
+		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+		userConfig.put("cluster.name", CLUSTER_NAME);
+		userConfig.put("node.local", "true");
 
-		source.addSink(new ElasticsearchSink<>(config, new TestIndexRequestBuilder()));
-
-		env.execute("Elasticsearch Node Client Test");
+		source.addSink(new ElasticsearchSink<>(
+			userConfig,
+			new SourceSinkDataTestKit.TestElasticsearchSinkFunction(index))
+		);
 
+		env.execute("Elasticsearch Embedded Node Test");
 
 		// verify the results
-		Client client = node.client();
-		for (int i = 0; i < NUM_ELEMENTS; i++) {
-			GetResponse response = client.get(new GetRequest("my-index",
-					"my-type",
-					Integer.toString(i))).actionGet();
-			Assert.assertEquals("message #" + i, response.getSource().get("data"));
-		}
+		Client client = embeddedNodeEnv.getClient();
+		SourceSinkDataTestKit.verifyProducedSinkData(client, index);
 
-		node.close();
+		client.close();
 	}
 
+	/**
+	 * Tests that behaviour of the deprecated {@link IndexRequestBuilder} constructor works properly.
+	 */
 	@Test
-	public void testTransportClient() throws Exception {
-
-		File dataDir = tempFolder.newFolder();
+	public void testDeprecatedIndexRequestBuilderVariant() throws Exception {
+		final String index = "index-req-builder-test-index";
 
-		Node node = nodeBuilder()
-				.settings(ImmutableSettings.settingsBuilder()
-						.put("http.enabled", false)
-						.put("path.data", dataDir.getAbsolutePath()))
-						// set a custom cluster name to verify that user config works correctly
-				.clusterName("my-node-client-cluster")
-				.local(true)
-				.node();
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
 
-		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
-
-		Map<String, String> config = Maps.newHashMap();
+		Map<String, String> userConfig = new HashMap<>();
 		// This instructs the sink to emit after every element, otherwise they would be buffered
-		config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-		config.put("cluster.name", "my-node-client-cluster");
-
-		// connect to our local node
-		config.put("node.local", "true");
+		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+		userConfig.put("cluster.name", CLUSTER_NAME);
+		userConfig.put("node.local", "true");
 
 		List<TransportAddress> transports = Lists.newArrayList();
 		transports.add(new LocalTransportAddress("1"));
 
-		source.addSink(new ElasticsearchSink<>(config, transports, new TestIndexRequestBuilder()));
-
-		env.execute("Elasticsearch TransportClient Test");
+		source.addSink(new ElasticsearchSink<>(
+			userConfig,
+			transports,
+			new TestIndexRequestBuilder(index))
+		);
 
+		env.execute("Elasticsearch Deprecated IndexRequestBuilder Bridge Test");
 
 		// verify the results
-		Client client = node.client();
-		for (int i = 0; i < NUM_ELEMENTS; i++) {
-			GetResponse response = client.get(new GetRequest("my-index",
-					"my-type",
-					Integer.toString(i))).actionGet();
-			Assert.assertEquals("message #" + i, response.getSource().get("data"));
-		}
+		Client client = embeddedNodeEnv.getClient();
+		SourceSinkDataTestKit.verifyProducedSinkData(client, index);
 
-		node.close();
+		client.close();
 	}
 
-	@Test(expected = JobExecutionException.class)
-	public void testTransportClientFails() throws Exception{
-		// this checks whether the TransportClient fails early when there is no cluster to
-		// connect to. We don't hava such as test for the Node Client version since that
-		// one will block and wait for a cluster to come online
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
+	@Override
+	protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig,
+																List<InetSocketAddress> transportAddresses,
+																ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+		return new ElasticsearchSink<>(userConfig, ElasticsearchUtils.convertInetSocketAddresses(transportAddresses), elasticsearchSinkFunction);
+	}
 
-		Map<String, String> config = Maps.newHashMap();
-		// This instructs the sink to emit after every element, otherwise they would be buffered
-		config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-		config.put("cluster.name", "my-node-client-cluster");
+	@Override
+	protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(
+		Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception {
 
-		// connect to our local node
-		config.put("node.local", "true");
+		// Elasticsearch 1.x requires this setting when using
+		// LocalTransportAddress to connect to a local embedded node
+		userConfig.put("node.local", "true");
 
 		List<TransportAddress> transports = Lists.newArrayList();
 		transports.add(new LocalTransportAddress("1"));
 
-		source.addSink(new ElasticsearchSink<>(config, transports, new TestIndexRequestBuilder()));
-
-		env.execute("Elasticsearch Node Client Test");
+		return new ElasticsearchSink<>(
+			userConfig,
+			transports,
+			elasticsearchSinkFunction);
 	}
 
-	private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
+	/**
+	 * A {@link IndexRequestBuilder} with equivalent functionality to {@link SourceSinkDataTestKit.TestElasticsearchSinkFunction}.
+	 */
+	private static class TestIndexRequestBuilder implements IndexRequestBuilder<Tuple2<Integer, String>> {
 		private static final long serialVersionUID = 1L;
 
-		private volatile boolean running = true;
+		private final String index;
 
-		@Override
-		public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
-			for (int i = 0; i < NUM_ELEMENTS && running; i++) {
-				ctx.collect(Tuple2.of(i, "message #" + i));
-			}
-		}
-
-		@Override
-		public void cancel() {
-			running = false;
+		public TestIndexRequestBuilder(String index) {
+			this.index = index;
 		}
-	}
-
-	private static class TestIndexRequestBuilder implements IndexRequestBuilder<Tuple2<Integer, String>> {
-		private static final long serialVersionUID = 1L;
 
 		@Override
 		public IndexRequest createIndexRequest(Tuple2<Integer, String> element, RuntimeContext ctx) {
@@ -196,10 +168,10 @@ public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase {
 			json.put("data", element.f1);
 
 			return Requests.indexRequest()
-					.index("my-index")
-					.type("my-type")
-					.id(element.f0.toString())
-					.source(json);
+				.index(index)
+				.type("flink-es-test-type")
+				.id(element.f0.toString())
+				.source(json);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
new file mode 100644
index 0000000..a0c809b
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.node.Node;
+
+import java.io.File;
+
+import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+
+/**
+ * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for Elasticsearch 1.x.
+ * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for integration tests.
+ */
+public class EmbeddedElasticsearchNodeEnvironmentImpl implements EmbeddedElasticsearchNodeEnvironment {
+
+	private Node node;
+
+	@Override
+	public void start(File tmpDataFolder, String clusterName) throws Exception {
+		if (node == null) {
+			node = nodeBuilder()
+				.settings(ImmutableSettings.settingsBuilder()
+					.put("http.enabled", false)
+					.put("path.data", tmpDataFolder.getAbsolutePath()))
+				.clusterName(clusterName)
+				.local(true)
+				.node();
+
+			node.start();
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		if (node != null && !node.isClosed()) {
+			node.close();
+			node = null;
+		}
+	}
+
+	@Override
+	public Client getClient() {
+		if (node != null && !node.isClosed()) {
+			return node.client();
+		} else {
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
deleted file mode 100644
index 136ae77..0000000
--- a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.elasticsearch.examples;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
-import org.apache.flink.streaming.connectors.elasticsearch.IndexRequestBuilder;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
- * you have a cluster named "elasticsearch" running or change the cluster name in the config map.
- */
-public class ElasticsearchExample {
-
-	public static void main(String[] args) throws Exception {
-		
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStreamSource<String> source = env.addSource(new SourceFunction<String>() {
-			private static final long serialVersionUID = 1L;
-
-			private volatile boolean running = true;
-
-			@Override
-			public void run(SourceContext<String> ctx) throws Exception {
-				for (int i = 0; i < 20 && running; i++) {
-					ctx.collect("message #" + i);
-				}
-			}
-
-			@Override
-			public void cancel() {
-				running = false;
-			}
-		});
-
-		Map<String, String> config = new HashMap<>();
-		// This instructs the sink to emit after every element, otherwise they would be buffered
-		config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-		source.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<String>() {
-			@Override
-			public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
-				Map<String, Object> json = new HashMap<>();
-				json.put("data", element);
-
-				return Requests.indexRequest()
-						.index("my-index")
-						.type("my-type")
-						.source(json);
-			}
-		}));
-
-
-		env.execute("Elasticsearch Example");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
new file mode 100644
index 0000000..d697c3c
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.examples;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
+ * you have a cluster named "elasticsearch" running or change the cluster name in the config map.
+ */
+public class ElasticsearchSinkExample {
+
+	public static void main(String[] args) throws Exception {
+		
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
+			@Override
+			public String map(Long value) throws Exception {
+				return "message #" + value;
+			}
+		});
+
+		Map<String, String> userConfig = new HashMap<>();
+		userConfig.put("cluster.name", "elasticsearch");
+		// This instructs the sink to emit after every element, otherwise they would be buffered
+		userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+		List<TransportAddress> transports = new ArrayList<>();
+		transports.add(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+		source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() {
+			@Override
+			public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+				indexer.add(createIndexRequest(element));
+			}
+		}));
+
+
+		env.execute("Elasticsearch Sink Example");
+	}
+
+	private static IndexRequest createIndexRequest(String element) {
+		Map<String, Object> json = new HashMap<>();
+		json.put("data", element);
+
+		return Requests.indexRequest()
+			.index("my-index")
+			.type("my-type")
+			.id(element)
+			.source(json);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties
index dc20726..2055184 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties
+++ b/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties
@@ -16,12 +16,12 @@
 # limitations under the License.
 ################################################################################
 
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
 
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.target=System.err
 log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
 log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/pom.xml b/flink-connectors/flink-connector-elasticsearch2/pom.xml
index 7aba36e..30396dd 100644
--- a/flink-connectors/flink-connector-elasticsearch2/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch2/pom.xml
@@ -52,17 +52,19 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>org.elasticsearch</groupId>
-			<artifactId>elasticsearch</artifactId>
-			<version>${elasticsearch.version}</version>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch-base_2.10</artifactId>
+			<version>${project.version}</version>
 		</dependency>
 
+		<!-- Override Elasticsearch version in base from 1.x to 2.x -->
 		<dependency>
-			<groupId>com.fasterxml.jackson.core</groupId>
-			<artifactId>jackson-core</artifactId>
+			<groupId>org.elasticsearch</groupId>
+			<artifactId>elasticsearch</artifactId>
+			<version>${elasticsearch.version}</version>
 		</dependency>
 
-		<!-- core dependencies -->
+		<!-- test dependencies -->
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
@@ -78,6 +80,15 @@ under the License.
 			<scope>test</scope>
 			<type>test-jar</type>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch-base_2.10</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
 	</dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java
deleted file mode 100644
index 650931f..0000000
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.elasticsearch2;
-
-import org.elasticsearch.action.ActionRequest;
-import org.elasticsearch.action.bulk.BulkProcessor;
-
-public class BulkProcessorIndexer implements RequestIndexer {
-	private final BulkProcessor bulkProcessor;
-
-	public BulkProcessorIndexer(BulkProcessor bulkProcessor) {
-		this.bulkProcessor = bulkProcessor;
-	}
-
-	@Override
-	public void add(ActionRequest... actionRequests) {
-		for (ActionRequest actionRequest : actionRequests) {
-			this.bulkProcessor.add(actionRequest);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
new file mode 100644
index 0000000..9407d9f
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch2;
+
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
+import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
+import org.apache.flink.util.Preconditions;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 2.x.
+ */
+public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge {
+
+	private static final long serialVersionUID = 2638252694744361079L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch2ApiCallBridge.class);
+
+	/**
+	 * User-provided transport addresses.
+	 *
+	 * We are using {@link InetSocketAddress} because {@link TransportAddress} is not serializable in Elasticsearch 2.x.
+	 */
+	private final List<InetSocketAddress> transportAddresses;
+
+	Elasticsearch2ApiCallBridge(List<InetSocketAddress> transportAddresses) {
+		Preconditions.checkArgument(transportAddresses != null && !transportAddresses.isEmpty());
+		this.transportAddresses = transportAddresses;
+	}
+
+	@Override
+	public Client createClient(Map<String, String> clientConfig) {
+		Settings settings = Settings.settingsBuilder().put(clientConfig).build();
+
+		TransportClient transportClient = TransportClient.builder().settings(settings).build();
+		for (TransportAddress address : ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) {
+			transportClient.addTransportAddress(address);
+		}
+
+		// verify that we actually are connected to a cluster
+		if (transportClient.connectedNodes().isEmpty()) {
+			throw new RuntimeException("Elasticsearch client is not connected to any Elasticsearch nodes!");
+		}
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Created Elasticsearch TransportClient with connected nodes {}", transportClient.connectedNodes());
+		}
+
+		return transportClient;
+	}
+
+	@Override
+	public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
+		if (!bulkItemResponse.isFailed()) {
+			return null;
+		} else {
+			return bulkItemResponse.getFailure().getCause();
+		}
+	}
+
+	@Override
+	public void cleanup() {
+		// nothing to cleanup
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
index e839589..a0abc51 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
@@ -16,55 +16,30 @@
  */
 package org.apache.flink.streaming.connectors.elasticsearch2;
 
-import com.google.common.collect.ImmutableList;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.util.Preconditions;
-import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.unit.TimeValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * Sink that emits its input elements in bulk to an Elasticsearch cluster.
+ * Elasticsearch 2.x sink that requests multiple {@link ActionRequest ActionRequests}
+ * against a cluster for each incoming element.
  *
  * <p>
- * When using the second constructor
- * {@link #ElasticsearchSink(java.util.Map, java.util.List, ElasticsearchSinkFunction)} a {@link TransportClient} will
- * be used.
+ * The sink internally uses a {@link TransportClient} to communicate with an Elasticsearch cluster.
+ * The sink will fail if no cluster can be connected to using the provided transport addresses passed to the constructor.
  *
  * <p>
- * <b>Attention: </b> When using the {@code TransportClient} the sink will fail if no cluster
- * can be connected to.
+ * The {@link Map} passed to the constructor is used to create the {@code TransportClient}. The config keys can be found
+ * in the <a href="https://www.elastic.io">Elasticsearch documentation</a>. An important setting is {@code cluster.name},
+ * which should be set to the name of the cluster that the sink should emit to.
  *
  * <p>
- * The {@link Map} passed to the constructor is forwarded to Elasticsearch when creating
- * {@link TransportClient}. The config keys can be found in the Elasticsearch
- * documentation. An important setting is {@code cluster.name}, this should be set to the name
- * of the cluster that the sink should emit to.
- *
- * <p>
- * Internally, the sink will use a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}.
+ * Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}.
  * This will buffer elements before sending a request to the cluster. The behaviour of the
  * {@code BulkProcessor} can be configured using these config keys:
  * <ul>
@@ -75,183 +50,41 @@ import java.util.concurrent.atomic.AtomicReference;
  * </ul>
  *
  * <p>
- * You also have to provide an {@link RequestIndexer}. This is used to create an
- * {@link IndexRequest} from an element that needs to be added to Elasticsearch. See
- * {@link RequestIndexer} for an example.
+ * You also have to provide an {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction}.
+ * This is used to create multiple {@link ActionRequest ActionRequests} for each incoming element. See the class level
+ * documentation of {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction} for an example.
  *
- * @param <T> Type of the elements emitted by this sink
+ * @param <T> Type of the elements handled by this sink
  */
-public class ElasticsearchSink<T> extends RichSinkFunction<T>  {
-
-	public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
-	public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
-	public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
 
 	private static final long serialVersionUID = 1L;
 
-	private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class);
-
-	/**
-	 * The user specified config map that we forward to Elasticsearch when we create the Client.
-	 */
-	private final Map<String, String> userConfig;
-
-	/**
-	 * The list of nodes that the TransportClient should connect to. This is null if we are using
-	 * an embedded Node to get a Client.
-	 */
-	private final List<InetSocketAddress> transportAddresses;
-
-	/**
-	 * The builder that is used to construct an {@link IndexRequest} from the incoming element.
-	 */
-	private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
-
-	/**
-	 * The Client that was either retrieved from a Node or is a TransportClient.
-	 */
-	private transient Client client;
-
-	/**
-	 * Bulk processor that was created using the client
-	 */
-	private transient BulkProcessor bulkProcessor;
-
-	/**
-	 * Bulk {@link org.elasticsearch.action.ActionRequest} indexer
-	 */
-	private transient RequestIndexer requestIndexer;
-
-	/**
-	 * This is set from inside the BulkProcessor listener if there where failures in processing.
-	 */
-	private final AtomicBoolean hasFailure = new AtomicBoolean(false);
-
-	/**
-	 * This is set from inside the BulkProcessor listener if a Throwable was thrown during processing.
-	 */
-	private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
-
 	/**
-	 * Creates a new ElasticsearchSink that connects to the cluster using a TransportClient.
+	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
 	 *
-	 * @param userConfig The map of user settings that are passed when constructing the TransportClient and BulkProcessor
-	 * @param transportAddresses The Elasticsearch Nodes to which to connect using a {@code TransportClient}
-	 * @param elasticsearchSinkFunction This is used to generate the ActionRequest from the incoming element
+	 * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor}
+	 * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
+	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
 	 *
+	 * @deprecated Deprecated since 1.2, to be removed at 2.0.
+	 *             Please use {@link ElasticsearchSink#ElasticsearchSink(Map, List, org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction)} instead.
 	 */
+	@Deprecated
 	public ElasticsearchSink(Map<String, String> userConfig, List<InetSocketAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
-		this.userConfig = userConfig;
-		this.elasticsearchSinkFunction = elasticsearchSinkFunction;
-		Preconditions.checkArgument(transportAddresses != null && transportAddresses.size() > 0);
-		this.transportAddresses = transportAddresses;
+		this(userConfig, transportAddresses, new OldNewElasticsearchSinkFunctionBridge<>(elasticsearchSinkFunction));
 	}
 
 	/**
-	 * Initializes the connection to Elasticsearch by creating a
-	 * {@link org.elasticsearch.client.transport.TransportClient}.
+	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
+	 *
+	 * @param userConfig The map of user settings that are passed when constructing the {@link TransportClient} and {@link BulkProcessor}
+	 * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
+	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
 	 */
-	@Override
-	public void open(Configuration configuration) {
-		List<TransportAddress> transportNodes;
-		transportNodes = new ArrayList<>(transportAddresses.size());
-		for (InetSocketAddress address : transportAddresses) {
-			transportNodes.add(new InetSocketTransportAddress(address));
-		}
-
-		Settings settings = Settings.settingsBuilder().put(userConfig).build();
-
-		TransportClient transportClient = TransportClient.builder().settings(settings).build();
-		for (TransportAddress transport: transportNodes) {
-			transportClient.addTransportAddress(transport);
-		}
-
-		// verify that we actually are connected to a cluster
-		ImmutableList<DiscoveryNode> nodes = ImmutableList.copyOf(transportClient.connectedNodes());
-		if (nodes.isEmpty()) {
-			throw new RuntimeException("Client is not connected to any Elasticsearch nodes!");
-		}
-
-		client = transportClient;
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Created Elasticsearch TransportClient {}", client);
-		}
-
-		BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(client, new BulkProcessor.Listener() {
-			@Override
-			public void beforeBulk(long executionId, BulkRequest request) {
-
-			}
-
-			@Override
-			public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
-				if (response.hasFailures()) {
-					for (BulkItemResponse itemResp : response.getItems()) {
-						if (itemResp.isFailed()) {
-							LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
-							failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
-						}
-					}
-					hasFailure.set(true);
-				}
-			}
-
-			@Override
-			public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
-				LOG.error(failure.getMessage());
-				failureThrowable.compareAndSet(null, failure);
-				hasFailure.set(true);
-			}
-		});
-
-		// This makes flush() blocking
-		bulkProcessorBuilder.setConcurrentRequests(0);
-
-		ParameterTool params = ParameterTool.fromMap(userConfig);
-
-		if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
-			bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
-		}
-
-		if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
-			bulkProcessorBuilder.setBulkSize(new ByteSizeValue(params.getInt(
-					CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB));
-		}
-
-		if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
-			bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
-		}
-
-		bulkProcessor = bulkProcessorBuilder.build();
-		requestIndexer = new BulkProcessorIndexer(bulkProcessor);
-	}
-
-	@Override
-	public void invoke(T element) {
-		elasticsearchSinkFunction.process(element, getRuntimeContext(), requestIndexer);
+	public ElasticsearchSink(Map<String, String> userConfig,
+							List<InetSocketAddress> transportAddresses,
+							org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+		super(new Elasticsearch2ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction);
 	}
-
-	@Override
-	public void close() {
-		if (bulkProcessor != null) {
-			bulkProcessor.close();
-			bulkProcessor = null;
-		}
-
-		if (client != null) {
-			client.close();
-		}
-
-		if (hasFailure.get()) {
-			Throwable cause = failureThrowable.get();
-			if (cause != null) {
-				throw new RuntimeException("An error occured in ElasticsearchSink.", cause);
-			} else {
-				throw new RuntimeException("An error occured in ElasticsearchSink.");
-			}
-		}
-
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java
index 55ba720..c474390 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java
@@ -1,13 +1,12 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -23,7 +22,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import java.io.Serializable;
 
 /**
- * Method that creates an {@link org.elasticsearch.action.ActionRequest} from an element in a Stream.
+ * Method that creates multiple {@link org.elasticsearch.action.ActionRequest}s from an element in a Stream.
  *
  * <p>
  * This is used by {@link ElasticsearchSink} to prepare elements for sending them to Elasticsearch.
@@ -54,7 +53,12 @@ import java.io.Serializable;
  * }</pre>
  *
  * @param <T> The type of the element handled by this {@code ElasticsearchSinkFunction}
+ *
+ * @deprecated Deprecated since 1.2, to be removed at 2.0.
+ *             This class has been deprecated due to package relocation.
+ *             Please use {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction} instead.
  */
+@Deprecated
 public interface ElasticsearchSinkFunction<T> extends Serializable, Function {
 	void process(T element, RuntimeContext ctx, RequestIndexer indexer);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewElasticsearchSinkFunctionBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewElasticsearchSinkFunctionBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewElasticsearchSinkFunctionBridge.java
new file mode 100644
index 0000000..c95fff5
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewElasticsearchSinkFunctionBridge.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch2;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+/**
+ * A dummy {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction} to bridge
+ * the migration from the deprecated {@link ElasticsearchSinkFunction}.
+ */
+class OldNewElasticsearchSinkFunctionBridge<T> implements org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<T> {
+
+	private static final long serialVersionUID = 2415651895272659448L;
+
+	private final ElasticsearchSinkFunction<T> deprecated;
+	private OldNewRequestIndexerBridge reusedRequestIndexerBridge;
+
+	OldNewElasticsearchSinkFunctionBridge(ElasticsearchSinkFunction<T> deprecated) {
+		this.deprecated = deprecated;
+	}
+
+	@Override
+	public void process(T element, RuntimeContext ctx, RequestIndexer indexer) {
+		if (reusedRequestIndexerBridge == null) {
+			reusedRequestIndexerBridge = new OldNewRequestIndexerBridge(indexer);
+		}
+		deprecated.process(element, ctx, reusedRequestIndexerBridge);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewRequestIndexerBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewRequestIndexerBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewRequestIndexerBridge.java
new file mode 100644
index 0000000..f42fb44
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewRequestIndexerBridge.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch2;
+
+import org.elasticsearch.action.ActionRequest;
+
+/**
+ * A dummy {@link org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer} to bridge
+ * the migration from the deprecated {@link RequestIndexer}.
+ */
+class OldNewRequestIndexerBridge implements RequestIndexer {
+
+	private static final long serialVersionUID = 4213982619497149416L;
+
+	private final org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer requestIndexer;
+
+	OldNewRequestIndexerBridge(org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer requestIndexer) {
+		this.requestIndexer = requestIndexer;
+	}
+
+	@Override
+	public void add(ActionRequest... actionRequests) {
+		requestIndexer.add(actionRequests);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java
index 144a87b..b2b3de4 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java
@@ -20,6 +20,12 @@ import org.elasticsearch.action.ActionRequest;
 
 import java.io.Serializable;
 
+/**
+ * @deprecated Deprecated since 1.2, to be removed at 2.0.
+ *             This class has been deprecated due to package relocation.
+ *             Please use {@link org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer} instead.
+ */
+@Deprecated
 public interface RequestIndexer extends Serializable {
 	void add(ActionRequest... actionRequests);
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
new file mode 100644
index 0000000..ddf3bd6
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSinkITCase;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+
+import java.io.File;
+
+/**
+ * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for Elasticsearch 2.x.
+ * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for integration tests.
+ */
+public class EmbeddedElasticsearchNodeEnvironmentImpl implements EmbeddedElasticsearchNodeEnvironment {
+
+	private Node node;
+
+	@Override
+	public void start(File tmpDataFolder, String clusterName) throws Exception {
+		if (node == null) {
+			node = NodeBuilder.nodeBuilder().settings(
+				Settings.settingsBuilder()
+					.put("path.home", tmpDataFolder.getParent())
+					.put("http.enabled", false)
+					.put("path.data", tmpDataFolder.getAbsolutePath()))
+				.clusterName(clusterName)
+				.node();
+
+			node.start();
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		if (node != null && !node.isClosed()) {
+			node.close();
+			node = null;
+		}
+	}
+
+	@Override
+	public Client getClient() {
+		if (node != null && !node.isClosed()) {
+			return node.client();
+		} else {
+			return null;
+		}
+	}
+
+}