You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/06 11:04:42 UTC

[inlong] branch master updated: [INLONG-7487][Sort] Change changelog mode to capture update_before for ES (#7510)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fc000b00 [INLONG-7487][Sort] Change changelog mode to capture update_before for ES (#7510)
2fc000b00 is described below

commit 2fc000b00f3a14394d3913d8f1fa63b7524ee182
Author: Yizhou Yang <32...@users.noreply.github.com>
AuthorDate: Mon Mar 6 19:04:36 2023 +0800

    [INLONG-7487][Sort] Change changelog mode to capture update_before for ES (#7510)
    
    Co-authored-by: Yizhou Yang <yi...@tencent.com>
---
 .../sort/elasticsearch7/table/Elasticsearch7DynamicSink.java     | 9 +--------
 .../sort/elasticsearch/table/RowElasticsearchSinkFunction.java   | 1 +
 2 files changed, 2 insertions(+), 8 deletions(-)

diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
index e2fde4124..982a74b84 100644
--- a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
+++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.SinkFunctionProvider;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.types.RowKind;
 import org.apache.flink.util.StringUtils;
 import org.apache.http.HttpHost;
 import org.apache.http.auth.AuthScope;
@@ -110,13 +109,7 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
 
     @Override
     public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
-        ChangelogMode.Builder builder = ChangelogMode.newBuilder();
-        for (RowKind kind : requestedMode.getContainedKinds()) {
-            if (kind != RowKind.UPDATE_BEFORE) {
-                builder.addContainedKind(kind);
-            }
-        }
-        return builder.build();
+        return ChangelogMode.all();
     }
 
     // --------------------------------------------------------------
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
index dcd7b7892..89fa46f0c 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -164,6 +164,7 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
                 }
                 break;
             case UPDATE_BEFORE:
+                sendMetrics(document);
                 break;
             default:
                 LOGGER.error(String.format("The type of element should be 'RowData' only, raw data: %s", element));