You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/06/24 09:39:05 UTC

[flink] branch release-1.11 updated: [FLINK-14938] Use ConcurrentLinkedQueue in BufferingNoOpRequestIndexer

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new bb1f162  [FLINK-14938] Use ConcurrentLinkedQueue in BufferingNoOpRequestIndexer
bb1f162 is described below

commit bb1f162d8e7cf3d24c01679492e53b3a041cdde9
Author: yushengnan <ys...@hotmail.com>
AuthorDate: Fri Jun 19 21:45:52 2020 +0800

    [FLINK-14938] Use ConcurrentLinkedQueue in BufferingNoOpRequestIndexer
    
    This solves the problem of concurrent modification when re-adding ES
    index requests from a failure handler.
---
 .../connectors/elasticsearch/BufferingNoOpRequestIndexer.java      | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BufferingNoOpRequestIndexer.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BufferingNoOpRequestIndexer.java
index e639b82..07341da 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BufferingNoOpRequestIndexer.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BufferingNoOpRequestIndexer.java
@@ -27,9 +27,8 @@ import org.elasticsearch.action.update.UpdateRequest;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
  * Implementation of a {@link RequestIndexer} that buffers {@link ActionRequest ActionRequests}
@@ -39,10 +38,10 @@ import java.util.List;
 @NotThreadSafe
 class BufferingNoOpRequestIndexer implements RequestIndexer {
 
-	private List<ActionRequest> bufferedRequests;
+	private ConcurrentLinkedQueue<ActionRequest> bufferedRequests;
 
 	BufferingNoOpRequestIndexer() {
-		this.bufferedRequests = new ArrayList<>(10);
+		this.bufferedRequests = new ConcurrentLinkedQueue<ActionRequest>();
 	}
 
 	@Override