You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ti...@apache.org on 2022/12/10 06:19:16 UTC
[pulsar] branch master updated: [feat][io] Add support for partitioned tables (#8527)
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a734b130a25 [feat][io] Add support for partitioned tables (#8527)
a734b130a25 is described below
commit a734b130a25a7243e0ed9a8e26655fe19b21f9a2
Author: Ethan Waldo <ew...@healthetechs.com>
AuthorDate: Sat Dec 10 01:19:07 2022 -0500
[feat][io] Add support for partitioned tables (#8527)
Co-authored-by: tison <wa...@gmail.com>
---
.../java/org/apache/pulsar/io/jdbc/JdbcUtils.java | 33 +++++++++++++---------
1 file changed, 19 insertions(+), 14 deletions(-)
diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
index ed809a95e91..f896e43d40a 100644
--- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
+++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
@@ -68,8 +68,13 @@ public class JdbcUtils {
private TableDefinition(TableId tableId, List<ColumnId> columns) {
this(tableId, columns, null, null);
}
- private TableDefinition(TableId tableId, List<ColumnId> columns,
- List<ColumnId> nonKeyColumns, List<ColumnId> keyColumns) {
+
+ private TableDefinition(
+ TableId tableId,
+ List<ColumnId> columns,
+ List<ColumnId> nonKeyColumns,
+ List<ColumnId> keyColumns
+ ) {
this.tableId = tableId;
this.columns = columns;
this.nonKeyColumns = nonKeyColumns;
@@ -92,13 +97,13 @@ public class JdbcUtils {
*/
public static TableId getTableId(Connection connection, String tableName) throws Exception {
DatabaseMetaData metadata = connection.getMetaData();
- try (ResultSet rs = metadata.getTables(null, null, tableName, new String[]{"TABLE"})) {
+ try (ResultSet rs = metadata.getTables(null, null, tableName, new String[]{"TABLE", "PARTITIONED TABLE"})) {
if (rs.next()) {
String catalogName = rs.getString(1);
String schemaName = rs.getString(2);
String gotTableName = rs.getString(3);
checkState(tableName.equals(gotTableName),
- "TableName not match: " + tableName + " Got: " + gotTableName);
+ "TableName not match: " + tableName + " Got: " + gotTableName);
if (log.isDebugEnabled()) {
log.debug("Get Table: {}, {}, {}", catalogName, schemaName, tableName);
}
@@ -113,10 +118,12 @@ public class JdbcUtils {
* Get the {@link TableDefinition} for the given table.
*/
public static TableDefinition getTableDefinition(
- Connection connection, TableId tableId,
+ Connection connection,
+ TableId tableId,
List<String> keyList,
List<String> nonKeyList,
- boolean excludeNonDeclaredFields) throws Exception {
+ boolean excludeNonDeclaredFields
+ ) throws Exception {
TableDefinition table = TableDefinition.of(
tableId, Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList());
@@ -124,10 +131,10 @@ public class JdbcUtils {
nonKeyList = nonKeyList == null ? Collections.emptyList() : nonKeyList;
try (ResultSet rs = connection.getMetaData().getColumns(
- tableId.getCatalogName(),
- tableId.getSchemaName(),
- tableId.getTableName(),
- null
+ tableId.getCatalogName(),
+ tableId.getSchemaName(),
+ tableId.getTableName(),
+ null
)) {
while (rs.next()) {
final String columnName = rs.getString(4);
@@ -205,7 +212,7 @@ public class JdbcUtils {
}
StringJoiner setJoiner = new StringJoiner(",");
- table.nonKeyColumns.forEach((columnId) ->{
+ table.nonKeyColumns.forEach((columnId) -> {
StringJoiner equals = new StringJoiner("=");
equals.add(columnId.getName()).add("? ");
setJoiner.add(equals.toString());
@@ -214,8 +221,6 @@ public class JdbcUtils {
}
public static String buildDeleteSql(TableDefinition table) {
- return "DELETE FROM "
- + table.tableId.getTableName()
- + combationWhere(table.keyColumns);
+ return "DELETE FROM " + table.tableId.getTableName() + combationWhere(table.keyColumns);
}
}