You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/02/24 15:05:24 UTC

[3/3] flink git commit: [FLINK-5122] [elasticsearch] Retry temporary Elasticsearch request errors.

[FLINK-5122] [elasticsearch] Retry temporary Elasticsearch request errors.

Covered exceptions are: Timeouts, No Master, UnavailableShardsException, bulk queue on node full


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aaac7c2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aaac7c2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aaac7c2e

Branch: refs/heads/master
Commit: aaac7c2e505717dbfc40465cb3656652e1bf5658
Parents: 0ba08b4
Author: Max Kuklinski <ma...@live.de>
Authored: Wed Nov 23 17:54:11 2016 +0100
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Feb 24 22:58:40 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/elasticsearch.md            |  5 ++
 .../elasticsearch/ElasticsearchSinkBase.java    | 62 ++++++++++++++++++--
 2 files changed, 63 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aaac7c2e/docs/dev/connectors/elasticsearch.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md
index 0f2d025..4388b9a 100644
--- a/docs/dev/connectors/elasticsearch.md
+++ b/docs/dev/connectors/elasticsearch.md
@@ -272,6 +272,11 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String
 The difference is that now we do not need to provide a list of addresses
 of Elasticsearch nodes.
 
+Optionally, the sink can try to re-execute the bulk request when the error
+message matches certain patterns indicating a timeout or a overloaded cluster.
+This behaviour is disabled by default and can be enabled by setting `checkErrorAndRetryBulk(true)`.
+
+
 More information about Elasticsearch can be found [here](https://elastic.co).
 
 #### Packaging the Elasticsearch Connector into an Uber-Jar

http://git-wip-us.apache.org/repos/asf/flink/blob/aaac7c2e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 6a2d65f..7977fc0 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -21,12 +21,15 @@ import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.IndicesRequest;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
@@ -85,6 +88,13 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 	/** Provided to the user via the {@link ElasticsearchSinkFunction} to add {@link ActionRequest ActionRequests}. */
 	private transient BulkProcessorIndexer requestIndexer;
 
+	/**
+	 * When set to <code>true</code> and the bulk action fails, the error message will be checked for
+	 * common patterns like <i>timeout</i>, <i>UnavailableShardsException</i> or a full buffer queue on the node.
+	 * When a matching pattern is found, the bulk will be retried.
+	 */
+	protected boolean checkErrorAndRetryBulk = false;
+
 	// ------------------------------------------------------------------------
 	//  Internals for the Flink Elasticsearch Sink
 	// ------------------------------------------------------------------------
@@ -165,20 +175,49 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 				@Override
 				public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
 					if (response.hasFailures()) {
+						boolean allRequestsRepeatable = true;
+
 						for (BulkItemResponse itemResp : response.getItems()) {
 							Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
 							if (failure != null) {
-								LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
-								failureThrowable.compareAndSet(null, failure);
+								String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
+
+								// Check if index request can be retried
+								if (checkErrorAndRetryBulk && (
+									failureMessageLowercase.contains("timeout") ||
+										failureMessageLowercase.contains("timed out") || // Generic timeout errors
+										failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) || // Shard not available due to rebalancing or node down
+										(failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")))) // Bulk index queue on node full
+								{
+									LOG.debug("Retry bulk: {}", itemResp.getFailureMessage());
+								} else {
+									// Cannot retry action
+									allRequestsRepeatable = false;
+									LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
+									failureThrowable.compareAndSet(null, failure);
+								}
 							}
 						}
+
+						if (allRequestsRepeatable) {
+							reAddBulkRequest(request);
+						}
 					}
 				}
 
 				@Override
 				public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
-					LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure);
-					failureThrowable.compareAndSet(null, failure);
+					if (checkErrorAndRetryBulk && (
+						failure instanceof ClusterBlockException // Examples: "no master"
+							|| failure instanceof ElasticsearchTimeoutException) // ElasticsearchTimeoutException sounded good, not seen in stress tests yet
+						)
+					{
+						LOG.debug("Retry bulk on throwable: {}", failure.getMessage());
+						reAddBulkRequest(request);
+					} else {
+						LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure);
+						failureThrowable.compareAndSet(null, failure);
+					}
 				}
 			}
 		);
@@ -228,6 +267,21 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
 		checkErrorAndRethrow();
 	}
 
+	/**
+	 * Adds all requests of the bulk to the BulkProcessor. Used when trying again.
+	 * @param bulkRequest
+	 */
+	public void reAddBulkRequest(BulkRequest bulkRequest) {
+		//TODO Check what happens when bulk contains a DeleteAction and IndexActions and the DeleteAction fails because the document already has been deleted. This may not happen in typical Flink jobs.
+
+		for (IndicesRequest req : bulkRequest.subRequests()) {
+			if (req instanceof ActionRequest) {
+				// There is no waiting time between index requests, so this may produce additional pressure on cluster
+				bulkProcessor.add((ActionRequest<?>) req);
+			}
+		}
+	}
+
 	private void checkErrorAndRethrow() {
 		Throwable cause = failureThrowable.get();
 		if (cause != null) {