You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/09/26 08:35:28 UTC

[incubator-seatunnel] branch dev updated: [Improve][Connector-V2]Kudu Sink Connector Support to upsert row

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1ece805ab [Improve][Connector-V2]Kudu Sink Connector Support to upsert row
1ece805ab is described below

commit 1ece805ab1fce3ec8e8c9d3ddc5311ab62ac8873
Author: Xiao Zhao <zh...@163.com>
AuthorDate: Mon Sep 26 16:35:21 2022 +0800

    [Improve][Connector-V2]Kudu Sink Connector Support to upsert row
---
 .../kudu/kuduclient/KuduOutputFormat.java          | 66 ++++++++++++++++------
 1 file changed, 48 insertions(+), 18 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java
index f73b191ca..909940f49 100644
--- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java
@@ -29,6 +29,7 @@ import org.apache.kudu.client.KuduSession;
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.PartialRow;
 import org.apache.kudu.client.SessionConfiguration;
+import org.apache.kudu.client.Upsert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,26 +46,25 @@ public class KuduOutputFormat
 
     private static final Logger LOGGER = LoggerFactory.getLogger(KuduOutputFormat.class);
 
-    private String kuduMaster;
-    private String kuduTableName;
+    public static final long TIMEOUTMS = 18000;
+    public static final long SESSIONTIMEOUTMS = 100000;
+
+    private final String kuduMaster;
+    private final String kuduTableName;
+    private final KuduSinkConfig.SaveMode saveMode;
     private KuduClient kuduClient;
     private KuduSession kuduSession;
     private KuduTable kuduTable;
-    public static final long TIMEOUTMS = 18000;
-    public static final long SESSIONTIMEOUTMS = 100000;
+
     public KuduOutputFormat(KuduSinkConfig kuduSinkConfig) {
         this.kuduMaster = kuduSinkConfig.getKuduMaster();
         this.kuduTableName = kuduSinkConfig.getKuduTableName();
+        this.saveMode = kuduSinkConfig.getSaveMode();
         init();
     }
 
-    public void write(SeaTunnelRow element) {
-
-        Insert insert = kuduTable.newInsert();
-        Schema schema = kuduTable.getSchema();
-
+    private void transform(PartialRow row, SeaTunnelRow element, Schema schema) {
         int columnCount = schema.getColumnCount();
-        PartialRow row = insert.getRow();
         for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) {
             ColumnSchema col = schema.getColumnByIndex(columnIndex);
             try {
@@ -114,24 +114,54 @@ public class KuduOutputFormat
                         throw new IllegalArgumentException("Unsupported column type: " + col.getType());
                 }
             } catch (ClassCastException e) {
-                e.printStackTrace();
                 throw new IllegalArgumentException(
                         "Value type does not match column type " + col.getType() +
                                 " for column " + col.getName());
             }
 
         }
+    }
 
+    private void upsert(SeaTunnelRow element) {
+        Upsert upsert = kuduTable.newUpsert();
+        Schema schema = kuduTable.getSchema();
+        PartialRow row = upsert.getRow();
+        transform(row, element, schema);
+        try {
+            kuduSession.apply(upsert);
+        } catch (KuduException e) {
+            LOGGER.error("Failed to upsert.", e);
+            throw new RuntimeException("Failed to upsert.", e);
+        }
+    }
+
+    private void insert(SeaTunnelRow element) {
+        Insert insert = kuduTable.newInsert();
+        Schema schema = kuduTable.getSchema();
+        PartialRow row = insert.getRow();
+        transform(row, element, schema);
         try {
             kuduSession.apply(insert);
         } catch (KuduException e) {
-            LOGGER.warn("kudu session insert data fail.", e);
-            throw new RuntimeException("kudu session insert data fail.", e);
+            LOGGER.error("Failed to insert.", e);
+            throw new RuntimeException("Failed to insert.", e);
         }
+    }
 
+    public void write(SeaTunnelRow element) {
+        switch (saveMode) {
+            case APPEND:
+                insert(element);
+                break;
+            case OVERWRITE:
+                upsert(element);
+                break;
+            default:
+                throw new IllegalArgumentException(String.format("Unsupported saveMode: %s.", saveMode.name()));
+        }
     }
 
-    public void init() {
+    private void init() {
         KuduClient.KuduClientBuilder kuduClientBuilder = new
                 KuduClient.KuduClientBuilder(kuduMaster);
         kuduClientBuilder.defaultOperationTimeoutMs(TIMEOUTMS);
@@ -142,10 +172,10 @@ public class KuduOutputFormat
         try {
             kuduTable = kuduClient.openTable(kuduTableName);
         } catch (KuduException e) {
-            LOGGER.warn("Failed to initialize the Kudu client.", e);
+            LOGGER.error("Failed to initialize the Kudu client.", e);
             throw new RuntimeException("Failed to initialize the Kudu client.", e);
         }
-        LOGGER.info("The Kudu client is successfully initialized", kuduMaster, kuduClient);
+        LOGGER.info("The Kudu client for Master: {} is initialized successfully.", kuduMaster);
     }
 
     public void closeOutputFormat() {
@@ -153,8 +183,8 @@ public class KuduOutputFormat
             try {
                 kuduClient.close();
                 kuduSession.close();
-            } catch (KuduException e) {
-                LOGGER.warn("Kudu Client close failed.", e);
+            } catch (KuduException ignored) {
+                LOGGER.warn("Failed to close Kudu Client.", ignored);
             } finally {
                 kuduClient = null;
                 kuduSession = null;