You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/02/24 15:05:22 UTC
[1/3] flink git commit: [FLINK-5487] [elasticsearch] At-least-once
Elasticsearch Sink
Repository: flink
Updated Branches:
refs/heads/master 0ba08b444 -> 2437da6e5
[FLINK-5487] [elasticsearch] At-least-once Elasticsearch Sink
This closes #3358.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2437da6e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2437da6e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2437da6e
Branch: refs/heads/master
Commit: 2437da6e54cb48c4e29116b8789fbe4782b17ea7
Parents: 3743e89
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Feb 20 16:50:19 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Feb 24 22:58:40 2017 +0800
----------------------------------------------------------------------
docs/dev/connectors/elasticsearch.md | 94 ++-
.../flink-connector-elasticsearch-base/pom.xml | 8 +
.../ActionRequestFailureHandler.java | 29 +-
.../elasticsearch/BulkProcessorIndexer.java | 15 +-
.../elasticsearch/ElasticsearchSinkBase.java | 236 +++++---
.../util/NoOpActionRequestFailureHandler.java | 37 --
.../elasticsearch/util/NoOpFailureHandler.java | 37 ++
.../RetryRejectedExecutionFailureHandler.java | 46 ++
.../ElasticsearchSinkBaseTest.java | 570 +++++++++++++++++++
.../elasticsearch/ElasticsearchSink.java | 6 +-
.../elasticsearch2/ElasticsearchSink.java | 4 +-
.../elasticsearch5/ElasticsearchSink.java | 4 +-
.../org/apache/flink/util/ExceptionUtils.java | 24 +
.../apache/flink/util/InstantiationUtil.java | 10 +
14 files changed, 969 insertions(+), 151 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/docs/dev/connectors/elasticsearch.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md
index 2ca1f9b..3fba7f0 100644
--- a/docs/dev/connectors/elasticsearch.md
+++ b/docs/dev/connectors/elasticsearch.md
@@ -209,6 +209,41 @@ This will buffer elements before sending them in bulk to the cluster. The `BulkP
executes bulk requests one at a time, i.e. there will be no two concurrent
flushes of the buffered actions in progress.
+### Elasticsearch Sinks and Fault Tolerance
+
+With Flink\u2019s checkpointing enabled, the Flink Elasticsearch Sink guarantees
+at-least-once delivery of action requests to Elasticsearch clusters. It does
+so by waiting for all pending action requests in the `BulkProcessor` at the
+time of checkpoints. This effectively assures that all requests before the
+checkpoint was triggered have been successfully acknowledged by Elasticsearch, before
+proceeding to process more records sent to the sink.
+
+More details on checkpoints and fault tolerance are in the [fault tolerance docs]({{site.baseurl}}/internals/stream_checkpointing.html).
+
+To use fault tolerant Elasticsearch Sinks, checkpointing of the topology needs to be enabled at the execution environment:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+</div>
+</div>
+
+<p style="border-radius: 5px; padding: 5px" class="bg-danger">
+<b>NOTE</b>: Users can disable flushing if they wish to do so, by calling
+<b>disableFlushOnCheckpoint()</b> on the created <b>ElasticsearchSink</b>. Be aware
+that this essentially means the sink will not provide any strong
+delivery guarantees anymore, even with checkpoint for the topology enabled.
+</p>
+
### Communication using Embedded Node (only for Elasticsearch 1.x)
For Elasticsearch versions 1.x, communication using an embedded node is
@@ -293,19 +328,20 @@ input.addSink(new ElasticsearchSink<>(
new ElasticsearchSinkFunction<String>() {...},
new ActionRequestFailureHandler() {
@Override
- boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
- // this example uses Apache Commons to search for nested exceptions
-
- if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) {
+ void onFailure(ActionRequest action,
+ Throwable failure,
+ int restStatusCode,
+ RequestIndexer indexer) throw Throwable {
+
+ if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
// full queue; re-add document for indexing
indexer.add(action);
- return false;
- } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) >= 0) {
+ } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
// malformed document; simply drop request without failing sink
- return false;
} else {
// for all other failures, fail the sink
- return true;
+ // here the failure is simply rethrown, but users can also choose to throw custom exceptions
+ throw failure;
}
}
}));
@@ -319,19 +355,21 @@ input.addSink(new ElasticsearchSink(
config, transportAddresses,
new ElasticsearchSinkFunction[String] {...},
new ActionRequestFailureHandler {
- override def onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
- // this example uses Apache Commons to search for nested exceptions
+ @throws(classOf[Throwable])
+ override def onFailure(ActionRequest action,
+ Throwable failure,
+ int restStatusCode,
+ RequestIndexer indexer) {
- if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) {
+ if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
// full queue; re-add document for indexing
indexer.add(action)
- return false
- } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) {
+ } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
// malformed document; simply drop request without failing sink
- return false
} else {
// for all other failures, fail the sink
- return true
+ // here the failure is simply rethrown, but users can also choose to throw custom exceptions
+ throw failure
}
}
}))
@@ -349,7 +387,31 @@ Note that `onFailure` is called for failures that still occur only after the
By default, the `BulkProcessor` retries to a maximum of 8 attempts with
an exponential backoff. For more information on the behaviour of the
internal `BulkProcessor` and how to configure it, please see the following section.
-
+
+By default, if a failure handler is not provided, the sink uses a
+`NoOpFailureHandler` that simply fails for all kinds of exceptions. The
+connector also provides a `RetryRejectedExecutionFailureHandler` implementation
+that always re-add requests that have failed due to queue capacity saturation.
+
+<p style="border-radius: 5px; padding: 5px" class="bg-danger">
+<b>IMPORTANT</b>: Re-adding requests back to the internal <b>BulkProcessor</b>
+on failures will lead to longer checkpoints, as the sink will also
+need to wait for the re-added requests to be flushed when checkpointing.
+For example, when using <b>RetryRejectedExecutionFailureHandler</b>, checkpoints
+will need to wait until Elasticsearch node queues have enough capacity for
+all the pending requests. This also means that if re-added requests never
+succeed, the checkpoint will never finish.
+</p>
+
+<p style="border-radius: 5px; padding: 5px" class="bg-warning">
+<b>Failure handling for Elasticsearch 1.x</b>: For Elasticsearch 1.x, it
+is not feasible to match the type of the failure because the exact type
+could not be retrieved through the older version Java client APIs (thus,
+the types will be general <b>Exception</b>s and only differ in the
+failure message). In this case, it is recommended to match on the
+provided REST status code.
+</p>
+
### Configuring the Internal Bulk Processor
The internal `BulkProcessor` can be further configured for its behaviour
http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/pom.xml b/flink-connectors/flink-connector-elasticsearch-base/pom.xml
index 81652c4..32327ff 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch-base/pom.xml
@@ -68,6 +68,14 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_2.10</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>${project.version}</version>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
index 45d04fc..abbdd72 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
@@ -23,7 +23,7 @@ import java.io.Serializable;
/**
* An implementation of {@link ActionRequestFailureHandler} is provided by the user to define how failed
- * {@link ActionRequest ActionRequests} should be handled, ex. dropping them, reprocessing malformed documents, or
+ * {@link ActionRequest ActionRequests} should be handled, e.g. dropping them, reprocessing malformed documents, or
* simply requesting them to be sent to Elasticsearch again if the failure is only temporary.
*
* <p>
@@ -34,19 +34,16 @@ import java.io.Serializable;
* private static class ExampleActionRequestFailureHandler implements ActionRequestFailureHandler {
*
* @Override
- * boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
- * // this example uses Apache Commons to search for nested exceptions
- *
- * if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) {
+ * void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
+ * if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
* // full queue; re-add document for indexing
* indexer.add(action);
- * return false;
- * } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) {
+ * } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
* // malformed document; simply drop request without failing sink
- * return false;
* } else {
- * // for all other failures, fail the sink
- * return true;
+ * // for all other failures, fail the sink;
+ * // here the failure is simply rethrown, but users can also choose to throw custom exceptions
+ * throw failure;
* }
* }
* }
@@ -56,6 +53,11 @@ import java.io.Serializable;
* <p>
* The above example will let the sink re-add requests that failed due to queue capacity saturation and drop requests
* with malformed documents, without failing the sink. For all other failures, the sink will fail.
+ *
+ * <p>
+ * Note: For Elasticsearch 1.x, it is not feasible to match the type of the failure because the exact type
+ * could not be retrieved through the older version Java client APIs (thus, the types will be general {@link Exception}s
+ * and only differ in the failure message). In this case, it is recommended to match on the provided REST status code.
*/
public interface ActionRequestFailureHandler extends Serializable {
@@ -64,9 +66,12 @@ public interface ActionRequestFailureHandler extends Serializable {
*
* @param action the {@link ActionRequest} that failed due to the failure
* @param failure the cause of failure
+ * @param restStatusCode the REST status code of the failure (-1 if none can be retrieved)
* @param indexer request indexer to re-add the failed action, if intended to do so
- * @return the implementation should return {@code true} if the sink should fail due to this failure, and {@code false} otherwise
+ *
+ * @throws Throwable if the sink should fail on this failure, the implementation should rethrow
+ * the exception or a custom one
*/
- boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer);
+ void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
index d802550..838865a 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
@@ -21,6 +21,10 @@ package org.apache.flink.streaming.connectors.elasticsearch;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}.
* {@link ActionRequest ActionRequests} will be buffered before sending a bulk request to the Elasticsearch cluster.
@@ -30,14 +34,21 @@ class BulkProcessorIndexer implements RequestIndexer {
private static final long serialVersionUID = 6841162943062034253L;
private final BulkProcessor bulkProcessor;
+ private final boolean flushOnCheckpoint;
+ private final AtomicLong numPendingRequestsRef;
- BulkProcessorIndexer(BulkProcessor bulkProcessor) {
- this.bulkProcessor = bulkProcessor;
+ BulkProcessorIndexer(BulkProcessor bulkProcessor, boolean flushOnCheckpoint, AtomicLong numPendingRequestsRef) {
+ this.bulkProcessor = checkNotNull(bulkProcessor);
+ this.flushOnCheckpoint = flushOnCheckpoint;
+ this.numPendingRequestsRef = checkNotNull(numPendingRequestsRef);
}
@Override
public void add(ActionRequest... actionRequests) {
for (ActionRequest actionRequest : actionRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
this.bulkProcessor.add(actionRequest);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 2c29865..f6944b3 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -17,8 +17,12 @@
package org.apache.flink.streaming.connectors.elasticsearch;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.InstantiationUtil;
import org.elasticsearch.action.ActionRequest;
@@ -30,11 +34,13 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.rest.RestStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -56,7 +62,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*
* @param <T> Type of the elements handled by this sink
*/
-public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
+public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> implements CheckpointedFunction {
private static final long serialVersionUID = -1007596293618451942L;
@@ -105,12 +111,12 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
}
public void setMaxRetryCount(int maxRetryCount) {
- checkArgument(maxRetryCount > 0);
+ checkArgument(maxRetryCount >= 0);
this.maxRetryCount = maxRetryCount;
}
public void setDelayMillis(long delayMillis) {
- checkArgument(delayMillis > 0);
+ checkArgument(delayMillis >= 0);
this.delayMillis = delayMillis;
}
}
@@ -133,6 +139,9 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
/** User-provided handler for failed {@link ActionRequest ActionRequests}. */
private final ActionRequestFailureHandler failureHandler;
+ /** If true, the producer will wait until all outstanding action requests have been sent to Elasticsearch. */
+ private boolean flushOnCheckpoint = true;
+
/** Provided to the user via the {@link ElasticsearchSinkFunction} to add {@link ActionRequest ActionRequests}. */
private transient BulkProcessorIndexer requestIndexer;
@@ -143,6 +152,17 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
/** Call bridge for different version-specfic */
private final ElasticsearchApiCallBridge callBridge;
+ /**
+ * Number of pending action requests not yet acknowledged by Elasticsearch.
+ * This value is maintained only if {@link ElasticsearchSinkBase#flushOnCheckpoint} is {@code true}.
+ *
+ * This is incremented whenever the user adds (or re-adds through the {@link ActionRequestFailureHandler}) requests
+ * to the {@link RequestIndexer}. It is decremented for each completed request of a bulk request, in
+ * {@link BulkProcessor.Listener#afterBulk(long, BulkRequest, BulkResponse)} and
+ * {@link BulkProcessor.Listener#afterBulk(long, BulkRequest, Throwable)}.
+ */
+ private AtomicLong numPendingRequests = new AtomicLong(0);
+
/** Elasticsearch client created using the call bridge. */
private transient Client client;
@@ -152,7 +172,7 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
/**
* This is set from inside the {@link BulkProcessor.Listener} if a {@link Throwable} was thrown in callbacks and
* the user considered it should fail the sink via the
- * {@link ActionRequestFailureHandler#onFailure(ActionRequest, Throwable, RequestIndexer)} method.
+ * {@link ActionRequestFailureHandler#onFailure(ActionRequest, Throwable, int, RequestIndexer)} method.
*
* Errors will be checked and rethrown before processing each input element, and when the sink is closed.
*/
@@ -172,21 +192,13 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
// otherwise, if they aren't serializable, users will merely get a non-informative error message
// "ElasticsearchSinkBase is not serializable"
- try {
- InstantiationUtil.serializeObject(elasticsearchSinkFunction);
- } catch (Exception e) {
- throw new IllegalArgumentException(
- "The implementation of the provided ElasticsearchSinkFunction is not serializable. " +
- "The object probably contains or references non serializable fields.");
- }
+ checkArgument(InstantiationUtil.isSerializable(elasticsearchSinkFunction),
+ "The implementation of the provided ElasticsearchSinkFunction is not serializable. " +
+ "The object probably contains or references non-serializable fields.");
- try {
- InstantiationUtil.serializeObject(failureHandler);
- } catch (Exception e) {
- throw new IllegalArgumentException(
- "The implementation of the provided ActionRequestFailureHandler is not serializable. " +
- "The object probably contains or references non serializable fields.");
- }
+ checkArgument(InstantiationUtil.isSerializable(failureHandler),
+ "The implementation of the provided ActionRequestFailureHandler is not serializable. " +
+ "The object probably contains or references non-serializable fields.");
// extract and remove bulk processor related configuration from the user-provided config,
// so that the resulting user config only contains configuration related to the Elasticsearch client.
@@ -244,47 +256,76 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
this.userConfig = userConfig;
}
+ /**
+ * Disable flushing on checkpoint. When disabled, the sink will not wait for all
+ * pending action requests to be acknowledged by Elasticsearch on checkpoints.
+ *
+ * NOTE: If flushing on checkpoint is disabled, the Flink Elasticsearch Sink does NOT
+ * provide any strong guarantees for at-least-once delivery of action requests.
+ */
+ public void disableFlushOnCheckpoint() {
+ this.flushOnCheckpoint = false;
+ }
+
@Override
public void open(Configuration parameters) throws Exception {
client = callBridge.createClient(userConfig);
+ bulkProcessor = buildBulkProcessor(new BulkProcessorListener());
+ requestIndexer = new BulkProcessorIndexer(bulkProcessor, flushOnCheckpoint, numPendingRequests);
+ }
- BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
- client,
- new BulkProcessor.Listener() {
- @Override
- public void beforeBulk(long executionId, BulkRequest request) { }
-
- @Override
- public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
- if (response.hasFailures()) {
- BulkItemResponse itemResponse;
- Throwable failure;
-
- for (int i = 0; i < response.getItems().length; i++) {
- itemResponse = response.getItems()[i];
- failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
- if (failure != null) {
- LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);
- if (failureHandler.onFailure(request.requests().get(i), failure, requestIndexer)) {
- failureThrowable.compareAndSet(null, failure);
- }
- }
- }
- }
- }
+ @Override
+ public void invoke(T value) throws Exception {
+ // if bulk processor callbacks have previously reported an error, we rethrow the error and fail the sink
+ checkErrorAndRethrow();
- @Override
- public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
- LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause());
+ elasticsearchSinkFunction.process(value, getRuntimeContext(), requestIndexer);
+ }
- // whole bulk request failures are usually just temporary timeouts on
- // the Elasticsearch side; simply retry all action requests in the bulk
- for (ActionRequest action : request.requests()) {
- requestIndexer.add(action);
- }
- }
- }
- );
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ // no initialization needed
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ checkErrorAndRethrow();
+
+ if (flushOnCheckpoint) {
+ do {
+ bulkProcessor.flush();
+ checkErrorAndRethrow();
+ } while (numPendingRequests.get() != 0);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (bulkProcessor != null) {
+ bulkProcessor.close();
+ bulkProcessor = null;
+ }
+
+ if (client != null) {
+ client.close();
+ client = null;
+ }
+
+ callBridge.cleanup();
+
+ // make sure any errors from callbacks are rethrown
+ checkErrorAndRethrow();
+ }
+
+ /**
+ * Build the {@link BulkProcessor}.
+ *
+ * Note: this is exposed for testing purposes.
+ */
+ protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
+ checkNotNull(listener);
+
+ BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(client, listener);
// This makes flush() blocking
bulkProcessorBuilder.setConcurrentRequests(0);
@@ -304,40 +345,81 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
// if backoff retrying is disabled, bulkProcessorFlushBackoffPolicy will be null
callBridge.configureBulkProcessorBackoff(bulkProcessorBuilder, bulkProcessorFlushBackoffPolicy);
- bulkProcessor = bulkProcessorBuilder.build();
- requestIndexer = new BulkProcessorIndexer(bulkProcessor);
+ return bulkProcessorBuilder.build();
}
- @Override
- public void invoke(T value) throws Exception {
- // if bulk processor callbacks have previously reported an error, we rethrow the error and fail the sink
- checkErrorAndRethrow();
-
- elasticsearchSinkFunction.process(value, getRuntimeContext(), requestIndexer);
+ private void checkErrorAndRethrow() {
+ Throwable cause = failureThrowable.get();
+ if (cause != null) {
+ throw new RuntimeException("An error occurred in ElasticsearchSink.", cause);
+ }
}
- @Override
- public void close() throws Exception {
- if (bulkProcessor != null) {
- bulkProcessor.close();
- bulkProcessor = null;
- }
+ private class BulkProcessorListener implements BulkProcessor.Listener {
+ @Override
+ public void beforeBulk(long executionId, BulkRequest request) { }
+
+ @Override
+ public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+ if (response.hasFailures()) {
+ BulkItemResponse itemResponse;
+ Throwable failure;
+ RestStatus restStatus;
+
+ try {
+ for (int i = 0; i < response.getItems().length; i++) {
+ itemResponse = response.getItems()[i];
+ failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
+ if (failure != null) {
+ LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);
+
+ restStatus = itemResponse.getFailure().getStatus();
+ if (restStatus == null) {
+ failureHandler.onFailure(request.requests().get(i), failure, -1, requestIndexer);
+ } else {
+ failureHandler.onFailure(request.requests().get(i), failure, restStatus.getStatus(), requestIndexer);
+ }
+ }
+ }
+ } catch (Throwable t) {
+ // fail the sink and skip the rest of the items
+ // if the failure handler decides to throw an exception
+ failureThrowable.compareAndSet(null, t);
+ }
+ }
- if (client != null) {
- client.close();
- client = null;
+ if (flushOnCheckpoint) {
+ numPendingRequests.getAndAdd(-request.numberOfActions());
+ }
}
- callBridge.cleanup();
+ @Override
+ public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+ LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause());
- // make sure any errors from callbacks are rethrown
- checkErrorAndRethrow();
+ try {
+ for (ActionRequest action : request.requests()) {
+ failureHandler.onFailure(action, failure, -1, requestIndexer);
+ }
+ } catch (Throwable t) {
+ // fail the sink and skip the rest of the items
+ // if the failure handler decides to throw an exception
+ failureThrowable.compareAndSet(null, t);
+ }
+
+ if (flushOnCheckpoint) {
+ numPendingRequests.getAndAdd(-request.numberOfActions());
+ }
+ }
}
- private void checkErrorAndRethrow() {
- Throwable cause = failureThrowable.get();
- if (cause != null) {
- throw new RuntimeException("An error occured in ElasticsearchSink.", cause);
+ @VisibleForTesting
+ long getNumPendingRequests() {
+ if (flushOnCheckpoint) {
+ return numPendingRequests.get();
+ } else {
+ throw new UnsupportedOperationException(
+ "The number of pending requests is not maintained when flushing on checkpoint is disabled.");
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java
deleted file mode 100644
index 09173a2..0000000
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.elasticsearch.util;
-
-import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
-import org.elasticsearch.action.ActionRequest;
-
-/**
- * An {@link ActionRequestFailureHandler} that simply fails the sink on any failures.
- */
-public class NoOpActionRequestFailureHandler implements ActionRequestFailureHandler {
-
- private static final long serialVersionUID = 737941343410827885L;
-
- @Override
- public boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
- // simply fail the sink
- return true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
new file mode 100644
index 0000000..b19ea08
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.util;
+
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.elasticsearch.action.ActionRequest;
+
+/**
+ * An {@link ActionRequestFailureHandler} that simply fails the sink on any failures.
+ */
+public class NoOpFailureHandler implements ActionRequestFailureHandler {
+
+ private static final long serialVersionUID = 737941343410827885L;
+
+ @Override
+ public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
+ // simply fail the sink
+ throw failure;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
new file mode 100644
index 0000000..fabdcbc
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.util;
+
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.util.ExceptionUtils;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+
+/**
+ * An {@link ActionRequestFailureHandler} that re-adds requests that failed due to temporary
+ * {@link EsRejectedExecutionException}s (which means that Elasticsearch node queues are currently full),
+ * and fails for all other failures.
+ */
+public class RetryRejectedExecutionFailureHandler implements ActionRequestFailureHandler {
+
+ private static final long serialVersionUID = -7423562912824511906L;
+
+ @Override
+ public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
+ if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
+ indexer.add(action);
+ } else {
+ // rethrow all other failures
+ throw failure;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
new file mode 100644
index 0000000..b9df5c6
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Suite of tests for {@link ElasticsearchSinkBase}.
+ */
+public class ElasticsearchSinkBaseTest {
+
+ /** Tests that any item failure in the listener callbacks is rethrown on an immediately following invoke call. */
+ @Test
+ public void testItemFailureRethrownOnInvoke() throws Throwable {
+ final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>(
+ new HashMap<String, String>(), new SimpleSinkFunction<String>(), new NoOpFailureHandler());
+
+ final OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink));
+
+ testHarness.open();
+
+ // setup the next bulk request, and its mock item failures
+ sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record")));
+ testHarness.processElement(new StreamRecord<>("msg"));
+ verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+
+ // manually execute the next bulk request
+ sink.manualBulkRequestWithAllPendingRequests();
+
+ try {
+ testHarness.processElement(new StreamRecord<>("next msg"));
+ } catch (Exception e) {
+ // the invoke should have failed with the failure
+ Assert.assertTrue(e.getCause().getMessage().contains("artificial failure for record"));
+
+ // test succeeded
+ return;
+ }
+
+ Assert.fail();
+ }
+
+ /** Tests that any item failure in the listener callbacks is rethrown on an immediately following checkpoint. */
+ @Test
+ public void testItemFailureRethrownOnCheckpoint() throws Throwable {
+ final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>(
+ new HashMap<String, String>(), new SimpleSinkFunction<String>(), new NoOpFailureHandler());
+
+ final OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink));
+
+ testHarness.open();
+
+ // setup the next bulk request, and its mock item failures
+ sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record")));
+ testHarness.processElement(new StreamRecord<>("msg"));
+ verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+
+ // manually execute the next bulk request
+ sink.manualBulkRequestWithAllPendingRequests();
+
+ try {
+ testHarness.snapshot(1L, 1000L);
+ } catch (Exception e) {
+ // the snapshot should have failed with the failure
+ Assert.assertTrue(e.getCause().getCause().getMessage().contains("artificial failure for record"));
+
+ // test succeeded
+ return;
+ }
+
+ Assert.fail();
+ }
+
+ /**
+ * Tests that any item failure in the listener callbacks due to flushing on an immediately following checkpoint
+ * is rethrown; we set a timeout because the test will not finish if the logic is broken
+ */
+ @Test(timeout=5000)
+ public void testItemFailureRethrownOnCheckpointAfterFlush() throws Throwable {
+ final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>(
+ new HashMap<String, String>(), new SimpleSinkFunction<String>(), new NoOpFailureHandler());
+
+ final OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink));
+
+ testHarness.open();
+
+ // setup the next bulk request, and its mock item failures
+
+ List<Exception> mockResponsesList = new ArrayList<>(2);
+ mockResponsesList.add(null); // the first request in a bulk will succeed
+ mockResponsesList.add(new Exception("artificial failure for record")); // the second request in a bulk will fail
+ sink.setMockItemFailuresListForNextBulkItemResponses(mockResponsesList);
+
+ testHarness.processElement(new StreamRecord<>("msg-1"));
+ verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+
+ // manually execute the next bulk request (1 request only, thus should succeed)
+ sink.manualBulkRequestWithAllPendingRequests();
+
+ // setup the requests to be flushed in the snapshot
+ testHarness.processElement(new StreamRecord<>("msg-2"));
+ testHarness.processElement(new StreamRecord<>("msg-3"));
+ verify(sink.getMockBulkProcessor(), times(3)).add(any(ActionRequest.class));
+
+ CheckedThread snapshotThread = new CheckedThread() {
+ @Override
+ public void go() throws Exception {
+ testHarness.snapshot(1L, 1000L);
+ }
+ };
+ snapshotThread.start();
+
+ // the snapshot should eventually be blocked before snapshot triggers flushing
+ while (snapshotThread.getState() != Thread.State.WAITING) {
+ Thread.sleep(10);
+ }
+
+ // let the snapshot-triggered flush continue (2 records in the bulk, so the 2nd one should fail)
+ sink.continueFlush();
+
+ try {
+ snapshotThread.sync();
+ } catch (Exception e) {
+ // the snapshot should have failed with the failure from the 2nd request
+ Assert.assertTrue(e.getCause().getCause().getMessage().contains("artificial failure for record"));
+
+ // test succeeded
+ return;
+ }
+
+ Assert.fail();
+ }
+
+ /** Tests that any bulk failure in the listener callbacks is rethrown on an immediately following invoke call. */
+ @Test
+ public void testBulkFailureRethrownOnInvoke() throws Throwable {
+ final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>(
+ new HashMap<String, String>(), new SimpleSinkFunction<String>(), new NoOpFailureHandler());
+
+ final OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink));
+
+ testHarness.open();
+
+ // setup the next bulk request, and let the whole bulk request fail
+ sink.setFailNextBulkRequestCompletely(new Exception("artificial failure for bulk request"));
+ testHarness.processElement(new StreamRecord<>("msg"));
+ verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+
+ // manually execute the next bulk request
+ sink.manualBulkRequestWithAllPendingRequests();
+
+ try {
+ testHarness.processElement(new StreamRecord<>("next msg"));
+ } catch (Exception e) {
+ // the invoke should have failed with the bulk request failure
+ Assert.assertTrue(e.getCause().getMessage().contains("artificial failure for bulk request"));
+
+ // test succeeded
+ return;
+ }
+
+ Assert.fail();
+ }
+
+ /** Tests that any bulk failure in the listener callbacks is rethrown on an immediately following checkpoint. */
+ @Test
+ public void testBulkFailureRethrownOnCheckpoint() throws Throwable {
+ final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>(
+ new HashMap<String, String>(), new SimpleSinkFunction<String>(), new NoOpFailureHandler());
+
+ final OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink));
+
+ testHarness.open();
+
+ // setup the next bulk request, and let the whole bulk request fail
+ sink.setFailNextBulkRequestCompletely(new Exception("artificial failure for bulk request"));
+ testHarness.processElement(new StreamRecord<>("msg"));
+ verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+
+ // manually execute the next bulk request
+ sink.manualBulkRequestWithAllPendingRequests();
+
+ try {
+ testHarness.snapshot(1L, 1000L);
+ } catch (Exception e) {
+ // the snapshot should have failed with the bulk request failure
+ Assert.assertTrue(e.getCause().getCause().getMessage().contains("artificial failure for bulk request"));
+
+ // test succeeded
+ return;
+ }
+
+ Assert.fail();
+ }
+
+ /**
+ * Tests that any bulk failure in the listener callbacks due to flushing on an immediately following checkpoint
+ * is rethrown; we set a timeout because the test will not finish if the logic is broken.
+ */
+ @Test(timeout=5000)
+ public void testBulkFailureRethrownOnOnCheckpointAfterFlush() throws Throwable {
+ final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>(
+ new HashMap<String, String>(), new SimpleSinkFunction<String>(), new NoOpFailureHandler());
+
+ final OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink));
+
+ testHarness.open();
+
+ // setup the next bulk request, and let bulk request succeed
+ sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList((Exception) null));
+ testHarness.processElement(new StreamRecord<>("msg-1"));
+ verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+
+ // manually execute the next bulk request
+ sink.manualBulkRequestWithAllPendingRequests();
+
+ // setup the requests to be flushed in the snapshot
+ testHarness.processElement(new StreamRecord<>("msg-2"));
+ testHarness.processElement(new StreamRecord<>("msg-3"));
+ verify(sink.getMockBulkProcessor(), times(3)).add(any(ActionRequest.class));
+
+ CheckedThread snapshotThread = new CheckedThread() {
+ @Override
+ public void go() throws Exception {
+ testHarness.snapshot(1L, 1000L);
+ }
+ };
+ snapshotThread.start();
+
+ // the snapshot should eventually be blocked before snapshot triggers flushing
+ while (snapshotThread.getState() != Thread.State.WAITING) {
+ Thread.sleep(10);
+ }
+
+ // for the snapshot-triggered flush, we let the bulk request fail completely
+ sink.setFailNextBulkRequestCompletely(new Exception("artificial failure for bulk request"));
+
+ // let the snapshot-triggered flush continue (bulk request should fail completely)
+ sink.continueFlush();
+
+ try {
+ snapshotThread.sync();
+ } catch (Exception e) {
+ // the snapshot should have failed with the bulk request failure
+ Assert.assertTrue(e.getCause().getCause().getMessage().contains("artificial failure for bulk request"));
+
+ // test succeeded
+ return;
+ }
+
+ Assert.fail();
+ }
+
+ /**
+ * Tests that the sink correctly waits for pending requests (including re-added requests) on checkpoints;
+ * we set a timeout because the test will not finish if the logic is broken
+ */
+ @Test(timeout=5000)
+ public void testAtLeastOnceSink() throws Throwable {
+ final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>(
+ new HashMap<String, String>(),
+ new SimpleSinkFunction<String>(),
+ new DummyRetryFailureHandler()); // use a failure handler that simply re-adds requests
+
+ final OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink));
+
+ testHarness.open();
+
+ // setup the next bulk request, and its mock item failures;
+ // it contains 1 request, which will fail and re-added to the next bulk request
+ sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record")));
+ testHarness.processElement(new StreamRecord<>("msg"));
+ verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+
+ CheckedThread snapshotThread = new CheckedThread() {
+ @Override
+ public void go() throws Exception {
+ testHarness.snapshot(1L, 1000L);
+ }
+ };
+ snapshotThread.start();
+
+ // the snapshot should eventually be blocked before snapshot triggers flushing
+ while (snapshotThread.getState() != Thread.State.WAITING) {
+ Thread.sleep(10);
+ }
+
+ sink.continueFlush();
+
+ // since the previous flush should have resulted in a request re-add from the failure handler,
+ // we should have flushed again, and eventually be blocked before snapshot triggers the 2nd flush
+ while (snapshotThread.getState() != Thread.State.WAITING) {
+ Thread.sleep(10);
+ }
+
+ // current number of pending request should be 1 due to the re-add
+ Assert.assertEquals(1, sink.getNumPendingRequests());
+
+ // this time, let the bulk request succeed, so no-more requests are re-added
+ sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList((Exception) null));
+
+ sink.continueFlush();
+
+ // the snapshot should finish with no exceptions
+ snapshotThread.sync();
+
+ testHarness.close();
+ }
+
+ /**
+ * This test is meant to assure that testAtLeastOnceSink is valid by testing that if flushing is disabled,
+ * the snapshot method does indeed finishes without waiting for pending requests;
+ * we set a timeout because the test will not finish if the logic is broken
+ */
+ @Test(timeout=5000)
+ public void testDoesNotWaitForPendingRequestsIfFlushingDisabled() throws Exception {
+ final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>(
+ new HashMap<String, String>(), new SimpleSinkFunction<String>(), new DummyRetryFailureHandler());
+ sink.disableFlushOnCheckpoint(); // disable flushing
+
+ final OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink));
+
+ testHarness.open();
+
+ // setup the next bulk request, and let bulk request succeed
+ sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record")));
+ testHarness.processElement(new StreamRecord<>("msg-1"));
+ verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+
+ // the snapshot should not block even though we haven't flushed the bulk request
+ testHarness.snapshot(1L, 1000L);
+
+ testHarness.close();
+ }
+
+ private static class DummyElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
+
+ private static final long serialVersionUID = 5051907841570096991L;
+
+ private transient BulkProcessor mockBulkProcessor;
+ private transient BulkRequest nextBulkRequest = new BulkRequest();
+ private transient MultiShotLatch flushLatch = new MultiShotLatch();
+
+ private List<? extends Throwable> mockItemFailuresList;
+ private Throwable nextBulkFailure;
+
+ public DummyElasticsearchSink(
+ Map<String, String> userConfig,
+ ElasticsearchSinkFunction<T> sinkFunction,
+ ActionRequestFailureHandler failureHandler) {
+ super(new DummyElasticsearchApiCallBridge(), userConfig, sinkFunction, failureHandler);
+ }
+
+ /**
+ * This method is used to mimic a scheduled bulk request; we need to do this
+ * manually because we are mocking the BulkProcessor
+ */
+ public void manualBulkRequestWithAllPendingRequests() {
+ flushLatch.trigger(); // let the flush
+ mockBulkProcessor.flush();
+ }
+
+ /**
+ * On non-manual flushes, i.e. when flush is called in the snapshot method implementation,
+ * usages need to explicitly call this to allow the flush to continue. This is useful
+ * to make sure that specific requests get added to the the next bulk request for flushing.
+ */
+ public void continueFlush() {
+ flushLatch.trigger();
+ }
+
+ /**
+ * Set the list of mock failures to use for the next bulk of item responses. A {@code null}
+ * means that the response is successful, failed otherwise.
+ *
+ * The list is used with corresponding order to the requests in the bulk, i.e. the first
+ * request uses the response at index 0, the second requests uses the response at index 1, etc.
+ */
+ public void setMockItemFailuresListForNextBulkItemResponses(List<? extends Throwable> mockItemFailuresList) {
+ this.mockItemFailuresList = mockItemFailuresList;
+ }
+
+ /**
+ * Let the next bulk request fail completely with the provided throwable.
+ * If this is set, the failures list provided with setMockItemFailuresListForNextBulkItemResponses is not respected.
+ */
+ public void setFailNextBulkRequestCompletely(Throwable failure) {
+ this.nextBulkFailure = failure;
+ }
+
+ public BulkProcessor getMockBulkProcessor() {
+ return mockBulkProcessor;
+ }
+
+ /**
+ * Override the bulk processor build process to provide a mock implementation,
+ * but reuse the listener implementation in our mock to test that the listener logic
+ * works correctly with request flushing logic.
+ */
+ @Override
+ protected BulkProcessor buildBulkProcessor(final BulkProcessor.Listener listener) {
+ this.mockBulkProcessor = mock(BulkProcessor.class);
+
+ when(mockBulkProcessor.add(any(ActionRequest.class))).thenAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ // intercept the request and add it to our mock bulk request
+ nextBulkRequest.add(invocationOnMock.getArgumentAt(0, ActionRequest.class));
+
+ return null;
+ }
+ });
+
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ while (nextBulkRequest.numberOfActions() > 0) {
+ // wait until we are allowed to continue with the flushing
+ flushLatch.await();
+
+ // create a copy of the accumulated mock requests, so that
+ // re-added requests from the failure handler are included in the next bulk
+ BulkRequest currentBulkRequest = nextBulkRequest;
+ nextBulkRequest = new BulkRequest();
+
+ listener.beforeBulk(123L, currentBulkRequest);
+
+ if (nextBulkFailure == null) {
+ BulkItemResponse[] mockResponses = new BulkItemResponse[currentBulkRequest.requests().size()];
+ for (int i = 0; i < currentBulkRequest.requests().size(); i++) {
+ Throwable mockItemFailure = mockItemFailuresList.get(i);
+
+ if (mockItemFailure == null) {
+ // the mock response for the item is success
+ mockResponses[i] = new BulkItemResponse(i, "opType", mock(ActionResponse.class));
+ } else {
+ // the mock response for the item is failure
+ mockResponses[i] = new BulkItemResponse(i, "opType", new BulkItemResponse.Failure("index", "type", "id", mockItemFailure));
+ }
+ }
+
+ listener.afterBulk(123L, currentBulkRequest, new BulkResponse(mockResponses, 1000L));
+ } else {
+ listener.afterBulk(123L, currentBulkRequest, nextBulkFailure);
+ }
+ }
+
+ return null;
+ }
+ }).when(mockBulkProcessor).flush();
+
+ return mockBulkProcessor;
+ }
+ }
+
+ private static class DummyElasticsearchApiCallBridge implements ElasticsearchApiCallBridge {
+
+ private static final long serialVersionUID = -4272760730959041699L;
+
+ @Override
+ public Client createClient(Map<String, String> clientConfig) {
+ return mock(Client.class);
+ }
+
+ @Nullable
+ @Override
+ public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
+ if (bulkItemResponse.isFailed()) {
+ return new Exception(bulkItemResponse.getFailure().getMessage());
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void configureBulkProcessorBackoff(BulkProcessor.Builder builder, @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
+ // no need for this in the test cases here
+ }
+
+ @Override
+ public void cleanup() {
+ // nothing to cleanup
+ }
+ }
+
+ private static class SimpleSinkFunction<String> implements ElasticsearchSinkFunction<String> {
+
+ private static final long serialVersionUID = -176739293659135148L;
+
+ @Override
+ public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+ Map<java.lang.String, Object> json = new HashMap<>();
+ json.put("data", element);
+
+ indexer.add(
+ Requests.indexRequest()
+ .index("index")
+ .type("type")
+ .id("id")
+ .source(json)
+ );
+ }
+ }
+
+ private static class DummyRetryFailureHandler implements ActionRequestFailureHandler {
+
+ private static final long serialVersionUID = 5400023700099200745L;
+
+ @Override
+ public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
+ indexer.add(action);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
index 375d739..2298986 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
@@ -17,7 +17,7 @@
package org.apache.flink.streaming.connectors.elasticsearch;
-import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.index.IndexRequest;
@@ -106,7 +106,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
* @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
*/
public ElasticsearchSink(Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
- this(userConfig, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler());
+ this(userConfig, elasticsearchSinkFunction, new NoOpFailureHandler());
}
/**
@@ -117,7 +117,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
* @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
*/
public ElasticsearchSink(Map<String, String> userConfig, List<TransportAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
- this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler());
+ this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpFailureHandler());
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
index 2210f63..6d771d4 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
@@ -18,7 +18,7 @@ package org.apache.flink.streaming.connectors.elasticsearch2;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
-import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.client.transport.TransportClient;
@@ -89,7 +89,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
List<InetSocketAddress> transportAddresses,
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
- this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler());
+ this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpFailureHandler());
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
index 175b4fa..61023c2 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.elasticsearch5;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.client.transport.TransportClient;
@@ -75,7 +75,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
List<InetSocketAddress> transportAddresses,
ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
- this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler());
+ this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpFailureHandler());
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 69c2692..fea25ff 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -261,6 +261,30 @@ public final class ExceptionUtils {
}
}
+ /**
+ * Checks whether a throwable chain contains a specific type of exception.
+ *
+ * @param throwable the throwable chain to check.
+ * @param searchType the type of exception to search for in the chain.
+ * @return True, if the searched type is nested in the throwable, false otherwise.
+ */
+ public static boolean containsThrowable(Throwable throwable, Class searchType) {
+ if (throwable == null || searchType == null) {
+ return false;
+ }
+
+ Throwable t = throwable;
+ while (t != null) {
+ if (searchType.isAssignableFrom(t.getClass())) {
+ return true;
+ } else {
+ t = t.getCause();
+ }
+ }
+
+ return false;
+ }
+
// ------------------------------------------------------------------------
/** Private constructor to prevent instantiation. */
http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index 219bf2a..d4a031c 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -325,6 +325,16 @@ public final class InstantiationUtil {
oos.writeObject(o);
}
+ public static boolean isSerializable(Object o) {
+ try {
+ serializeObject(o);
+ } catch (IOException e) {
+ return false;
+ }
+
+ return true;
+ }
+
/**
* Clones the given serializable object using Java serialization.
*
[2/3] flink git commit: [FLINK-5353] [elasticsearch] User-provided
failure handler for ElasticsearchSink
Posted by tz...@apache.org.
[FLINK-5353] [elasticsearch] User-provided failure handler for ElasticsearchSink
This commit fixes both FLINK-5353 and FLINK-5122. It allows users to implement a
failure handler to control how failed action requests are dealt with.
The commit also includes general improvements to FLINK-5122:
1. Use the built-in backoff functionality in the Elasticsearch BulkProcessor (not
available for Elasticsearch 1.x)
2. Migrate the `checkErrorAndRetryBulk` functionality to the new failure handler
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3743e898
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3743e898
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3743e898
Branch: refs/heads/master
Commit: 3743e898104d79a9813d444d38fa9f86617bb5ef
Parents: aaac7c2
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Jan 30 13:55:26 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Feb 24 22:58:40 2017 +0800
----------------------------------------------------------------------
docs/dev/connectors/elasticsearch.md | 121 +++++++++++--
.../ActionRequestFailureHandler.java | 72 ++++++++
.../ElasticsearchApiCallBridge.java | 12 ++
.../elasticsearch/ElasticsearchSinkBase.java | 174 ++++++++++++-------
.../util/NoOpActionRequestFailureHandler.java | 37 ++++
.../Elasticsearch1ApiCallBridge.java | 10 ++
.../elasticsearch/ElasticsearchSink.java | 37 +++-
.../Elasticsearch2ApiCallBridge.java | 31 ++++
.../elasticsearch2/ElasticsearchSink.java | 29 +++-
.../Elasticsearch5ApiCallBridge.java | 31 ++++
.../elasticsearch5/ElasticsearchSink.java | 31 +++-
11 files changed, 499 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/docs/dev/connectors/elasticsearch.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md
index 4388b9a..2ca1f9b 100644
--- a/docs/dev/connectors/elasticsearch.md
+++ b/docs/dev/connectors/elasticsearch.md
@@ -23,6 +23,9 @@ specific language governing permissions and limitations
under the License.
-->
+* This will be replaced by the TOC
+{:toc}
+
This connector provides sinks that can request document actions to an
[Elasticsearch](https://elastic.co/) Index. To use this connector, add one
of the following dependencies to your project, depending on the version
@@ -59,14 +62,14 @@ Note that the streaming connectors are currently not part of the binary
distribution. See [here]({{site.baseurl}}/dev/linking.html) for information
about how to package the program with the libraries for cluster execution.
-#### Installing Elasticsearch
+## Installing Elasticsearch
Instructions for setting up an Elasticsearch cluster can be found
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
Make sure to set and remember a cluster name. This must be set when
creating an `ElasticsearchSink` for requesting document actions against your cluster.
-#### Elasticsearch Sink
+## Elasticsearch Sink
The `ElasticsearchSink` uses a `TransportClient` to communicate with an
Elasticsearch cluster.
@@ -200,15 +203,13 @@ request for each incoming element. Generally, the `ElasticsearchSinkFunction`
can be used to perform multiple requests of different types (ex.,
`DeleteRequest`, `UpdateRequest`, etc.).
-Internally, the sink uses a `BulkProcessor` to send action requests to the cluster.
-This will buffer elements before sending them in bulk to the cluster. The behaviour of the
-`BulkProcessor` can be set using these config keys in the provided `Map` configuration:
- * **bulk.flush.max.actions**: Maximum amount of elements to buffer
- * **bulk.flush.max.size.mb**: Maximum amount of data (in megabytes) to buffer
- * **bulk.flush.interval.ms**: Interval at which to flush data regardless of the other two
- settings in milliseconds
+Internally, each parallel instance of the Flink Elasticsearch Sink uses
+a `BulkProcessor` to send action requests to the cluster.
+This will buffer elements before sending them in bulk to the cluster. The `BulkProcessor`
+executes bulk requests one at a time, i.e. there will be no two concurrent
+flushes of the buffered actions in progress.
-#### Communication using Embedded Node (only for Elasticsearch 1.x)
+### Communication using Embedded Node (only for Elasticsearch 1.x)
For Elasticsearch versions 1.x, communication using an embedded node is
also supported. See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
@@ -272,14 +273,106 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String
The difference is that now we do not need to provide a list of addresses
of Elasticsearch nodes.
-Optionally, the sink can try to re-execute the bulk request when the error
-message matches certain patterns indicating a timeout or a overloaded cluster.
-This behaviour is disabled by default and can be enabled by setting `checkErrorAndRetryBulk(true)`.
+### Handling Failing Elasticsearch Requests
+
+Elasticsearch action requests may fail due to a variety of reasons, including
+temporarily saturated node queue capacity or malformed documents to be indexed.
+The Flink Elasticsearch Sink allows the user to specify how request
+failures are handled, by simply implementing an `ActionRequestFailureHandler` and
+providing it to the constructor.
+
+Below is an example:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<String> input = ...;
+
+input.addSink(new ElasticsearchSink<>(
+ config, transportAddresses,
+ new ElasticsearchSinkFunction<String>() {...},
+ new ActionRequestFailureHandler() {
+ @Override
+ boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
+ // this example uses Apache Commons to search for nested exceptions
+
+ if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) {
+ // full queue; re-add document for indexing
+ indexer.add(action);
+ return false;
+ } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) >= 0) {
+ // malformed document; simply drop request without failing sink
+ return false;
+ } else {
+ // for all other failures, fail the sink
+ return true;
+ }
+ }
+}));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[String] = ...
+
+input.addSink(new ElasticsearchSink(
+ config, transportAddresses,
+ new ElasticsearchSinkFunction[String] {...},
+ new ActionRequestFailureHandler {
+ override def onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
+ // this example uses Apache Commons to search for nested exceptions
+
+ if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) {
+ // full queue; re-add document for indexing
+ indexer.add(action)
+ return false
+ } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) {
+ // malformed document; simply drop request without failing sink
+ return false
+ } else {
+ // for all other failures, fail the sink
+ return true
+ }
+ }
+}))
+{% endhighlight %}
+</div>
+</div>
+The above example will let the sink re-add requests that failed due to
+queue capacity saturation and drop requests with malformed documents, without
+failing the sink. For all other failures, the sink will fail. If a `ActionRequestFailureHandler`
+is not provided to the constructor, the sink will fail for any kind of error.
+
+Note that `onFailure` is called for failures that still occur only after the
+`BulkProcessor` internally finishes all backoff retry attempts.
+By default, the `BulkProcessor` retries to a maximum of 8 attempts with
+an exponential backoff. For more information on the behaviour of the
+internal `BulkProcessor` and how to configure it, please see the following section.
+
+### Configuring the Internal Bulk Processor
+
+The internal `BulkProcessor` can be further configured for its behaviour
+on how buffered action requests are flushed, by setting the following values in
+the provided `Map<String, String>`:
+
+ * **bulk.flush.max.actions**: Maximum amount of actions to buffer before flushing.
+ * **bulk.flush.max.size.mb**: Maximum size of data (in megabytes) to buffer before flushing.
+ * **bulk.flush.interval.ms**: Interval at which to flush regardless of the amount or size of buffered actions.
+
+For versions 2.x and above, configuring how temporary request errors are
+retried is also supported:
+
+ * **bulk.flush.backoff.enable**: Whether or not to perform retries with backoff delay for a flush
+ if one or more of its actions failed due to a temporary `EsRejectedExecutionException`.
+ * **bulk.flush.backoff.type**: The type of backoff delay, either `CONSTANT` or `EXPONENTIAL`
+ * **bulk.flush.backoff.delay**: The amount of delay for backoff. For constant backoff, this
+ is simply the delay between each retry. For exponential backoff, this is the initial base delay.
+ * **bulk.flush.backoff.retries**: The amount of backoff retries to attempt.
More information about Elasticsearch can be found [here](https://elastic.co).
-#### Packaging the Elasticsearch Connector into an Uber-Jar
+## Packaging the Elasticsearch Connector into an Uber-Jar
For the execution of your Flink program, it is recommended to build a
so-called uber-jar (executable jar) containing all your dependencies
http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
new file mode 100644
index 0000000..45d04fc
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.elasticsearch.action.ActionRequest;
+
+import java.io.Serializable;
+
+/**
+ * An implementation of {@link ActionRequestFailureHandler} is provided by the user to define how failed
+ * {@link ActionRequest ActionRequests} should be handled, ex. dropping them, reprocessing malformed documents, or
+ * simply requesting them to be sent to Elasticsearch again if the failure is only temporary.
+ *
+ * <p>
+ * Example:
+ *
+ * <pre>{@code
+ *
+ * private static class ExampleActionRequestFailureHandler implements ActionRequestFailureHandler {
+ *
+ * @Override
+ * boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
+ * // this example uses Apache Commons to search for nested exceptions
+ *
+ * if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) {
+ * // full queue; re-add document for indexing
+ * indexer.add(action);
+ * return false;
+ * } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) {
+ * // malformed document; simply drop request without failing sink
+ * return false;
+ * } else {
+ * // for all other failures, fail the sink
+ * return true;
+ * }
+ * }
+ * }
+ *
+ * }</pre>
+ *
+ * <p>
+ * The above example will let the sink re-add requests that failed due to queue capacity saturation and drop requests
+ * with malformed documents, without failing the sink. For all other failures, the sink will fail.
+ */
+public interface ActionRequestFailureHandler extends Serializable {
+
+ /**
+ * Handle a failed {@link ActionRequest}.
+ *
+ * @param action the {@link ActionRequest} that failed due to the failure
+ * @param failure the cause of failure
+ * @param indexer request indexer to re-add the failed action, if intended to do so
+ * @return the implementation should return {@code true} if the sink should fail due to this failure, and {@code false} otherwise
+ */
+ boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer);
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
index 6298a85..b482432 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.connectors.elasticsearch;
import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.client.Client;
import javax.annotation.Nullable;
@@ -53,6 +54,17 @@ public interface ElasticsearchApiCallBridge extends Serializable {
@Nullable Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse);
/**
+ * Set backoff-related configurations on the provided {@link BulkProcessor.Builder}.
+ * The builder will be later on used to instantiate the actual {@link BulkProcessor}.
+ *
+ * @param builder the {@link BulkProcessor.Builder} to configure.
+ * @param flushBackoffPolicy user-provided backoff retry settings ({@code null} if the user disabled backoff retries).
+ */
+ void configureBulkProcessorBackoff(
+ BulkProcessor.Builder builder,
+ @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy);
+
+ /**
* Perform any necessary state cleanup.
*/
void cleanup();
http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 7977fc0..2c29865 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -21,24 +21,23 @@ import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.InstantiationUtil;
-import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionRequest;
-import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
-import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -70,10 +69,56 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
+ public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = "bulk.flush.backoff.enable";
+ public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = "bulk.flush.backoff.type";
+ public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries";
+ public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay";
+
+ public enum FlushBackoffType {
+ CONSTANT,
+ EXPONENTIAL
+ }
+
+ public class BulkFlushBackoffPolicy implements Serializable {
+
+ private static final long serialVersionUID = -6022851996101826049L;
+
+ // the default values follow the Elasticsearch default settings for BulkProcessor
+ private FlushBackoffType backoffType = FlushBackoffType.EXPONENTIAL;
+ private int maxRetryCount = 8;
+ private long delayMillis = 50;
+
+ public FlushBackoffType getBackoffType() {
+ return backoffType;
+ }
+
+ public int getMaxRetryCount() {
+ return maxRetryCount;
+ }
+
+ public long getDelayMillis() {
+ return delayMillis;
+ }
+
+ public void setBackoffType(FlushBackoffType backoffType) {
+ this.backoffType = checkNotNull(backoffType);
+ }
+
+ public void setMaxRetryCount(int maxRetryCount) {
+ checkArgument(maxRetryCount > 0);
+ this.maxRetryCount = maxRetryCount;
+ }
+
+ public void setDelayMillis(long delayMillis) {
+ checkArgument(delayMillis > 0);
+ this.delayMillis = delayMillis;
+ }
+ }
private final Integer bulkProcessorFlushMaxActions;
private final Integer bulkProcessorFlushMaxSizeMb;
private final Integer bulkProcessorFlushIntervalMillis;
+ private final BulkFlushBackoffPolicy bulkProcessorFlushBackoffPolicy;
// ------------------------------------------------------------------------
// User-facing API and configuration
@@ -85,16 +130,12 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
/** The function that is used to construct mulitple {@link ActionRequest ActionRequests} from each incoming element. */
private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
+ /** User-provided handler for failed {@link ActionRequest ActionRequests}. */
+ private final ActionRequestFailureHandler failureHandler;
+
/** Provided to the user via the {@link ElasticsearchSinkFunction} to add {@link ActionRequest ActionRequests}. */
private transient BulkProcessorIndexer requestIndexer;
- /**
- * When set to <code>true</code> and the bulk action fails, the error message will be checked for
- * common patterns like <i>timeout</i>, <i>UnavailableShardsException</i> or a full buffer queue on the node.
- * When a matching pattern is found, the bulk will be retried.
- */
- protected boolean checkErrorAndRetryBulk = false;
-
// ------------------------------------------------------------------------
// Internals for the Flink Elasticsearch Sink
// ------------------------------------------------------------------------
@@ -109,21 +150,28 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
private transient BulkProcessor bulkProcessor;
/**
- * This is set from inside the {@link BulkProcessor.Listener} if a {@link Throwable} was thrown in callbacks.
+ * This is set from inside the {@link BulkProcessor.Listener} if a {@link Throwable} was thrown in callbacks and
+ * the user considered it should fail the sink via the
+ * {@link ActionRequestFailureHandler#onFailure(ActionRequest, Throwable, RequestIndexer)} method.
+ *
+ * Errors will be checked and rethrown before processing each input element, and when the sink is closed.
*/
private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
public ElasticsearchSinkBase(
ElasticsearchApiCallBridge callBridge,
Map<String, String> userConfig,
- ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+ ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+ ActionRequestFailureHandler failureHandler) {
this.callBridge = checkNotNull(callBridge);
this.elasticsearchSinkFunction = checkNotNull(elasticsearchSinkFunction);
+ this.failureHandler = checkNotNull(failureHandler);
- // we eagerly check if the user-provided sink function is serializable;
- // otherwise, if it isn't serializable, users will merely get a non-informative error message
+ // we eagerly check if the user-provided sink function and failure handler is serializable;
+ // otherwise, if they aren't serializable, users will merely get a non-informative error message
// "ElasticsearchSinkBase is not serializable"
+
try {
InstantiationUtil.serializeObject(elasticsearchSinkFunction);
} catch (Exception e) {
@@ -132,10 +180,19 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
"The object probably contains or references non serializable fields.");
}
- checkNotNull(userConfig);
+ try {
+ InstantiationUtil.serializeObject(failureHandler);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ "The implementation of the provided ActionRequestFailureHandler is not serializable. " +
+ "The object probably contains or references non serializable fields.");
+ }
// extract and remove bulk processor related configuration from the user-provided config,
// so that the resulting user config only contains configuration related to the Elasticsearch client.
+
+ checkNotNull(userConfig);
+
ParameterTool params = ParameterTool.fromMap(userConfig);
if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
@@ -159,6 +216,31 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
bulkProcessorFlushIntervalMillis = null;
}
+ boolean bulkProcessorFlushBackoffEnable = params.getBoolean(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, true);
+ userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE);
+
+ if (bulkProcessorFlushBackoffEnable) {
+ this.bulkProcessorFlushBackoffPolicy = new BulkFlushBackoffPolicy();
+
+ if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE)) {
+ bulkProcessorFlushBackoffPolicy.setBackoffType(FlushBackoffType.valueOf(params.get(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE)));
+ userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE);
+ }
+
+ if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES)) {
+ bulkProcessorFlushBackoffPolicy.setMaxRetryCount(params.getInt(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES));
+ userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES);
+ }
+
+ if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY)) {
+ bulkProcessorFlushBackoffPolicy.setDelayMillis(params.getLong(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY));
+ userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY);
+ }
+
+ } else {
+ bulkProcessorFlushBackoffPolicy = null;
+ }
+
this.userConfig = userConfig;
}
@@ -175,48 +257,30 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
- boolean allRequestsRepeatable = true;
+ BulkItemResponse itemResponse;
+ Throwable failure;
- for (BulkItemResponse itemResp : response.getItems()) {
- Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
+ for (int i = 0; i < response.getItems().length; i++) {
+ itemResponse = response.getItems()[i];
+ failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
if (failure != null) {
- String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
-
- // Check if index request can be retried
- if (checkErrorAndRetryBulk && (
- failureMessageLowercase.contains("timeout") ||
- failureMessageLowercase.contains("timed out") || // Generic timeout errors
- failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) || // Shard not available due to rebalancing or node down
- (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")))) // Bulk index queue on node full
- {
- LOG.debug("Retry bulk: {}", itemResp.getFailureMessage());
- } else {
- // Cannot retry action
- allRequestsRepeatable = false;
- LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
+ LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);
+ if (failureHandler.onFailure(request.requests().get(i), failure, requestIndexer)) {
failureThrowable.compareAndSet(null, failure);
}
}
}
-
- if (allRequestsRepeatable) {
- reAddBulkRequest(request);
- }
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
- if (checkErrorAndRetryBulk && (
- failure instanceof ClusterBlockException // Examples: "no master"
- || failure instanceof ElasticsearchTimeoutException) // ElasticsearchTimeoutException sounded good, not seen in stress tests yet
- )
- {
- LOG.debug("Retry bulk on throwable: {}", failure.getMessage());
- reAddBulkRequest(request);
- } else {
- LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure);
- failureThrowable.compareAndSet(null, failure);
+ LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause());
+
+ // whole bulk request failures are usually just temporary timeouts on
+ // the Elasticsearch side; simply retry all action requests in the bulk
+ for (ActionRequest action : request.requests()) {
+ requestIndexer.add(action);
}
}
}
@@ -237,6 +301,9 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(bulkProcessorFlushIntervalMillis));
}
+ // if backoff retrying is disabled, bulkProcessorFlushBackoffPolicy will be null
+ callBridge.configureBulkProcessorBackoff(bulkProcessorBuilder, bulkProcessorFlushBackoffPolicy);
+
bulkProcessor = bulkProcessorBuilder.build();
requestIndexer = new BulkProcessorIndexer(bulkProcessor);
}
@@ -267,21 +334,6 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
checkErrorAndRethrow();
}
- /**
- * Adds all requests of the bulk to the BulkProcessor. Used when trying again.
- * @param bulkRequest
- */
- public void reAddBulkRequest(BulkRequest bulkRequest) {
- //TODO Check what happens when bulk contains a DeleteAction and IndexActions and the DeleteAction fails because the document already has been deleted. This may not happen in typical Flink jobs.
-
- for (IndicesRequest req : bulkRequest.subRequests()) {
- if (req instanceof ActionRequest) {
- // There is no waiting time between index requests, so this may produce additional pressure on cluster
- bulkProcessor.add((ActionRequest<?>) req);
- }
- }
- }
-
private void checkErrorAndRethrow() {
Throwable cause = failureThrowable.get();
if (cause != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java
new file mode 100644
index 0000000..09173a2
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.util;
+
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.elasticsearch.action.ActionRequest;
+
+/**
+ * An {@link ActionRequestFailureHandler} that simply fails the sink on any failures.
+ */
+public class NoOpActionRequestFailureHandler implements ActionRequestFailureHandler {
+
+ private static final long serialVersionUID = 737941343410827885L;
+
+ @Override
+ public boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
+ // simply fail the sink
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
index 098afa9..8a59da9 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.elasticsearch;
import org.apache.flink.util.Preconditions;
import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
@@ -27,6 +28,7 @@ import org.elasticsearch.node.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
@@ -119,6 +121,14 @@ public class Elasticsearch1ApiCallBridge implements ElasticsearchApiCallBridge {
}
@Override
+ public void configureBulkProcessorBackoff(
+ BulkProcessor.Builder builder,
+ @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
+ // Elasticsearch 1.x does not support backoff retries for failed bulk requests
+ LOG.warn("Elasticsearch 1.x does not support backoff retries.");
+ }
+
+ @Override
public void cleanup() {
if (node != null && !node.isClosed()) {
node.close();
http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
index c338860..375d739 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.connectors.elasticsearch;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpActionRequestFailureHandler;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.index.IndexRequest;
@@ -105,7 +106,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
* @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
*/
public ElasticsearchSink(Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
- super(new Elasticsearch1ApiCallBridge(), userConfig, elasticsearchSinkFunction);
+ this(userConfig, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler());
}
/**
@@ -116,6 +117,38 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
* @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
*/
public ElasticsearchSink(Map<String, String> userConfig, List<TransportAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
- super(new Elasticsearch1ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction);
+ this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler());
+ }
+
+ /**
+ * Creates a new {@code ElasticsearchSink} that connects to the cluster using an embedded {@link Node}.
+ *
+ * @param userConfig The map of user settings that are used when constructing the embedded {@link Node} and {@link BulkProcessor}
+ * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
+ * @param failureHandler This is used to handle failed {@link ActionRequest}
+ */
+ public ElasticsearchSink(
+ Map<String, String> userConfig,
+ ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+ ActionRequestFailureHandler failureHandler) {
+
+ super(new Elasticsearch1ApiCallBridge(), userConfig, elasticsearchSinkFunction, failureHandler);
+ }
+
+ /**
+ * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
+ *
+ * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor}
+ * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
+ * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
+ * @param failureHandler This is used to handle failed {@link ActionRequest}
+ */
+ public ElasticsearchSink(
+ Map<String, String> userConfig,
+ List<TransportAddress> transportAddresses,
+ ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+ ActionRequestFailureHandler failureHandler) {
+
+ super(new Elasticsearch1ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction, failureHandler);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
index 9407d9f..e85daf5 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
@@ -18,16 +18,21 @@
package org.apache.flink.streaming.connectors.elasticsearch2;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
import org.apache.flink.util.Preconditions;
+import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.TimeValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
@@ -84,6 +89,32 @@ public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge {
}
@Override
+ public void configureBulkProcessorBackoff(
+ BulkProcessor.Builder builder,
+ @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
+
+ BackoffPolicy backoffPolicy;
+ if (flushBackoffPolicy != null) {
+ switch (flushBackoffPolicy.getBackoffType()) {
+ case CONSTANT:
+ backoffPolicy = BackoffPolicy.constantBackoff(
+ new TimeValue(flushBackoffPolicy.getDelayMillis()),
+ flushBackoffPolicy.getMaxRetryCount());
+ break;
+ case EXPONENTIAL:
+ default:
+ backoffPolicy = BackoffPolicy.exponentialBackoff(
+ new TimeValue(flushBackoffPolicy.getDelayMillis()),
+ flushBackoffPolicy.getMaxRetryCount());
+ }
+ } else {
+ backoffPolicy = BackoffPolicy.noBackoff();
+ }
+
+ builder.setBackoffPolicy(backoffPolicy);
+ }
+
+ @Override
public void cleanup() {
// nothing to cleanup
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
index a0abc51..2210f63 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
@@ -16,7 +16,9 @@
*/
package org.apache.flink.streaming.connectors.elasticsearch2;
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpActionRequestFailureHandler;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.client.transport.TransportClient;
@@ -82,9 +84,28 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
* @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
* @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
*/
- public ElasticsearchSink(Map<String, String> userConfig,
- List<InetSocketAddress> transportAddresses,
- org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
- super(new Elasticsearch2ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction);
+ public ElasticsearchSink(
+ Map<String, String> userConfig,
+ List<InetSocketAddress> transportAddresses,
+ org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+
+ this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler());
+ }
+
+ /**
+ * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
+ *
+ * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor}
+ * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
+ * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
+ * @param failureHandler This is used to handle failed {@link ActionRequest}
+ */
+ public ElasticsearchSink(
+ Map<String, String> userConfig,
+ List<InetSocketAddress> transportAddresses,
+ org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+ ActionRequestFailureHandler failureHandler) {
+
+ super(new Elasticsearch2ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction, failureHandler);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
index 1389e7d..c7d81f5 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
@@ -18,19 +18,24 @@
package org.apache.flink.streaming.connectors.elasticsearch5;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
import org.apache.flink.util.Preconditions;
+import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.transport.Netty3Plugin;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
@@ -90,6 +95,32 @@ public class Elasticsearch5ApiCallBridge implements ElasticsearchApiCallBridge {
}
@Override
+ public void configureBulkProcessorBackoff(
+ BulkProcessor.Builder builder,
+ @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
+
+ BackoffPolicy backoffPolicy;
+ if (flushBackoffPolicy != null) {
+ switch (flushBackoffPolicy.getBackoffType()) {
+ case CONSTANT:
+ backoffPolicy = BackoffPolicy.constantBackoff(
+ new TimeValue(flushBackoffPolicy.getDelayMillis()),
+ flushBackoffPolicy.getMaxRetryCount());
+ break;
+ case EXPONENTIAL:
+ default:
+ backoffPolicy = BackoffPolicy.exponentialBackoff(
+ new TimeValue(flushBackoffPolicy.getDelayMillis()),
+ flushBackoffPolicy.getMaxRetryCount());
+ }
+ } else {
+ backoffPolicy = BackoffPolicy.noBackoff();
+ }
+
+ builder.setBackoffPolicy(backoffPolicy);
+ }
+
+ @Override
public void cleanup() {
// nothing to cleanup
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
index 9107d4e..175b4fa 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
@@ -16,8 +16,10 @@
*/
package org.apache.flink.streaming.connectors.elasticsearch5;
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpActionRequestFailureHandler;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.client.transport.TransportClient;
@@ -64,13 +66,32 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
/**
* Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
*
- * @param userConfig The map of user settings that are used when constructing the {@link TransportClient}
+ * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor}
* @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
* @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
*/
- public ElasticsearchSink(Map<String, String> userConfig,
- List<InetSocketAddress> transportAddresses,
- ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
- super(new Elasticsearch5ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction);
+ public ElasticsearchSink(
+ Map<String, String> userConfig,
+ List<InetSocketAddress> transportAddresses,
+ ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+
+ this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler());
+ }
+
+ /**
+ * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
+ *
+ * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor}
+ * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
+ * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
+ * @param failureHandler This is used to handle failed {@link ActionRequest}
+ */
+ public ElasticsearchSink(
+ Map<String, String> userConfig,
+ List<InetSocketAddress> transportAddresses,
+ ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+ ActionRequestFailureHandler failureHandler) {
+
+ super(new Elasticsearch5ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction, failureHandler);
}
}
[3/3] flink git commit: [FLINK-5122] [elasticsearch] Retry temporary
Elasticsearch request errors.
Posted by tz...@apache.org.
[FLINK-5122] [elasticsearch] Retry temporary Elasticsearch request errors.
Covered exceptions are: Timeouts, No Master, UnavailableShardsException, bulk queue on node full
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aaac7c2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aaac7c2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aaac7c2e
Branch: refs/heads/master
Commit: aaac7c2e505717dbfc40465cb3656652e1bf5658
Parents: 0ba08b4
Author: Max Kuklinski <ma...@live.de>
Authored: Wed Nov 23 17:54:11 2016 +0100
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Feb 24 22:58:40 2017 +0800
----------------------------------------------------------------------
docs/dev/connectors/elasticsearch.md | 5 ++
.../elasticsearch/ElasticsearchSinkBase.java | 62 ++++++++++++++++++--
2 files changed, 63 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/aaac7c2e/docs/dev/connectors/elasticsearch.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md
index 0f2d025..4388b9a 100644
--- a/docs/dev/connectors/elasticsearch.md
+++ b/docs/dev/connectors/elasticsearch.md
@@ -272,6 +272,11 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String
The difference is that now we do not need to provide a list of addresses
of Elasticsearch nodes.
+Optionally, the sink can try to re-execute the bulk request when the error
+message matches certain patterns indicating a timeout or a overloaded cluster.
+This behaviour is disabled by default and can be enabled by setting `checkErrorAndRetryBulk(true)`.
+
+
More information about Elasticsearch can be found [here](https://elastic.co).
#### Packaging the Elasticsearch Connector into an Uber-Jar
http://git-wip-us.apache.org/repos/asf/flink/blob/aaac7c2e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 6a2d65f..7977fc0 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -21,12 +21,15 @@ import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
@@ -85,6 +88,13 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
/** Provided to the user via the {@link ElasticsearchSinkFunction} to add {@link ActionRequest ActionRequests}. */
private transient BulkProcessorIndexer requestIndexer;
+ /**
+ * When set to <code>true</code> and the bulk action fails, the error message will be checked for
+ * common patterns like <i>timeout</i>, <i>UnavailableShardsException</i> or a full buffer queue on the node.
+ * When a matching pattern is found, the bulk will be retried.
+ */
+ protected boolean checkErrorAndRetryBulk = false;
+
// ------------------------------------------------------------------------
// Internals for the Flink Elasticsearch Sink
// ------------------------------------------------------------------------
@@ -165,20 +175,49 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
+ boolean allRequestsRepeatable = true;
+
for (BulkItemResponse itemResp : response.getItems()) {
Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
if (failure != null) {
- LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
- failureThrowable.compareAndSet(null, failure);
+ String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
+
+ // Check if index request can be retried
+ if (checkErrorAndRetryBulk && (
+ failureMessageLowercase.contains("timeout") ||
+ failureMessageLowercase.contains("timed out") || // Generic timeout errors
+ failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) || // Shard not available due to rebalancing or node down
+ (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")))) // Bulk index queue on node full
+ {
+ LOG.debug("Retry bulk: {}", itemResp.getFailureMessage());
+ } else {
+ // Cannot retry action
+ allRequestsRepeatable = false;
+ LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
+ failureThrowable.compareAndSet(null, failure);
+ }
}
}
+
+ if (allRequestsRepeatable) {
+ reAddBulkRequest(request);
+ }
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
- LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure);
- failureThrowable.compareAndSet(null, failure);
+ if (checkErrorAndRetryBulk && (
+ failure instanceof ClusterBlockException // Examples: "no master"
+ || failure instanceof ElasticsearchTimeoutException) // ElasticsearchTimeoutException sounded good, not seen in stress tests yet
+ )
+ {
+ LOG.debug("Retry bulk on throwable: {}", failure.getMessage());
+ reAddBulkRequest(request);
+ } else {
+ LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure);
+ failureThrowable.compareAndSet(null, failure);
+ }
}
}
);
@@ -228,6 +267,21 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
checkErrorAndRethrow();
}
+ /**
+ * Adds all requests of the bulk to the BulkProcessor. Used when trying again.
+ * @param bulkRequest
+ */
+ public void reAddBulkRequest(BulkRequest bulkRequest) {
+ //TODO Check what happens when bulk contains a DeleteAction and IndexActions and the DeleteAction fails because the document already has been deleted. This may not happen in typical Flink jobs.
+
+ for (IndicesRequest req : bulkRequest.subRequests()) {
+ if (req instanceof ActionRequest) {
+ // There is no waiting time between index requests, so this may produce additional pressure on cluster
+ bulkProcessor.add((ActionRequest<?>) req);
+ }
+ }
+ }
+
private void checkErrorAndRethrow() {
Throwable cause = failureThrowable.get();
if (cause != null) {