You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/06/04 07:15:19 UTC

[flink] branch release-1.11 updated: [FLINK-18006] Always overwrite RestClientFactory in ElasticsearchXDynamicSink

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 10f1d37  [FLINK-18006] Always overwrite RestClientFactory in ElasticsearchXDynamicSink
10f1d37 is described below

commit 10f1d37841a0b8f3806678df2bab85e71842b06a
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Jun 2 17:09:26 2020 +0200

    [FLINK-18006] Always overwrite RestClientFactory in ElasticsearchXDynamicSink
    
    We always overwrite the RestClientFactory in order to workaround an
    issue with shading classes in lambdas deserialization method. That way
    we never use the default lambda from ElasticsearchSink$Builder which
    cannot be deserialized when used from a
    flink-sql-connector-elasticsearch module due to shading.
    
    This closes #12455
---
 .../connectors/elasticsearch/table/Elasticsearch6DynamicSink.java    | 5 +++--
 .../connectors/elasticsearch/table/Elasticsearch7DynamicSink.java    | 5 +++--
 2 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
index bedfbef..680cb2c 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
@@ -136,8 +136,9 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
 			config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
 			config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
 
-			config.getPathPrefix()
-				.ifPresent(pathPrefix -> builder.setRestClientFactory(new DefaultRestClientFactory(pathPrefix)));
+			// we must overwrite the default factory which is defined with a lambda because of a bug
+			// in shading lambda serialization shading see FLINK-18006
+			builder.setRestClientFactory(new DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
 
 			final ElasticsearchSink<RowData> sink = builder.build();
 
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
index 408673e..7aa52ea 100644
--- a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
+++ b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
@@ -136,8 +136,9 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
 			config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
 			config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
 
-			config.getPathPrefix()
-				.ifPresent(pathPrefix -> builder.setRestClientFactory(new DefaultRestClientFactory(pathPrefix)));
+			// we must overwrite the default factory which is defined with a lambda because of a bug
+			// in shading lambda serialization shading see FLINK-18006
+			builder.setRestClientFactory(new DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
 
 			final ElasticsearchSink<RowData> sink = builder.build();