You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by "sandynz (via GitHub)" <gi...@apache.org> on 2023/02/17 05:18:29 UTC

[GitHub] [shardingsphere] sandynz commented on a diff in pull request #24209: CDC stream data source table name wildcard support

sandynz commented on code in PR #24209:
URL: https://github.com/apache/shardingsphere/pull/24209#discussion_r1109305995


##########
kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java:
##########
@@ -32,7 +32,7 @@ public final class Bootstrap {
      * @param args args
      */
     public static void main(final String[] args) {
-        ImportDataSourceParameter importDataSourceParam = new ImportDataSourceParameter("jdbc:opengauss:localhost:5432/cdc_db?stringtype=unspecified", "gaussdb", "Root@123");
+        ImportDataSourceParameter importDataSourceParam = new ImportDataSourceParameter("jdbc:opengauss://localhost:5432/cdc_db?stringtype=unspecified", "gaussdb", "Root@123");

Review Comment:
   Could we move `Bootstrap` to `example` package?
   
   And then merge other similar implementations to here.
   



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java:
##########
@@ -36,7 +37,7 @@ public final class CDCJobConfiguration implements PipelineJobConfiguration {
     
     private final String database;
     
-    private final List<String> schemaTableNames;
+    private final Collection<String> schemaTableNames;

Review Comment:
   It's better to use `List` not `Collection` in yaml related configuration class



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java:
##########
@@ -35,11 +35,11 @@ public final class CDCJobId extends AbstractPipelineJobId {
     
     private final String databaseName;
     
-    private final List<String> tableNames;
+    private final Collection<String> tableNames;

Review Comment:
   Does `tableNames` mean `schemaTableNames`?



##########
proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java:
##########
@@ -98,6 +117,58 @@ public CDCResponse streamData(final String requestId, final StreamDataRequestBod
         return CDCResponseGenerator.succeedBuilder(requestId).setStreamDataResult(StreamDataResult.newBuilder().setStreamingId(jobId).build()).build();
     }
     
+    private Map<String, Collection<String>> getSchemaTableMapWithSchema(final ShardingSphereDatabase database, final List<SchemaTable> schemaTables) {
+        Map<String, Collection<String>> result = new HashMap<>();
+        Collection<String> systemSchemas = database.getProtocolType().getSystemSchemas();
+        Optional<SchemaTable> allSchemaTablesOptional = schemaTables.stream().filter(each -> "*".equals(each.getTable()) && "*".equals(each.getSchema())).findFirst();
+        if (allSchemaTablesOptional.isPresent()) {
+            for (Entry<String, ShardingSphereSchema> entry : database.getSchemas().entrySet()) {
+                if (systemSchemas.contains(entry.getKey())) {
+                    continue;
+                }
+                entry.getValue().getAllTableNames().forEach(tableName -> result.computeIfAbsent(entry.getKey(), ignored -> new HashSet<>()).add(tableName));
+            }
+            return result;
+        }
+        for (SchemaTable each : schemaTables) {
+            if ("*".equals(each.getSchema())) {
+                for (Entry<String, ShardingSphereSchema> entry : database.getSchemas().entrySet()) {
+                    if (systemSchemas.contains(entry.getKey())) {
+                        continue;
+                    }
+                    entry.getValue().getAllTableNames().stream().filter(tableName -> tableName.equals(each.getTable())).findFirst()
+                            .ifPresent(tableName -> result.computeIfAbsent(entry.getKey(), ignored -> new HashSet<>()).add(tableName));
+                }
+            } else if ("*".equals(each.getTable())) {
+                String schemaName = each.getSchema().isEmpty() ? getDefaultSchema(database.getProtocolType()) : each.getSchema();
+                ShardingSphereSchema schema = database.getSchema(schemaName);
+                if (null == schema) {
+                    throw new SchemaNotFoundException(each.getSchema());
+                }
+                schema.getAllTableNames().forEach(tableName -> result.computeIfAbsent(schemaName, ignored -> new HashSet<>()).add(tableName));
+            } else {
+                result.computeIfAbsent(each.getSchema(), ignored -> new HashSet<>()).add(each.getTable());
+            }
+        }
+        return result;
+    }
+    
+    private String getDefaultSchema(final DatabaseType databaseType) {
+        if (!(databaseType instanceof SchemaSupportedDatabaseType)) {
+            return null;
+        }
+        return ((SchemaSupportedDatabaseType) databaseType).getDefaultSchema();
+    }
+    
+    private Collection<String> getTableNamesWithoutSchema(final ShardingSphereDatabase database, final List<SchemaTable> schemaTables) {
+        Optional<SchemaTable> allTablesOptional = schemaTables.stream().filter(each -> each.getTable().equals("*")).findFirst();
+        Set<String> allTableNames = new HashSet<>(database.getSchema(database.getName()).getAllTableNames());
+        if (allTablesOptional.isPresent()) {
+            return allTableNames;
+        }

Review Comment:
   `allTableNames` could be combined in `return`



##########
proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java:
##########
@@ -91,4 +101,61 @@ public void assertStreamDataRequestFailed() {
         CDCResponse actualResponse = handler.streamData(request.getRequestId(), request.getStreamDataRequestBody(), mock(CDCConnectionContext.class), mock(Channel.class));
         assertThat(actualResponse.getStatus(), is(Status.FAILED));
     }
+    
+    @Test
+    public void assertGetSchemaTableMapWithSchema() throws NoSuchMethodException {
+        Map<String, ShardingSphereSchema> schemas = new HashMap<>();
+        schemas.put("test", mockSchema());
+        schemas.put("public", mockSchema());
+        ShardingSphereDatabase database = new ShardingSphereDatabase("sharding_db", new PostgreSQLDatabaseType(), null, null, schemas);

Review Comment:
   Could `PostgreSQLDatabaseType` be `OpenGaussDatabaseType`?



##########
proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java:
##########
@@ -91,4 +101,61 @@ public void assertStreamDataRequestFailed() {
         CDCResponse actualResponse = handler.streamData(request.getRequestId(), request.getStreamDataRequestBody(), mock(CDCConnectionContext.class), mock(Channel.class));
         assertThat(actualResponse.getStatus(), is(Status.FAILED));
     }
+    
+    @Test
+    public void assertGetSchemaTableMapWithSchema() throws NoSuchMethodException {
+        Map<String, ShardingSphereSchema> schemas = new HashMap<>();
+        schemas.put("test", mockSchema());
+        schemas.put("public", mockSchema());
+        ShardingSphereDatabase database = new ShardingSphereDatabase("sharding_db", new PostgreSQLDatabaseType(), null, null, schemas);
+        List<SchemaTable> schemaTables = Arrays.asList(SchemaTable.newBuilder().setSchema("public").setTable("t_order").build(),
+                SchemaTable.newBuilder().setSchema("test").setTable("*").build());
+        Map<String, Collection<String>> expected = new HashMap<>();
+        expected.put("test", new HashSet<>(Arrays.asList("t_order", "t_order_item")));
+        expected.put("public", new HashSet<>(Collections.singletonList("t_order")));
+        Map<String, String> actual = getSchemaTableMapWithSchemaResult(database, schemaTables);
+        assertThat(actual, is(expected));
+        schemaTables = Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build());
+        actual = getSchemaTableMapWithSchemaResult(database, schemaTables);
+        expected = Collections.singletonMap("", Collections.singleton("t_order"));
+        assertThat(actual, is(expected));
+        schemaTables = Collections.singletonList(SchemaTable.newBuilder().setSchema("*").setTable("t_order").build());
+        actual = getSchemaTableMapWithSchemaResult(database, schemaTables);
+        expected = new HashMap<>();
+        expected.put("test", new HashSet<>(Collections.singletonList("t_order")));
+        expected.put("public", new HashSet<>(Collections.singletonList("t_order")));
+        assertThat(actual, is(expected));
+    }
+    
+    private ShardingSphereSchema mockSchema() {
+        Map<String, ShardingSphereTable> tables = new HashMap<>();
+        tables.put("t_order", mock(ShardingSphereTable.class));
+        tables.put("t_order_item", mock(ShardingSphereTable.class));
+        return new ShardingSphereSchema(tables, Collections.emptyMap());
+    }
+    
+    private Map<String, String> getSchemaTableMapWithSchemaResult(final ShardingSphereDatabase database, final List<SchemaTable> schemaTables) throws NoSuchMethodException {
+        return ReflectionUtil.invokeMethod(handler.getClass().getDeclaredMethod("getSchemaTableMapWithSchema", ShardingSphereDatabase.class, List.class),
+                handler, database, schemaTables);
+    }
+    
+    @Test
+    public void assertGetSchemaTableMapWithoutSchema() throws NoSuchMethodException {
+        Map<String, ShardingSphereSchema> schemas = new HashMap<>();
+        schemas.put("sharding_db", mockSchema());
+        ShardingSphereDatabase database = new ShardingSphereDatabase("sharding_db", new PostgreSQLDatabaseType(), null, null, schemas);

Review Comment:
   Could `PostgreSQLDatabaseType` be `MySQLDatabaseType`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org