You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ti...@apache.org on 2022/12/10 06:19:16 UTC

[pulsar] branch master updated: [feat][io] Add support for partitioned tables (#8527)

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a734b130a25 [feat][io] Add support for partitioned tables (#8527)
a734b130a25 is described below

commit a734b130a25a7243e0ed9a8e26655fe19b21f9a2
Author: Ethan Waldo <ew...@healthetechs.com>
AuthorDate: Sat Dec 10 01:19:07 2022 -0500

    [feat][io] Add support for partitioned tables (#8527)
    
    Co-authored-by: tison <wa...@gmail.com>
---
 .../java/org/apache/pulsar/io/jdbc/JdbcUtils.java  | 33 +++++++++++++---------
 1 file changed, 19 insertions(+), 14 deletions(-)

diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
index ed809a95e91..f896e43d40a 100644
--- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
+++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
@@ -68,8 +68,13 @@ public class JdbcUtils {
         private TableDefinition(TableId tableId, List<ColumnId> columns) {
             this(tableId, columns, null, null);
         }
-        private TableDefinition(TableId tableId, List<ColumnId> columns,
-                               List<ColumnId> nonKeyColumns, List<ColumnId> keyColumns) {
+
+        private TableDefinition(
+                TableId tableId,
+                List<ColumnId> columns,
+                List<ColumnId> nonKeyColumns,
+                List<ColumnId> keyColumns
+        ) {
             this.tableId = tableId;
             this.columns = columns;
             this.nonKeyColumns = nonKeyColumns;
@@ -92,13 +97,13 @@ public class JdbcUtils {
      */
     public static TableId getTableId(Connection connection, String tableName) throws Exception {
         DatabaseMetaData metadata = connection.getMetaData();
-        try (ResultSet rs = metadata.getTables(null, null, tableName, new String[]{"TABLE"})) {
+        try (ResultSet rs = metadata.getTables(null, null, tableName, new String[]{"TABLE", "PARTITIONED TABLE"})) {
             if (rs.next()) {
                 String catalogName = rs.getString(1);
                 String schemaName = rs.getString(2);
                 String gotTableName = rs.getString(3);
                 checkState(tableName.equals(gotTableName),
-                    "TableName not match: " + tableName + " Got: " + gotTableName);
+                        "TableName not match: " + tableName + " Got: " + gotTableName);
                 if (log.isDebugEnabled()) {
                     log.debug("Get Table: {}, {}, {}", catalogName, schemaName, tableName);
                 }
@@ -113,10 +118,12 @@ public class JdbcUtils {
      * Get the {@link TableDefinition} for the given table.
      */
     public static TableDefinition getTableDefinition(
-            Connection connection, TableId tableId,
+            Connection connection,
+            TableId tableId,
             List<String> keyList,
             List<String> nonKeyList,
-            boolean excludeNonDeclaredFields) throws Exception {
+            boolean excludeNonDeclaredFields
+    ) throws Exception {
         TableDefinition table = TableDefinition.of(
                 tableId, Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList());
 
@@ -124,10 +131,10 @@ public class JdbcUtils {
         nonKeyList = nonKeyList == null ? Collections.emptyList() : nonKeyList;
 
         try (ResultSet rs = connection.getMetaData().getColumns(
-            tableId.getCatalogName(),
-            tableId.getSchemaName(),
-            tableId.getTableName(),
-            null
+                tableId.getCatalogName(),
+                tableId.getSchemaName(),
+                tableId.getTableName(),
+                null
         )) {
             while (rs.next()) {
                 final String columnName = rs.getString(4);
@@ -205,7 +212,7 @@ public class JdbcUtils {
         }
         StringJoiner setJoiner = new StringJoiner(",");
 
-        table.nonKeyColumns.forEach((columnId) ->{
+        table.nonKeyColumns.forEach((columnId) -> {
             StringJoiner equals = new StringJoiner("=");
             equals.add(columnId.getName()).add("? ");
             setJoiner.add(equals.toString());
@@ -214,8 +221,6 @@ public class JdbcUtils {
     }
 
     public static String buildDeleteSql(TableDefinition table) {
-        return "DELETE FROM "
-            + table.tableId.getTableName()
-            + combationWhere(table.keyColumns);
+        return "DELETE FROM " + table.tableId.getTableName() + combationWhere(table.keyColumns);
     }
 }