You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/10/11 19:35:29 UTC
[pulsar] branch master updated: [fix][connector] JDBC sinks: verify key and nonKey fields are set with insertMode=UPSERT (#17950)
This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 320808673f8 [fix][connector] JDBC sinks: verify key and nonKey fields are set with insertMode=UPSERT (#17950)
320808673f8 is described below
commit 320808673f89ad8194d50183e9820c94b034d9d9
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Tue Oct 11 21:35:20 2022 +0200
[fix][connector] JDBC sinks: verify key and nonKey fields are set with insertMode=UPSERT (#17950)
---
.../pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java | 8 +++--
.../apache/pulsar/io/jdbc/JdbcAbstractSink.java | 35 +++++++++++++---------
.../java/org/apache/pulsar/io/jdbc/JdbcUtils.java | 17 ++---------
.../pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java | 11 +++++--
.../pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java | 16 +++++++++-
.../pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java | 16 +++++++++-
6 files changed, 69 insertions(+), 34 deletions(-)
diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
index 17cfe4cd97b..45e71a5f0ce 100644
--- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
@@ -48,6 +48,11 @@ public abstract class BaseJdbcAutoSchemaSink extends JdbcAbstractSink<GenericObj
throw new IllegalStateException("UPSERT not supported");
}
+ @Override
+ public List<ColumnId> getColumnsForUpsert() {
+ throw new IllegalStateException("UPSERT not supported");
+ }
+
@Override
public void bindValue(PreparedStatement statement, Mutation mutation) throws Exception {
final List<ColumnId> columns = new ArrayList<>();
@@ -56,8 +61,7 @@ public abstract class BaseJdbcAutoSchemaSink extends JdbcAbstractSink<GenericObj
columns.addAll(tableDefinition.getColumns());
break;
case UPSERT:
- columns.addAll(tableDefinition.getColumns());
- columns.addAll(tableDefinition.getNonKeyColumns());
+ columns.addAll(getColumnsForUpsert());
break;
case UPDATE:
columns.addAll(tableDefinition.getNonKeyColumns());
diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index 3f7f62e3abb..95f66edf7a7 100644
--- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -109,30 +109,35 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
}
private void initStatement() throws Exception {
- List<String> keyList = Lists.newArrayList();
- String key = jdbcSinkConfig.getKey();
- if (key != null && !key.isEmpty()) {
- keyList = Arrays.asList(key.split(","));
- }
- List<String> nonKeyList = Lists.newArrayList();
- String nonKey = jdbcSinkConfig.getNonKey();
- if (nonKey != null && !nonKey.isEmpty()) {
- nonKeyList = Arrays.asList(nonKey.split(","));
- }
+ List<String> keyList = getListFromConfig(jdbcSinkConfig.getKey());
+ List<String> nonKeyList = getListFromConfig(jdbcSinkConfig.getNonKey());
tableDefinition = JdbcUtils.getTableDefinition(connection, tableId, keyList, nonKeyList);
- insertStatement = JdbcUtils.buildInsertStatement(connection, generateInsertQueryStatement());
+ insertStatement = connection.prepareStatement(generateInsertQueryStatement());
if (jdbcSinkConfig.getInsertMode() == JdbcSinkConfig.InsertMode.UPSERT) {
- upsertStatement = JdbcUtils.buildInsertStatement(connection, generateUpsertQueryStatement());
+ if (nonKeyList.isEmpty() || keyList.isEmpty()) {
+ throw new IllegalStateException("UPSERT mode is not configured if 'key' and 'nonKey' "
+ + "config are not set.");
+ }
+ upsertStatement = connection.prepareStatement(generateUpsertQueryStatement());
}
if (!nonKeyList.isEmpty()) {
- updateStatement = JdbcUtils.buildUpdateStatement(connection, generateUpdateQueryStatement());
+ updateStatement = connection.prepareStatement(generateUpdateQueryStatement());
}
if (!keyList.isEmpty()) {
- deleteStatement = JdbcUtils.buildDeleteStatement(connection, generateDeleteQueryStatement());
+ deleteStatement = connection.prepareStatement(generateDeleteQueryStatement());
}
}
+ private static List<String> getListFromConfig(String jdbcSinkConfig) {
+ List<String> nonKeyList = Lists.newArrayList();
+ String nonKey = jdbcSinkConfig;
+ if (nonKey != null && !nonKey.isEmpty()) {
+ nonKeyList = Arrays.asList(nonKey.split(","));
+ }
+ return nonKeyList;
+ }
+
@Override
public void close() throws Exception {
if (flushExecutor != null) {
@@ -185,6 +190,8 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
public abstract String generateUpsertQueryStatement();
+ public abstract List<JdbcUtils.ColumnId> getColumnsForUpsert();
+
public String generateDeleteQueryStatement() {
return JdbcUtils.buildDeleteSql(tableDefinition);
}
diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
index 327f1db7c64..5fceea27547 100644
--- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
+++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
@@ -23,9 +23,7 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.common.collect.Lists;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
-import java.sql.PreparedStatement;
import java.sql.ResultSet;
-import java.sql.SQLException;
import java.util.List;
import java.util.StringJoiner;
import java.util.stream.IntStream;
@@ -173,10 +171,6 @@ public class JdbcUtils {
return builder.toString();
}
- public static PreparedStatement buildInsertStatement(Connection connection, String insertSQL) throws SQLException {
- return connection.prepareStatement(insertSQL);
- }
-
public static String combationWhere(List<ColumnId> columnIds) {
StringBuilder builder = new StringBuilder();
if (!columnIds.isEmpty()) {
@@ -205,6 +199,9 @@ public class JdbcUtils {
}
public static StringJoiner buildUpdateSqlSetPart(TableDefinition table) {
+ if (table.nonKeyColumns.isEmpty()) {
+ throw new IllegalStateException("UPDATE operations are not supported if 'nonKey' config is not set.");
+ }
StringJoiner setJoiner = new StringJoiner(",");
table.nonKeyColumns.forEach((columnId) ->{
@@ -215,17 +212,9 @@ public class JdbcUtils {
return setJoiner;
}
- public static PreparedStatement buildUpdateStatement(Connection connection, String updateSQL) throws SQLException {
- return connection.prepareStatement(updateSQL);
- }
-
public static String buildDeleteSql(TableDefinition table) {
return "DELETE FROM "
+ table.tableId.getTableName()
+ combationWhere(table.keyColumns);
}
-
- public static PreparedStatement buildDeleteStatement(Connection connection, String deleteSQL) throws SQLException {
- return connection.prepareStatement(deleteSQL);
- }
}
diff --git a/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java b/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java
index 5c232db1aa0..d0f44e4c77c 100644
--- a/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.io.jdbc;
+import java.util.ArrayList;
+import java.util.List;
import java.util.stream.Collectors;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
@@ -32,10 +34,15 @@ public class MariadbJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {
@Override
public String generateUpsertQueryStatement() {
- final String keys = tableDefinition.getKeyColumns().stream().map(JdbcUtils.ColumnId::getName)
- .collect(Collectors.joining(","));
return JdbcUtils.buildInsertSql(tableDefinition)
+ "ON DUPLICATE KEY UPDATE " + JdbcUtils.buildUpdateSqlSetPart(tableDefinition);
}
+ @Override
+ public List<JdbcUtils.ColumnId> getColumnsForUpsert() {
+ final List<JdbcUtils.ColumnId> columns = new ArrayList<>();
+ columns.addAll(tableDefinition.getColumns());
+ columns.addAll(tableDefinition.getNonKeyColumns());
+ return columns;
+ }
}
diff --git a/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java b/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java
index d29e3e3aaca..803f1372437 100644
--- a/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.io.jdbc;
+import java.util.ArrayList;
+import java.util.List;
import java.util.stream.Collectors;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
@@ -32,10 +34,22 @@ public class PostgresJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {
@Override
public String generateUpsertQueryStatement() {
- final String keys = tableDefinition.getKeyColumns().stream().map(JdbcUtils.ColumnId::getName)
+ final List<JdbcUtils.ColumnId> keyColumns = tableDefinition.getKeyColumns();
+ if (keyColumns.isEmpty()) {
+ throw new IllegalStateException("UPSERT is not supported if 'key' config is not set.");
+ }
+ final String keys = keyColumns.stream().map(JdbcUtils.ColumnId::getName)
.collect(Collectors.joining(","));
return JdbcUtils.buildInsertSql(tableDefinition)
+ " ON CONFLICT(" + keys + ") "
+ "DO UPDATE SET " + JdbcUtils.buildUpdateSqlSetPart(tableDefinition);
}
+
+ @Override
+ public List<JdbcUtils.ColumnId> getColumnsForUpsert() {
+ final List<JdbcUtils.ColumnId> columns = new ArrayList<>();
+ columns.addAll(tableDefinition.getColumns());
+ columns.addAll(tableDefinition.getNonKeyColumns());
+ return columns;
+ }
}
diff --git a/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java b/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java
index 76655397003..f67f0f1afbf 100644
--- a/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.io.jdbc;
+import java.util.ArrayList;
+import java.util.List;
import java.util.stream.Collectors;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
@@ -32,10 +34,22 @@ public class SqliteJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {
@Override
public String generateUpsertQueryStatement() {
- final String keys = tableDefinition.getKeyColumns().stream().map(JdbcUtils.ColumnId::getName)
+ final List<JdbcUtils.ColumnId> keyColumns = tableDefinition.getKeyColumns();
+ if (keyColumns.isEmpty()) {
+ throw new IllegalStateException("UPSERT is not supported if 'key' config is not set.");
+ }
+ final String keys = keyColumns.stream().map(JdbcUtils.ColumnId::getName)
.collect(Collectors.joining(","));
return JdbcUtils.buildInsertSql(tableDefinition)
+ " ON CONFLICT(" + keys + ") "
+ "DO UPDATE SET " + JdbcUtils.buildUpdateSqlSetPart(tableDefinition);
}
+
+ @Override
+ public List<JdbcUtils.ColumnId> getColumnsForUpsert() {
+ final List<JdbcUtils.ColumnId> columns = new ArrayList<>();
+ columns.addAll(tableDefinition.getColumns());
+ columns.addAll(tableDefinition.getNonKeyColumns());
+ return columns;
+ }
}