You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/10/11 19:35:29 UTC

[pulsar] branch master updated: [fix][connector] JDBC sinks: verify key and nonKey fields are set with insertMode=UPSERT (#17950)

This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 320808673f8 [fix][connector] JDBC sinks: verify key and nonKey fields are set with insertMode=UPSERT (#17950)
320808673f8 is described below

commit 320808673f89ad8194d50183e9820c94b034d9d9
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Tue Oct 11 21:35:20 2022 +0200

    [fix][connector] JDBC sinks: verify key and nonKey fields are set with insertMode=UPSERT (#17950)
---
 .../pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java     |  8 +++--
 .../apache/pulsar/io/jdbc/JdbcAbstractSink.java    | 35 +++++++++++++---------
 .../java/org/apache/pulsar/io/jdbc/JdbcUtils.java  | 17 ++---------
 .../pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java  | 11 +++++--
 .../pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java | 16 +++++++++-
 .../pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java   | 16 +++++++++-
 6 files changed, 69 insertions(+), 34 deletions(-)

diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
index 17cfe4cd97b..45e71a5f0ce 100644
--- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
@@ -48,6 +48,11 @@ public abstract class BaseJdbcAutoSchemaSink extends JdbcAbstractSink<GenericObj
         throw new IllegalStateException("UPSERT not supported");
     }
 
+    @Override
+    public List<ColumnId> getColumnsForUpsert() {
+        throw new IllegalStateException("UPSERT not supported");
+    }
+
     @Override
     public void bindValue(PreparedStatement statement, Mutation mutation) throws Exception {
         final List<ColumnId> columns = new ArrayList<>();
@@ -56,8 +61,7 @@ public abstract class BaseJdbcAutoSchemaSink extends JdbcAbstractSink<GenericObj
                 columns.addAll(tableDefinition.getColumns());
                 break;
             case UPSERT:
-                columns.addAll(tableDefinition.getColumns());
-                columns.addAll(tableDefinition.getNonKeyColumns());
+                columns.addAll(getColumnsForUpsert());
                 break;
             case UPDATE:
                 columns.addAll(tableDefinition.getNonKeyColumns());
diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index 3f7f62e3abb..95f66edf7a7 100644
--- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -109,30 +109,35 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
     }
 
     private void initStatement()  throws Exception {
-        List<String> keyList = Lists.newArrayList();
-        String key = jdbcSinkConfig.getKey();
-        if (key != null && !key.isEmpty()) {
-            keyList = Arrays.asList(key.split(","));
-        }
-        List<String> nonKeyList = Lists.newArrayList();
-        String nonKey = jdbcSinkConfig.getNonKey();
-        if (nonKey != null && !nonKey.isEmpty()) {
-            nonKeyList = Arrays.asList(nonKey.split(","));
-        }
+        List<String> keyList = getListFromConfig(jdbcSinkConfig.getKey());
+        List<String> nonKeyList = getListFromConfig(jdbcSinkConfig.getNonKey());
 
         tableDefinition = JdbcUtils.getTableDefinition(connection, tableId, keyList, nonKeyList);
-        insertStatement = JdbcUtils.buildInsertStatement(connection, generateInsertQueryStatement());
+        insertStatement = connection.prepareStatement(generateInsertQueryStatement());
         if (jdbcSinkConfig.getInsertMode() == JdbcSinkConfig.InsertMode.UPSERT) {
-            upsertStatement = JdbcUtils.buildInsertStatement(connection, generateUpsertQueryStatement());
+            if (nonKeyList.isEmpty() || keyList.isEmpty()) {
+                throw new IllegalStateException("UPSERT mode is not configured if 'key' and 'nonKey' "
+                        + "config are not set.");
+            }
+            upsertStatement = connection.prepareStatement(generateUpsertQueryStatement());
         }
         if (!nonKeyList.isEmpty()) {
-            updateStatement = JdbcUtils.buildUpdateStatement(connection, generateUpdateQueryStatement());
+            updateStatement = connection.prepareStatement(generateUpdateQueryStatement());
         }
         if (!keyList.isEmpty()) {
-            deleteStatement = JdbcUtils.buildDeleteStatement(connection, generateDeleteQueryStatement());
+            deleteStatement = connection.prepareStatement(generateDeleteQueryStatement());
         }
     }
 
+    private static List<String> getListFromConfig(String jdbcSinkConfig) {
+        List<String> nonKeyList = Lists.newArrayList();
+        String nonKey = jdbcSinkConfig;
+        if (nonKey != null && !nonKey.isEmpty()) {
+            nonKeyList = Arrays.asList(nonKey.split(","));
+        }
+        return nonKeyList;
+    }
+
     @Override
     public void close() throws Exception {
         if (flushExecutor != null) {
@@ -185,6 +190,8 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
 
     public abstract String generateUpsertQueryStatement();
 
+    public abstract List<JdbcUtils.ColumnId> getColumnsForUpsert();
+
     public String generateDeleteQueryStatement() {
         return JdbcUtils.buildDeleteSql(tableDefinition);
     }
diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
index 327f1db7c64..5fceea27547 100644
--- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
+++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
@@ -23,9 +23,7 @@ import static com.google.common.base.Preconditions.checkState;
 import com.google.common.collect.Lists;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
-import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.SQLException;
 import java.util.List;
 import java.util.StringJoiner;
 import java.util.stream.IntStream;
@@ -173,10 +171,6 @@ public class JdbcUtils {
         return builder.toString();
     }
 
-    public static PreparedStatement buildInsertStatement(Connection connection, String insertSQL) throws SQLException {
-        return connection.prepareStatement(insertSQL);
-    }
-
     public static String combationWhere(List<ColumnId> columnIds) {
         StringBuilder builder = new StringBuilder();
         if (!columnIds.isEmpty()) {
@@ -205,6 +199,9 @@ public class JdbcUtils {
     }
 
     public static StringJoiner buildUpdateSqlSetPart(TableDefinition table) {
+        if (table.nonKeyColumns.isEmpty()) {
+            throw new IllegalStateException("UPDATE operations are not supported if 'nonKey' config is not set.");
+        }
         StringJoiner setJoiner = new StringJoiner(",");
 
         table.nonKeyColumns.forEach((columnId) ->{
@@ -215,17 +212,9 @@ public class JdbcUtils {
         return setJoiner;
     }
 
-    public static PreparedStatement buildUpdateStatement(Connection connection, String updateSQL) throws SQLException {
-        return connection.prepareStatement(updateSQL);
-    }
-
     public static String buildDeleteSql(TableDefinition table) {
         return "DELETE FROM "
             + table.tableId.getTableName()
             + combationWhere(table.keyColumns);
     }
-
-    public static PreparedStatement buildDeleteStatement(Connection connection, String deleteSQL) throws SQLException {
-        return connection.prepareStatement(deleteSQL);
-    }
 }
diff --git a/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java b/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java
index 5c232db1aa0..d0f44e4c77c 100644
--- a/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.io.jdbc;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
@@ -32,10 +34,15 @@ public class MariadbJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {
 
     @Override
     public String generateUpsertQueryStatement() {
-        final String keys = tableDefinition.getKeyColumns().stream().map(JdbcUtils.ColumnId::getName)
-                .collect(Collectors.joining(","));
         return JdbcUtils.buildInsertSql(tableDefinition)
                 + "ON DUPLICATE KEY UPDATE " + JdbcUtils.buildUpdateSqlSetPart(tableDefinition);
     }
 
+    @Override
+    public List<JdbcUtils.ColumnId> getColumnsForUpsert() {
+        final List<JdbcUtils.ColumnId> columns = new ArrayList<>();
+        columns.addAll(tableDefinition.getColumns());
+        columns.addAll(tableDefinition.getNonKeyColumns());
+        return columns;
+    }
 }
diff --git a/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java b/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java
index d29e3e3aaca..803f1372437 100644
--- a/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.io.jdbc;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
@@ -32,10 +34,22 @@ public class PostgresJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {
 
     @Override
     public String generateUpsertQueryStatement() {
-        final String keys = tableDefinition.getKeyColumns().stream().map(JdbcUtils.ColumnId::getName)
+        final List<JdbcUtils.ColumnId> keyColumns = tableDefinition.getKeyColumns();
+        if (keyColumns.isEmpty()) {
+            throw new IllegalStateException("UPSERT is not supported if 'key' config is not set.");
+        }
+        final String keys = keyColumns.stream().map(JdbcUtils.ColumnId::getName)
                 .collect(Collectors.joining(","));
         return JdbcUtils.buildInsertSql(tableDefinition)
                 + " ON CONFLICT(" + keys + ") "
                 + "DO UPDATE SET " + JdbcUtils.buildUpdateSqlSetPart(tableDefinition);
     }
+
+    @Override
+    public List<JdbcUtils.ColumnId> getColumnsForUpsert() {
+        final List<JdbcUtils.ColumnId> columns = new ArrayList<>();
+        columns.addAll(tableDefinition.getColumns());
+        columns.addAll(tableDefinition.getNonKeyColumns());
+        return columns;
+    }
 }
diff --git a/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java b/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java
index 76655397003..f67f0f1afbf 100644
--- a/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.io.jdbc;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
@@ -32,10 +34,22 @@ public class SqliteJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {
 
     @Override
     public String generateUpsertQueryStatement() {
-        final String keys = tableDefinition.getKeyColumns().stream().map(JdbcUtils.ColumnId::getName)
+        final List<JdbcUtils.ColumnId> keyColumns = tableDefinition.getKeyColumns();
+        if (keyColumns.isEmpty()) {
+            throw new IllegalStateException("UPSERT is not supported if 'key' config is not set.");
+        }
+        final String keys = keyColumns.stream().map(JdbcUtils.ColumnId::getName)
                 .collect(Collectors.joining(","));
         return JdbcUtils.buildInsertSql(tableDefinition)
                 + " ON CONFLICT(" + keys + ") "
                 + "DO UPDATE SET " + JdbcUtils.buildUpdateSqlSetPart(tableDefinition);
     }
+
+    @Override
+    public List<JdbcUtils.ColumnId> getColumnsForUpsert() {
+        final List<JdbcUtils.ColumnId> columns = new ArrayList<>();
+        columns.addAll(tableDefinition.getColumns());
+        columns.addAll(tableDefinition.getNonKeyColumns());
+        return columns;
+    }
 }