You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/09/26 08:35:28 UTC
[incubator-seatunnel] branch dev updated: [Improve][Connector-V2]Kudu Sink Connector Support to upsert row
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1ece805ab [Improve][Connector-V2]Kudu Sink Connector Support to upsert row
1ece805ab is described below
commit 1ece805ab1fce3ec8e8c9d3ddc5311ab62ac8873
Author: Xiao Zhao <zh...@163.com>
AuthorDate: Mon Sep 26 16:35:21 2022 +0800
[Improve][Connector-V2]Kudu Sink Connector Support to upsert row
---
.../kudu/kuduclient/KuduOutputFormat.java | 66 ++++++++++++++++------
1 file changed, 48 insertions(+), 18 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java
index f73b191ca..909940f49 100644
--- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java
@@ -29,6 +29,7 @@ import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.SessionConfiguration;
+import org.apache.kudu.client.Upsert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,26 +46,25 @@ public class KuduOutputFormat
private static final Logger LOGGER = LoggerFactory.getLogger(KuduOutputFormat.class);
- private String kuduMaster;
- private String kuduTableName;
+ public static final long TIMEOUTMS = 18000;
+ public static final long SESSIONTIMEOUTMS = 100000;
+
+ private final String kuduMaster;
+ private final String kuduTableName;
+ private final KuduSinkConfig.SaveMode saveMode;
private KuduClient kuduClient;
private KuduSession kuduSession;
private KuduTable kuduTable;
- public static final long TIMEOUTMS = 18000;
- public static final long SESSIONTIMEOUTMS = 100000;
+
public KuduOutputFormat(KuduSinkConfig kuduSinkConfig) {
this.kuduMaster = kuduSinkConfig.getKuduMaster();
this.kuduTableName = kuduSinkConfig.getKuduTableName();
+ this.saveMode = kuduSinkConfig.getSaveMode();
init();
}
- public void write(SeaTunnelRow element) {
-
- Insert insert = kuduTable.newInsert();
- Schema schema = kuduTable.getSchema();
-
+ private void transform(PartialRow row, SeaTunnelRow element, Schema schema) {
int columnCount = schema.getColumnCount();
- PartialRow row = insert.getRow();
for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) {
ColumnSchema col = schema.getColumnByIndex(columnIndex);
try {
@@ -114,24 +114,54 @@ public class KuduOutputFormat
throw new IllegalArgumentException("Unsupported column type: " + col.getType());
}
} catch (ClassCastException e) {
- e.printStackTrace();
throw new IllegalArgumentException(
"Value type does not match column type " + col.getType() +
" for column " + col.getName());
}
}
+ }
+ private void upsert(SeaTunnelRow element) {
+ Upsert upsert = kuduTable.newUpsert();
+ Schema schema = kuduTable.getSchema();
+ PartialRow row = upsert.getRow();
+ transform(row, element, schema);
+ try {
+ kuduSession.apply(upsert);
+ } catch (KuduException e) {
+ LOGGER.error("Failed to upsert.", e);
+ throw new RuntimeException("Failed to upsert.", e);
+ }
+ }
+
+ private void insert(SeaTunnelRow element) {
+ Insert insert = kuduTable.newInsert();
+ Schema schema = kuduTable.getSchema();
+ PartialRow row = insert.getRow();
+ transform(row, element, schema);
try {
kuduSession.apply(insert);
} catch (KuduException e) {
- LOGGER.warn("kudu session insert data fail.", e);
- throw new RuntimeException("kudu session insert data fail.", e);
+ LOGGER.error("Failed to insert.", e);
+ throw new RuntimeException("Failed to insert.", e);
}
+ }
+ public void write(SeaTunnelRow element) {
+ switch (saveMode) {
+ case APPEND:
+ insert(element);
+ break;
+ case OVERWRITE:
+ upsert(element);
+ break;
+ default:
+ throw new IllegalArgumentException(String.format("Unsupported saveMode: %s.", saveMode.name()));
+ }
}
- public void init() {
+ private void init() {
KuduClient.KuduClientBuilder kuduClientBuilder = new
KuduClient.KuduClientBuilder(kuduMaster);
kuduClientBuilder.defaultOperationTimeoutMs(TIMEOUTMS);
@@ -142,10 +172,10 @@ public class KuduOutputFormat
try {
kuduTable = kuduClient.openTable(kuduTableName);
} catch (KuduException e) {
- LOGGER.warn("Failed to initialize the Kudu client.", e);
+ LOGGER.error("Failed to initialize the Kudu client.", e);
throw new RuntimeException("Failed to initialize the Kudu client.", e);
}
- LOGGER.info("The Kudu client is successfully initialized", kuduMaster, kuduClient);
+ LOGGER.info("The Kudu client for Master: {} is initialized successfully.", kuduMaster);
}
public void closeOutputFormat() {
@@ -153,8 +183,8 @@ public class KuduOutputFormat
try {
kuduClient.close();
kuduSession.close();
- } catch (KuduException e) {
- LOGGER.warn("Kudu Client close failed.", e);
+ } catch (KuduException ignored) {
+ LOGGER.warn("Failed to close Kudu Client.", ignored);
} finally {
kuduClient = null;
kuduSession = null;