You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/02/24 15:05:24 UTC
[3/3] flink git commit: [FLINK-5122] [elasticsearch] Retry temporary
Elasticsearch request errors.
[FLINK-5122] [elasticsearch] Retry temporary Elasticsearch request errors.
Covered exceptions are: Timeouts, No Master, UnavailableShardsException, bulk queue on node full
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aaac7c2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aaac7c2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aaac7c2e
Branch: refs/heads/master
Commit: aaac7c2e505717dbfc40465cb3656652e1bf5658
Parents: 0ba08b4
Author: Max Kuklinski <ma...@live.de>
Authored: Wed Nov 23 17:54:11 2016 +0100
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Feb 24 22:58:40 2017 +0800
----------------------------------------------------------------------
docs/dev/connectors/elasticsearch.md | 5 ++
.../elasticsearch/ElasticsearchSinkBase.java | 62 ++++++++++++++++++--
2 files changed, 63 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/aaac7c2e/docs/dev/connectors/elasticsearch.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md
index 0f2d025..4388b9a 100644
--- a/docs/dev/connectors/elasticsearch.md
+++ b/docs/dev/connectors/elasticsearch.md
@@ -272,6 +272,11 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String
The difference is that now we do not need to provide a list of addresses
of Elasticsearch nodes.
+Optionally, the sink can try to re-execute the bulk request when the error
+message matches certain patterns indicating a timeout or a overloaded cluster.
+This behaviour is disabled by default and can be enabled by setting `checkErrorAndRetryBulk(true)`.
+
+
More information about Elasticsearch can be found [here](https://elastic.co).
#### Packaging the Elasticsearch Connector into an Uber-Jar
http://git-wip-us.apache.org/repos/asf/flink/blob/aaac7c2e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 6a2d65f..7977fc0 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -21,12 +21,15 @@ import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
@@ -85,6 +88,13 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
/** Provided to the user via the {@link ElasticsearchSinkFunction} to add {@link ActionRequest ActionRequests}. */
private transient BulkProcessorIndexer requestIndexer;
+ /**
+ * When set to <code>true</code> and the bulk action fails, the error message will be checked for
+ * common patterns like <i>timeout</i>, <i>UnavailableShardsException</i> or a full buffer queue on the node.
+ * When a matching pattern is found, the bulk will be retried.
+ */
+ protected boolean checkErrorAndRetryBulk = false;
+
// ------------------------------------------------------------------------
// Internals for the Flink Elasticsearch Sink
// ------------------------------------------------------------------------
@@ -165,20 +175,49 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
+ boolean allRequestsRepeatable = true;
+
for (BulkItemResponse itemResp : response.getItems()) {
Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
if (failure != null) {
- LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
- failureThrowable.compareAndSet(null, failure);
+ String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
+
+ // Check if index request can be retried
+ if (checkErrorAndRetryBulk && (
+ failureMessageLowercase.contains("timeout") ||
+ failureMessageLowercase.contains("timed out") || // Generic timeout errors
+ failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) || // Shard not available due to rebalancing or node down
+ (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")))) // Bulk index queue on node full
+ {
+ LOG.debug("Retry bulk: {}", itemResp.getFailureMessage());
+ } else {
+ // Cannot retry action
+ allRequestsRepeatable = false;
+ LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
+ failureThrowable.compareAndSet(null, failure);
+ }
}
}
+
+ if (allRequestsRepeatable) {
+ reAddBulkRequest(request);
+ }
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
- LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure);
- failureThrowable.compareAndSet(null, failure);
+ if (checkErrorAndRetryBulk && (
+ failure instanceof ClusterBlockException // Examples: "no master"
+ || failure instanceof ElasticsearchTimeoutException) // ElasticsearchTimeoutException sounded good, not seen in stress tests yet
+ )
+ {
+ LOG.debug("Retry bulk on throwable: {}", failure.getMessage());
+ reAddBulkRequest(request);
+ } else {
+ LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure);
+ failureThrowable.compareAndSet(null, failure);
+ }
}
}
);
@@ -228,6 +267,21 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
checkErrorAndRethrow();
}
+ /**
+ * Adds all requests of the bulk to the BulkProcessor. Used when trying again.
+ * @param bulkRequest
+ */
+ public void reAddBulkRequest(BulkRequest bulkRequest) {
+ //TODO Check what happens when bulk contains a DeleteAction and IndexActions and the DeleteAction fails because the document already has been deleted. This may not happen in typical Flink jobs.
+
+ for (IndicesRequest req : bulkRequest.subRequests()) {
+ if (req instanceof ActionRequest) {
+ // There is no waiting time between index requests, so this may produce additional pressure on cluster
+ bulkProcessor.add((ActionRequest<?>) req);
+ }
+ }
+ }
+
private void checkErrorAndRethrow() {
Throwable cause = failureThrowable.get();
if (cause != null) {