You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/02/07 14:47:58 UTC

[3/4] flink git commit: [FLINK-4988] [elasticsearch] Restructure Elasticsearch connectors

[FLINK-4988] [elasticsearch] Restructure Elasticsearch connectors

This closes #3112.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5caaef8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5caaef8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5caaef8

Branch: refs/heads/master
Commit: b5caaef82add4a6f424094d526700c77b011724e
Parents: 8699b03
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu Jan 12 15:21:56 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Feb 7 22:45:45 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/elasticsearch.md            | 271 ++++++++++++-----
 docs/dev/connectors/elasticsearch2.md           | 173 -----------
 docs/dev/connectors/elasticsearch5.md           | 146 ----------
 docs/dev/connectors/filesystem_sink.md          |   2 +-
 docs/dev/connectors/nifi.md                     |   2 +-
 docs/dev/connectors/rabbitmq.md                 |   2 +-
 docs/dev/connectors/twitter.md                  |   2 +-
 docs/redirects/elasticsearch2.md                |   2 +-
 docs/redirects/elasticsearch2_2.md              |  24 ++
 .../flink-connector-elasticsearch-base/pom.xml  |  95 ++++++
 .../elasticsearch/BulkProcessorIndexer.java     |  44 +++
 .../ElasticsearchApiCallBridge.java             |  60 ++++
 .../elasticsearch/ElasticsearchSinkBase.java    | 237 +++++++++++++++
 .../ElasticsearchSinkFunction.java              |  71 +++++
 .../elasticsearch/RequestIndexer.java           |  37 +++
 .../elasticsearch/util/ElasticsearchUtils.java  |  51 ++++
 .../ElasticsearchSinkTestBase.java              | 186 ++++++++++++
 .../EmbeddedElasticsearchNodeEnvironment.java   |  55 ++++
 .../testutils/SourceSinkDataTestKit.java        | 112 +++++++
 .../src/test/resources/log4j-test.properties    |  27 ++
 .../src/test/resources/logback-test.xml         |  30 ++
 .../flink-connector-elasticsearch/pom.xml       |  27 +-
 .../Elasticsearch1ApiCallBridge.java            | 128 ++++++++
 .../elasticsearch/ElasticsearchSink.java        | 290 +++----------------
 .../elasticsearch/IndexRequestBuilder.java      |   8 +-
 .../IndexRequestBuilderWrapperFunction.java     |  41 +++
 .../elasticsearch/ElasticsearchSinkITCase.java  | 204 ++++++-------
 ...mbeddedElasticsearchNodeEnvironmentImpl.java |  68 +++++
 .../examples/ElasticsearchExample.java          |  80 -----
 .../examples/ElasticsearchSinkExample.java      |  84 ++++++
 .../src/test/resources/log4j-test.properties    |   6 +-
 .../flink-connector-elasticsearch2/pom.xml      |  23 +-
 .../elasticsearch2/BulkProcessorIndexer.java    |  35 ---
 .../Elasticsearch2ApiCallBridge.java            |  91 ++++++
 .../elasticsearch2/ElasticsearchSink.java       | 231 ++-------------
 .../ElasticsearchSinkFunction.java              |  24 +-
 .../OldNewElasticsearchSinkFunctionBridge.java  |  45 +++
 .../OldNewRequestIndexerBridge.java             |  41 +++
 .../elasticsearch2/RequestIndexer.java          |   8 +-
 ...mbeddedElasticsearchNodeEnvironmentImpl.java |  68 +++++
 .../elasticsearch2/ElasticsearchSinkITCase.java | 229 ++-------------
 .../examples/ElasticsearchExample.java          |  90 ------
 .../examples/ElasticsearchSinkExample.java      |  79 +++++
 .../src/test/resources/log4j-test.properties    |   6 +-
 .../flink-connector-elasticsearch5/pom.xml      |  95 +++++-
 .../elasticsearch5/BulkProcessorIndexer.java    |  35 ---
 .../Elasticsearch5ApiCallBridge.java            |  97 +++++++
 .../elasticsearch5/ElasticsearchSink.java       | 247 ++--------------
 .../ElasticsearchSinkFunction.java              |  60 ----
 .../elasticsearch5/RequestIndexer.java          |  25 --
 ...mbeddedElasticsearchNodeEnvironmentImpl.java |  81 ++++++
 .../elasticsearch5/ElasticsearchSinkITCase.java | 182 ++----------
 .../examples/ElasticsearchExample.java          |  83 ------
 .../examples/ElasticsearchSinkExample.java      |  81 ++++++
 .../src/test/resources/log4j-test.properties    |  27 ++
 .../src/test/resources/log4j2.properties        |  27 --
 flink-connectors/pom.xml                        |  16 +-
 57 files changed, 2584 insertions(+), 2007 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/docs/dev/connectors/elasticsearch.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md
index 3e8c68a..a40de68 100644
--- a/docs/dev/connectors/elasticsearch.md
+++ b/docs/dev/connectors/elasticsearch.md
@@ -23,130 +23,226 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This connector provides a Sink that can write to an
-[Elasticsearch](https://elastic.co/) Index. To use this connector, add the
-following dependency to your project:
-
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-elasticsearch{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
+This connector provides sinks that can request document actions to an
+[Elasticsearch](https://elastic.co/) Index. To use this connector, add one
+of the following dependencies to your project, depending on the version
+of the Elasticsearch installation:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left">Maven Dependency</th>
+      <th class="text-left">Supported since</th>
+      <th class="text-left">Elasticsearch version</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td>flink-connector-elasticsearch{{ site.scala_version_suffix }}</td>
+        <td>1.0.0</td>
+        <td>1.x</td>
+    </tr>
+    <tr>
+        <td>flink-connector-elasticsearch2{{ site.scala_version_suffix }}</td>
+        <td>1.0.0</td>
+        <td>2.x</td>
+    </tr>
+    <tr>
+        <td>flink-connector-elasticsearch5{{ site.scala_version_suffix }}</td>
+        <td>1.2.0</td>
+        <td>5.x</td>
+    </tr>
+  </tbody>
+</table>
 
 Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/dev/linking.html)
-for information about how to package the program with the libraries for
-cluster execution.
+distribution. See [here]({{site.baseurl}}/dev/linking.html) for information
+about how to package the program with the libraries for cluster execution.
 
 #### Installing Elasticsearch
 
 Instructions for setting up an Elasticsearch cluster can be found
 [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
 Make sure to set and remember a cluster name. This must be set when
-creating a Sink for writing to your cluster
+creating an `ElasticsearchSink` for requesting document actions against your cluster.
 
 #### Elasticsearch Sink
-The connector provides a Sink that can send data to an Elasticsearch Index.
-
-The sink can use two different methods for communicating with Elasticsearch:
-
-1. An embedded Node
-2. The TransportClient
 
-See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
-for information about the differences between the two modes.
+The `ElasticsearchSink` uses a `TransportClient` to communicate with an
+Elasticsearch cluster.
 
-This code shows how to create a sink that uses an embedded Node for
-communication:
+The example below shows how to configure and create a sink:
 
 <div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
+<div data-lang="java, Elasticsearch 1.x" markdown="1">
 {% highlight java %}
 DataStream<String> input = ...;
 
-Map<String, String> config = Maps.newHashMap();
+Map<String, String> config = new HashMap<>();
+config.put("cluster.name", "my-cluster-name");
 // This instructs the sink to emit after every element, otherwise they would be buffered
 config.put("bulk.flush.max.actions", "1");
-config.put("cluster.name", "my-cluster-name");
 
-input.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<String>() {
-    @Override
-    public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
-        Map<String, Object> json = new HashMap<>();
-        json.put("data", element);
+List<TransportAddress> transportAddresses = new ArrayList<String>();
+transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300));
+transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300));
 
+input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
+    public IndexRequest createIndexRequest(String element) {
+        Map<String, String> json = new HashMap<>();
+        json.put("data", element);
+    
         return Requests.indexRequest()
                 .index("my-index")
                 .type("my-type")
                 .source(json);
     }
+    
+    @Override
+    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+        indexer.add(createIndexRequest(element));
+    }
 }));
 {% endhighlight %}
 </div>
-<div data-lang="scala" markdown="1">
+<div data-lang="java, Elasticsearch 2.x / 5.x" markdown="1">
+{% highlight java %}
+DataStream<String> input = ...;
+
+Map<String, String> config = new HashMap<>();
+config.put("cluster.name", "my-cluster-name");
+// This instructs the sink to emit after every element, otherwise they would be buffered
+config.put("bulk.flush.max.actions", "1");
+
+List<InetSocketAddress> transportAddresses = new ArrayList<>();
+transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
+
+input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
+    public IndexRequest createIndexRequest(String element) {
+        Map<String, String> json = new HashMap<>();
+        json.put("data", element);
+    
+        return Requests.indexRequest()
+                .index("my-index")
+                .type("my-type")
+                .source(json);
+    }
+    
+    @Override
+    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+        indexer.add(createIndexRequest(element));
+    }
+}));{% endhighlight %}
+</div>
+<div data-lang="scala, Elasticsearch 1.x" markdown="1">
 {% highlight scala %}
 val input: DataStream[String] = ...
 
-val config = new util.HashMap[String, String]
+val config = new java.util.HashMap[String, String]
+config.put("cluster.name", "my-cluster-name")
+// This instructs the sink to emit after every element, otherwise they would be buffered
 config.put("bulk.flush.max.actions", "1")
+
+val transportAddresses = new java.util.ArrayList[TransportAddress]
+transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300))
+transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300))
+
+input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] {
+  def createIndexRequest(element: String): IndexRequest = {
+    val json = new java.util.HashMap[String, String]
+    json.put("data", element)
+    
+    return Requests.indexRequest()
+            .index("my-index")
+            .type("my-type")
+            .source(json);
+  }
+}))
+{% endhighlight %}
+</div>
+<div data-lang="scala, Elasticsearch 2.x / 5.x" markdown="1">
+{% highlight scala %}
+val input: DataStream[String] = ...
+
+val config = new java.util.HashMap[String, String]
 config.put("cluster.name", "my-cluster-name")
+// This instructs the sink to emit after every element, otherwise they would be buffered
+config.put("bulk.flush.max.actions", "1")
+
+val transportAddresses = new java.util.ArrayList[InetSocketAddress]
+transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
+transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300))
 
-text.addSink(new ElasticsearchSink(config, new IndexRequestBuilder[String] {
-  override def createIndexRequest(element: String, ctx: RuntimeContext): IndexRequest = {
-    val json = new util.HashMap[String, AnyRef]
+input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] {
+  def createIndexRequest(element: String): IndexRequest = {
+    val json = new java.util.HashMap[String, String]
     json.put("data", element)
-    println("SENDING: " + element)
-    Requests.indexRequest.index("my-index").`type`("my-type").source(json)
+    
+    return Requests.indexRequest()
+            .index("my-index")
+            .type("my-type")
+            .source(json);
   }
 }))
 {% endhighlight %}
 </div>
 </div>
 
-Note how a Map of Strings is used to configure the Sink. The configuration keys
-are documented in the Elasticsearch documentation
+Note how a `Map` of `String`s is used to configure the `ElasticsearchSink`.
+The configuration keys are documented in the Elasticsearch documentation
 [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
 Especially important is the `cluster.name` parameter that must correspond to
 the name of your cluster.
 
-Internally, the sink uses a `BulkProcessor` to send index requests to the cluster.
-This will buffer elements before sending a request to the cluster. The behaviour of the
-`BulkProcessor` can be configured using these config keys:
+Also note that the example only demonstrates performing a single index
+request for each incoming element. Generally, the `ElasticsearchSinkFunction`
+can be used to perform multiple requests of different types (ex.,
+`DeleteRequest`, `UpdateRequest`, etc.). 
+
+Internally, the sink uses a `BulkProcessor` to send action requests to the cluster.
+This will buffer elements before sending them in bulk to the cluster. The behaviour of the
+`BulkProcessor` can be set using these config keys in the provided `Map` configuration:
  * **bulk.flush.max.actions**: Maximum amount of elements to buffer
  * **bulk.flush.max.size.mb**: Maximum amount of data (in megabytes) to buffer
  * **bulk.flush.interval.ms**: Interval at which to flush data regardless of the other two
   settings in milliseconds
 
-This example code does the same, but with a `TransportClient`:
+#### Communication using Embedded Node (only for Elasticsearch 1.x)
+
+For Elasticsearch versions 1.x, communication using an embedded node is
+also supported. See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
+for information about the differences between communicating with Elasticsearch
+with an embedded node and a `TransportClient`.
+
+Below is an example of how to create an `ElasticsearchSink` use an
+embedded node instead of a `TransportClient`:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 DataStream<String> input = ...;
 
-Map<String, String> config = Maps.newHashMap();
+Map<String, String> config = new HashMap<>;
 // This instructs the sink to emit after every element, otherwise they would be buffered
 config.put("bulk.flush.max.actions", "1");
 config.put("cluster.name", "my-cluster-name");
 
-List<TransportAddress> transports = new ArrayList<String>();
-transports.add(new InetSocketTransportAddress("node-1", 9300));
-transports.add(new InetSocketTransportAddress("node-2", 9300));
-
-input.addSink(new ElasticsearchSink<>(config, transports, new IndexRequestBuilder<String>() {
-    @Override
-    public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
-        Map<String, Object> json = new HashMap<>();
+input.addSink(new ElasticsearchSink<>(config, new ElasticsearchSinkFunction<String>() {
+    public IndexRequest createIndexRequest(String element) {
+        Map<String, String> json = new HashMap<>();
         json.put("data", element);
-
+    
         return Requests.indexRequest()
                 .index("my-index")
                 .type("my-type")
                 .source(json);
     }
+    
+    @Override
+    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+        indexer.add(createIndexRequest(element));
+    }
 }));
 {% endhighlight %}
 </div>
@@ -154,27 +250,64 @@ input.addSink(new ElasticsearchSink<>(config, transports, new IndexRequestBuilde
 {% highlight scala %}
 val input: DataStream[String] = ...
 
-val config = new util.HashMap[String, String]
+val config = new java.util.HashMap[String, String]
 config.put("bulk.flush.max.actions", "1")
 config.put("cluster.name", "my-cluster-name")
 
-val transports = new ArrayList[String]
-transports.add(new InetSocketTransportAddress("node-1", 9300))
-transports.add(new InetSocketTransportAddress("node-2", 9300))
-
-text.addSink(new ElasticsearchSink(config, transports, new IndexRequestBuilder[String] {
-  override def createIndexRequest(element: String, ctx: RuntimeContext): IndexRequest = {
-    val json = new util.HashMap[String, AnyRef]
+input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String] {
+  def createIndexRequest(element: String): IndexRequest = {
+    val json = new java.util.HashMap[String, String]
     json.put("data", element)
-    println("SENDING: " + element)
-    Requests.indexRequest.index("my-index").`type`("my-type").source(json)
+    
+    return Requests.indexRequest()
+            .index("my-index")
+            .type("my-type")
+            .source(json);
   }
 }))
 {% endhighlight %}
 </div>
 </div>
 
-The difference is that we now need to provide a list of Elasticsearch Nodes
-to which the sink should connect using a `TransportClient`.
+The difference is that now we do not need to provide a list of addresses
+of Elasticsearch nodes.
 
 More information about Elasticsearch can be found [here](https://elastic.co).
+
+#### Packaging the Elasticsearch Connector into an Uber-Jar
+
+For the execution of your Flink program, it is recommended to build a
+so-called uber-jar (executable jar) containing all your dependencies
+(see [here]({{site.baseurl}}/dev/linking.html) for further information).
+
+However, when an uber-jar containing an Elasticsearch sink is executed,
+an `IllegalArgumentException` may occur, which is caused by conflicting
+files of Elasticsearch and it's dependencies in `META-INF/services`:
+
+```
+IllegalArgumentException[An SPI class of type org.apache.lucene.codecs.PostingsFormat with name 'Lucene50' does not exist.  You need to add the corresponding JAR file supporting this SPI to your classpath.  The current classpath supports the following names: [es090, completion090, XBloomFilter]]
+```
+
+If the uber-jar is built using Maven, this issue can be avoided by
+adding the following to the Maven POM file in the plugins section:
+
+~~~xml
+<plugin>
+    <groupId>org.apache.maven.plugins</groupId>
+    <artifactId>maven-shade-plugin</artifactId>
+    <version>2.4.3</version>
+    <executions>
+        <execution>
+            <phase>package</phase>
+            <goals>
+                <goal>shade</goal>
+            </goals>
+            <configuration>
+                <transformers>
+                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                </transformers>
+            </configuration>
+        </execution>
+    </executions>
+</plugin>
+~~~

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/docs/dev/connectors/elasticsearch2.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/elasticsearch2.md b/docs/dev/connectors/elasticsearch2.md
deleted file mode 100644
index af02c84..0000000
--- a/docs/dev/connectors/elasticsearch2.md
+++ /dev/null
@@ -1,173 +0,0 @@
----
-title: "Elasticsearch 2.x Connector"
-nav-title: Elasticsearch 2.x
-nav-parent_id: connectors
-nav-pos: 5
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-This connector provides a Sink that can write to an
-[Elasticsearch 2.x](https://elastic.co/) Index. To use this connector, add the
-following dependency to your project:
-
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-elasticsearch2{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-
-Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/dev/linking.html)
-for information about how to package the program with the libraries for
-cluster execution.
-
-#### Installing Elasticsearch 2.x
-
-Instructions for setting up an Elasticsearch cluster can be found
-[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
-Make sure to set and remember a cluster name. This must be set when
-creating a Sink for writing to your cluster
-
-#### Elasticsearch 2.x Sink
-The connector provides a Sink that can send data to an Elasticsearch 2.x Index.
-
-The sink communicates with Elasticsearch via Transport Client
-
-See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/transport-client.html)
-for information about the Transport Client.
-
-The code below shows how to create a sink that uses a `TransportClient` for communication:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-File dataDir = ....;
-
-DataStream<String> input = ...;
-
-Map<String, String> config = new HashMap<>();
-// This instructs the sink to emit after every element, otherwise they would be buffered
-config.put("bulk.flush.max.actions", "1");
-config.put("cluster.name", "my-cluster-name");
-
-List<InetSocketAddress> transports = new ArrayList<>();
-transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
-
-input.addSink(new ElasticsearchSink(config, transports, new ElasticsearchSinkFunction<String>() {
-  public IndexRequest createIndexRequest(String element) {
-    Map<String, String> json = new HashMap<>();
-    json.put("data", element);
-
-    return Requests.indexRequest()
-            .index("my-index")
-            .type("my-type")
-            .source(json);
-  }
-
-  @Override
-  public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
-    indexer.add(createIndexRequest(element));
-  }
-}));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val dataDir = ....;
-
-val input: DataStream[String] = ...
-
-val config = new util.HashMap[String, String]
-config.put("bulk.flush.max.actions", "1")
-config.put("cluster.name", "my-cluster-name")
-
-val transports = new ArrayList[String]
-transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
-transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
-
-input.addSink(new ElasticsearchSink(config, transports, new ElasticsearchSinkFunction[String] {
-  def createIndexRequest(element: String): IndexRequest = {
-    val json = new util.HashMap[String, AnyRef]
-    json.put("data", element)
-    Requests.indexRequest.index("my-index").`type`("my-type").source(json)
-  }
-
-  override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
-    indexer.add(createIndexRequest(element))
-  }
-}))
-{% endhighlight %}
-</div>
-</div>
-
-A Map of Strings is used to configure the Sink. The configuration keys
-are documented in the Elasticsearch documentation
-[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
-Especially important is the `cluster.name`. parameter that must correspond to
-the name of your cluster and with ElasticSearch 2x you also need to specify `path.home`.
-
-Internally, the sink uses a `BulkProcessor` to send Action requests to the cluster.
-This will buffer elements and Action Requests before sending to the cluster. The behaviour of the
-`BulkProcessor` can be configured using these config keys:
- * **bulk.flush.max.actions**: Maximum amount of elements to buffer
- * **bulk.flush.max.size.mb**: Maximum amount of data (in megabytes) to buffer
- * **bulk.flush.interval.ms**: Interval at which to flush data regardless of the other two
-  settings in milliseconds
-
-This now provides a list of Elasticsearch Nodes
-to which the sink should connect via a `TransportClient`.
-
-More information about Elasticsearch can be found [here](https://elastic.co).
-
-
-#### Packaging the Elasticsearch Connector into an Uber-jar
-
-For the execution of your Flink program,
-it is recommended to build a so-called uber-jar (executable jar) containing all your dependencies
-(see [here]({{site.baseurl}}/dev/linking.html) for further information).
-
-However,
-when an uber-jar containing an Elasticsearch sink is executed,
-an `IllegalArgumentException` may occur,
-which is caused by conflicting files of Elasticsearch and it's dependencies
-in `META-INF/services`:
-
-```
-IllegalArgumentException[An SPI class of type org.apache.lucene.codecs.PostingsFormat with name 'Lucene50' does not exist.  You need to add the corresponding JAR file supporting this SPI to your classpath.  The current classpath supports the following names: [es090, completion090, XBloomFilter]]
-```
-
-If the uber-jar is build by means of maven,
-this issue can be avoided by adding the following bits to the pom file:
-
-```
-<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
-    <resource>META-INF/services/org.apache.lucene.codecs.Codec</resource>
-</transformer>
-<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
-    <resource>META-INF/services/org.apache.lucene.codecs.DocValuesFormat</resource>
-</transformer>
-<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
-   <resource>META-INF/services/org.apache.lucene.codecs.PostingsFormat</resource>
-</transformer>
-```

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/docs/dev/connectors/elasticsearch5.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/elasticsearch5.md b/docs/dev/connectors/elasticsearch5.md
deleted file mode 100644
index 2673d86..0000000
--- a/docs/dev/connectors/elasticsearch5.md
+++ /dev/null
@@ -1,146 +0,0 @@
----
-title: "Elasticsearch 5.x Connector"
-nav-title: Elasticsearch 5.x
-nav-parent_id: connectors
-nav-pos: 6
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-This connector provides a Sink that can write to an
-[Elasticsearch 5.x](https://elastic.co/) Index. To use this connector, add the
-following dependency to your project:
-
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-elasticsearch5{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-
-Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/dev/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution)
-for information about how to package the program with the libraries for
-cluster execution.
-
-#### Installing Elasticsearch 5.x
-
-Instructions for setting up an Elasticsearch cluster can be found
-    [here](https://www.elastic.co/guide/en/elasticsearch/reference/5.x/setup.html).
-Make sure to set and remember a cluster name. This must be set when
-creating a Sink for writing to your cluster
-
-#### Elasticsearch 5.x Sink
-The connector provides a Sink that can send data to an Elasticsearch 5.x Index.
-
-The sink communicates with Elasticsearch via Transport Client
-
-See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.x/transport-client.html)
-for information about the Transport Client.
-
-The code below shows how to create a sink that uses a `TransportClient` for communication:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-File dataDir = ....;
-
-DataStream<String> input = ...;
-
-Map<String, String> esConfig = new HashMap<>();
-esConfig.put("cluster.name", "my-cluster-name");
-
-// This instructs the sink to emit after every element, otherwise they would be buffered
-Map<String, String> sinkConfig = new HashMap<>();
-sinkConfig.put("bulk.flush.max.actions", "1");
-
-List<InetSocketAddress> transports = new ArrayList<>();
-transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
-
-input.addSink(new ElasticsearchSink(esConfig, sinkConfig, transports, new ElasticsearchSinkFunction<String>() {
-  public IndexRequest createIndexRequest(String element) {
-    Map<String, String> json = new HashMap<>();
-    json.put("data", element);
-
-    return Requests.indexRequest()
-            .index("my-index")
-            .type("my-type")
-            .source(json);
-  }
-
-  @Override
-  public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
-    indexer.add(createIndexRequest(element));
-  }
-}));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val dataDir = ....;
-
-val input: DataStream[String] = ...
-
-val esConfig = new util.HashMap[String, String]
-esConfig.put("cluster.name", "my-cluster-name")
-
-val sinkConfig = new util.HashMap[String, String]
-sinkConfig.put("bulk.flush.max.actions", "1")
-
-val transports = new ArrayList[String]
-transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
-transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
-
-input.addSink(new ElasticsearchSink(esConfig, sinkConfig, transports, new ElasticsearchSinkFunction[String] {
-  def createIndexRequest(element: String): IndexRequest = {
-    val json = new util.HashMap[String, AnyRef]
-    json.put("data", element)
-    Requests.indexRequest.index("my-index").`type`("my-type").source(json)
-  }
-
-  override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
-    indexer.add(createIndexRequest(element))
-  }
-}))
-{% endhighlight %}
-</div>
-</div>
-
-The first Map of Strings is used to configure the Transport Client. The configuration keys
-are documented in the Elasticsearch documentation
-[here](https://www.elastic.co/guide/en/elasticsearch/reference/5.x/index.html).
-Especially important is the `cluster.name`. parameter that must correspond to
-the name of your cluster.
-
-The second Map of Strings is used to configure a `BulkProcessor` to send Action requests to the cluster.
-This will buffer elements and Action Requests before sending to the cluster. The behaviour of the
-`BulkProcessor` can be configured using these config keys:
- * **bulk.flush.max.actions**: Maximum amount of elements to buffer
- * **bulk.flush.max.size.mb**: Maximum amount of data (in megabytes) to buffer
- * **bulk.flush.interval.ms**: Interval at which to flush data regardless of the other two
-  settings in milliseconds
-
-This now provides a list of Elasticsearch Nodes
-to which the sink should connect via a `TransportClient`.
-
-More information about Elasticsearch can be found [here](https://elastic.co).
-

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/docs/dev/connectors/filesystem_sink.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/filesystem_sink.md b/docs/dev/connectors/filesystem_sink.md
index bcaeb17..67250f0 100644
--- a/docs/dev/connectors/filesystem_sink.md
+++ b/docs/dev/connectors/filesystem_sink.md
@@ -2,7 +2,7 @@
 title: "HDFS Connector"
 nav-title: Rolling File Sink
 nav-parent_id: connectors
-nav-pos: 7
+nav-pos: 5
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/docs/dev/connectors/nifi.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/nifi.md b/docs/dev/connectors/nifi.md
index 8223867..dbc1e8a 100644
--- a/docs/dev/connectors/nifi.md
+++ b/docs/dev/connectors/nifi.md
@@ -2,7 +2,7 @@
 title: "Apache NiFi Connector"
 nav-title: NiFi
 nav-parent_id: connectors
-nav-pos: 9
+nav-pos: 7
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/docs/dev/connectors/rabbitmq.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/rabbitmq.md b/docs/dev/connectors/rabbitmq.md
index b4da248..d3360e4 100644
--- a/docs/dev/connectors/rabbitmq.md
+++ b/docs/dev/connectors/rabbitmq.md
@@ -2,7 +2,7 @@
 title: "RabbitMQ Connector"
 nav-title: RabbitMQ
 nav-parent_id: connectors
-nav-pos: 8
+nav-pos: 6
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/docs/dev/connectors/twitter.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/twitter.md b/docs/dev/connectors/twitter.md
index f1fbbd4..5fb7d68 100644
--- a/docs/dev/connectors/twitter.md
+++ b/docs/dev/connectors/twitter.md
@@ -2,7 +2,7 @@
 title: "Twitter Connector"
 nav-title: Twitter
 nav-parent_id: connectors
-nav-pos: 10
+nav-pos: 8
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/docs/redirects/elasticsearch2.md
----------------------------------------------------------------------
diff --git a/docs/redirects/elasticsearch2.md b/docs/redirects/elasticsearch2.md
index 8442e52..f40199b 100644
--- a/docs/redirects/elasticsearch2.md
+++ b/docs/redirects/elasticsearch2.md
@@ -1,7 +1,7 @@
 ---
 title: "Elasticsearch2 Connector"
 layout: redirect
-redirect: /dev/connectors/elasticsearch2.html
+redirect: /dev/connectors/elasticsearch.html
 permalink: /apis/streaming/connectors/elasticsearch2.html
 ---
 <!--

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/docs/redirects/elasticsearch2_2.md
----------------------------------------------------------------------
diff --git a/docs/redirects/elasticsearch2_2.md b/docs/redirects/elasticsearch2_2.md
new file mode 100644
index 0000000..561ac69
--- /dev/null
+++ b/docs/redirects/elasticsearch2_2.md
@@ -0,0 +1,24 @@
+---
+title: "Elasticsearch2 Connector"
+layout: redirect
+redirect: /dev/connectors/elasticsearch.html
+permalink: /dev/connectors/elasticsearch2.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/pom.xml b/flink-connectors/flink-connector-elasticsearch-base/pom.xml
new file mode 100644
index 0000000..81652c4
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/pom.xml
@@ -0,0 +1,95 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+			xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+			xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connectors</artifactId>
+		<version>1.3-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-elasticsearch-base_2.10</artifactId>
+	<name>flink-connector-elasticsearch-base</name>
+
+	<packaging>jar</packaging>
+
+	<!-- Allow users to pass custom connector versions -->
+	<properties>
+		<elasticsearch.version>1.7.1</elasticsearch.version>
+	</properties>
+
+	<dependencies>
+
+		<!-- core dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.elasticsearch</groupId>
+			<artifactId>elasticsearch</artifactId>
+			<version>${elasticsearch.version}</version>
+		</dependency>
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
new file mode 100644
index 0000000..d802550
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}.
+ * {@link ActionRequest ActionRequests} will be buffered before sending a bulk request to the Elasticsearch cluster.
+ */
+class BulkProcessorIndexer implements RequestIndexer {
+
+	private static final long serialVersionUID = 6841162943062034253L;
+
+	private final BulkProcessor bulkProcessor;
+
+	BulkProcessorIndexer(BulkProcessor bulkProcessor) {
+		this.bulkProcessor = bulkProcessor;
+	}
+
+	@Override
+	public void add(ActionRequest... actionRequests) {
+		for (ActionRequest actionRequest : actionRequests) {
+			this.bulkProcessor.add(actionRequest);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
new file mode 100644
index 0000000..6298a85
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.client.Client;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * An {@link ElasticsearchApiCallBridge} is used to bridge incompatible Elasticsearch Java API calls across different versions.
+ * This includes calls to create Elasticsearch clients, handle failed item responses, etc. Any incompatible Elasticsearch
+ * Java APIs should be bridged using this interface.
+ *
+ * Implementations are allowed to be stateful. For example, for Elasticsearch 1.x, since connecting via an embedded node
+ * is allowed, the call bridge will hold reference to the created embedded node. Each instance of the sink will hold
+ * exactly one instance of the call bridge, and state cleanup is performed when the sink is closed.
+ */
+public interface ElasticsearchApiCallBridge extends Serializable {
+
+	/**
+	 * Creates an Elasticsearch {@link Client}.
+	 *
+	 * @param clientConfig The configuration to use when constructing the client.
+	 * @return The created client.
+	 */
+	Client createClient(Map<String, String> clientConfig);
+
+	/**
+	 * Extracts the cause of failure of a bulk item action.
+	 *
+	 * @param bulkItemResponse the bulk item response to extract cause of failure
+	 * @return the extracted {@link Throwable} from the response ({@code null} is the response is successful).
+	 */
+	@Nullable Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse);
+
+	/**
+	 * Perform any necessary state cleanup.
+	 */
+	void cleanup();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
new file mode 100644
index 0000000..6a2d65f
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all Flink Elasticsearch Sinks.
+ *
+ * <p>
+ * This class implements the common behaviour across Elasticsearch versions, such as
+ * the use of an internal {@link BulkProcessor} to buffer multiple {@link ActionRequest}s before
+ * sending the requests to the cluster, as well as passing input records to the user provided
+ * {@link ElasticsearchSinkFunction} for processing.
+ *
+ * <p>
+ * The version specific API calls for different Elasticsearch versions should be defined by a concrete implementation of
+ * a {@link ElasticsearchApiCallBridge}, which is provided to the constructor of this class. This call bridge is used,
+ * for example, to create a Elasticsearch {@link Client}, handle failed item responses, etc.
+ *
+ * @param <T> Type of the elements handled by this sink
+ */
+public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
+
+	private static final long serialVersionUID = -1007596293618451942L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkBase.class);
+
+	// ------------------------------------------------------------------------
+	//  Internal bulk processor configuration
+	// ------------------------------------------------------------------------
+
+	public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
+	public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
+	public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
+
+	private final Integer bulkProcessorFlushMaxActions;
+	private final Integer bulkProcessorFlushMaxSizeMb;
+	private final Integer bulkProcessorFlushIntervalMillis;
+
+	// ------------------------------------------------------------------------
+	//  User-facing API and configuration
+	// ------------------------------------------------------------------------
+
+	/** The user specified config map that we forward to Elasticsearch when we create the {@link Client}. */
+	private final Map<String, String> userConfig;
+
+	/** The function that is used to construct mulitple {@link ActionRequest ActionRequests} from each incoming element. */
+	private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
+
+	/** Provided to the user via the {@link ElasticsearchSinkFunction} to add {@link ActionRequest ActionRequests}. */
+	private transient BulkProcessorIndexer requestIndexer;
+
+	// ------------------------------------------------------------------------
+	//  Internals for the Flink Elasticsearch Sink
+	// ------------------------------------------------------------------------
+
+	/** Call bridge for different version-specfic */
+	private final ElasticsearchApiCallBridge callBridge;
+
+	/** Elasticsearch client created using the call bridge. */
+	private transient Client client;
+
+	/** Bulk processor to buffer and send requests to Elasticsearch, created using the client. */
+	private transient BulkProcessor bulkProcessor;
+
+	/**
+	 * This is set from inside the {@link BulkProcessor.Listener} if a {@link Throwable} was thrown in callbacks.
+	 */
+	private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
+
+	public ElasticsearchSinkBase(
+		ElasticsearchApiCallBridge callBridge,
+		Map<String, String> userConfig,
+		ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+
+		this.callBridge = checkNotNull(callBridge);
+		this.elasticsearchSinkFunction = checkNotNull(elasticsearchSinkFunction);
+
+		// we eagerly check if the user-provided sink function is serializable;
+		// otherwise, if it isn't serializable, users will merely get a non-informative error message
+		// "ElasticsearchSinkBase is not serializable"
+		try {
+			InstantiationUtil.serializeObject(elasticsearchSinkFunction);
+		} catch (Exception e) {
+			throw new IllegalArgumentException(
+				"The implementation of the provided ElasticsearchSinkFunction is not serializable. " +
+				"The object probably contains or references non serializable fields.");
+		}
+
+		checkNotNull(userConfig);
+
+		// extract and remove bulk processor related configuration from the user-provided config,
+		// so that the resulting user config only contains configuration related to the Elasticsearch client.
+		ParameterTool params = ParameterTool.fromMap(userConfig);
+
+		if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
+			bulkProcessorFlushMaxActions = params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS);
+			userConfig.remove(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS);
+		} else {
+			bulkProcessorFlushMaxActions = null;
+		}
+
+		if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
+			bulkProcessorFlushMaxSizeMb = params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB);
+			userConfig.remove(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB);
+		} else {
+			bulkProcessorFlushMaxSizeMb = null;
+		}
+
+		if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
+			bulkProcessorFlushIntervalMillis = params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
+			userConfig.remove(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
+		} else {
+			bulkProcessorFlushIntervalMillis = null;
+		}
+
+		this.userConfig = userConfig;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		client = callBridge.createClient(userConfig);
+
+		BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
+			client,
+			new BulkProcessor.Listener() {
+				@Override
+				public void beforeBulk(long executionId, BulkRequest request) { }
+
+				@Override
+				public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+					if (response.hasFailures()) {
+						for (BulkItemResponse itemResp : response.getItems()) {
+							Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
+							if (failure != null) {
+								LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
+								failureThrowable.compareAndSet(null, failure);
+							}
+						}
+					}
+				}
+
+				@Override
+				public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+					LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure);
+					failureThrowable.compareAndSet(null, failure);
+				}
+			}
+		);
+
+		// This makes flush() blocking
+		bulkProcessorBuilder.setConcurrentRequests(0);
+
+		if (bulkProcessorFlushMaxActions != null) {
+			bulkProcessorBuilder.setBulkActions(bulkProcessorFlushMaxActions);
+		}
+
+		if (bulkProcessorFlushMaxSizeMb != null) {
+			bulkProcessorBuilder.setBulkSize(new ByteSizeValue(bulkProcessorFlushMaxSizeMb, ByteSizeUnit.MB));
+		}
+
+		if (bulkProcessorFlushIntervalMillis != null) {
+			bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(bulkProcessorFlushIntervalMillis));
+		}
+
+		bulkProcessor = bulkProcessorBuilder.build();
+		requestIndexer = new BulkProcessorIndexer(bulkProcessor);
+	}
+
+	@Override
+	public void invoke(T value) throws Exception {
+		// if bulk processor callbacks have previously reported an error, we rethrow the error and fail the sink
+		checkErrorAndRethrow();
+
+		elasticsearchSinkFunction.process(value, getRuntimeContext(), requestIndexer);
+	}
+
+	@Override
+	public void close() throws Exception {
+		if (bulkProcessor != null) {
+			bulkProcessor.close();
+			bulkProcessor = null;
+		}
+
+		if (client != null) {
+			client.close();
+			client = null;
+		}
+
+		callBridge.cleanup();
+
+		// make sure any errors from callbacks are rethrown
+		checkErrorAndRethrow();
+	}
+
+	private void checkErrorAndRethrow() {
+		Throwable cause = failureThrowable.get();
+		if (cause != null) {
+			throw new RuntimeException("An error occured in ElasticsearchSink.", cause);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
new file mode 100644
index 0000000..1e20a0a
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.elasticsearch.action.ActionRequest;
+
+import java.io.Serializable;
+
+/**
+ * Creates multiple {@link ActionRequest ActionRequests} from an element in a stream.
+ *
+ * <p>
+ * This is used by sinks to prepare elements for sending them to Elasticsearch.
+ *
+ * <p>
+ * Example:
+ *
+ * <pre>{@code
+ *					private static class TestElasticSearchSinkFunction implements
+ *						ElasticsearchSinkFunction<Tuple2<Integer, String>> {
+ *
+ *					public IndexRequest createIndexRequest(Tuple2<Integer, String> element) {
+ *						Map<String, Object> json = new HashMap<>();
+ *						json.put("data", element.f1);
+ *
+ *						return Requests.indexRequest()
+ *							.index("my-index")
+ *							.type("my-type")
+ *							.id(element.f0.toString())
+ *							.source(json);
+ *						}
+ *
+ *				public void process(Tuple2<Integer, String> element, RuntimeContext ctx, RequestIndexer indexer) {
+ *					indexer.add(createIndexRequest(element));
+ *				}
+ *		}
+ *
+ * }</pre>
+ *
+ * @param <T> The type of the element handled by this {@code ElasticsearchSinkFunction}
+ */
+public interface ElasticsearchSinkFunction<T> extends Serializable, Function {
+
+	/**
+	 * Process the incoming element to produce multiple {@link ActionRequest ActionsRequests}.
+	 * The produced requests should be added to the provided {@link RequestIndexer}.
+	 *
+	 * @param element incoming element to process
+	 * @param ctx     runtime context containing information about the sink instance
+	 * @param indexer request indexer that {@code ActionRequest} should be added to
+	 */
+	void process(T element, RuntimeContext ctx, RequestIndexer indexer);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
new file mode 100644
index 0000000..4587a80
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.elasticsearch.action.ActionRequest;
+
+import java.io.Serializable;
+
+/**
+ * Users add multiple {@link ActionRequest ActionRequests} to a {@link RequestIndexer} to prepare
+ * them for sending to an Elasticsearch cluster.
+ */
+public interface RequestIndexer extends Serializable {
+
+	/**
+	 * Add multiple {@link ActionRequest} to the indexer to prepare for sending requests to Elasticsearch.
+	 *
+	 * @param actionRequests The multiple {@link ActionRequest} to add.
+	 */
+	void add(ActionRequest... actionRequests);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/ElasticsearchUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/ElasticsearchUtils.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/ElasticsearchUtils.java
new file mode 100644
index 0000000..9776c4c
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/ElasticsearchUtils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.util;
+
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Suite of utility methods for Elasticsearch.
+ */
+public class ElasticsearchUtils {
+
+	/**
+	 * Utility method to convert a {@link List} of {@link InetSocketAddress} to Elasticsearch {@link TransportAddress}.
+	 *
+	 * @param inetSocketAddresses The list of {@link InetSocketAddress} to convert.
+	 */
+	public static List<TransportAddress> convertInetSocketAddresses(List<InetSocketAddress> inetSocketAddresses) {
+		if (inetSocketAddresses == null) {
+			return null;
+		} else {
+			List<TransportAddress> converted;
+			converted = new ArrayList<>(inetSocketAddresses.size());
+			for (InetSocketAddress address : inetSocketAddresses) {
+				converted.add(new InetSocketTransportAddress(address));
+			}
+			return converted;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
new file mode 100644
index 0000000..2f9e4c1
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Environment preparation and suite of tests for version-specific {@link ElasticsearchSinkBase} implementations.
+ */
+public abstract class ElasticsearchSinkTestBase extends StreamingMultipleProgramsTestBase {
+
+	protected final static String CLUSTER_NAME = "test-cluster";
+
+	protected static EmbeddedElasticsearchNodeEnvironment embeddedNodeEnv;
+
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@BeforeClass
+	public static void prepare() throws Exception {
+
+		LOG.info("-------------------------------------------------------------------------");
+		LOG.info("    Starting embedded Elasticsearch node ");
+		LOG.info("-------------------------------------------------------------------------");
+
+		// dynamically load version-specific implementation of the Elasticsearch embedded node environment
+		Class<?> clazz = Class.forName(
+			"org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironmentImpl");
+		embeddedNodeEnv = (EmbeddedElasticsearchNodeEnvironment) InstantiationUtil.instantiate(clazz);
+
+		embeddedNodeEnv.start(tempFolder.newFolder(), CLUSTER_NAME);
+
+	}
+
+	@AfterClass
+	public static void shutdown() throws Exception {
+
+		LOG.info("-------------------------------------------------------------------------");
+		LOG.info("    Shutting down embedded Elasticsearch node ");
+		LOG.info("-------------------------------------------------------------------------");
+
+		embeddedNodeEnv.close();
+
+	}
+
+	/**
+	 * Tests that the Elasticsearch sink works properly using a {@link TransportClient}.
+	 */
+	public void runTransportClientTest() throws Exception {
+		final String index = "transport-client-test-index";
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
+
+		Map<String, String> userConfig = new HashMap<>();
+		// This instructs the sink to emit after every element, otherwise they would be buffered
+		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+		userConfig.put("cluster.name", CLUSTER_NAME);
+
+		source.addSink(createElasticsearchSinkForEmbeddedNode(
+			userConfig, new SourceSinkDataTestKit.TestElasticsearchSinkFunction(index)));
+
+		env.execute("Elasticsearch TransportClient Test");
+
+		// verify the results
+		Client client = embeddedNodeEnv.getClient();
+		SourceSinkDataTestKit.verifyProducedSinkData(client, index);
+
+		client.close();
+	}
+
+	/**
+	 * Tests that the Elasticsearch sink fails eagerly if the provided list of transport addresses is {@code null}.
+	 */
+	public void runNullTransportClientTest() throws Exception {
+		Map<String, String> userConfig = new HashMap<>();
+		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+		userConfig.put("cluster.name", "my-transport-client-cluster");
+
+		try {
+			createElasticsearchSink(userConfig, null, new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
+		} catch(IllegalArgumentException expectedException) {
+			// test passes
+			return;
+		}
+
+		fail();
+	}
+
+	/**
+	 * Tests that the Elasticsearch sink fails eagerly if the provided list of transport addresses is empty.
+	 */
+	public void runEmptyTransportClientTest() throws Exception {
+		Map<String, String> userConfig = new HashMap<>();
+		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+		userConfig.put("cluster.name", "my-transport-client-cluster");
+
+		try {
+			createElasticsearchSink(
+				userConfig,
+				Collections.<InetSocketAddress>emptyList(),
+				new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
+		} catch(IllegalArgumentException expectedException) {
+			// test passes
+			return;
+		}
+
+		fail();
+	}
+
+	/**
+	 * Tests whether the Elasticsearch sink fails when there is no cluster to connect to.
+	 */
+	public void runTransportClientFailsTest() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
+
+		Map<String, String> userConfig = new HashMap<>();
+		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+		userConfig.put("cluster.name", "my-transport-client-cluster");
+
+		source.addSink(createElasticsearchSinkForEmbeddedNode(
+			userConfig, new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test")));
+
+		try {
+			env.execute("Elasticsearch Transport Client Test");
+		} catch(JobExecutionException expectedException) {
+			assertTrue(expectedException.getCause().getMessage().contains("not connected to any Elasticsearch nodes"));
+			return;
+		}
+
+		fail();
+	}
+
+	/** Creates a version-specific Elasticsearch sink, using arbitrary transport addresses */
+	protected abstract <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig,
+																			List<InetSocketAddress> transportAddresses,
+																			ElasticsearchSinkFunction<T> elasticsearchSinkFunction);
+
+	/**
+	 * Creates a version-specific Elasticsearch sink to connect to a local embedded Elasticsearch node.
+	 *
+	 * This case is singled out from {@link ElasticsearchSinkTestBase#createElasticsearchSink(Map, List, ElasticsearchSinkFunction)}
+	 * because the Elasticsearch Java API to do so is incompatible across different versions.
+	 */
+	protected abstract <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(
+		Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
new file mode 100644
index 0000000..f59eb03
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.elasticsearch.client.Client;
+
+import java.io.File;
+
+/**
+ * The {@link EmbeddedElasticsearchNodeEnvironment} is used in integration tests to manage Elasticsearch embedded nodes.
+ *
+ * NOTE: In order for {@link ElasticsearchSinkTestBase} to dynamically load version-specific implementations
+ *       for the tests, concrete implementations must be named {@code EmbeddedElasticsearchNodeEnvironmentImpl}. It must
+ *       also be located under the same package. The intentional package-private accessibility of this interface
+ *       enforces that.
+ */
+interface EmbeddedElasticsearchNodeEnvironment {
+
+	/**
+	 * Start an embedded Elasticsearch node instance.
+	 * Calling this method multiple times consecutively should not restart the embedded node.
+	 *
+	 * @param tmpDataFolder The temporary data folder for the embedded node to use.
+	 * @param clusterName The name of the cluster that the embedded node should be configured with.
+	 */
+	void start(File tmpDataFolder, String clusterName) throws Exception;
+
+	/**
+	 * Close the embedded node, if previously started.
+	 */
+	void close() throws Exception;
+
+	/**
+	 * Returns a client to communicate with the embedded node.
+	 *
+	 * @return Client to communicate with the embedded node.
+	 *         Returns {@code null} if the embedded node wasn't started or is closed.
+	 */
+	Client getClient();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/SourceSinkDataTestKit.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/SourceSinkDataTestKit.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/SourceSinkDataTestKit.java
new file mode 100644
index 0000000..55a48fa
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/SourceSinkDataTestKit.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.testutils;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
+import org.junit.Assert;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class contains utilities and a pre-defined source function and
+ * Elasticsearch Sink function used to simulate and verify data used in tests.
+ */
+public class SourceSinkDataTestKit {
+
+	private static final int NUM_ELEMENTS = 20;
+
+	private static final String DATA_PREFIX = "message #";
+	private static final String DATA_FIELD_NAME = "data";
+
+	private static final String TYPE_NAME = "flink-es-test-type";
+
+	/**
+	 * A {@link SourceFunction} that generates the elements (id, "message #" + id) with id being 0 - 20.
+	 */
+	public static class TestDataSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
+		private static final long serialVersionUID = 1L;
+
+		private volatile boolean running = true;
+
+		@Override
+		public void run(SourceFunction.SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
+			for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+				ctx.collect(Tuple2.of(i, DATA_PREFIX + i));
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
+
+	/**
+	 * A {@link ElasticsearchSinkFunction} that indexes each element it receives to a sepecified Elasticsearch index.
+	 */
+	public static class TestElasticsearchSinkFunction implements ElasticsearchSinkFunction<Tuple2<Integer, String>> {
+		private static final long serialVersionUID = 1L;
+
+		private final String index;
+
+		/**
+		 * Create the sink function, specifying a target Elasticsearch index.
+		 *
+		 * @param index Name of the target Elasticsearch index.
+		 */
+		public TestElasticsearchSinkFunction(String index) {
+			this.index = index;
+		}
+
+		public IndexRequest createIndexRequest(Tuple2<Integer, String> element) {
+			Map<String, Object> json = new HashMap<>();
+			json.put(DATA_FIELD_NAME, element.f1);
+
+			return new IndexRequest(index, TYPE_NAME, element.f0.toString()).source(json);
+		}
+
+		@Override
+		public void process(Tuple2<Integer, String> element, RuntimeContext ctx, RequestIndexer indexer) {
+			indexer.add(createIndexRequest(element));
+		}
+	}
+
+	/**
+	 * Verify the results in an Elasticsearch index. The results must first be produced into the index
+	 * using a {@link TestElasticsearchSinkFunction};
+	 *
+	 * @param client The client to use to connect to Elasticsearch
+	 * @param index The index to check
+	 */
+	public static void verifyProducedSinkData(Client client, String index) {
+		for (int i = 0; i < NUM_ELEMENTS; i++) {
+			GetResponse response = client.get(new GetRequest(index, TYPE_NAME, Integer.toString(i))).actionGet();
+			Assert.assertEquals(DATA_PREFIX + i, response.getSource().get(DATA_FIELD_NAME));
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch-base/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-elasticsearch-base/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2055184
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch-base/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/resources/logback-test.xml b/flink-connectors/flink-connector-elasticsearch-base/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/pom.xml b/flink-connectors/flink-connector-elasticsearch/pom.xml
index b2d7284..14f28d0 100644
--- a/flink-connectors/flink-connector-elasticsearch/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch/pom.xml
@@ -52,9 +52,9 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>org.elasticsearch</groupId>
-			<artifactId>elasticsearch</artifactId>
-			<version>${elasticsearch.version}</version>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch-base_2.10</artifactId>
+			<version>${project.version}</version>
 		</dependency>
 
 		<!-- test dependencies -->
@@ -73,18 +73,15 @@ under the License.
 			<scope>test</scope>
 			<type>test-jar</type>
 		</dependency>
-	</dependencies>
 
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-surefire-plugin</artifactId>
-				<configuration>
-					<rerunFailingTestsCount>3</rerunFailingTestsCount>
-				</configuration>
-			</plugin>
-		</plugins>
-	</build>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch-base_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+	</dependencies>
 
 </project>