You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/02/24 15:05:22 UTC

[1/3] flink git commit: [FLINK-5487] [elasticsearch] At-least-once Elasticsearch Sink

Repository: flink
Updated Branches:
  refs/heads/master 0ba08b444 -> 2437da6e5


[FLINK-5487] [elasticsearch] At-least-once Elasticsearch Sink

This closes #3358.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2437da6e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2437da6e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2437da6e

Branch: refs/heads/master
Commit: 2437da6e54cb48c4e29116b8789fbe4782b17ea7
Parents: 3743e89
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Feb 20 16:50:19 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Feb 24 22:58:40 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/elasticsearch.md            |  94 ++-
 .../flink-connector-elasticsearch-base/pom.xml  |   8 +
 .../ActionRequestFailureHandler.java            |  29 +-
 .../elasticsearch/BulkProcessorIndexer.java     |  15 +-
 .../elasticsearch/ElasticsearchSinkBase.java    | 236 +++++---
 .../util/NoOpActionRequestFailureHandler.java   |  37 --
 .../elasticsearch/util/NoOpFailureHandler.java  |  37 ++
 .../RetryRejectedExecutionFailureHandler.java   |  46 ++
 .../ElasticsearchSinkBaseTest.java              | 570 +++++++++++++++++++
 .../elasticsearch/ElasticsearchSink.java        |   6 +-
 .../elasticsearch2/ElasticsearchSink.java       |   4 +-
 .../elasticsearch5/ElasticsearchSink.java       |   4 +-
 .../org/apache/flink/util/ExceptionUtils.java   |  24 +
 .../apache/flink/util/InstantiationUtil.java    |  10 +
 14 files changed, 969 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/docs/dev/connectors/elasticsearch.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md
index 2ca1f9b..3fba7f0 100644
--- a/docs/dev/connectors/elasticsearch.md
+++ b/docs/dev/connectors/elasticsearch.md
@@ -209,6 +209,41 @@ This will buffer elements before sending them in bulk to the cluster. The `BulkP
 executes bulk requests one at a time, i.e. there will be no two concurrent
 flushes of the buffered actions in progress.
 
+### Elasticsearch Sinks and Fault Tolerance
+
+With Flink\u2019s checkpointing enabled, the Flink Elasticsearch Sink guarantees
+at-least-once delivery of action requests to Elasticsearch clusters. It does
+so by waiting for all pending action requests in the `BulkProcessor` at the
+time of checkpoints. This effectively assures that all requests before the
+checkpoint was triggered have been successfully acknowledged by Elasticsearch, before
+proceeding to process more records sent to the sink.
+
+More details on checkpoints and fault tolerance are in the [fault tolerance docs]({{site.baseurl}}/internals/stream_checkpointing.html).
+
+To use fault tolerant Elasticsearch Sinks, checkpointing of the topology needs to be enabled at the execution environment:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+</div>
+</div>
+
+<p style="border-radius: 5px; padding: 5px" class="bg-danger">
+<b>NOTE</b>: Users can disable flushing if they wish to do so, by calling
+<b>disableFlushOnCheckpoint()</b> on the created <b>ElasticsearchSink</b>. Be aware
+that this essentially means the sink will not provide any strong
+delivery guarantees anymore, even with checkpoint for the topology enabled.
+</p>
+
 ### Communication using Embedded Node (only for Elasticsearch 1.x)
 
 For Elasticsearch versions 1.x, communication using an embedded node is
@@ -293,19 +328,20 @@ input.addSink(new ElasticsearchSink<>(
     new ElasticsearchSinkFunction<String>() {...},
     new ActionRequestFailureHandler() {
         @Override
-        boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
-            // this example uses Apache Commons to search for nested exceptions
-            
-            if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) {
+        void onFailure(ActionRequest action,
+                Throwable failure,
+                int restStatusCode,
+                RequestIndexer indexer) throw Throwable {
+
+            if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
                 // full queue; re-add document for indexing
                 indexer.add(action);
-                return false;
-            } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) >= 0) {
+            } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
                 // malformed document; simply drop request without failing sink
-                return false;
             } else {
                 // for all other failures, fail the sink
-                return true;
+                // here the failure is simply rethrown, but users can also choose to throw custom exceptions
+                throw failure;
             }
         }
 }));
@@ -319,19 +355,21 @@ input.addSink(new ElasticsearchSink(
     config, transportAddresses,
     new ElasticsearchSinkFunction[String] {...},
     new ActionRequestFailureHandler {
-        override def onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
-            // this example uses Apache Commons to search for nested exceptions
+        @throws(classOf[Throwable])
+        override def onFailure(ActionRequest action,
+                Throwable failure,
+                int restStatusCode,
+                RequestIndexer indexer) {
 
-            if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) {
+            if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
                 // full queue; re-add document for indexing
                 indexer.add(action)
-                return false
-            } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) {
+            } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
                 // malformed document; simply drop request without failing sink
-                return false
             } else {
                 // for all other failures, fail the sink
-                return true
+                // here the failure is simply rethrown, but users can also choose to throw custom exceptions
+                throw failure
             }
         }
 }))
@@ -349,7 +387,31 @@ Note that `onFailure` is called for failures that still occur only after the
 By default, the `BulkProcessor` retries to a maximum of 8 attempts with
 an exponential backoff. For more information on the behaviour of the
 internal `BulkProcessor` and how to configure it, please see the following section.
- 
+
+By default, if a failure handler is not provided, the sink uses a
+`NoOpFailureHandler` that simply fails for all kinds of exceptions. The
+connector also provides a `RetryRejectedExecutionFailureHandler` implementation
+that always re-add requests that have failed due to queue capacity saturation.
+
+<p style="border-radius: 5px; padding: 5px" class="bg-danger">
+<b>IMPORTANT</b>: Re-adding requests back to the internal <b>BulkProcessor</b>
+on failures will lead to longer checkpoints, as the sink will also
+need to wait for the re-added requests to be flushed when checkpointing.
+For example, when using <b>RetryRejectedExecutionFailureHandler</b>, checkpoints
+will need to wait until Elasticsearch node queues have enough capacity for
+all the pending requests. This also means that if re-added requests never
+succeed, the checkpoint will never finish.
+</p>
+
+<p style="border-radius: 5px; padding: 5px" class="bg-warning">
+<b>Failure handling for Elasticsearch 1.x</b>: For Elasticsearch 1.x, it
+is not feasible to match the type of the failure because the exact type
+could not be retrieved through the older version Java client APIs (thus,
+the types will be general <b>Exception</b>s and only differ in the
+failure message). In this case, it is recommended to match on the
+provided REST status code.
+</p>
+
 ### Configuring the Internal Bulk Processor
 
 The internal `BulkProcessor` can be further configured for its behaviour

http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/pom.xml b/flink-connectors/flink-connector-elasticsearch-base/pom.xml
index 81652c4..32327ff 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch-base/pom.xml
@@ -68,6 +68,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_2.10</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-java_2.10</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
index 45d04fc..abbdd72 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
@@ -23,7 +23,7 @@ import java.io.Serializable;
 
 /**
  * An implementation of {@link ActionRequestFailureHandler} is provided by the user to define how failed
- * {@link ActionRequest ActionRequests} should be handled, ex. dropping them, reprocessing malformed documents, or
+ * {@link ActionRequest ActionRequests} should be handled, e.g. dropping them, reprocessing malformed documents, or
  * simply requesting them to be sent to Elasticsearch again if the failure is only temporary.
  *
  * <p>
@@ -34,19 +34,16 @@ import java.io.Serializable;
  *	private static class ExampleActionRequestFailureHandler implements ActionRequestFailureHandler {
  *
  *		@Override
- *		boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
- *			// this example uses Apache Commons to search for nested exceptions
- *
- *			if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) {
+ *		void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
+ *			if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
  *				// full queue; re-add document for indexing
  *				indexer.add(action);
- *				return false;
- *			} else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) {
+ *			} else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
  *				// malformed document; simply drop request without failing sink
- *				return false;
  *			} else {
- *				// for all other failures, fail the sink
- *				return true;
+ *				// for all other failures, fail the sink;
+ *				// here the failure is simply rethrown, but users can also choose to throw custom exceptions
+ *				throw failure;
  *			}
  *		}
  *	}
@@ -56,6 +53,11 @@ import java.io.Serializable;
  * <p>
  * The above example will let the sink re-add requests that failed due to queue capacity saturation and drop requests
  * with malformed documents, without failing the sink. For all other failures, the sink will fail.
+ *
+ * <p>
+ * Note: For Elasticsearch 1.x, it is not feasible to match the type of the failure because the exact type
+ * could not be retrieved through the older version Java client APIs (thus, the types will be general {@link Exception}s
+ * and only differ in the failure message). In this case, it is recommended to match on the provided REST status code.
  */
 public interface ActionRequestFailureHandler extends Serializable {
 
@@ -64,9 +66,12 @@ public interface ActionRequestFailureHandler extends Serializable {
 	 *
 	 * @param action the {@link ActionRequest} that failed due to the failure
 	 * @param failure the cause of failure
+	 * @param restStatusCode the REST status code of the failure (-1 if none can be retrieved)
 	 * @param indexer request indexer to re-add the failed action, if intended to do so
-	 * @return the implementation should return {@code true} if the sink should fail due to this failure, and {@code false} otherwise
+	 *
+	 * @throws Throwable if the sink should fail on this failure, the implementation should rethrow
+	 *                   the exception or a custom one
 	 */
-	boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer);
+	void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable;
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
index d802550..838865a 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
@@ -21,6 +21,10 @@ package org.apache.flink.streaming.connectors.elasticsearch;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
 
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}.
  * {@link ActionRequest ActionRequests} will be buffered before sending a bulk request to the Elasticsearch cluster.
@@ -30,14 +34,21 @@ class BulkProcessorIndexer implements RequestIndexer {
 	private static final long serialVersionUID = 6841162943062034253L;
 
 	private final BulkProcessor bulkProcessor;
+	private final boolean flushOnCheckpoint;
+	private final AtomicLong numPendingRequestsRef;
 
-	BulkProcessorIndexer(BulkProcessor bulkProcessor) {
-		this.bulkProcessor = bulkProcessor;
+	BulkProcessorIndexer(BulkProcessor bulkProcessor, boolean flushOnCheckpoint, AtomicLong numPendingRequestsRef) {
+		this.bulkProcessor = checkNotNull(bulkProcessor);
+		this.flushOnCheckpoint = flushOnCheckpoint;
+		this.numPendingRequestsRef = checkNotNull(numPendingRequestsRef);
 	}
 
 	@Override
 	public void add(ActionRequest... actionRequests) {
 		for (ActionRequest actionRequest : actionRequests) {
+			if (flushOnCheckpoint) {
+				numPendingRequestsRef.getAndIncrement();
+			}
 			this.bulkProcessor.add(actionRequest);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 2c29865..f6944b3 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -17,8 +17,12 @@
 
 package org.apache.flink.streaming.connectors.elasticsearch;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.util.InstantiationUtil;
 import org.elasticsearch.action.ActionRequest;
@@ -30,11 +34,13 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.rest.RestStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -56,7 +62,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * @param <T> Type of the elements handled by this sink
  */
-public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
+public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> implements CheckpointedFunction {
 
 	private static final long serialVersionUID = -1007596293618451942L;
 
@@ -105,12 +111,12 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 		}
 
 		public void setMaxRetryCount(int maxRetryCount) {
-			checkArgument(maxRetryCount > 0);
+			checkArgument(maxRetryCount >= 0);
 			this.maxRetryCount = maxRetryCount;
 		}
 
 		public void setDelayMillis(long delayMillis) {
-			checkArgument(delayMillis > 0);
+			checkArgument(delayMillis >= 0);
 			this.delayMillis = delayMillis;
 		}
 	}
@@ -133,6 +139,9 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 	/** User-provided handler for failed {@link ActionRequest ActionRequests}. */
 	private final ActionRequestFailureHandler failureHandler;
 
+	/** If true, the producer will wait until all outstanding action requests have been sent to Elasticsearch. */
+	private boolean flushOnCheckpoint = true;
+
 	/** Provided to the user via the {@link ElasticsearchSinkFunction} to add {@link ActionRequest ActionRequests}. */
 	private transient BulkProcessorIndexer requestIndexer;
 
@@ -143,6 +152,17 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 	/** Call bridge for different version-specfic */
 	private final ElasticsearchApiCallBridge callBridge;
 
+	/**
+	 * Number of pending action requests not yet acknowledged by Elasticsearch.
+	 * This value is maintained only if {@link ElasticsearchSinkBase#flushOnCheckpoint} is {@code true}.
+	 *
+	 * This is incremented whenever the user adds (or re-adds through the {@link ActionRequestFailureHandler}) requests
+	 * to the {@link RequestIndexer}. It is decremented for each completed request of a bulk request, in
+	 * {@link BulkProcessor.Listener#afterBulk(long, BulkRequest, BulkResponse)} and
+	 * {@link BulkProcessor.Listener#afterBulk(long, BulkRequest, Throwable)}.
+	 */
+	private AtomicLong numPendingRequests = new AtomicLong(0);
+
 	/** Elasticsearch client created using the call bridge. */
 	private transient Client client;
 
@@ -152,7 +172,7 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 	/**
 	 * This is set from inside the {@link BulkProcessor.Listener} if a {@link Throwable} was thrown in callbacks and
 	 * the user considered it should fail the sink via the
-	 * {@link ActionRequestFailureHandler#onFailure(ActionRequest, Throwable, RequestIndexer)} method.
+	 * {@link ActionRequestFailureHandler#onFailure(ActionRequest, Throwable, int, RequestIndexer)} method.
 	 *
 	 * Errors will be checked and rethrown before processing each input element, and when the sink is closed.
 	 */
@@ -172,21 +192,13 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 		// otherwise, if they aren't serializable, users will merely get a non-informative error message
 		// "ElasticsearchSinkBase is not serializable"
 
-		try {
-			InstantiationUtil.serializeObject(elasticsearchSinkFunction);
-		} catch (Exception e) {
-			throw new IllegalArgumentException(
-				"The implementation of the provided ElasticsearchSinkFunction is not serializable. " +
-				"The object probably contains or references non serializable fields.");
-		}
+		checkArgument(InstantiationUtil.isSerializable(elasticsearchSinkFunction),
+			"The implementation of the provided ElasticsearchSinkFunction is not serializable. " +
+				"The object probably contains or references non-serializable fields.");
 
-		try {
-			InstantiationUtil.serializeObject(failureHandler);
-		} catch (Exception e) {
-			throw new IllegalArgumentException(
-				"The implementation of the provided ActionRequestFailureHandler is not serializable. " +
-					"The object probably contains or references non serializable fields.");
-		}
+		checkArgument(InstantiationUtil.isSerializable(failureHandler),
+			"The implementation of the provided ActionRequestFailureHandler is not serializable. " +
+				"The object probably contains or references non-serializable fields.");
 
 		// extract and remove bulk processor related configuration from the user-provided config,
 		// so that the resulting user config only contains configuration related to the Elasticsearch client.
@@ -244,47 +256,76 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 		this.userConfig = userConfig;
 	}
 
+	/**
+	 * Disable flushing on checkpoint. When disabled, the sink will not wait for all
+	 * pending action requests to be acknowledged by Elasticsearch on checkpoints.
+	 *
+	 * NOTE: If flushing on checkpoint is disabled, the Flink Elasticsearch Sink does NOT
+	 * provide any strong guarantees for at-least-once delivery of action requests.
+	 */
+	public void disableFlushOnCheckpoint() {
+		this.flushOnCheckpoint = false;
+	}
+
 	@Override
 	public void open(Configuration parameters) throws Exception {
 		client = callBridge.createClient(userConfig);
+		bulkProcessor = buildBulkProcessor(new BulkProcessorListener());
+		requestIndexer = new BulkProcessorIndexer(bulkProcessor, flushOnCheckpoint, numPendingRequests);
+	}
 
-		BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
-			client,
-			new BulkProcessor.Listener() {
-				@Override
-				public void beforeBulk(long executionId, BulkRequest request) { }
-
-				@Override
-				public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
-					if (response.hasFailures()) {
-						BulkItemResponse itemResponse;
-						Throwable failure;
-
-						for (int i = 0; i < response.getItems().length; i++) {
-							itemResponse = response.getItems()[i];
-							failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
-							if (failure != null) {
-								LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);
-								if (failureHandler.onFailure(request.requests().get(i), failure, requestIndexer)) {
-									failureThrowable.compareAndSet(null, failure);
-								}
-							}
-						}
-					}
-				}
+	@Override
+	public void invoke(T value) throws Exception {
+		// if bulk processor callbacks have previously reported an error, we rethrow the error and fail the sink
+		checkErrorAndRethrow();
 
-				@Override
-				public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
-					LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause());
+		elasticsearchSinkFunction.process(value, getRuntimeContext(), requestIndexer);
+	}
 
-					// whole bulk request failures are usually just temporary timeouts on
-					// the Elasticsearch side; simply retry all action requests in the bulk
-					for (ActionRequest action : request.requests()) {
-						requestIndexer.add(action);
-					}
-				}
-			}
-		);
+	@Override
+	public void initializeState(FunctionInitializationContext context) throws Exception {
+		// no initialization needed
+	}
+
+	@Override
+	public void snapshotState(FunctionSnapshotContext context) throws Exception {
+		checkErrorAndRethrow();
+
+		if (flushOnCheckpoint) {
+			do {
+				bulkProcessor.flush();
+				checkErrorAndRethrow();
+			} while (numPendingRequests.get() != 0);
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		if (bulkProcessor != null) {
+			bulkProcessor.close();
+			bulkProcessor = null;
+		}
+
+		if (client != null) {
+			client.close();
+			client = null;
+		}
+
+		callBridge.cleanup();
+
+		// make sure any errors from callbacks are rethrown
+		checkErrorAndRethrow();
+	}
+
+	/**
+	 * Build the {@link BulkProcessor}.
+	 *
+	 * Note: this is exposed for testing purposes.
+	 */
+	protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
+		checkNotNull(listener);
+
+		BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(client, listener);
 
 		// This makes flush() blocking
 		bulkProcessorBuilder.setConcurrentRequests(0);
@@ -304,40 +345,81 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 		// if backoff retrying is disabled, bulkProcessorFlushBackoffPolicy will be null
 		callBridge.configureBulkProcessorBackoff(bulkProcessorBuilder, bulkProcessorFlushBackoffPolicy);
 
-		bulkProcessor = bulkProcessorBuilder.build();
-		requestIndexer = new BulkProcessorIndexer(bulkProcessor);
+		return bulkProcessorBuilder.build();
 	}
 
-	@Override
-	public void invoke(T value) throws Exception {
-		// if bulk processor callbacks have previously reported an error, we rethrow the error and fail the sink
-		checkErrorAndRethrow();
-
-		elasticsearchSinkFunction.process(value, getRuntimeContext(), requestIndexer);
+	private void checkErrorAndRethrow() {
+		Throwable cause = failureThrowable.get();
+		if (cause != null) {
+			throw new RuntimeException("An error occurred in ElasticsearchSink.", cause);
+		}
 	}
 
-	@Override
-	public void close() throws Exception {
-		if (bulkProcessor != null) {
-			bulkProcessor.close();
-			bulkProcessor = null;
-		}
+	private class BulkProcessorListener implements BulkProcessor.Listener {
+		@Override
+		public void beforeBulk(long executionId, BulkRequest request) { }
+
+		@Override
+		public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+			if (response.hasFailures()) {
+				BulkItemResponse itemResponse;
+				Throwable failure;
+				RestStatus restStatus;
+
+				try {
+					for (int i = 0; i < response.getItems().length; i++) {
+						itemResponse = response.getItems()[i];
+						failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
+						if (failure != null) {
+							LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);
+
+							restStatus = itemResponse.getFailure().getStatus();
+							if (restStatus == null) {
+								failureHandler.onFailure(request.requests().get(i), failure, -1, requestIndexer);
+							} else {
+								failureHandler.onFailure(request.requests().get(i), failure, restStatus.getStatus(), requestIndexer);
+							}
+						}
+					}
+				} catch (Throwable t) {
+					// fail the sink and skip the rest of the items
+					// if the failure handler decides to throw an exception
+					failureThrowable.compareAndSet(null, t);
+				}
+			}
 
-		if (client != null) {
-			client.close();
-			client = null;
+			if (flushOnCheckpoint) {
+				numPendingRequests.getAndAdd(-request.numberOfActions());
+			}
 		}
 
-		callBridge.cleanup();
+		@Override
+		public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+			LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause());
 
-		// make sure any errors from callbacks are rethrown
-		checkErrorAndRethrow();
+			try {
+				for (ActionRequest action : request.requests()) {
+					failureHandler.onFailure(action, failure, -1, requestIndexer);
+				}
+			} catch (Throwable t) {
+				// fail the sink and skip the rest of the items
+				// if the failure handler decides to throw an exception
+				failureThrowable.compareAndSet(null, t);
+			}
+
+			if (flushOnCheckpoint) {
+				numPendingRequests.getAndAdd(-request.numberOfActions());
+			}
+		}
 	}
 
-	private void checkErrorAndRethrow() {
-		Throwable cause = failureThrowable.get();
-		if (cause != null) {
-			throw new RuntimeException("An error occured in ElasticsearchSink.", cause);
+	@VisibleForTesting
+	long getNumPendingRequests() {
+		if (flushOnCheckpoint) {
+			return numPendingRequests.get();
+		} else {
+			throw new UnsupportedOperationException(
+				"The number of pending requests is not maintained when flushing on checkpoint is disabled.");
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java
deleted file mode 100644
index 09173a2..0000000
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.elasticsearch.util;
-
-import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
-import org.elasticsearch.action.ActionRequest;
-
-/**
- * An {@link ActionRequestFailureHandler} that simply fails the sink on any failures.
- */
-public class NoOpActionRequestFailureHandler implements ActionRequestFailureHandler {
-
-	private static final long serialVersionUID = 737941343410827885L;
-
-	@Override
-	public boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
-		// simply fail the sink
-		return true;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
new file mode 100644
index 0000000..b19ea08
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.util;
+
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.elasticsearch.action.ActionRequest;
+
+/**
+ * An {@link ActionRequestFailureHandler} that simply fails the sink on any failures.
+ */
+public class NoOpFailureHandler implements ActionRequestFailureHandler {
+
+	private static final long serialVersionUID = 737941343410827885L;
+
+	@Override
+	public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
+		// simply fail the sink
+		throw failure;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
new file mode 100644
index 0000000..fabdcbc
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.util;
+
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.util.ExceptionUtils;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+
+/**
+ * An {@link ActionRequestFailureHandler} that re-adds requests that failed due to temporary
+ * {@link EsRejectedExecutionException}s (which means that Elasticsearch node queues are currently full),
+ * and fails for all other failures.
+ */
+public class RetryRejectedExecutionFailureHandler implements ActionRequestFailureHandler {
+
+	private static final long serialVersionUID = -7423562912824511906L;
+
+	@Override
+	public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
+		if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
+			indexer.add(action);
+		} else {
+			// rethrow all other failures
+			throw failure;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
new file mode 100644
index 0000000..b9df5c6
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Suite of tests for {@link ElasticsearchSinkBase}.
+ */
+public class ElasticsearchSinkBaseTest {
+
+	/** Tests that any item failure in the listener callbacks is rethrown on an immediately following invoke call. */
+	@Test
+	public void testItemFailureRethrownOnInvoke() throws Throwable {
+		final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>(
+			new HashMap<String, String>(), new SimpleSinkFunction<String>(), new NoOpFailureHandler());
+
+		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
+			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink));
+
+		testHarness.open();
+
+		// setup the next bulk request, and its mock item failures
+		sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record")));
+		testHarness.processElement(new StreamRecord<>("msg"));
+		verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+
+		// manually execute the next bulk request
+		sink.manualBulkRequestWithAllPendingRequests();
+
+		try {
+			testHarness.processElement(new StreamRecord<>("next msg"));
+		} catch (Exception e) {
+			// the invoke should have failed with the failure
+			Assert.assertTrue(e.getCause().getMessage().contains("artificial failure for record"));
+
+			// test succeeded
+			return;
+		}
+
+		Assert.fail();
+	}
+
+	/** Tests that any item failure in the listener callbacks is rethrown on an immediately following checkpoint. */
+	@Test
+	public void testItemFailureRethrownOnCheckpoint() throws Throwable {
+		final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>(
+			new HashMap<String, String>(), new SimpleSinkFunction<String>(), new NoOpFailureHandler());
+
+		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
+			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink));
+
+		testHarness.open();
+
+		// setup the next bulk request, and its mock item failures
+		sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record")));
+		testHarness.processElement(new StreamRecord<>("msg"));
+		verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+
+		// manually execute the next bulk request
+		sink.manualBulkRequestWithAllPendingRequests();
+
+		try {
+			testHarness.snapshot(1L, 1000L);
+		} catch (Exception e) {
+			// the snapshot should have failed with the failure
+			Assert.assertTrue(e.getCause().getCause().getMessage().contains("artificial failure for record"));
+
+			// test succeeded
+			return;
+		}
+
+		Assert.fail();
+	}
+
+	/**
+	 * Tests that any item failure in the listener callbacks due to flushing on an immediately following checkpoint
+	 * is rethrown; we set a timeout because the test will not finish if the logic is broken
+	 */
+	@Test(timeout=5000)
+	public void testItemFailureRethrownOnCheckpointAfterFlush() throws Throwable {
+		final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>(
+			new HashMap<String, String>(), new SimpleSinkFunction<String>(), new NoOpFailureHandler());
+
+		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
+			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink));
+
+		testHarness.open();
+
+		// setup the next bulk request, and its mock item failures
+
+		List<Exception> mockResponsesList = new ArrayList<>(2);
+		mockResponsesList.add(null); // the first request in a bulk will succeed
+		mockResponsesList.add(new Exception("artificial failure for record")); // the second request in a bulk will fail
+		sink.setMockItemFailuresListForNextBulkItemResponses(mockResponsesList);
+
+		testHarness.processElement(new StreamRecord<>("msg-1"));
+		verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+
+		// manually execute the next bulk request (1 request only, thus should succeed)
+		sink.manualBulkRequestWithAllPendingRequests();
+
+		// setup the requests to be flushed in the snapshot
+		testHarness.processElement(new StreamRecord<>("msg-2"));
+		testHarness.processElement(new StreamRecord<>("msg-3"));
+		verify(sink.getMockBulkProcessor(), times(3)).add(any(ActionRequest.class));
+
+		CheckedThread snapshotThread = new CheckedThread() {
+			@Override
+			public void go() throws Exception {
+				testHarness.snapshot(1L, 1000L);
+			}
+		};
+		snapshotThread.start();
+
+		// the snapshot should eventually be blocked before snapshot triggers flushing
+		while (snapshotThread.getState() != Thread.State.WAITING) {
+			Thread.sleep(10);
+		}
+
+		// let the snapshot-triggered flush continue (2 records in the bulk, so the 2nd one should fail)
+		sink.continueFlush();
+
+		try {
+			snapshotThread.sync();
+		} catch (Exception e) {
+			// the snapshot should have failed with the failure from the 2nd request
+			Assert.assertTrue(e.getCause().getCause().getMessage().contains("artificial failure for record"));
+
+			// test succeeded
+			return;
+		}
+
+		Assert.fail();
+	}
+
+	/** Tests that any bulk failure in the listener callbacks is rethrown on an immediately following invoke call. */
+	@Test
+	public void testBulkFailureRethrownOnInvoke() throws Throwable {
+		final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>(
+			new HashMap<String, String>(), new SimpleSinkFunction<String>(), new NoOpFailureHandler());
+
+		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
+			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink));
+
+		testHarness.open();
+
+		// setup the next bulk request, and let the whole bulk request fail
+		sink.setFailNextBulkRequestCompletely(new Exception("artificial failure for bulk request"));
+		testHarness.processElement(new StreamRecord<>("msg"));
+		verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+
+		// manually execute the next bulk request
+		sink.manualBulkRequestWithAllPendingRequests();
+
+		try {
+			testHarness.processElement(new StreamRecord<>("next msg"));
+		} catch (Exception e) {
+			// the invoke should have failed with the bulk request failure
+			Assert.assertTrue(e.getCause().getMessage().contains("artificial failure for bulk request"));
+
+			// test succeeded
+			return;
+		}
+
+		Assert.fail();
+	}
+
+	/** Tests that any bulk failure in the listener callbacks is rethrown on an immediately following checkpoint. */
+	@Test
+	public void testBulkFailureRethrownOnCheckpoint() throws Throwable {
+		final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>(
+			new HashMap<String, String>(), new SimpleSinkFunction<String>(), new NoOpFailureHandler());
+
+		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
+			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink));
+
+		testHarness.open();
+
+		// setup the next bulk request, and let the whole bulk request fail
+		sink.setFailNextBulkRequestCompletely(new Exception("artificial failure for bulk request"));
+		testHarness.processElement(new StreamRecord<>("msg"));
+		verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+
+		// manually execute the next bulk request
+		sink.manualBulkRequestWithAllPendingRequests();
+
+		try {
+			testHarness.snapshot(1L, 1000L);
+		} catch (Exception e) {
+			// the snapshot should have failed with the bulk request failure
+			Assert.assertTrue(e.getCause().getCause().getMessage().contains("artificial failure for bulk request"));
+
+			// test succeeded
+			return;
+		}
+
+		Assert.fail();
+	}
+
+	/**
+	 * Tests that any bulk failure in the listener callbacks due to flushing on an immediately following checkpoint
+	 * is rethrown; we set a timeout because the test will not finish if the logic is broken.
+	 */
+	@Test(timeout=5000)
+	public void testBulkFailureRethrownOnOnCheckpointAfterFlush() throws Throwable {
+		final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>(
+			new HashMap<String, String>(), new SimpleSinkFunction<String>(), new NoOpFailureHandler());
+
+		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
+			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink));
+
+		testHarness.open();
+
+		// setup the next bulk request, and let bulk request succeed
+		sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList((Exception) null));
+		testHarness.processElement(new StreamRecord<>("msg-1"));
+		verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+
+		// manually execute the next bulk request
+		sink.manualBulkRequestWithAllPendingRequests();
+
+		// setup the requests to be flushed in the snapshot
+		testHarness.processElement(new StreamRecord<>("msg-2"));
+		testHarness.processElement(new StreamRecord<>("msg-3"));
+		verify(sink.getMockBulkProcessor(), times(3)).add(any(ActionRequest.class));
+
+		CheckedThread snapshotThread = new CheckedThread() {
+			@Override
+			public void go() throws Exception {
+				testHarness.snapshot(1L, 1000L);
+			}
+		};
+		snapshotThread.start();
+
+		// the snapshot should eventually be blocked before snapshot triggers flushing
+		while (snapshotThread.getState() != Thread.State.WAITING) {
+			Thread.sleep(10);
+		}
+
+		// for the snapshot-triggered flush, we let the bulk request fail completely
+		sink.setFailNextBulkRequestCompletely(new Exception("artificial failure for bulk request"));
+
+		// let the snapshot-triggered flush continue (bulk request should fail completely)
+		sink.continueFlush();
+
+		try {
+			snapshotThread.sync();
+		} catch (Exception e) {
+			// the snapshot should have failed with the bulk request failure
+			Assert.assertTrue(e.getCause().getCause().getMessage().contains("artificial failure for bulk request"));
+
+			// test succeeded
+			return;
+		}
+
+		Assert.fail();
+	}
+
+	/**
+	 * Tests that the sink correctly waits for pending requests (including re-added requests) on checkpoints;
+	 * we set a timeout because the test will not finish if the logic is broken
+	 */
+	@Test(timeout=5000)
+	public void testAtLeastOnceSink() throws Throwable {
+		final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>(
+				new HashMap<String, String>(),
+				new SimpleSinkFunction<String>(),
+				new DummyRetryFailureHandler()); // use a failure handler that simply re-adds requests
+
+		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
+			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink));
+
+		testHarness.open();
+
+		// setup the next bulk request, and its mock item failures;
+		// it contains 1 request, which will fail and re-added to the next bulk request
+		sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record")));
+		testHarness.processElement(new StreamRecord<>("msg"));
+		verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+
+		CheckedThread snapshotThread = new CheckedThread() {
+			@Override
+			public void go() throws Exception {
+				testHarness.snapshot(1L, 1000L);
+			}
+		};
+		snapshotThread.start();
+
+		// the snapshot should eventually be blocked before snapshot triggers flushing
+		while (snapshotThread.getState() != Thread.State.WAITING) {
+			Thread.sleep(10);
+		}
+
+		sink.continueFlush();
+
+		// since the previous flush should have resulted in a request re-add from the failure handler,
+		// we should have flushed again, and eventually be blocked before snapshot triggers the 2nd flush
+		while (snapshotThread.getState() != Thread.State.WAITING) {
+			Thread.sleep(10);
+		}
+
+		// current number of pending request should be 1 due to the re-add
+		Assert.assertEquals(1, sink.getNumPendingRequests());
+
+		// this time, let the bulk request succeed, so no-more requests are re-added
+		sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList((Exception) null));
+
+		sink.continueFlush();
+
+		// the snapshot should finish with no exceptions
+		snapshotThread.sync();
+
+		testHarness.close();
+	}
+
+	/**
+	 * This test is meant to assure that testAtLeastOnceSink is valid by testing that if flushing is disabled,
+	 * the snapshot method does indeed finishes without waiting for pending requests;
+	 * we set a timeout because the test will not finish if the logic is broken
+	 */
+	@Test(timeout=5000)
+	public void testDoesNotWaitForPendingRequestsIfFlushingDisabled() throws Exception {
+		final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>(
+			new HashMap<String, String>(), new SimpleSinkFunction<String>(), new DummyRetryFailureHandler());
+		sink.disableFlushOnCheckpoint(); // disable flushing
+
+		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
+			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink));
+
+		testHarness.open();
+
+		// setup the next bulk request, and let bulk request succeed
+		sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record")));
+		testHarness.processElement(new StreamRecord<>("msg-1"));
+		verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+
+		// the snapshot should not block even though we haven't flushed the bulk request
+		testHarness.snapshot(1L, 1000L);
+
+		testHarness.close();
+	}
+
+	private static class DummyElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
+
+		private static final long serialVersionUID = 5051907841570096991L;
+
+		private transient BulkProcessor mockBulkProcessor;
+		private transient BulkRequest nextBulkRequest = new BulkRequest();
+		private transient MultiShotLatch flushLatch = new MultiShotLatch();
+
+		private List<? extends Throwable> mockItemFailuresList;
+		private Throwable nextBulkFailure;
+
+		public DummyElasticsearchSink(
+				Map<String, String> userConfig,
+				ElasticsearchSinkFunction<T> sinkFunction,
+				ActionRequestFailureHandler failureHandler) {
+			super(new DummyElasticsearchApiCallBridge(), userConfig, sinkFunction, failureHandler);
+		}
+
+		/**
+		 * This method is used to mimic a scheduled bulk request; we need to do this
+		 * manually because we are mocking the BulkProcessor
+		 */
+		public void manualBulkRequestWithAllPendingRequests() {
+			flushLatch.trigger(); // let the flush
+			mockBulkProcessor.flush();
+		}
+
+		/**
+		 * On non-manual flushes, i.e. when flush is called in the snapshot method implementation,
+		 * usages need to explicitly call this to allow the flush to continue. This is useful
+		 * to make sure that specific requests get added to the the next bulk request for flushing.
+		 */
+		public void continueFlush() {
+			flushLatch.trigger();
+		}
+
+		/**
+		 * Set the list of mock failures to use for the next bulk of item responses. A {@code null}
+		 * means that the response is successful, failed otherwise.
+		 *
+		 * The list is used with corresponding order to the requests in the bulk, i.e. the first
+		 * request uses the response at index 0, the second requests uses the response at index 1, etc.
+		 */
+		public void setMockItemFailuresListForNextBulkItemResponses(List<? extends Throwable> mockItemFailuresList) {
+			this.mockItemFailuresList = mockItemFailuresList;
+		}
+
+		/**
+		 * Let the next bulk request fail completely with the provided throwable.
+		 * If this is set, the failures list provided with setMockItemFailuresListForNextBulkItemResponses is not respected.
+		 */
+		public void setFailNextBulkRequestCompletely(Throwable failure) {
+			this.nextBulkFailure = failure;
+		}
+
+		public BulkProcessor getMockBulkProcessor() {
+			return mockBulkProcessor;
+		}
+
+		/**
+		 * Override the bulk processor build process to provide a mock implementation,
+		 * but reuse the listener implementation in our mock to test that the listener logic
+		 * works correctly with request flushing logic.
+		 */
+		@Override
+		protected BulkProcessor buildBulkProcessor(final BulkProcessor.Listener listener) {
+			this.mockBulkProcessor = mock(BulkProcessor.class);
+
+			when(mockBulkProcessor.add(any(ActionRequest.class))).thenAnswer(new Answer<Object>() {
+				@Override
+				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+					// intercept the request and add it to our mock bulk request
+					nextBulkRequest.add(invocationOnMock.getArgumentAt(0, ActionRequest.class));
+
+					return null;
+				}
+			});
+
+			doAnswer(new Answer() {
+				@Override
+				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+					while (nextBulkRequest.numberOfActions() > 0) {
+						// wait until we are allowed to continue with the flushing
+						flushLatch.await();
+
+						// create a copy of the accumulated mock requests, so that
+						// re-added requests from the failure handler are included in the next bulk
+						BulkRequest currentBulkRequest = nextBulkRequest;
+						nextBulkRequest = new BulkRequest();
+
+						listener.beforeBulk(123L, currentBulkRequest);
+
+						if (nextBulkFailure == null) {
+							BulkItemResponse[] mockResponses = new BulkItemResponse[currentBulkRequest.requests().size()];
+							for (int i = 0; i < currentBulkRequest.requests().size(); i++) {
+								Throwable mockItemFailure = mockItemFailuresList.get(i);
+
+								if (mockItemFailure == null) {
+									// the mock response for the item is success
+									mockResponses[i] = new BulkItemResponse(i, "opType", mock(ActionResponse.class));
+								} else {
+									// the mock response for the item is failure
+									mockResponses[i] = new BulkItemResponse(i, "opType", new BulkItemResponse.Failure("index", "type", "id", mockItemFailure));
+								}
+							}
+
+							listener.afterBulk(123L, currentBulkRequest, new BulkResponse(mockResponses, 1000L));
+						} else {
+							listener.afterBulk(123L, currentBulkRequest, nextBulkFailure);
+						}
+					}
+
+					return null;
+				}
+			}).when(mockBulkProcessor).flush();
+
+			return mockBulkProcessor;
+		}
+	}
+
+	private static class DummyElasticsearchApiCallBridge implements ElasticsearchApiCallBridge {
+
+		private static final long serialVersionUID = -4272760730959041699L;
+
+		@Override
+		public Client createClient(Map<String, String> clientConfig) {
+			return mock(Client.class);
+		}
+
+		@Nullable
+		@Override
+		public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
+			if (bulkItemResponse.isFailed()) {
+				return new Exception(bulkItemResponse.getFailure().getMessage());
+			} else {
+				return null;
+			}
+		}
+
+		@Override
+		public void configureBulkProcessorBackoff(BulkProcessor.Builder builder, @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
+			// no need for this in the test cases here
+		}
+
+		@Override
+		public void cleanup() {
+			// nothing to cleanup
+		}
+	}
+
+	private static class SimpleSinkFunction<String> implements ElasticsearchSinkFunction<String> {
+
+		private static final long serialVersionUID = -176739293659135148L;
+
+		@Override
+		public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+			Map<java.lang.String, Object> json = new HashMap<>();
+			json.put("data", element);
+
+			indexer.add(
+				Requests.indexRequest()
+					.index("index")
+					.type("type")
+					.id("id")
+					.source(json)
+			);
+		}
+	}
+
+	private static class DummyRetryFailureHandler implements ActionRequestFailureHandler {
+
+		private static final long serialVersionUID = 5400023700099200745L;
+
+		@Override
+		public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
+			indexer.add(action);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
index 375d739..2298986 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.connectors.elasticsearch;
 
-import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.index.IndexRequest;
@@ -106,7 +106,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
 	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
 	 */
 	public ElasticsearchSink(Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
-		this(userConfig, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler());
+		this(userConfig, elasticsearchSinkFunction, new NoOpFailureHandler());
 	}
 
 	/**
@@ -117,7 +117,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
 	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
 	 */
 	public ElasticsearchSink(Map<String, String> userConfig, List<TransportAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
-		this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler());
+		this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpFailureHandler());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
index 2210f63..6d771d4 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
@@ -18,7 +18,7 @@ package org.apache.flink.streaming.connectors.elasticsearch2;
 
 import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
-import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.client.transport.TransportClient;
@@ -89,7 +89,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
 		List<InetSocketAddress> transportAddresses,
 		org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
 
-		this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler());
+		this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpFailureHandler());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
index 175b4fa..61023c2 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.elasticsearch5;
 import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.client.transport.TransportClient;
@@ -75,7 +75,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
 		List<InetSocketAddress> transportAddresses,
 		ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
 
-		this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler());
+		this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpFailureHandler());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 69c2692..fea25ff 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -261,6 +261,30 @@ public final class ExceptionUtils {
 		}
 	}
 
+	/**
+	 * Checks whether a throwable chain contains a specific type of exception.
+	 *
+	 * @param throwable the throwable chain to check.
+	 * @param searchType the type of exception to search for in the chain.
+	 * @return True, if the searched type is nested in the throwable, false otherwise.
+	 */
+	public static boolean containsThrowable(Throwable throwable, Class searchType) {
+		if (throwable == null || searchType == null) {
+			return false;
+		}
+
+		Throwable t = throwable;
+		while (t != null) {
+			if (searchType.isAssignableFrom(t.getClass())) {
+				return true;
+			} else {
+				t = t.getCause();
+			}
+		}
+
+		return false;
+	}
+
 	// ------------------------------------------------------------------------
 
 	/** Private constructor to prevent instantiation. */

http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index 219bf2a..d4a031c 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -325,6 +325,16 @@ public final class InstantiationUtil {
 		oos.writeObject(o);
 	}
 
+	public static boolean isSerializable(Object o) {
+		try {
+			serializeObject(o);
+		} catch (IOException e) {
+			return false;
+		}
+
+		return true;
+	}
+
 	/**
 	 * Clones the given serializable object using Java serialization.
 	 *


[2/3] flink git commit: [FLINK-5353] [elasticsearch] User-provided failure handler for ElasticsearchSink

Posted by tz...@apache.org.
[FLINK-5353] [elasticsearch] User-provided failure handler for ElasticsearchSink

This commit fixes both FLINK-5353 and FLINK-5122. It allows users to implement a
failure handler to control how failed action requests are dealt with.

The commit also includes general improvements to FLINK-5122:
1. Use the built-in backoff functionality in the Elasticsearch BulkProcessor (not
available for Elasticsearch 1.x)
2. Migrate the `checkErrorAndRetryBulk` functionality to the new failure handler


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3743e898
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3743e898
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3743e898

Branch: refs/heads/master
Commit: 3743e898104d79a9813d444d38fa9f86617bb5ef
Parents: aaac7c2
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Jan 30 13:55:26 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Feb 24 22:58:40 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/elasticsearch.md            | 121 +++++++++++--
 .../ActionRequestFailureHandler.java            |  72 ++++++++
 .../ElasticsearchApiCallBridge.java             |  12 ++
 .../elasticsearch/ElasticsearchSinkBase.java    | 174 ++++++++++++-------
 .../util/NoOpActionRequestFailureHandler.java   |  37 ++++
 .../Elasticsearch1ApiCallBridge.java            |  10 ++
 .../elasticsearch/ElasticsearchSink.java        |  37 +++-
 .../Elasticsearch2ApiCallBridge.java            |  31 ++++
 .../elasticsearch2/ElasticsearchSink.java       |  29 +++-
 .../Elasticsearch5ApiCallBridge.java            |  31 ++++
 .../elasticsearch5/ElasticsearchSink.java       |  31 +++-
 11 files changed, 499 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/docs/dev/connectors/elasticsearch.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md
index 4388b9a..2ca1f9b 100644
--- a/docs/dev/connectors/elasticsearch.md
+++ b/docs/dev/connectors/elasticsearch.md
@@ -23,6 +23,9 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+* This will be replaced by the TOC
+{:toc}
+
 This connector provides sinks that can request document actions to an
 [Elasticsearch](https://elastic.co/) Index. To use this connector, add one
 of the following dependencies to your project, depending on the version
@@ -59,14 +62,14 @@ Note that the streaming connectors are currently not part of the binary
 distribution. See [here]({{site.baseurl}}/dev/linking.html) for information
 about how to package the program with the libraries for cluster execution.
 
-#### Installing Elasticsearch
+## Installing Elasticsearch
 
 Instructions for setting up an Elasticsearch cluster can be found
 [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
 Make sure to set and remember a cluster name. This must be set when
 creating an `ElasticsearchSink` for requesting document actions against your cluster.
 
-#### Elasticsearch Sink
+## Elasticsearch Sink
 
 The `ElasticsearchSink` uses a `TransportClient` to communicate with an
 Elasticsearch cluster.
@@ -200,15 +203,13 @@ request for each incoming element. Generally, the `ElasticsearchSinkFunction`
 can be used to perform multiple requests of different types (ex.,
 `DeleteRequest`, `UpdateRequest`, etc.). 
 
-Internally, the sink uses a `BulkProcessor` to send action requests to the cluster.
-This will buffer elements before sending them in bulk to the cluster. The behaviour of the
-`BulkProcessor` can be set using these config keys in the provided `Map` configuration:
- * **bulk.flush.max.actions**: Maximum amount of elements to buffer
- * **bulk.flush.max.size.mb**: Maximum amount of data (in megabytes) to buffer
- * **bulk.flush.interval.ms**: Interval at which to flush data regardless of the other two
-  settings in milliseconds
+Internally, each parallel instance of the Flink Elasticsearch Sink uses
+a `BulkProcessor` to send action requests to the cluster.
+This will buffer elements before sending them in bulk to the cluster. The `BulkProcessor`
+executes bulk requests one at a time, i.e. there will be no two concurrent
+flushes of the buffered actions in progress.
 
-#### Communication using Embedded Node (only for Elasticsearch 1.x)
+### Communication using Embedded Node (only for Elasticsearch 1.x)
 
 For Elasticsearch versions 1.x, communication using an embedded node is
 also supported. See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
@@ -272,14 +273,106 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String
 The difference is that now we do not need to provide a list of addresses
 of Elasticsearch nodes.
 
-Optionally, the sink can try to re-execute the bulk request when the error
-message matches certain patterns indicating a timeout or a overloaded cluster.
-This behaviour is disabled by default and can be enabled by setting `checkErrorAndRetryBulk(true)`.
+### Handling Failing Elasticsearch Requests
+
+Elasticsearch action requests may fail due to a variety of reasons, including
+temporarily saturated node queue capacity or malformed documents to be indexed.
+The Flink Elasticsearch Sink allows the user to specify how request
+failures are handled, by simply implementing an `ActionRequestFailureHandler` and
+providing it to the constructor.
+
+Below is an example:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<String> input = ...;
+
+input.addSink(new ElasticsearchSink<>(
+    config, transportAddresses,
+    new ElasticsearchSinkFunction<String>() {...},
+    new ActionRequestFailureHandler() {
+        @Override
+        boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
+            // this example uses Apache Commons to search for nested exceptions
+            
+            if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) {
+                // full queue; re-add document for indexing
+                indexer.add(action);
+                return false;
+            } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) >= 0) {
+                // malformed document; simply drop request without failing sink
+                return false;
+            } else {
+                // for all other failures, fail the sink
+                return true;
+            }
+        }
+}));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[String] = ...
+
+input.addSink(new ElasticsearchSink(
+    config, transportAddresses,
+    new ElasticsearchSinkFunction[String] {...},
+    new ActionRequestFailureHandler {
+        override def onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
+            // this example uses Apache Commons to search for nested exceptions
+
+            if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) {
+                // full queue; re-add document for indexing
+                indexer.add(action)
+                return false
+            } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) {
+                // malformed document; simply drop request without failing sink
+                return false
+            } else {
+                // for all other failures, fail the sink
+                return true
+            }
+        }
+}))
+{% endhighlight %}
+</div>
+</div>
 
+The above example will let the sink re-add requests that failed due to
+queue capacity saturation and drop requests with malformed documents, without
+failing the sink. For all other failures, the sink will fail. If a `ActionRequestFailureHandler`
+is not provided to the constructor, the sink will fail for any kind of error.
+
+Note that `onFailure` is called for failures that still occur only after the
+`BulkProcessor` internally finishes all backoff retry attempts.
+By default, the `BulkProcessor` retries to a maximum of 8 attempts with
+an exponential backoff. For more information on the behaviour of the
+internal `BulkProcessor` and how to configure it, please see the following section.
+ 
+### Configuring the Internal Bulk Processor
+
+The internal `BulkProcessor` can be further configured for its behaviour
+on how buffered action requests are flushed, by setting the following values in
+the provided `Map<String, String>`:
+
+ * **bulk.flush.max.actions**: Maximum amount of actions to buffer before flushing.
+ * **bulk.flush.max.size.mb**: Maximum size of data (in megabytes) to buffer before flushing.
+ * **bulk.flush.interval.ms**: Interval at which to flush regardless of the amount or size of buffered actions.
+ 
+For versions 2.x and above, configuring how temporary request errors are
+retried is also supported:
+ 
+ * **bulk.flush.backoff.enable**: Whether or not to perform retries with backoff delay for a flush
+ if one or more of its actions failed due to a temporary `EsRejectedExecutionException`.
+ * **bulk.flush.backoff.type**: The type of backoff delay, either `CONSTANT` or `EXPONENTIAL`
+ * **bulk.flush.backoff.delay**: The amount of delay for backoff. For constant backoff, this
+ is simply the delay between each retry. For exponential backoff, this is the initial base delay.
+ * **bulk.flush.backoff.retries**: The amount of backoff retries to attempt.
 
 More information about Elasticsearch can be found [here](https://elastic.co).
 
-#### Packaging the Elasticsearch Connector into an Uber-Jar
+## Packaging the Elasticsearch Connector into an Uber-Jar
 
 For the execution of your Flink program, it is recommended to build a
 so-called uber-jar (executable jar) containing all your dependencies

http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
new file mode 100644
index 0000000..45d04fc
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.elasticsearch.action.ActionRequest;
+
+import java.io.Serializable;
+
+/**
+ * An implementation of {@link ActionRequestFailureHandler} is provided by the user to define how failed
+ * {@link ActionRequest ActionRequests} should be handled, ex. dropping them, reprocessing malformed documents, or
+ * simply requesting them to be sent to Elasticsearch again if the failure is only temporary.
+ *
+ * <p>
+ * Example:
+ *
+ * <pre>{@code
+ *
+ *	private static class ExampleActionRequestFailureHandler implements ActionRequestFailureHandler {
+ *
+ *		@Override
+ *		boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
+ *			// this example uses Apache Commons to search for nested exceptions
+ *
+ *			if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) {
+ *				// full queue; re-add document for indexing
+ *				indexer.add(action);
+ *				return false;
+ *			} else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) {
+ *				// malformed document; simply drop request without failing sink
+ *				return false;
+ *			} else {
+ *				// for all other failures, fail the sink
+ *				return true;
+ *			}
+ *		}
+ *	}
+ *
+ * }</pre>
+ *
+ * <p>
+ * The above example will let the sink re-add requests that failed due to queue capacity saturation and drop requests
+ * with malformed documents, without failing the sink. For all other failures, the sink will fail.
+ */
+public interface ActionRequestFailureHandler extends Serializable {
+
+	/**
+	 * Handle a failed {@link ActionRequest}.
+	 *
+	 * @param action the {@link ActionRequest} that failed due to the failure
+	 * @param failure the cause of failure
+	 * @param indexer request indexer to re-add the failed action, if intended to do so
+	 * @return the implementation should return {@code true} if the sink should fail due to this failure, and {@code false} otherwise
+	 */
+	boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
index 6298a85..b482432 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.elasticsearch;
 
 import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.client.Client;
 
 import javax.annotation.Nullable;
@@ -53,6 +54,17 @@ public interface ElasticsearchApiCallBridge extends Serializable {
 	@Nullable Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse);
 
 	/**
+	 * Set backoff-related configurations on the provided {@link BulkProcessor.Builder}.
+	 * The builder will be later on used to instantiate the actual {@link BulkProcessor}.
+	 *
+	 * @param builder the {@link BulkProcessor.Builder} to configure.
+	 * @param flushBackoffPolicy user-provided backoff retry settings ({@code null} if the user disabled backoff retries).
+	 */
+	void configureBulkProcessorBackoff(
+		BulkProcessor.Builder builder,
+		@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy);
+
+	/**
 	 * Perform any necessary state cleanup.
 	 */
 	void cleanup();

http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 7977fc0..2c29865 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -21,24 +21,23 @@ import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.util.InstantiationUtil;
-import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.action.ActionRequest;
-import org.elasticsearch.action.IndicesRequest;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -70,10 +69,56 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 	public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
 	public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
 	public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
+	public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = "bulk.flush.backoff.enable";
+	public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = "bulk.flush.backoff.type";
+	public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries";
+	public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay";
+
+	public enum FlushBackoffType {
+		CONSTANT,
+		EXPONENTIAL
+	}
+
+	public class BulkFlushBackoffPolicy implements Serializable {
+
+		private static final long serialVersionUID = -6022851996101826049L;
+
+		// the default values follow the Elasticsearch default settings for BulkProcessor
+		private FlushBackoffType backoffType = FlushBackoffType.EXPONENTIAL;
+		private int maxRetryCount = 8;
+		private long delayMillis = 50;
+
+		public FlushBackoffType getBackoffType() {
+			return backoffType;
+		}
+
+		public int getMaxRetryCount() {
+			return maxRetryCount;
+		}
+
+		public long getDelayMillis() {
+			return delayMillis;
+		}
+
+		public void setBackoffType(FlushBackoffType backoffType) {
+			this.backoffType = checkNotNull(backoffType);
+		}
+
+		public void setMaxRetryCount(int maxRetryCount) {
+			checkArgument(maxRetryCount > 0);
+			this.maxRetryCount = maxRetryCount;
+		}
+
+		public void setDelayMillis(long delayMillis) {
+			checkArgument(delayMillis > 0);
+			this.delayMillis = delayMillis;
+		}
+	}
 
 	private final Integer bulkProcessorFlushMaxActions;
 	private final Integer bulkProcessorFlushMaxSizeMb;
 	private final Integer bulkProcessorFlushIntervalMillis;
+	private final BulkFlushBackoffPolicy bulkProcessorFlushBackoffPolicy;
 
 	// ------------------------------------------------------------------------
 	//  User-facing API and configuration
@@ -85,16 +130,12 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 	/** The function that is used to construct mulitple {@link ActionRequest ActionRequests} from each incoming element. */
 	private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
 
+	/** User-provided handler for failed {@link ActionRequest ActionRequests}. */
+	private final ActionRequestFailureHandler failureHandler;
+
 	/** Provided to the user via the {@link ElasticsearchSinkFunction} to add {@link ActionRequest ActionRequests}. */
 	private transient BulkProcessorIndexer requestIndexer;
 
-	/**
-	 * When set to <code>true</code> and the bulk action fails, the error message will be checked for
-	 * common patterns like <i>timeout</i>, <i>UnavailableShardsException</i> or a full buffer queue on the node.
-	 * When a matching pattern is found, the bulk will be retried.
-	 */
-	protected boolean checkErrorAndRetryBulk = false;
-
 	// ------------------------------------------------------------------------
 	//  Internals for the Flink Elasticsearch Sink
 	// ------------------------------------------------------------------------
@@ -109,21 +150,28 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 	private transient BulkProcessor bulkProcessor;
 
 	/**
-	 * This is set from inside the {@link BulkProcessor.Listener} if a {@link Throwable} was thrown in callbacks.
+	 * This is set from inside the {@link BulkProcessor.Listener} if a {@link Throwable} was thrown in callbacks and
+	 * the user considered it should fail the sink via the
+	 * {@link ActionRequestFailureHandler#onFailure(ActionRequest, Throwable, RequestIndexer)} method.
+	 *
+	 * Errors will be checked and rethrown before processing each input element, and when the sink is closed.
 	 */
 	private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
 
 	public ElasticsearchSinkBase(
 		ElasticsearchApiCallBridge callBridge,
 		Map<String, String> userConfig,
-		ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+		ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+		ActionRequestFailureHandler failureHandler) {
 
 		this.callBridge = checkNotNull(callBridge);
 		this.elasticsearchSinkFunction = checkNotNull(elasticsearchSinkFunction);
+		this.failureHandler = checkNotNull(failureHandler);
 
-		// we eagerly check if the user-provided sink function is serializable;
-		// otherwise, if it isn't serializable, users will merely get a non-informative error message
+		// we eagerly check if the user-provided sink function and failure handler is serializable;
+		// otherwise, if they aren't serializable, users will merely get a non-informative error message
 		// "ElasticsearchSinkBase is not serializable"
+
 		try {
 			InstantiationUtil.serializeObject(elasticsearchSinkFunction);
 		} catch (Exception e) {
@@ -132,10 +180,19 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 				"The object probably contains or references non serializable fields.");
 		}
 
-		checkNotNull(userConfig);
+		try {
+			InstantiationUtil.serializeObject(failureHandler);
+		} catch (Exception e) {
+			throw new IllegalArgumentException(
+				"The implementation of the provided ActionRequestFailureHandler is not serializable. " +
+					"The object probably contains or references non serializable fields.");
+		}
 
 		// extract and remove bulk processor related configuration from the user-provided config,
 		// so that the resulting user config only contains configuration related to the Elasticsearch client.
+
+		checkNotNull(userConfig);
+
 		ParameterTool params = ParameterTool.fromMap(userConfig);
 
 		if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
@@ -159,6 +216,31 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 			bulkProcessorFlushIntervalMillis = null;
 		}
 
+		boolean bulkProcessorFlushBackoffEnable = params.getBoolean(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, true);
+		userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE);
+
+		if (bulkProcessorFlushBackoffEnable) {
+			this.bulkProcessorFlushBackoffPolicy = new BulkFlushBackoffPolicy();
+
+			if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE)) {
+				bulkProcessorFlushBackoffPolicy.setBackoffType(FlushBackoffType.valueOf(params.get(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE)));
+				userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE);
+			}
+
+			if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES)) {
+				bulkProcessorFlushBackoffPolicy.setMaxRetryCount(params.getInt(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES));
+				userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES);
+			}
+
+			if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY)) {
+				bulkProcessorFlushBackoffPolicy.setDelayMillis(params.getLong(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY));
+				userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY);
+			}
+
+		} else {
+			bulkProcessorFlushBackoffPolicy = null;
+		}
+
 		this.userConfig = userConfig;
 	}
 
@@ -175,48 +257,30 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 				@Override
 				public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
 					if (response.hasFailures()) {
-						boolean allRequestsRepeatable = true;
+						BulkItemResponse itemResponse;
+						Throwable failure;
 
-						for (BulkItemResponse itemResp : response.getItems()) {
-							Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
+						for (int i = 0; i < response.getItems().length; i++) {
+							itemResponse = response.getItems()[i];
+							failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
 							if (failure != null) {
-								String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
-
-								// Check if index request can be retried
-								if (checkErrorAndRetryBulk && (
-									failureMessageLowercase.contains("timeout") ||
-										failureMessageLowercase.contains("timed out") || // Generic timeout errors
-										failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) || // Shard not available due to rebalancing or node down
-										(failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")))) // Bulk index queue on node full
-								{
-									LOG.debug("Retry bulk: {}", itemResp.getFailureMessage());
-								} else {
-									// Cannot retry action
-									allRequestsRepeatable = false;
-									LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
+								LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);
+								if (failureHandler.onFailure(request.requests().get(i), failure, requestIndexer)) {
 									failureThrowable.compareAndSet(null, failure);
 								}
 							}
 						}
-
-						if (allRequestsRepeatable) {
-							reAddBulkRequest(request);
-						}
 					}
 				}
 
 				@Override
 				public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
-					if (checkErrorAndRetryBulk && (
-						failure instanceof ClusterBlockException // Examples: "no master"
-							|| failure instanceof ElasticsearchTimeoutException) // ElasticsearchTimeoutException sounded good, not seen in stress tests yet
-						)
-					{
-						LOG.debug("Retry bulk on throwable: {}", failure.getMessage());
-						reAddBulkRequest(request);
-					} else {
-						LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure);
-						failureThrowable.compareAndSet(null, failure);
+					LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause());
+
+					// whole bulk request failures are usually just temporary timeouts on
+					// the Elasticsearch side; simply retry all action requests in the bulk
+					for (ActionRequest action : request.requests()) {
+						requestIndexer.add(action);
 					}
 				}
 			}
@@ -237,6 +301,9 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 			bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(bulkProcessorFlushIntervalMillis));
 		}
 
+		// if backoff retrying is disabled, bulkProcessorFlushBackoffPolicy will be null
+		callBridge.configureBulkProcessorBackoff(bulkProcessorBuilder, bulkProcessorFlushBackoffPolicy);
+
 		bulkProcessor = bulkProcessorBuilder.build();
 		requestIndexer = new BulkProcessorIndexer(bulkProcessor);
 	}
@@ -267,21 +334,6 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 		checkErrorAndRethrow();
 	}
 
-	/**
-	 * Adds all requests of the bulk to the BulkProcessor. Used when trying again.
-	 * @param bulkRequest
-	 */
-	public void reAddBulkRequest(BulkRequest bulkRequest) {
-		//TODO Check what happens when bulk contains a DeleteAction and IndexActions and the DeleteAction fails because the document already has been deleted. This may not happen in typical Flink jobs.
-
-		for (IndicesRequest req : bulkRequest.subRequests()) {
-			if (req instanceof ActionRequest) {
-				// There is no waiting time between index requests, so this may produce additional pressure on cluster
-				bulkProcessor.add((ActionRequest<?>) req);
-			}
-		}
-	}
-
 	private void checkErrorAndRethrow() {
 		Throwable cause = failureThrowable.get();
 		if (cause != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java
new file mode 100644
index 0000000..09173a2
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.util;
+
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.elasticsearch.action.ActionRequest;
+
+/**
+ * An {@link ActionRequestFailureHandler} that simply fails the sink on any failures.
+ */
+public class NoOpActionRequestFailureHandler implements ActionRequestFailureHandler {
+
+	private static final long serialVersionUID = 737941343410827885L;
+
+	@Override
+	public boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
+		// simply fail the sink
+		return true;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
index 098afa9..8a59da9 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.elasticsearch;
 
 import org.apache.flink.util.Preconditions;
 import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
@@ -27,6 +28,7 @@ import org.elasticsearch.node.Node;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Map;
 
@@ -119,6 +121,14 @@ public class Elasticsearch1ApiCallBridge implements ElasticsearchApiCallBridge {
 	}
 
 	@Override
+	public void configureBulkProcessorBackoff(
+		BulkProcessor.Builder builder,
+		@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
+		// Elasticsearch 1.x does not support backoff retries for failed bulk requests
+		LOG.warn("Elasticsearch 1.x does not support backoff retries.");
+	}
+
+	@Override
 	public void cleanup() {
 		if (node != null && !node.isClosed()) {
 			node.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
index c338860..375d739 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.elasticsearch;
 
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpActionRequestFailureHandler;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.index.IndexRequest;
@@ -105,7 +106,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
 	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
 	 */
 	public ElasticsearchSink(Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
-		super(new Elasticsearch1ApiCallBridge(), userConfig, elasticsearchSinkFunction);
+		this(userConfig, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler());
 	}
 
 	/**
@@ -116,6 +117,38 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
 	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
 	 */
 	public ElasticsearchSink(Map<String, String> userConfig, List<TransportAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
-		super(new Elasticsearch1ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction);
+		this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler());
+	}
+
+	/**
+	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using an embedded {@link Node}.
+	 *
+	 * @param userConfig The map of user settings that are used when constructing the embedded {@link Node} and {@link BulkProcessor}
+	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
+	 * @param failureHandler This is used to handle failed {@link ActionRequest}
+	 */
+	public ElasticsearchSink(
+		Map<String, String> userConfig,
+		ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+		ActionRequestFailureHandler failureHandler) {
+
+		super(new Elasticsearch1ApiCallBridge(), userConfig, elasticsearchSinkFunction, failureHandler);
+	}
+
+	/**
+	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
+	 *
+	 * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor}
+	 * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
+	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
+	 * @param failureHandler This is used to handle failed {@link ActionRequest}
+	 */
+	public ElasticsearchSink(
+		Map<String, String> userConfig,
+		List<TransportAddress> transportAddresses,
+		ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+		ActionRequestFailureHandler failureHandler) {
+
+		super(new Elasticsearch1ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction, failureHandler);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
index 9407d9f..e85daf5 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
@@ -18,16 +18,21 @@
 package org.apache.flink.streaming.connectors.elasticsearch2;
 
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
 import org.apache.flink.util.Preconditions;
+import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.TimeValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
@@ -84,6 +89,32 @@ public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge {
 	}
 
 	@Override
+	public void configureBulkProcessorBackoff(
+		BulkProcessor.Builder builder,
+		@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
+
+		BackoffPolicy backoffPolicy;
+		if (flushBackoffPolicy != null) {
+			switch (flushBackoffPolicy.getBackoffType()) {
+				case CONSTANT:
+					backoffPolicy = BackoffPolicy.constantBackoff(
+						new TimeValue(flushBackoffPolicy.getDelayMillis()),
+						flushBackoffPolicy.getMaxRetryCount());
+					break;
+				case EXPONENTIAL:
+				default:
+					backoffPolicy = BackoffPolicy.exponentialBackoff(
+						new TimeValue(flushBackoffPolicy.getDelayMillis()),
+						flushBackoffPolicy.getMaxRetryCount());
+			}
+		} else {
+			backoffPolicy = BackoffPolicy.noBackoff();
+		}
+
+		builder.setBackoffPolicy(backoffPolicy);
+	}
+
+	@Override
 	public void cleanup() {
 		// nothing to cleanup
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
index a0abc51..2210f63 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
@@ -16,7 +16,9 @@
  */
 package org.apache.flink.streaming.connectors.elasticsearch2;
 
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpActionRequestFailureHandler;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.client.transport.TransportClient;
@@ -82,9 +84,28 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
 	 * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
 	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
 	 */
-	public ElasticsearchSink(Map<String, String> userConfig,
-							List<InetSocketAddress> transportAddresses,
-							org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
-		super(new Elasticsearch2ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction);
+	public ElasticsearchSink(
+		Map<String, String> userConfig,
+		List<InetSocketAddress> transportAddresses,
+		org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+
+		this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler());
+	}
+
+	/**
+	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
+	 *
+	 * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor}
+	 * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
+	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
+	 * @param failureHandler This is used to handle failed {@link ActionRequest}
+	 */
+	public ElasticsearchSink(
+		Map<String, String> userConfig,
+		List<InetSocketAddress> transportAddresses,
+		org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+		ActionRequestFailureHandler failureHandler) {
+
+		super(new Elasticsearch2ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction, failureHandler);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
index 1389e7d..c7d81f5 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
@@ -18,19 +18,24 @@
 package org.apache.flink.streaming.connectors.elasticsearch5;
 
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
 import org.apache.flink.util.Preconditions;
+import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.network.NetworkModule;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.transport.Netty3Plugin;
 import org.elasticsearch.transport.client.PreBuiltTransportClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
@@ -90,6 +95,32 @@ public class Elasticsearch5ApiCallBridge implements ElasticsearchApiCallBridge {
 	}
 
 	@Override
+	public void configureBulkProcessorBackoff(
+		BulkProcessor.Builder builder,
+		@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
+
+		BackoffPolicy backoffPolicy;
+		if (flushBackoffPolicy != null) {
+			switch (flushBackoffPolicy.getBackoffType()) {
+				case CONSTANT:
+					backoffPolicy = BackoffPolicy.constantBackoff(
+						new TimeValue(flushBackoffPolicy.getDelayMillis()),
+						flushBackoffPolicy.getMaxRetryCount());
+					break;
+				case EXPONENTIAL:
+				default:
+					backoffPolicy = BackoffPolicy.exponentialBackoff(
+						new TimeValue(flushBackoffPolicy.getDelayMillis()),
+						flushBackoffPolicy.getMaxRetryCount());
+			}
+		} else {
+			backoffPolicy = BackoffPolicy.noBackoff();
+		}
+
+		builder.setBackoffPolicy(backoffPolicy);
+	}
+
+	@Override
 	public void cleanup() {
 		// nothing to cleanup
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
index 9107d4e..175b4fa 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
@@ -16,8 +16,10 @@
  */
 package org.apache.flink.streaming.connectors.elasticsearch5;
 
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpActionRequestFailureHandler;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.client.transport.TransportClient;
@@ -64,13 +66,32 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
 	/**
 	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
 	 *
-	 * @param userConfig The map of user settings that are used when constructing the {@link TransportClient}
+	 * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor}
 	 * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
 	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
 	 */
-	public ElasticsearchSink(Map<String, String> userConfig,
-							List<InetSocketAddress> transportAddresses,
-							ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
-		super(new Elasticsearch5ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction);
+	public ElasticsearchSink(
+		Map<String, String> userConfig,
+		List<InetSocketAddress> transportAddresses,
+		ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+
+		this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler());
+	}
+
+	/**
+	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
+	 *
+	 * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor}
+	 * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
+	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
+	 * @param failureHandler This is used to handle failed {@link ActionRequest}
+	 */
+	public ElasticsearchSink(
+		Map<String, String> userConfig,
+		List<InetSocketAddress> transportAddresses,
+		ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+		ActionRequestFailureHandler failureHandler) {
+
+		super(new Elasticsearch5ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction, failureHandler);
 	}
 }


[3/3] flink git commit: [FLINK-5122] [elasticsearch] Retry temporary Elasticsearch request errors.

Posted by tz...@apache.org.
[FLINK-5122] [elasticsearch] Retry temporary Elasticsearch request errors.

Covered exceptions are: Timeouts, No Master, UnavailableShardsException, bulk queue on node full


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aaac7c2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aaac7c2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aaac7c2e

Branch: refs/heads/master
Commit: aaac7c2e505717dbfc40465cb3656652e1bf5658
Parents: 0ba08b4
Author: Max Kuklinski <ma...@live.de>
Authored: Wed Nov 23 17:54:11 2016 +0100
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Feb 24 22:58:40 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/elasticsearch.md            |  5 ++
 .../elasticsearch/ElasticsearchSinkBase.java    | 62 ++++++++++++++++++--
 2 files changed, 63 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aaac7c2e/docs/dev/connectors/elasticsearch.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md
index 0f2d025..4388b9a 100644
--- a/docs/dev/connectors/elasticsearch.md
+++ b/docs/dev/connectors/elasticsearch.md
@@ -272,6 +272,11 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String
 The difference is that now we do not need to provide a list of addresses
 of Elasticsearch nodes.
 
+Optionally, the sink can try to re-execute the bulk request when the error
+message matches certain patterns indicating a timeout or a overloaded cluster.
+This behaviour is disabled by default and can be enabled by setting `checkErrorAndRetryBulk(true)`.
+
+
 More information about Elasticsearch can be found [here](https://elastic.co).
 
 #### Packaging the Elasticsearch Connector into an Uber-Jar

http://git-wip-us.apache.org/repos/asf/flink/blob/aaac7c2e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 6a2d65f..7977fc0 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -21,12 +21,15 @@ import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.IndicesRequest;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
@@ -85,6 +88,13 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 	/** Provided to the user via the {@link ElasticsearchSinkFunction} to add {@link ActionRequest ActionRequests}. */
 	private transient BulkProcessorIndexer requestIndexer;
 
+	/**
+	 * When set to <code>true</code> and the bulk action fails, the error message will be checked for
+	 * common patterns like <i>timeout</i>, <i>UnavailableShardsException</i> or a full buffer queue on the node.
+	 * When a matching pattern is found, the bulk will be retried.
+	 */
+	protected boolean checkErrorAndRetryBulk = false;
+
 	// ------------------------------------------------------------------------
 	//  Internals for the Flink Elasticsearch Sink
 	// ------------------------------------------------------------------------
@@ -165,20 +175,49 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 				@Override
 				public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
 					if (response.hasFailures()) {
+						boolean allRequestsRepeatable = true;
+
 						for (BulkItemResponse itemResp : response.getItems()) {
 							Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
 							if (failure != null) {
-								LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
-								failureThrowable.compareAndSet(null, failure);
+								String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
+
+								// Check if index request can be retried
+								if (checkErrorAndRetryBulk && (
+									failureMessageLowercase.contains("timeout") ||
+										failureMessageLowercase.contains("timed out") || // Generic timeout errors
+										failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) || // Shard not available due to rebalancing or node down
+										(failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")))) // Bulk index queue on node full
+								{
+									LOG.debug("Retry bulk: {}", itemResp.getFailureMessage());
+								} else {
+									// Cannot retry action
+									allRequestsRepeatable = false;
+									LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
+									failureThrowable.compareAndSet(null, failure);
+								}
 							}
 						}
+
+						if (allRequestsRepeatable) {
+							reAddBulkRequest(request);
+						}
 					}
 				}
 
 				@Override
 				public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
-					LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure);
-					failureThrowable.compareAndSet(null, failure);
+					if (checkErrorAndRetryBulk && (
+						failure instanceof ClusterBlockException // Examples: "no master"
+							|| failure instanceof ElasticsearchTimeoutException) // ElasticsearchTimeoutException sounded good, not seen in stress tests yet
+						)
+					{
+						LOG.debug("Retry bulk on throwable: {}", failure.getMessage());
+						reAddBulkRequest(request);
+					} else {
+						LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure);
+						failureThrowable.compareAndSet(null, failure);
+					}
 				}
 			}
 		);
@@ -228,6 +267,21 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 		checkErrorAndRethrow();
 	}
 
+	/**
+	 * Adds all requests of the bulk to the BulkProcessor. Used when trying again.
+	 * @param bulkRequest
+	 */
+	public void reAddBulkRequest(BulkRequest bulkRequest) {
+		//TODO Check what happens when bulk contains a DeleteAction and IndexActions and the DeleteAction fails because the document already has been deleted. This may not happen in typical Flink jobs.
+
+		for (IndicesRequest req : bulkRequest.subRequests()) {
+			if (req instanceof ActionRequest) {
+				// There is no waiting time between index requests, so this may produce additional pressure on cluster
+				bulkProcessor.add((ActionRequest<?>) req);
+			}
+		}
+	}
+
 	private void checkErrorAndRethrow() {
 		Throwable cause = failureThrowable.get();
 		if (cause != null) {