You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by "azexcy (via GitHub)" <gi...@apache.org> on 2023/02/17 04:52:29 UTC

[GitHub] [shardingsphere] azexcy opened a new pull request, #24209: CDC stream data source table name wildcard support

azexcy opened a new pull request, #24209:
URL: https://github.com/apache/shardingsphere/pull/24209

   Related #22500.
   
   Changes proposed in this pull request:
     - Support stream data source table name using wildcard
     - Add error code
     - Add PostgreSQL example
   
   
   ---
   
   Before committing this PR, I'm sure that I have checked the following options:
   - [ ] My code follows the [code of conduct](https://shardingsphere.apache.org/community/en/involved/conduct/code/) of this project.
   - [ ] I have self-reviewed the commit code.
   - [ ] I have (or in comment I request) added corresponding labels for the pull request.
   - [ ] I have passed maven check locally : `./mvnw clean install -B -T1C -Dmaven.javadoc.skip -Dmaven.jacoco.skip -e`.
   - [ ] I have made corresponding changes to the documentation.
   - [ ] I have added corresponding unit tests for my changes.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz merged pull request #24209: CDC stream data source table name wildcard support

Posted by "sandynz (via GitHub)" <gi...@apache.org>.
sandynz merged PR #24209:
URL: https://github.com/apache/shardingsphere/pull/24209


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #24209: CDC stream data source table name wildcard support

Posted by "sandynz (via GitHub)" <gi...@apache.org>.
sandynz commented on code in PR #24209:
URL: https://github.com/apache/shardingsphere/pull/24209#discussion_r1109305995


##########
kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java:
##########
@@ -32,7 +32,7 @@ public final class Bootstrap {
      * @param args args
      */
     public static void main(final String[] args) {
-        ImportDataSourceParameter importDataSourceParam = new ImportDataSourceParameter("jdbc:opengauss:localhost:5432/cdc_db?stringtype=unspecified", "gaussdb", "Root@123");
+        ImportDataSourceParameter importDataSourceParam = new ImportDataSourceParameter("jdbc:opengauss://localhost:5432/cdc_db?stringtype=unspecified", "gaussdb", "Root@123");

Review Comment:
   Could we move `Bootstrap` to `example` package?
   
   And then merge other similar implementations to here.
   



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java:
##########
@@ -36,7 +37,7 @@ public final class CDCJobConfiguration implements PipelineJobConfiguration {
     
     private final String database;
     
-    private final List<String> schemaTableNames;
+    private final Collection<String> schemaTableNames;

Review Comment:
   It's better to use `List` not `Collection` in yaml related configuration class



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java:
##########
@@ -35,11 +35,11 @@ public final class CDCJobId extends AbstractPipelineJobId {
     
     private final String databaseName;
     
-    private final List<String> tableNames;
+    private final Collection<String> tableNames;

Review Comment:
   Does `tableNames` mean `schemaTableNames`?



##########
proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java:
##########
@@ -98,6 +117,58 @@ public CDCResponse streamData(final String requestId, final StreamDataRequestBod
         return CDCResponseGenerator.succeedBuilder(requestId).setStreamDataResult(StreamDataResult.newBuilder().setStreamingId(jobId).build()).build();
     }
     
+    private Map<String, Collection<String>> getSchemaTableMapWithSchema(final ShardingSphereDatabase database, final List<SchemaTable> schemaTables) {
+        Map<String, Collection<String>> result = new HashMap<>();
+        Collection<String> systemSchemas = database.getProtocolType().getSystemSchemas();
+        Optional<SchemaTable> allSchemaTablesOptional = schemaTables.stream().filter(each -> "*".equals(each.getTable()) && "*".equals(each.getSchema())).findFirst();
+        if (allSchemaTablesOptional.isPresent()) {
+            for (Entry<String, ShardingSphereSchema> entry : database.getSchemas().entrySet()) {
+                if (systemSchemas.contains(entry.getKey())) {
+                    continue;
+                }
+                entry.getValue().getAllTableNames().forEach(tableName -> result.computeIfAbsent(entry.getKey(), ignored -> new HashSet<>()).add(tableName));
+            }
+            return result;
+        }
+        for (SchemaTable each : schemaTables) {
+            if ("*".equals(each.getSchema())) {
+                for (Entry<String, ShardingSphereSchema> entry : database.getSchemas().entrySet()) {
+                    if (systemSchemas.contains(entry.getKey())) {
+                        continue;
+                    }
+                    entry.getValue().getAllTableNames().stream().filter(tableName -> tableName.equals(each.getTable())).findFirst()
+                            .ifPresent(tableName -> result.computeIfAbsent(entry.getKey(), ignored -> new HashSet<>()).add(tableName));
+                }
+            } else if ("*".equals(each.getTable())) {
+                String schemaName = each.getSchema().isEmpty() ? getDefaultSchema(database.getProtocolType()) : each.getSchema();
+                ShardingSphereSchema schema = database.getSchema(schemaName);
+                if (null == schema) {
+                    throw new SchemaNotFoundException(each.getSchema());
+                }
+                schema.getAllTableNames().forEach(tableName -> result.computeIfAbsent(schemaName, ignored -> new HashSet<>()).add(tableName));
+            } else {
+                result.computeIfAbsent(each.getSchema(), ignored -> new HashSet<>()).add(each.getTable());
+            }
+        }
+        return result;
+    }
+    
+    private String getDefaultSchema(final DatabaseType databaseType) {
+        if (!(databaseType instanceof SchemaSupportedDatabaseType)) {
+            return null;
+        }
+        return ((SchemaSupportedDatabaseType) databaseType).getDefaultSchema();
+    }
+    
+    private Collection<String> getTableNamesWithoutSchema(final ShardingSphereDatabase database, final List<SchemaTable> schemaTables) {
+        Optional<SchemaTable> allTablesOptional = schemaTables.stream().filter(each -> each.getTable().equals("*")).findFirst();
+        Set<String> allTableNames = new HashSet<>(database.getSchema(database.getName()).getAllTableNames());
+        if (allTablesOptional.isPresent()) {
+            return allTableNames;
+        }

Review Comment:
   `allTableNames` could be combined in `return`



##########
proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java:
##########
@@ -91,4 +101,61 @@ public void assertStreamDataRequestFailed() {
         CDCResponse actualResponse = handler.streamData(request.getRequestId(), request.getStreamDataRequestBody(), mock(CDCConnectionContext.class), mock(Channel.class));
         assertThat(actualResponse.getStatus(), is(Status.FAILED));
     }
+    
+    @Test
+    public void assertGetSchemaTableMapWithSchema() throws NoSuchMethodException {
+        Map<String, ShardingSphereSchema> schemas = new HashMap<>();
+        schemas.put("test", mockSchema());
+        schemas.put("public", mockSchema());
+        ShardingSphereDatabase database = new ShardingSphereDatabase("sharding_db", new PostgreSQLDatabaseType(), null, null, schemas);

Review Comment:
   Could `PostgreSQLDatabaseType` be `OpenGaussDatabaseType`?



##########
proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java:
##########
@@ -91,4 +101,61 @@ public void assertStreamDataRequestFailed() {
         CDCResponse actualResponse = handler.streamData(request.getRequestId(), request.getStreamDataRequestBody(), mock(CDCConnectionContext.class), mock(Channel.class));
         assertThat(actualResponse.getStatus(), is(Status.FAILED));
     }
+    
+    @Test
+    public void assertGetSchemaTableMapWithSchema() throws NoSuchMethodException {
+        Map<String, ShardingSphereSchema> schemas = new HashMap<>();
+        schemas.put("test", mockSchema());
+        schemas.put("public", mockSchema());
+        ShardingSphereDatabase database = new ShardingSphereDatabase("sharding_db", new PostgreSQLDatabaseType(), null, null, schemas);
+        List<SchemaTable> schemaTables = Arrays.asList(SchemaTable.newBuilder().setSchema("public").setTable("t_order").build(),
+                SchemaTable.newBuilder().setSchema("test").setTable("*").build());
+        Map<String, Collection<String>> expected = new HashMap<>();
+        expected.put("test", new HashSet<>(Arrays.asList("t_order", "t_order_item")));
+        expected.put("public", new HashSet<>(Collections.singletonList("t_order")));
+        Map<String, String> actual = getSchemaTableMapWithSchemaResult(database, schemaTables);
+        assertThat(actual, is(expected));
+        schemaTables = Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build());
+        actual = getSchemaTableMapWithSchemaResult(database, schemaTables);
+        expected = Collections.singletonMap("", Collections.singleton("t_order"));
+        assertThat(actual, is(expected));
+        schemaTables = Collections.singletonList(SchemaTable.newBuilder().setSchema("*").setTable("t_order").build());
+        actual = getSchemaTableMapWithSchemaResult(database, schemaTables);
+        expected = new HashMap<>();
+        expected.put("test", new HashSet<>(Collections.singletonList("t_order")));
+        expected.put("public", new HashSet<>(Collections.singletonList("t_order")));
+        assertThat(actual, is(expected));
+    }
+    
+    private ShardingSphereSchema mockSchema() {
+        Map<String, ShardingSphereTable> tables = new HashMap<>();
+        tables.put("t_order", mock(ShardingSphereTable.class));
+        tables.put("t_order_item", mock(ShardingSphereTable.class));
+        return new ShardingSphereSchema(tables, Collections.emptyMap());
+    }
+    
+    private Map<String, String> getSchemaTableMapWithSchemaResult(final ShardingSphereDatabase database, final List<SchemaTable> schemaTables) throws NoSuchMethodException {
+        return ReflectionUtil.invokeMethod(handler.getClass().getDeclaredMethod("getSchemaTableMapWithSchema", ShardingSphereDatabase.class, List.class),
+                handler, database, schemaTables);
+    }
+    
+    @Test
+    public void assertGetSchemaTableMapWithoutSchema() throws NoSuchMethodException {
+        Map<String, ShardingSphereSchema> schemas = new HashMap<>();
+        schemas.put("sharding_db", mockSchema());
+        ShardingSphereDatabase database = new ShardingSphereDatabase("sharding_db", new PostgreSQLDatabaseType(), null, null, schemas);

Review Comment:
   Could `PostgreSQLDatabaseType` be `MySQLDatabaseType`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #24209: CDC stream data source table name wildcard support

Posted by "azexcy (via GitHub)" <gi...@apache.org>.
azexcy commented on code in PR #24209:
URL: https://github.com/apache/shardingsphere/pull/24209#discussion_r1109315735


##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java:
##########
@@ -35,11 +35,11 @@ public final class CDCJobId extends AbstractPipelineJobId {
     
     private final String databaseName;
     
-    private final List<String> tableNames;
+    private final Collection<String> tableNames;

Review Comment:
   Yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org