You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/02/24 15:05:23 UTC

[2/3] flink git commit: [FLINK-5353] [elasticsearch] User-provided failure handler for ElasticsearchSink

[FLINK-5353] [elasticsearch] User-provided failure handler for ElasticsearchSink

This commit fixes both FLINK-5353 and FLINK-5122. It allows users to implement a
failure handler to control how failed action requests are dealt with.

The commit also includes general improvements to FLINK-5122:
1. Use the built-in backoff functionality in the Elasticsearch BulkProcessor (not
available for Elasticsearch 1.x)
2. Migrate the `checkErrorAndRetryBulk` functionality to the new failure handler


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3743e898
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3743e898
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3743e898

Branch: refs/heads/master
Commit: 3743e898104d79a9813d444d38fa9f86617bb5ef
Parents: aaac7c2
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Jan 30 13:55:26 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Feb 24 22:58:40 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/elasticsearch.md            | 121 +++++++++++--
 .../ActionRequestFailureHandler.java            |  72 ++++++++
 .../ElasticsearchApiCallBridge.java             |  12 ++
 .../elasticsearch/ElasticsearchSinkBase.java    | 174 ++++++++++++-------
 .../util/NoOpActionRequestFailureHandler.java   |  37 ++++
 .../Elasticsearch1ApiCallBridge.java            |  10 ++
 .../elasticsearch/ElasticsearchSink.java        |  37 +++-
 .../Elasticsearch2ApiCallBridge.java            |  31 ++++
 .../elasticsearch2/ElasticsearchSink.java       |  29 +++-
 .../Elasticsearch5ApiCallBridge.java            |  31 ++++
 .../elasticsearch5/ElasticsearchSink.java       |  31 +++-
 11 files changed, 499 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/docs/dev/connectors/elasticsearch.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md
index 4388b9a..2ca1f9b 100644
--- a/docs/dev/connectors/elasticsearch.md
+++ b/docs/dev/connectors/elasticsearch.md
@@ -23,6 +23,9 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+* This will be replaced by the TOC
+{:toc}
+
 This connector provides sinks that can request document actions to an
 [Elasticsearch](https://elastic.co/) Index. To use this connector, add one
 of the following dependencies to your project, depending on the version
@@ -59,14 +62,14 @@ Note that the streaming connectors are currently not part of the binary
 distribution. See [here]({{site.baseurl}}/dev/linking.html) for information
 about how to package the program with the libraries for cluster execution.
 
-#### Installing Elasticsearch
+## Installing Elasticsearch
 
 Instructions for setting up an Elasticsearch cluster can be found
 [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
 Make sure to set and remember a cluster name. This must be set when
 creating an `ElasticsearchSink` for requesting document actions against your cluster.
 
-#### Elasticsearch Sink
+## Elasticsearch Sink
 
 The `ElasticsearchSink` uses a `TransportClient` to communicate with an
 Elasticsearch cluster.
@@ -200,15 +203,13 @@ request for each incoming element. Generally, the `ElasticsearchSinkFunction`
 can be used to perform multiple requests of different types (ex.,
 `DeleteRequest`, `UpdateRequest`, etc.). 
 
-Internally, the sink uses a `BulkProcessor` to send action requests to the cluster.
-This will buffer elements before sending them in bulk to the cluster. The behaviour of the
-`BulkProcessor` can be set using these config keys in the provided `Map` configuration:
- * **bulk.flush.max.actions**: Maximum amount of elements to buffer
- * **bulk.flush.max.size.mb**: Maximum amount of data (in megabytes) to buffer
- * **bulk.flush.interval.ms**: Interval at which to flush data regardless of the other two
-  settings in milliseconds
+Internally, each parallel instance of the Flink Elasticsearch Sink uses
+a `BulkProcessor` to send action requests to the cluster.
+This will buffer elements before sending them in bulk to the cluster. The `BulkProcessor`
+executes bulk requests one at a time, i.e. there will be no two concurrent
+flushes of the buffered actions in progress.
 
-#### Communication using Embedded Node (only for Elasticsearch 1.x)
+### Communication using Embedded Node (only for Elasticsearch 1.x)
 
 For Elasticsearch versions 1.x, communication using an embedded node is
 also supported. See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
@@ -272,14 +273,106 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String
 The difference is that now we do not need to provide a list of addresses
 of Elasticsearch nodes.
 
-Optionally, the sink can try to re-execute the bulk request when the error
-message matches certain patterns indicating a timeout or a overloaded cluster.
-This behaviour is disabled by default and can be enabled by setting `checkErrorAndRetryBulk(true)`.
+### Handling Failing Elasticsearch Requests
+
+Elasticsearch action requests may fail due to a variety of reasons, including
+temporarily saturated node queue capacity or malformed documents to be indexed.
+The Flink Elasticsearch Sink allows the user to specify how request
+failures are handled, by simply implementing an `ActionRequestFailureHandler` and
+providing it to the constructor.
+
+Below is an example:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<String> input = ...;
+
+input.addSink(new ElasticsearchSink<>(
+    config, transportAddresses,
+    new ElasticsearchSinkFunction<String>() {...},
+    new ActionRequestFailureHandler() {
+        @Override
+        boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
+            // this example uses Apache Commons to search for nested exceptions
+            
+            if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) {
+                // full queue; re-add document for indexing
+                indexer.add(action);
+                return false;
+            } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) >= 0) {
+                // malformed document; simply drop request without failing sink
+                return false;
+            } else {
+                // for all other failures, fail the sink
+                return true;
+            }
+        }
+}));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[String] = ...
+
+input.addSink(new ElasticsearchSink(
+    config, transportAddresses,
+    new ElasticsearchSinkFunction[String] {...},
+    new ActionRequestFailureHandler {
+        override def onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
+            // this example uses Apache Commons to search for nested exceptions
+
+            if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) {
+                // full queue; re-add document for indexing
+                indexer.add(action)
+                return false
+            } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) {
+                // malformed document; simply drop request without failing sink
+                return false
+            } else {
+                // for all other failures, fail the sink
+                return true
+            }
+        }
+}))
+{% endhighlight %}
+</div>
+</div>
 
+The above example will let the sink re-add requests that failed due to
+queue capacity saturation and drop requests with malformed documents, without
+failing the sink. For all other failures, the sink will fail. If a `ActionRequestFailureHandler`
+is not provided to the constructor, the sink will fail for any kind of error.
+
+Note that `onFailure` is called for failures that still occur only after the
+`BulkProcessor` internally finishes all backoff retry attempts.
+By default, the `BulkProcessor` retries to a maximum of 8 attempts with
+an exponential backoff. For more information on the behaviour of the
+internal `BulkProcessor` and how to configure it, please see the following section.
+ 
+### Configuring the Internal Bulk Processor
+
+The internal `BulkProcessor` can be further configured for its behaviour
+on how buffered action requests are flushed, by setting the following values in
+the provided `Map<String, String>`:
+
+ * **bulk.flush.max.actions**: Maximum amount of actions to buffer before flushing.
+ * **bulk.flush.max.size.mb**: Maximum size of data (in megabytes) to buffer before flushing.
+ * **bulk.flush.interval.ms**: Interval at which to flush regardless of the amount or size of buffered actions.
+ 
+For versions 2.x and above, configuring how temporary request errors are
+retried is also supported:
+ 
+ * **bulk.flush.backoff.enable**: Whether or not to perform retries with backoff delay for a flush
+ if one or more of its actions failed due to a temporary `EsRejectedExecutionException`.
+ * **bulk.flush.backoff.type**: The type of backoff delay, either `CONSTANT` or `EXPONENTIAL`
+ * **bulk.flush.backoff.delay**: The amount of delay for backoff. For constant backoff, this
+ is simply the delay between each retry. For exponential backoff, this is the initial base delay.
+ * **bulk.flush.backoff.retries**: The amount of backoff retries to attempt.
 
 More information about Elasticsearch can be found [here](https://elastic.co).
 
-#### Packaging the Elasticsearch Connector into an Uber-Jar
+## Packaging the Elasticsearch Connector into an Uber-Jar
 
 For the execution of your Flink program, it is recommended to build a
 so-called uber-jar (executable jar) containing all your dependencies

http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
new file mode 100644
index 0000000..45d04fc
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.elasticsearch.action.ActionRequest;
+
+import java.io.Serializable;
+
+/**
+ * An implementation of {@link ActionRequestFailureHandler} is provided by the user to define how failed
+ * {@link ActionRequest ActionRequests} should be handled, ex. dropping them, reprocessing malformed documents, or
+ * simply requesting them to be sent to Elasticsearch again if the failure is only temporary.
+ *
+ * <p>
+ * Example:
+ *
+ * <pre>{@code
+ *
+ *	private static class ExampleActionRequestFailureHandler implements ActionRequestFailureHandler {
+ *
+ *		@Override
+ *		boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
+ *			// this example uses Apache Commons to search for nested exceptions
+ *
+ *			if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) {
+ *				// full queue; re-add document for indexing
+ *				indexer.add(action);
+ *				return false;
+ *			} else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) {
+ *				// malformed document; simply drop request without failing sink
+ *				return false;
+ *			} else {
+ *				// for all other failures, fail the sink
+ *				return true;
+ *			}
+ *		}
+ *	}
+ *
+ * }</pre>
+ *
+ * <p>
+ * The above example will let the sink re-add requests that failed due to queue capacity saturation and drop requests
+ * with malformed documents, without failing the sink. For all other failures, the sink will fail.
+ */
+public interface ActionRequestFailureHandler extends Serializable {
+
+	/**
+	 * Handle a failed {@link ActionRequest}.
+	 *
+	 * @param action the {@link ActionRequest} that failed due to the failure
+	 * @param failure the cause of failure
+	 * @param indexer request indexer to re-add the failed action, if intended to do so
+	 * @return the implementation should return {@code true} if the sink should fail due to this failure, and {@code false} otherwise
+	 */
+	boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
index 6298a85..b482432 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.elasticsearch;
 
 import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.client.Client;
 
 import javax.annotation.Nullable;
@@ -53,6 +54,17 @@ public interface ElasticsearchApiCallBridge extends Serializable {
 	@Nullable Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse);
 
 	/**
+	 * Set backoff-related configurations on the provided {@link BulkProcessor.Builder}.
+	 * The builder will be later on used to instantiate the actual {@link BulkProcessor}.
+	 *
+	 * @param builder the {@link BulkProcessor.Builder} to configure.
+	 * @param flushBackoffPolicy user-provided backoff retry settings ({@code null} if the user disabled backoff retries).
+	 */
+	void configureBulkProcessorBackoff(
+		BulkProcessor.Builder builder,
+		@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy);
+
+	/**
 	 * Perform any necessary state cleanup.
 	 */
 	void cleanup();

http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 7977fc0..2c29865 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -21,24 +21,23 @@ import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.util.InstantiationUtil;
-import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.action.ActionRequest;
-import org.elasticsearch.action.IndicesRequest;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -70,10 +69,56 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 	public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
 	public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
 	public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
+	public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = "bulk.flush.backoff.enable";
+	public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = "bulk.flush.backoff.type";
+	public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries";
+	public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay";
+
+	public enum FlushBackoffType {
+		CONSTANT,
+		EXPONENTIAL
+	}
+
+	public class BulkFlushBackoffPolicy implements Serializable {
+
+		private static final long serialVersionUID = -6022851996101826049L;
+
+		// the default values follow the Elasticsearch default settings for BulkProcessor
+		private FlushBackoffType backoffType = FlushBackoffType.EXPONENTIAL;
+		private int maxRetryCount = 8;
+		private long delayMillis = 50;
+
+		public FlushBackoffType getBackoffType() {
+			return backoffType;
+		}
+
+		public int getMaxRetryCount() {
+			return maxRetryCount;
+		}
+
+		public long getDelayMillis() {
+			return delayMillis;
+		}
+
+		public void setBackoffType(FlushBackoffType backoffType) {
+			this.backoffType = checkNotNull(backoffType);
+		}
+
+		public void setMaxRetryCount(int maxRetryCount) {
+			checkArgument(maxRetryCount > 0);
+			this.maxRetryCount = maxRetryCount;
+		}
+
+		public void setDelayMillis(long delayMillis) {
+			checkArgument(delayMillis > 0);
+			this.delayMillis = delayMillis;
+		}
+	}
 
 	private final Integer bulkProcessorFlushMaxActions;
 	private final Integer bulkProcessorFlushMaxSizeMb;
 	private final Integer bulkProcessorFlushIntervalMillis;
+	private final BulkFlushBackoffPolicy bulkProcessorFlushBackoffPolicy;
 
 	// ------------------------------------------------------------------------
 	//  User-facing API and configuration
@@ -85,16 +130,12 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 	/** The function that is used to construct mulitple {@link ActionRequest ActionRequests} from each incoming element. */
 	private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
 
+	/** User-provided handler for failed {@link ActionRequest ActionRequests}. */
+	private final ActionRequestFailureHandler failureHandler;
+
 	/** Provided to the user via the {@link ElasticsearchSinkFunction} to add {@link ActionRequest ActionRequests}. */
 	private transient BulkProcessorIndexer requestIndexer;
 
-	/**
-	 * When set to <code>true</code> and the bulk action fails, the error message will be checked for
-	 * common patterns like <i>timeout</i>, <i>UnavailableShardsException</i> or a full buffer queue on the node.
-	 * When a matching pattern is found, the bulk will be retried.
-	 */
-	protected boolean checkErrorAndRetryBulk = false;
-
 	// ------------------------------------------------------------------------
 	//  Internals for the Flink Elasticsearch Sink
 	// ------------------------------------------------------------------------
@@ -109,21 +150,28 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 	private transient BulkProcessor bulkProcessor;
 
 	/**
-	 * This is set from inside the {@link BulkProcessor.Listener} if a {@link Throwable} was thrown in callbacks.
+	 * This is set from inside the {@link BulkProcessor.Listener} if a {@link Throwable} was thrown in callbacks and
+	 * the user considered it should fail the sink via the
+	 * {@link ActionRequestFailureHandler#onFailure(ActionRequest, Throwable, RequestIndexer)} method.
+	 *
+	 * Errors will be checked and rethrown before processing each input element, and when the sink is closed.
 	 */
 	private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
 
 	public ElasticsearchSinkBase(
 		ElasticsearchApiCallBridge callBridge,
 		Map<String, String> userConfig,
-		ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+		ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+		ActionRequestFailureHandler failureHandler) {
 
 		this.callBridge = checkNotNull(callBridge);
 		this.elasticsearchSinkFunction = checkNotNull(elasticsearchSinkFunction);
+		this.failureHandler = checkNotNull(failureHandler);
 
-		// we eagerly check if the user-provided sink function is serializable;
-		// otherwise, if it isn't serializable, users will merely get a non-informative error message
+		// we eagerly check if the user-provided sink function and failure handler is serializable;
+		// otherwise, if they aren't serializable, users will merely get a non-informative error message
 		// "ElasticsearchSinkBase is not serializable"
+
 		try {
 			InstantiationUtil.serializeObject(elasticsearchSinkFunction);
 		} catch (Exception e) {
@@ -132,10 +180,19 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 				"The object probably contains or references non serializable fields.");
 		}
 
-		checkNotNull(userConfig);
+		try {
+			InstantiationUtil.serializeObject(failureHandler);
+		} catch (Exception e) {
+			throw new IllegalArgumentException(
+				"The implementation of the provided ActionRequestFailureHandler is not serializable. " +
+					"The object probably contains or references non serializable fields.");
+		}
 
 		// extract and remove bulk processor related configuration from the user-provided config,
 		// so that the resulting user config only contains configuration related to the Elasticsearch client.
+
+		checkNotNull(userConfig);
+
 		ParameterTool params = ParameterTool.fromMap(userConfig);
 
 		if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
@@ -159,6 +216,31 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 			bulkProcessorFlushIntervalMillis = null;
 		}
 
+		boolean bulkProcessorFlushBackoffEnable = params.getBoolean(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, true);
+		userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE);
+
+		if (bulkProcessorFlushBackoffEnable) {
+			this.bulkProcessorFlushBackoffPolicy = new BulkFlushBackoffPolicy();
+
+			if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE)) {
+				bulkProcessorFlushBackoffPolicy.setBackoffType(FlushBackoffType.valueOf(params.get(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE)));
+				userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE);
+			}
+
+			if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES)) {
+				bulkProcessorFlushBackoffPolicy.setMaxRetryCount(params.getInt(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES));
+				userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES);
+			}
+
+			if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY)) {
+				bulkProcessorFlushBackoffPolicy.setDelayMillis(params.getLong(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY));
+				userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY);
+			}
+
+		} else {
+			bulkProcessorFlushBackoffPolicy = null;
+		}
+
 		this.userConfig = userConfig;
 	}
 
@@ -175,48 +257,30 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 				@Override
 				public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
 					if (response.hasFailures()) {
-						boolean allRequestsRepeatable = true;
+						BulkItemResponse itemResponse;
+						Throwable failure;
 
-						for (BulkItemResponse itemResp : response.getItems()) {
-							Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
+						for (int i = 0; i < response.getItems().length; i++) {
+							itemResponse = response.getItems()[i];
+							failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
 							if (failure != null) {
-								String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
-
-								// Check if index request can be retried
-								if (checkErrorAndRetryBulk && (
-									failureMessageLowercase.contains("timeout") ||
-										failureMessageLowercase.contains("timed out") || // Generic timeout errors
-										failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) || // Shard not available due to rebalancing or node down
-										(failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")))) // Bulk index queue on node full
-								{
-									LOG.debug("Retry bulk: {}", itemResp.getFailureMessage());
-								} else {
-									// Cannot retry action
-									allRequestsRepeatable = false;
-									LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
+								LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);
+								if (failureHandler.onFailure(request.requests().get(i), failure, requestIndexer)) {
 									failureThrowable.compareAndSet(null, failure);
 								}
 							}
 						}
-
-						if (allRequestsRepeatable) {
-							reAddBulkRequest(request);
-						}
 					}
 				}
 
 				@Override
 				public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
-					if (checkErrorAndRetryBulk && (
-						failure instanceof ClusterBlockException // Examples: "no master"
-							|| failure instanceof ElasticsearchTimeoutException) // ElasticsearchTimeoutException sounded good, not seen in stress tests yet
-						)
-					{
-						LOG.debug("Retry bulk on throwable: {}", failure.getMessage());
-						reAddBulkRequest(request);
-					} else {
-						LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure);
-						failureThrowable.compareAndSet(null, failure);
+					LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause());
+
+					// whole bulk request failures are usually just temporary timeouts on
+					// the Elasticsearch side; simply retry all action requests in the bulk
+					for (ActionRequest action : request.requests()) {
+						requestIndexer.add(action);
 					}
 				}
 			}
@@ -237,6 +301,9 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 			bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(bulkProcessorFlushIntervalMillis));
 		}
 
+		// if backoff retrying is disabled, bulkProcessorFlushBackoffPolicy will be null
+		callBridge.configureBulkProcessorBackoff(bulkProcessorBuilder, bulkProcessorFlushBackoffPolicy);
+
 		bulkProcessor = bulkProcessorBuilder.build();
 		requestIndexer = new BulkProcessorIndexer(bulkProcessor);
 	}
@@ -267,21 +334,6 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 		checkErrorAndRethrow();
 	}
 
-	/**
-	 * Adds all requests of the bulk to the BulkProcessor. Used when trying again.
-	 * @param bulkRequest
-	 */
-	public void reAddBulkRequest(BulkRequest bulkRequest) {
-		//TODO Check what happens when bulk contains a DeleteAction and IndexActions and the DeleteAction fails because the document already has been deleted. This may not happen in typical Flink jobs.
-
-		for (IndicesRequest req : bulkRequest.subRequests()) {
-			if (req instanceof ActionRequest) {
-				// There is no waiting time between index requests, so this may produce additional pressure on cluster
-				bulkProcessor.add((ActionRequest<?>) req);
-			}
-		}
-	}
-
 	private void checkErrorAndRethrow() {
 		Throwable cause = failureThrowable.get();
 		if (cause != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java
new file mode 100644
index 0000000..09173a2
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.util;
+
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.elasticsearch.action.ActionRequest;
+
+/**
+ * An {@link ActionRequestFailureHandler} that simply fails the sink on any failures.
+ */
+public class NoOpActionRequestFailureHandler implements ActionRequestFailureHandler {
+
+	private static final long serialVersionUID = 737941343410827885L;
+
+	@Override
+	public boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
+		// simply fail the sink
+		return true;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
index 098afa9..8a59da9 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.elasticsearch;
 
 import org.apache.flink.util.Preconditions;
 import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
@@ -27,6 +28,7 @@ import org.elasticsearch.node.Node;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Map;
 
@@ -119,6 +121,14 @@ public class Elasticsearch1ApiCallBridge implements ElasticsearchApiCallBridge {
 	}
 
 	@Override
+	public void configureBulkProcessorBackoff(
+		BulkProcessor.Builder builder,
+		@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
+		// Elasticsearch 1.x does not support backoff retries for failed bulk requests
+		LOG.warn("Elasticsearch 1.x does not support backoff retries.");
+	}
+
+	@Override
 	public void cleanup() {
 		if (node != null && !node.isClosed()) {
 			node.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
index c338860..375d739 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.elasticsearch;
 
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpActionRequestFailureHandler;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.index.IndexRequest;
@@ -105,7 +106,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
 	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
 	 */
 	public ElasticsearchSink(Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
-		super(new Elasticsearch1ApiCallBridge(), userConfig, elasticsearchSinkFunction);
+		this(userConfig, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler());
 	}
 
 	/**
@@ -116,6 +117,38 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
 	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
 	 */
 	public ElasticsearchSink(Map<String, String> userConfig, List<TransportAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
-		super(new Elasticsearch1ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction);
+		this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler());
+	}
+
+	/**
+	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using an embedded {@link Node}.
+	 *
+	 * @param userConfig The map of user settings that are used when constructing the embedded {@link Node} and {@link BulkProcessor}
+	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
+	 * @param failureHandler This is used to handle failed {@link ActionRequest}
+	 */
+	public ElasticsearchSink(
+		Map<String, String> userConfig,
+		ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+		ActionRequestFailureHandler failureHandler) {
+
+		super(new Elasticsearch1ApiCallBridge(), userConfig, elasticsearchSinkFunction, failureHandler);
+	}
+
+	/**
+	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
+	 *
+	 * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor}
+	 * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
+	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
+	 * @param failureHandler This is used to handle failed {@link ActionRequest}
+	 */
+	public ElasticsearchSink(
+		Map<String, String> userConfig,
+		List<TransportAddress> transportAddresses,
+		ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+		ActionRequestFailureHandler failureHandler) {
+
+		super(new Elasticsearch1ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction, failureHandler);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
index 9407d9f..e85daf5 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
@@ -18,16 +18,21 @@
 package org.apache.flink.streaming.connectors.elasticsearch2;
 
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
 import org.apache.flink.util.Preconditions;
+import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.TimeValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
@@ -84,6 +89,32 @@ public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge {
 	}
 
 	@Override
+	public void configureBulkProcessorBackoff(
+		BulkProcessor.Builder builder,
+		@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
+
+		BackoffPolicy backoffPolicy;
+		if (flushBackoffPolicy != null) {
+			switch (flushBackoffPolicy.getBackoffType()) {
+				case CONSTANT:
+					backoffPolicy = BackoffPolicy.constantBackoff(
+						new TimeValue(flushBackoffPolicy.getDelayMillis()),
+						flushBackoffPolicy.getMaxRetryCount());
+					break;
+				case EXPONENTIAL:
+				default:
+					backoffPolicy = BackoffPolicy.exponentialBackoff(
+						new TimeValue(flushBackoffPolicy.getDelayMillis()),
+						flushBackoffPolicy.getMaxRetryCount());
+			}
+		} else {
+			backoffPolicy = BackoffPolicy.noBackoff();
+		}
+
+		builder.setBackoffPolicy(backoffPolicy);
+	}
+
+	@Override
 	public void cleanup() {
 		// nothing to cleanup
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
index a0abc51..2210f63 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
@@ -16,7 +16,9 @@
  */
 package org.apache.flink.streaming.connectors.elasticsearch2;
 
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpActionRequestFailureHandler;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.client.transport.TransportClient;
@@ -82,9 +84,28 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
 	 * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
 	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
 	 */
-	public ElasticsearchSink(Map<String, String> userConfig,
-							List<InetSocketAddress> transportAddresses,
-							org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
-		super(new Elasticsearch2ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction);
+	public ElasticsearchSink(
+		Map<String, String> userConfig,
+		List<InetSocketAddress> transportAddresses,
+		org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+
+		this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler());
+	}
+
+	/**
+	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
+	 *
+	 * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor}
+	 * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
+	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
+	 * @param failureHandler This is used to handle failed {@link ActionRequest}
+	 */
+	public ElasticsearchSink(
+		Map<String, String> userConfig,
+		List<InetSocketAddress> transportAddresses,
+		org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+		ActionRequestFailureHandler failureHandler) {
+
+		super(new Elasticsearch2ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction, failureHandler);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
index 1389e7d..c7d81f5 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
@@ -18,19 +18,24 @@
 package org.apache.flink.streaming.connectors.elasticsearch5;
 
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
 import org.apache.flink.util.Preconditions;
+import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.network.NetworkModule;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.transport.Netty3Plugin;
 import org.elasticsearch.transport.client.PreBuiltTransportClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
@@ -90,6 +95,32 @@ public class Elasticsearch5ApiCallBridge implements ElasticsearchApiCallBridge {
 	}
 
 	@Override
+	public void configureBulkProcessorBackoff(
+		BulkProcessor.Builder builder,
+		@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
+
+		BackoffPolicy backoffPolicy;
+		if (flushBackoffPolicy != null) {
+			switch (flushBackoffPolicy.getBackoffType()) {
+				case CONSTANT:
+					backoffPolicy = BackoffPolicy.constantBackoff(
+						new TimeValue(flushBackoffPolicy.getDelayMillis()),
+						flushBackoffPolicy.getMaxRetryCount());
+					break;
+				case EXPONENTIAL:
+				default:
+					backoffPolicy = BackoffPolicy.exponentialBackoff(
+						new TimeValue(flushBackoffPolicy.getDelayMillis()),
+						flushBackoffPolicy.getMaxRetryCount());
+			}
+		} else {
+			backoffPolicy = BackoffPolicy.noBackoff();
+		}
+
+		builder.setBackoffPolicy(backoffPolicy);
+	}
+
+	@Override
 	public void cleanup() {
 		// nothing to cleanup
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
index 9107d4e..175b4fa 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
@@ -16,8 +16,10 @@
  */
 package org.apache.flink.streaming.connectors.elasticsearch5;
 
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpActionRequestFailureHandler;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.client.transport.TransportClient;
@@ -64,13 +66,32 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
 	/**
 	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
 	 *
-	 * @param userConfig The map of user settings that are used when constructing the {@link TransportClient}
+	 * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor}
 	 * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
 	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
 	 */
-	public ElasticsearchSink(Map<String, String> userConfig,
-							List<InetSocketAddress> transportAddresses,
-							ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
-		super(new Elasticsearch5ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction);
+	public ElasticsearchSink(
+		Map<String, String> userConfig,
+		List<InetSocketAddress> transportAddresses,
+		ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+
+		this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler());
+	}
+
+	/**
+	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
+	 *
+	 * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor}
+	 * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
+	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
+	 * @param failureHandler This is used to handle failed {@link ActionRequest}
+	 */
+	public ElasticsearchSink(
+		Map<String, String> userConfig,
+		List<InetSocketAddress> transportAddresses,
+		ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+		ActionRequestFailureHandler failureHandler) {
+
+		super(new Elasticsearch5ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction, failureHandler);
 	}
 }