You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/08/01 13:20:01 UTC

[jira] [Commented] (FLINK-7386) Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ client

    [ https://issues.apache.org/jira/browse/FLINK-7386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565299#comment-16565299 ] 

ASF GitHub Bot commented on FLINK-7386:
---------------------------------------

asfgit closed pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it working with Elastic 5.3+, evolve ElasticsearchApiCallBridge API to make it compatible with a possible RestHighLevelClient implementation 
URL: https://github.com/apache/flink/pull/6043
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
index 2ebb97c82e2..33b42cb47f1 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
@@ -22,6 +22,9 @@
 
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -45,12 +48,32 @@
 	}
 
 	@Override
-	public void add(ActionRequest... actionRequests) {
-		for (ActionRequest actionRequest : actionRequests) {
+	public void add(DeleteRequest... deleteRequests) {
+		for (DeleteRequest deleteRequest : deleteRequests) {
 			if (flushOnCheckpoint) {
 				numPendingRequestsRef.getAndIncrement();
 			}
-			this.bulkProcessor.add(actionRequest);
+			this.bulkProcessor.add(deleteRequest);
+		}
+	}
+
+	@Override
+	public void add(IndexRequest... indexRequests) {
+		for (IndexRequest indexRequest : indexRequests) {
+			if (flushOnCheckpoint) {
+				numPendingRequestsRef.getAndIncrement();
+			}
+			this.bulkProcessor.add(indexRequest);
+		}
+	}
+
+	@Override
+	public void add(UpdateRequest... updateRequests) {
+		for (UpdateRequest updateRequest : updateRequests) {
+			if (flushOnCheckpoint) {
+				numPendingRequestsRef.getAndIncrement();
+			}
+			this.bulkProcessor.add(updateRequest);
 		}
 	}
 }
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
index 2a7a21659e4..1c501bf4a20 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
@@ -22,7 +22,6 @@
 
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.client.Client;
 
 import javax.annotation.Nullable;
 
@@ -39,15 +38,18 @@
  * exactly one instance of the call bridge, and state cleanup is performed when the sink is closed.
  */
 @Internal
-public interface ElasticsearchApiCallBridge extends Serializable {
+public abstract class ElasticsearchApiCallBridge implements Serializable {
 
 	/**
-	 * Creates an Elasticsearch {@link Client}.
+	 * Creates an Elasticsearch client implementing {@link AutoCloseable}. This can
+	 * be a {@link org.elasticsearch.client.Client} or {@link org.elasticsearch.client.RestHighLevelClient}
 	 *
 	 * @param clientConfig The configuration to use when constructing the client.
 	 * @return The created client.
 	 */
-	Client createClient(Map<String, String> clientConfig);
+	public abstract AutoCloseable createClient(Map<String, String> clientConfig);
+
+	public abstract BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener);
 
 	/**
 	 * Extracts the cause of failure of a bulk item action.
@@ -55,7 +57,7 @@
 	 * @param bulkItemResponse the bulk item response to extract cause of failure
 	 * @return the extracted {@link Throwable} from the response ({@code null} is the response is successful).
 	 */
-	@Nullable Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse);
+	public abstract @Nullable Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse);
 
 	/**
 	 * Set backoff-related configurations on the provided {@link BulkProcessor.Builder}.
@@ -64,13 +66,15 @@
 	 * @param builder the {@link BulkProcessor.Builder} to configure.
 	 * @param flushBackoffPolicy user-provided backoff retry settings ({@code null} if the user disabled backoff retries).
 	 */
-	void configureBulkProcessorBackoff(
+	public abstract void configureBulkProcessorBackoff(
 		BulkProcessor.Builder builder,
 		@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy);
 
 	/**
 	 * Perform any necessary state cleanup.
 	 */
-	void cleanup();
+	public void cleanup() {
+		// nothing to cleanup by default
+	}
 
 }
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 9105d9947f2..0305ee3d867 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -176,7 +176,7 @@ public void setDelayMillis(long delayMillis) {
 	private AtomicLong numPendingRequests = new AtomicLong(0);
 
 	/** Elasticsearch client created using the call bridge. */
-	private transient Client client;
+	private transient AutoCloseable client;
 
 	/** Bulk processor to buffer and send requests to Elasticsearch, created using the client. */
 	private transient BulkProcessor bulkProcessor;
@@ -341,7 +341,7 @@ public void close() throws Exception {
 	protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
 		checkNotNull(listener);
 
-		BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(client, listener);
+		BulkProcessor.Builder bulkProcessorBuilder = callBridge.createBulkProcessorBuilder(client, listener);
 
 		// This makes flush() blocking
 		bulkProcessorBuilder.setConcurrentRequests(0);
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
index 2a1b29736b6..3dc8f879641 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
@@ -21,9 +21,12 @@
 import org.apache.flink.annotation.PublicEvolving;
 
 import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 
 /**
- * Users add multiple {@link ActionRequest ActionRequests} to a {@link RequestIndexer} to prepare
+ * Users add multiple delete, index or update requests to a {@link RequestIndexer} to prepare
  * them for sending to an Elasticsearch cluster.
  */
 @PublicEvolving
@@ -33,6 +36,41 @@
 	 * Add multiple {@link ActionRequest} to the indexer to prepare for sending requests to Elasticsearch.
 	 *
 	 * @param actionRequests The multiple {@link ActionRequest} to add.
+	 * @deprecated use the {@link DeleteRequest}, {@link IndexRequest} or {@link UpdateRequest}
 	 */
-	void add(ActionRequest... actionRequests);
+	@Deprecated
+	default void add(ActionRequest... actionRequests) {
+		for (ActionRequest actionRequest : actionRequests) {
+			if (actionRequest instanceof IndexRequest) {
+				add((IndexRequest) actionRequest);
+			} else if (actionRequest instanceof DeleteRequest) {
+				add((DeleteRequest) actionRequest);
+			} else if (actionRequest instanceof UpdateRequest) {
+				add((UpdateRequest) actionRequest);
+			} else {
+				throw new IllegalArgumentException("RequestIndexer only supports Index, Delete and Update requests");
+			}
+		}
+	}
+
+	/**
+	 * Add multiple {@link DeleteRequest} to the indexer to prepare for sending requests to Elasticsearch.
+	 *
+	 * @param deleteRequests The multiple {@link DeleteRequest} to add.
+	 */
+	void add(DeleteRequest... deleteRequests);
+
+	/**
+	 * Add multiple {@link IndexRequest} to the indexer to prepare for sending requests to Elasticsearch.
+	 *
+	 * @param indexRequests The multiple {@link IndexRequest} to add.
+	 */
+	void add(IndexRequest... indexRequests);
+
+	/**
+	 * Add multiple {@link UpdateRequest} to the indexer to prepare for sending requests to Elasticsearch.
+	 *
+	 * @param updateRequests The multiple {@link UpdateRequest} to add.
+	 */
+	void add(UpdateRequest... updateRequests);
 }
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
index 09d8806b963..5a161a747c1 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
@@ -31,6 +31,7 @@
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Requests;
 import org.junit.Assert;
@@ -92,7 +93,7 @@ public void testItemFailureRethrownOnInvoke() throws Throwable {
 		// setup the next bulk request, and its mock item failures
 		sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record")));
 		testHarness.processElement(new StreamRecord<>("msg"));
-		verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+		verify(sink.getMockBulkProcessor(), times(1)).add(any(IndexRequest.class));
 
 		// manually execute the next bulk request
 		sink.manualBulkRequestWithAllPendingRequests();
@@ -124,7 +125,7 @@ public void testItemFailureRethrownOnCheckpoint() throws Throwable {
 		// setup the next bulk request, and its mock item failures
 		sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record")));
 		testHarness.processElement(new StreamRecord<>("msg"));
-		verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+		verify(sink.getMockBulkProcessor(), times(1)).add(any(IndexRequest.class));
 
 		// manually execute the next bulk request
 		sink.manualBulkRequestWithAllPendingRequests();
@@ -164,7 +165,7 @@ public void testItemFailureRethrownOnCheckpointAfterFlush() throws Throwable {
 		sink.setMockItemFailuresListForNextBulkItemResponses(mockResponsesList);
 
 		testHarness.processElement(new StreamRecord<>("msg-1"));
-		verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+		verify(sink.getMockBulkProcessor(), times(1)).add(any(IndexRequest.class));
 
 		// manually execute the next bulk request (1 request only, thus should succeed)
 		sink.manualBulkRequestWithAllPendingRequests();
@@ -172,7 +173,7 @@ public void testItemFailureRethrownOnCheckpointAfterFlush() throws Throwable {
 		// setup the requests to be flushed in the snapshot
 		testHarness.processElement(new StreamRecord<>("msg-2"));
 		testHarness.processElement(new StreamRecord<>("msg-3"));
-		verify(sink.getMockBulkProcessor(), times(3)).add(any(ActionRequest.class));
+		verify(sink.getMockBulkProcessor(), times(3)).add(any(IndexRequest.class));
 
 		CheckedThread snapshotThread = new CheckedThread() {
 			@Override
@@ -217,7 +218,7 @@ public void testBulkFailureRethrownOnInvoke() throws Throwable {
 		// setup the next bulk request, and let the whole bulk request fail
 		sink.setFailNextBulkRequestCompletely(new Exception("artificial failure for bulk request"));
 		testHarness.processElement(new StreamRecord<>("msg"));
-		verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+		verify(sink.getMockBulkProcessor(), times(1)).add(any(IndexRequest.class));
 
 		// manually execute the next bulk request
 		sink.manualBulkRequestWithAllPendingRequests();
@@ -249,7 +250,7 @@ public void testBulkFailureRethrownOnCheckpoint() throws Throwable {
 		// setup the next bulk request, and let the whole bulk request fail
 		sink.setFailNextBulkRequestCompletely(new Exception("artificial failure for bulk request"));
 		testHarness.processElement(new StreamRecord<>("msg"));
-		verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+		verify(sink.getMockBulkProcessor(), times(1)).add(any(IndexRequest.class));
 
 		// manually execute the next bulk request
 		sink.manualBulkRequestWithAllPendingRequests();
@@ -284,7 +285,7 @@ public void testBulkFailureRethrownOnOnCheckpointAfterFlush() throws Throwable {
 		// setup the next bulk request, and let bulk request succeed
 		sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList((Exception) null));
 		testHarness.processElement(new StreamRecord<>("msg-1"));
-		verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+		verify(sink.getMockBulkProcessor(), times(1)).add(any(IndexRequest.class));
 
 		// manually execute the next bulk request
 		sink.manualBulkRequestWithAllPendingRequests();
@@ -292,7 +293,7 @@ public void testBulkFailureRethrownOnOnCheckpointAfterFlush() throws Throwable {
 		// setup the requests to be flushed in the snapshot
 		testHarness.processElement(new StreamRecord<>("msg-2"));
 		testHarness.processElement(new StreamRecord<>("msg-3"));
-		verify(sink.getMockBulkProcessor(), times(3)).add(any(ActionRequest.class));
+		verify(sink.getMockBulkProcessor(), times(3)).add(any(IndexRequest.class));
 
 		CheckedThread snapshotThread = new CheckedThread() {
 			@Override
@@ -346,7 +347,7 @@ public void testAtLeastOnceSink() throws Throwable {
 		// it contains 1 request, which will fail and re-added to the next bulk request
 		sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record")));
 		testHarness.processElement(new StreamRecord<>("msg"));
-		verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+		verify(sink.getMockBulkProcessor(), times(1)).add(any(IndexRequest.class));
 
 		CheckedThread snapshotThread = new CheckedThread() {
 			@Override
@@ -402,7 +403,7 @@ public void testDoesNotWaitForPendingRequestsIfFlushingDisabled() throws Excepti
 		// setup the next bulk request, and let bulk request succeed
 		sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record")));
 		testHarness.processElement(new StreamRecord<>("msg-1"));
-		verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+		verify(sink.getMockBulkProcessor(), times(1)).add(any(IndexRequest.class));
 
 		// the snapshot should not block even though we haven't flushed the bulk request
 		testHarness.snapshot(1L, 1000L);
@@ -478,11 +479,11 @@ public BulkProcessor getMockBulkProcessor() {
 		protected BulkProcessor buildBulkProcessor(final BulkProcessor.Listener listener) {
 			this.mockBulkProcessor = mock(BulkProcessor.class);
 
-			when(mockBulkProcessor.add(any(ActionRequest.class))).thenAnswer(new Answer<Object>() {
+			when(mockBulkProcessor.add(any(IndexRequest.class))).thenAnswer(new Answer<Object>() {
 				@Override
 				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
 					// intercept the request and add it to our mock bulk request
-					nextBulkRequest.add(invocationOnMock.getArgumentAt(0, ActionRequest.class));
+					nextBulkRequest.add(invocationOnMock.getArgumentAt(0, IndexRequest.class));
 
 					return null;
 				}
@@ -530,12 +531,12 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
 		}
 	}
 
-	private static class DummyElasticsearchApiCallBridge implements ElasticsearchApiCallBridge {
+	private static class DummyElasticsearchApiCallBridge extends ElasticsearchApiCallBridge {
 
 		private static final long serialVersionUID = -4272760730959041699L;
 
 		@Override
-		public Client createClient(Map<String, String> clientConfig) {
+		public AutoCloseable createClient(Map<String, String> clientConfig) {
 			return mock(Client.class);
 		}
 
@@ -550,13 +551,13 @@ public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkIt
 		}
 
 		@Override
-		public void configureBulkProcessorBackoff(BulkProcessor.Builder builder, @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
-			// no need for this in the test cases here
+		public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
+			return null;
 		}
 
 		@Override
-		public void cleanup() {
-			// nothing to cleanup
+		public void configureBulkProcessorBackoff(BulkProcessor.Builder builder, @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
+			// no need for this in the test cases here
 		}
 	}
 
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
index 2a3c2a06460..6f492064a02 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
@@ -42,7 +42,7 @@
  * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 1.x.
  */
 @Internal
-public class Elasticsearch1ApiCallBridge implements ElasticsearchApiCallBridge {
+public class Elasticsearch1ApiCallBridge extends ElasticsearchApiCallBridge {
 
 	private static final long serialVersionUID = -2632363720584123682L;
 
@@ -70,7 +70,7 @@
 	}
 
 	@Override
-	public Client createClient(Map<String, String> clientConfig) {
+	public AutoCloseable createClient(Map<String, String> clientConfig) {
 		if (transportAddresses == null) {
 
 			// Make sure that we disable http access to our embedded node
@@ -115,6 +115,11 @@ public Client createClient(Map<String, String> clientConfig) {
 		}
 	}
 
+	@Override
+	public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
+		return BulkProcessor.builder((Client) client, listener);
+	}
+
 	@Override
 	public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
 		if (!bulkItemResponse.isFailed()) {
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
index 390a4078e2b..80c1b3acf22 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
@@ -44,7 +44,7 @@
  * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 2.x.
  */
 @Internal
-public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge {
+public class Elasticsearch2ApiCallBridge extends ElasticsearchApiCallBridge {
 
 	private static final long serialVersionUID = 2638252694744361079L;
 
@@ -63,7 +63,7 @@
 	}
 
 	@Override
-	public Client createClient(Map<String, String> clientConfig) {
+	public AutoCloseable createClient(Map<String, String> clientConfig) {
 		Settings settings = Settings.settingsBuilder().put(clientConfig).build();
 
 		TransportClient transportClient = TransportClient.builder().settings(settings).build();
@@ -83,6 +83,11 @@ public Client createClient(Map<String, String> clientConfig) {
 		return transportClient;
 	}
 
+	@Override
+	public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
+		return BulkProcessor.builder((Client) client, listener);
+	}
+
 	@Override
 	public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
 		if (!bulkItemResponse.isFailed()) {
@@ -117,10 +122,4 @@ public void configureBulkProcessorBackoff(
 
 		builder.setBackoffPolicy(backoffPolicy);
 	}
-
-	@Override
-	public void cleanup() {
-		// nothing to cleanup
-	}
-
 }
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
index 7c4ba7a97f1..1e73feb9e43 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
@@ -47,7 +47,7 @@
  * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 5.x.
  */
 @Internal
-public class Elasticsearch5ApiCallBridge implements ElasticsearchApiCallBridge {
+public class Elasticsearch5ApiCallBridge extends ElasticsearchApiCallBridge {
 
 	private static final long serialVersionUID = -5222683870097809633L;
 
@@ -66,7 +66,7 @@
 	}
 
 	@Override
-	public Client createClient(Map<String, String> clientConfig) {
+	public AutoCloseable createClient(Map<String, String> clientConfig) {
 		Settings settings = Settings.builder().put(clientConfig)
 			.put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
 			.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME)
@@ -89,6 +89,11 @@ public Client createClient(Map<String, String> clientConfig) {
 		return transportClient;
 	}
 
+	@Override
+	public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
+		return BulkProcessor.builder((Client) client, listener);
+	}
+
 	@Override
 	public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
 		if (!bulkItemResponse.isFailed()) {
@@ -123,10 +128,4 @@ public void configureBulkProcessorBackoff(
 
 		builder.setBackoffPolicy(backoffPolicy);
 	}
-
-	@Override
-	public void cleanup() {
-		// nothing to cleanup
-	}
-
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ client
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-7386
>                 URL: https://issues.apache.org/jira/browse/FLINK-7386
>             Project: Flink
>          Issue Type: Improvement
>          Components: ElasticSearch Connector
>            Reporter: Dawid Wysakowicz
>            Assignee: Fang Yong
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> In Elasticsearch 5.2.0 client the class {{BulkProcessor}} was refactored and has no longer the method {{add(ActionRequest)}}.
> For more info see: https://github.com/elastic/elasticsearch/pull/20109



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)