You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by GitBox <gi...@apache.org> on 2022/12/10 01:59:35 UTC

[GitHub] [shardingsphere] azexcy opened a new pull request, #22779: Add CDC subscription FULL mode implementation

azexcy opened a new pull request, #22779:
URL: https://github.com/apache/shardingsphere/pull/22779

   Fixes #22500.
   
   Changes proposed in this pull request:
     - Add CDC subscription FULL mode implementation
     - Add unmarshal test
   
   ---
   
   Before committing this PR, I'm sure that I have checked the following options:
   - [ ] My code follows the [code of conduct](https://shardingsphere.apache.org/community/en/involved/conduct/code/) of this project.
   - [ ] I have self-reviewed the commit code.
   - [ ] I have (or in comment I request) added corresponding labels for the pull request.
   - [ ] I have passed maven check locally : `./mvnw clean install -B -T1C -Dmaven.javadoc.skip -Dmaven.jacoco.skip -e`.
   - [ ] I have made corresponding changes to the documentation.
   - [ ] I have added corresponding unit tests for my changes.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on pull request #22779: Add partial implementation of CDC subscription FULL mode

Posted by GitBox <gi...@apache.org>.
azexcy commented on PR #22779:
URL: https://github.com/apache/shardingsphere/pull/22779#issuecomment-1345928388

   > Is it part of the implementation?
   
   Yes, I I changed the title


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz merged pull request #22779: Add partial implementation of CDC subscription FULL mode

Posted by GitBox <gi...@apache.org>.
sandynz merged PR #22779:
URL: https://github.com/apache/shardingsphere/pull/22779


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #22779: Add CDC subscription FULL mode implementation

Posted by GitBox <gi...@apache.org>.
azexcy commented on code in PR #22779:
URL: https://github.com/apache/shardingsphere/pull/22779#discussion_r1045352365


##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPrepare.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.prepare;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
+import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
+import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.context.job.CDCJobItemContext;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtil;
+import org.apache.shardingsphere.data.pipeline.core.prepare.InventoryTaskSplitter;
+import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * CDC job prepare.
+ */
+@Slf4j
+public final class CDCJobPrepare {
+    
+    private final CDCJobAPI jobAPI = CDCJobAPIFactory.getInstance();
+    
+    /**
+     * Do prepare work.
+     *
+     * @param jobItemContext job item context
+     */
+    public void prepare(final CDCJobItemContext jobItemContext) {
+        if (jobItemContext.isStopping()) {
+            PipelineJobCenter.stop(jobItemContext.getJobId());
+            return;
+        }
+        CDCJobConfiguration jobConfig = jobItemContext.getJobConfig();
+        if (SubscriptionMode.FULL.name().equals(jobConfig.getSubscriptionMode())) {
+            initInventoryTasks(jobItemContext);
+        }
+        jobAPI.persistJobItemProgress(jobItemContext);
+    }
+    
+    private void initInventoryTasks(final CDCJobItemContext jobItemContext) {
+        CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
+        // TODO importer and channel requires a new implementation
+        InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(jobItemContext.getTaskConfig().getDumperConfig());
+        InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), inventoryDumperConfig, taskConfig.getImporterConfig());
+        List<InventoryTask> allInventoryTasks = inventoryTaskSplitter.splitInventoryData(jobItemContext);
+        jobItemContext.getInventoryTasks().addAll(allInventoryTasks);
+    }
+    
+    private Collection<InventoryDumperConfiguration> generateInventoryDumperConfigurations(final DumperConfiguration dumperConfig, final PipelineTableMetaDataLoader pipelineTableMetaDataLoader) {
+        Collection<InventoryDumperConfiguration> result = new LinkedList<>();

Review Comment:
   Removed now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #22779: Add CDC subscription FULL mode implementation

Posted by GitBox <gi...@apache.org>.
sandynz commented on code in PR #22779:
URL: https://github.com/apache/shardingsphere/pull/22779#discussion_r1045041177


##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java:
##########
@@ -135,15 +157,53 @@ protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
     }
     
     @Override
-    public PipelineTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
-        // TODO to be implement
-        return null;
+    public CDCTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
+        CDCJobConfiguration jobConfig = (CDCJobConfiguration) pipelineJobConfig;
+        JobDataNodeLine dataNodeLine = jobConfig.getJobShardingDataNodes().get(jobShardingItem);
+        Map<ActualTableName, LogicTableName> tableNameMap = new LinkedHashMap<>();
+        dataNodeLine.getEntries().forEach(each -> each.getDataNodes().forEach(node -> tableNameMap.put(new ActualTableName(node.getTableName()), new LogicTableName(each.getLogicTableName()))));
+        TableNameSchemaNameMapping tableNameSchemaNameMapping = new TableNameSchemaNameMapping(Collections.emptyMap());
+        String dataSourceName = dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
+        StandardPipelineDataSourceConfiguration actualDataSourceConfiguration = jobConfig.getDataSourceConfiguration().getActualDataSourceConfiguration(dataSourceName);
+        DumperConfiguration dumperConfig = buildDumperConfiguration(jobConfig.getJobId(), dataSourceName, actualDataSourceConfiguration, tableNameMap, tableNameSchemaNameMapping);
+        ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, pipelineProcessConfig, jobConfig.getTableNames(), tableNameSchemaNameMapping);
+        CDCTaskConfiguration result = new CDCTaskConfiguration(dumperConfig, importerConfig);
+        log.debug("buildTaskConfiguration, result={}", result);
+        return result;
+    }
+    
+    private static DumperConfiguration buildDumperConfiguration(final String jobId, final String dataSourceName, final PipelineDataSourceConfiguration sourceDataSourceConfig,
+                                                                final Map<ActualTableName, LogicTableName> tableNameMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+        DumperConfiguration result = new DumperConfiguration();
+        result.setJobId(jobId);
+        result.setDataSourceName(dataSourceName);
+        result.setDataSourceConfig(sourceDataSourceConfig);
+        result.setTableNameMap(tableNameMap);
+        result.setTableNameSchemaNameMapping(tableNameSchemaNameMapping);
+        return result;
+    }
+    
+    private ImporterConfiguration buildImporterConfiguration(final CDCJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig, final List<String> logicalTableNames,
+                                                             final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+        PipelineDataSourceConfiguration dataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getDataSourceConfiguration().getType(),
+                jobConfig.getDataSourceConfiguration().getParameter());
+        CDCProcessContext processContext = new CDCProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
+        JobRateLimitAlgorithm writeRateLimitAlgorithm = processContext.getWriteRateLimitAlgorithm();
+        int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
+        Map<LogicTableName, Set<String>> shardingColumnsMap = ShardingColumnsExtractorFactory.getInstance().getShardingColumnsMap(jobConfig.getDataSourceConfiguration().getRootConfig().getRules(),
+                logicalTableNames.stream().map(LogicTableName::new).collect(Collectors.toSet()));
+        return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, tableNameSchemaNameMapping, batchSize, writeRateLimitAlgorithm, 0, 1);
     }
     
     @Override
-    public PipelineProcessContext buildPipelineProcessContext(final PipelineJobConfiguration pipelineJobConfig) {
-        // TODO to be implement
-        return null;
+    public CDCProcessContext buildPipelineProcessContext(final PipelineJobConfiguration pipelineJobConfig) {
+        Properties props = new Properties();
+        props.put(MemoryPipelineChannelCreator.BLOCK_QUEUE_SIZE_KEY, MemoryPipelineChannelCreator.BLOCK_QUEUE_SIZE_DEFAULT_VALUE);
+        AlgorithmConfiguration channel = new AlgorithmConfiguration("MEMORY", props);
+        PipelineReadConfiguration readConfig = new PipelineReadConfiguration(1, 1000, Integer.MAX_VALUE, null);
+        PipelineWriteConfiguration writeConfig = new PipelineWriteConfiguration(1, 1000, null);
+        PipelineProcessConfiguration processConfig = new PipelineProcessConfiguration(readConfig, writeConfig, channel);
+        return new CDCProcessContext(pipelineJobConfig.getJobId(), processConfig);
     }

Review Comment:
   We could use the default process configuration, transfer `null` to `processConfig` for now



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.context.job;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.concurrent.ConcurrentException;
+import org.apache.commons.lang3.concurrent.LazyInitializer;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
+import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
+import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+
+import java.util.Collection;
+import java.util.LinkedList;
+
+/**
+ * CDC job item context.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class CDCJobItemContext implements InventoryIncrementalJobItemContext {
+    
+    private final CDCJobConfiguration jobConfig;
+    
+    private final int shardingItem;
+    
+    @Setter
+    private volatile boolean stopping;
+    
+    @Setter
+    private volatile JobStatus status = JobStatus.RUNNING;
+    
+    private final InventoryIncrementalJobItemProgress initProgress;
+    
+    private final CDCProcessContext jobProcessContext;
+    
+    private final CDCTaskConfiguration taskConfig;
+    
+    private final PipelineDataSourceManager dataSourceManager;
+    
+    private final Collection<InventoryTask> inventoryTasks = new LinkedList<>();
+    
+    private final Collection<IncrementalTask> incrementalTasks = new LinkedList<>();
+    
+    private final LazyInitializer<PipelineDataSourceWrapper> sourceDataSourceLazyInitializer = new LazyInitializer<PipelineDataSourceWrapper>() {
+        
+        @Override
+        protected PipelineDataSourceWrapper initialize() {
+            return dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig());
+        }
+    };
+    
+    private final LazyInitializer<PipelineTableMetaDataLoader> sourceMetaDataLoaderLazyInitializer = new LazyInitializer<PipelineTableMetaDataLoader>() {
+        
+        @Override
+        protected PipelineTableMetaDataLoader initialize() throws ConcurrentException {
+            return new StandardPipelineTableMetaDataLoader(sourceDataSourceLazyInitializer.get());
+        }
+    };
+    
+    @Override
+    public String getJobId() {
+        return jobConfig.getJobId();
+    }
+    
+    @Override
+    public String getDataSourceName() {
+        return taskConfig.getDumperConfig().getDataSourceName();
+    }
+    
+    @Override
+    public void onProgressUpdated(final PipelineJobProgressUpdatedParameter param) {
+    

Review Comment:
   It's better to add TODO for these methods



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPrepare.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.prepare;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
+import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
+import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.context.job.CDCJobItemContext;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtil;
+import org.apache.shardingsphere.data.pipeline.core.prepare.InventoryTaskSplitter;
+import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * CDC job prepare.
+ */
+@Slf4j
+public final class CDCJobPrepare {

Review Comment:
   `CDCJobPrepare` could be `CDCJobPreparer`



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/JobDataNodeLineConvertUtil.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
+import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
+import org.apache.shardingsphere.infra.datanode.DataNode;
+
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+/**
+ * Job data node line convert util.
+ */
+public final class JobDataNodeLineConvertUtil {

Review Comment:
   Could we add unit tests for JobDataNodeLineConvertUtil?



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPrepare.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.prepare;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
+import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
+import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.context.job.CDCJobItemContext;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtil;
+import org.apache.shardingsphere.data.pipeline.core.prepare.InventoryTaskSplitter;
+import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * CDC job prepare.
+ */
+@Slf4j
+public final class CDCJobPrepare {
+    
+    private final CDCJobAPI jobAPI = CDCJobAPIFactory.getInstance();
+    
+    /**
+     * Do prepare work.
+     *
+     * @param jobItemContext job item context
+     */
+    public void prepare(final CDCJobItemContext jobItemContext) {
+        if (jobItemContext.isStopping()) {
+            PipelineJobCenter.stop(jobItemContext.getJobId());
+            return;
+        }
+        CDCJobConfiguration jobConfig = jobItemContext.getJobConfig();
+        if (SubscriptionMode.FULL.name().equals(jobConfig.getSubscriptionMode())) {
+            initInventoryTasks(jobItemContext);
+        }
+        jobAPI.persistJobItemProgress(jobItemContext);
+    }
+    
+    private void initInventoryTasks(final CDCJobItemContext jobItemContext) {
+        CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
+        // TODO importer and channel requires a new implementation
+        InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(jobItemContext.getTaskConfig().getDumperConfig());
+        InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), inventoryDumperConfig, taskConfig.getImporterConfig());
+        List<InventoryTask> allInventoryTasks = inventoryTaskSplitter.splitInventoryData(jobItemContext);
+        jobItemContext.getInventoryTasks().addAll(allInventoryTasks);
+    }
+    
+    private Collection<InventoryDumperConfiguration> generateInventoryDumperConfigurations(final DumperConfiguration dumperConfig, final PipelineTableMetaDataLoader pipelineTableMetaDataLoader) {
+        Collection<InventoryDumperConfiguration> result = new LinkedList<>();

Review Comment:
   `generateInventoryDumperConfigurations` method is not used, is it required?



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java:
##########
@@ -43,7 +44,15 @@ public final class CDCJobConfiguration implements PipelineJobConfiguration {
     
     private final String sourceDatabaseType;
     
-    private final PipelineDataSourceConfiguration dataSourceConfiguration;
+    private final ShardingSpherePipelineDataSourceConfiguration dataSourceConfiguration;

Review Comment:
   `dataSourceConfiguration` could be `dataSourceConfig`



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java:
##########
@@ -17,11 +17,16 @@
 
 package org.apache.shardingsphere.data.pipeline.cdc.yaml.job;
 
+import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
 
+import java.util.List;
+import java.util.stream.Collectors;
+

Review Comment:
   Could we add unit test for YamlCDCJobConfigurationSwapper?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on pull request #22779: Add CDC subscription FULL mode implementation

Posted by GitBox <gi...@apache.org>.
sandynz commented on PR #22779:
URL: https://github.com/apache/shardingsphere/pull/22779#issuecomment-1345219087

   Is it part of the implementation?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] codecov-commenter commented on pull request #22779: Add partial implementation of CDC subscription FULL mode

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #22779:
URL: https://github.com/apache/shardingsphere/pull/22779#issuecomment-1345964870

   # [Codecov](https://codecov.io/gh/apache/shardingsphere/pull/22779?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#22779](https://codecov.io/gh/apache/shardingsphere/pull/22779?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (a91f047) into [master](https://codecov.io/gh/apache/shardingsphere/commit/98ad9898edb80ddd87a90660aed51cfe5aed4cc0?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (98ad989) will **decrease** coverage by `0.05%`.
   > The diff coverage is `18.18%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #22779      +/-   ##
   ============================================
   - Coverage     49.31%   49.26%   -0.06%     
   - Complexity     2448     2457       +9     
   ============================================
     Files          4129     4135       +6     
     Lines         57733    57804      +71     
     Branches       9888     9901      +13     
   ============================================
   + Hits          28471    28476       +5     
   - Misses        26811    26873      +62     
   - Partials       2451     2455       +4     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/shardingsphere/pull/22779?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...e/data/pipeline/api/datanode/JobDataNodeEntry.java](https://codecov.io/gh/apache/shardingsphere/pull/22779/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvYXBpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9zaGFyZGluZ3NwaGVyZS9kYXRhL3BpcGVsaW5lL2FwaS9kYXRhbm9kZS9Kb2JEYXRhTm9kZUVudHJ5LmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...re/data/pipeline/api/datanode/JobDataNodeLine.java](https://codecov.io/gh/apache/shardingsphere/pull/22779/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvYXBpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9zaGFyZGluZ3NwaGVyZS9kYXRhL3BpcGVsaW5lL2FwaS9kYXRhbm9kZS9Kb2JEYXRhTm9kZUxpbmUuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...ShardingSpherePipelineDataSourceConfiguration.java](https://codecov.io/gh/apache/shardingsphere/pull/22779/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvYXBpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9zaGFyZGluZ3NwaGVyZS9kYXRhL3BpcGVsaW5lL2FwaS9kYXRhc291cmNlL2NvbmZpZy9pbXBsL1NoYXJkaW5nU3BoZXJlUGlwZWxpbmVEYXRhU291cmNlQ29uZmlndXJhdGlvbi5qYXZh) | `50.00% <0.00%> (-3.85%)` | :arrow_down: |
   | [...peline/cdc/client/handler/LoginRequestHandler.java](https://codecov.io/gh/apache/shardingsphere/pull/22779/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvY2RjL2NsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvc2hhcmRpbmdzcGhlcmUvZGF0YS9waXBlbGluZS9jZGMvY2xpZW50L2hhbmRsZXIvTG9naW5SZXF1ZXN0SGFuZGxlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...here/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java](https://codecov.io/gh/apache/shardingsphere/pull/22779/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvY2RjL2NvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3NoYXJkaW5nc3BoZXJlL2RhdGEvcGlwZWxpbmUvY2RjL2FwaS9pbXBsL0NEQ0pvYkFQSUltcGwuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...a/pipeline/cdc/config/job/CDCJobConfiguration.java](https://codecov.io/gh/apache/shardingsphere/pull/22779/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvY2RjL2NvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3NoYXJkaW5nc3BoZXJlL2RhdGEvcGlwZWxpbmUvY2RjL2NvbmZpZy9qb2IvQ0RDSm9iQ29uZmlndXJhdGlvbi5qYXZh) | `0.00% <ø> (ø)` | |
   | [...e/data/pipeline/cdc/context/CDCProcessContext.java](https://codecov.io/gh/apache/shardingsphere/pull/22779/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvY2RjL2NvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3NoYXJkaW5nc3BoZXJlL2RhdGEvcGlwZWxpbmUvY2RjL2NvbnRleHQvQ0RDUHJvY2Vzc0NvbnRleHQuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...ta/pipeline/cdc/context/job/CDCJobItemContext.java](https://codecov.io/gh/apache/shardingsphere/pull/22779/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvY2RjL2NvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3NoYXJkaW5nc3BoZXJlL2RhdGEvcGlwZWxpbmUvY2RjL2NvbnRleHQvam9iL0NEQ0pvYkl0ZW1Db250ZXh0LmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...rdingsphere/data/pipeline/cdc/core/job/CDCJob.java](https://codecov.io/gh/apache/shardingsphere/pull/22779/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvY2RjL2NvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3NoYXJkaW5nc3BoZXJlL2RhdGEvcGlwZWxpbmUvY2RjL2NvcmUvam9iL0NEQ0pvYi5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...processor/CDCJobConfigurationChangedProcessor.java](https://codecov.io/gh/apache/shardingsphere/pull/22779/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvY2RjL2NvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3NoYXJkaW5nc3BoZXJlL2RhdGEvcGlwZWxpbmUvY2RjL2NvcmUvbWV0YWRhdGEvcHJvY2Vzc29yL0NEQ0pvYkNvbmZpZ3VyYXRpb25DaGFuZ2VkUHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | ... and [22 more](https://codecov.io/gh/apache/shardingsphere/pull/22779/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #22779: Add partial implementation of CDC subscription FULL mode

Posted by GitBox <gi...@apache.org>.
sandynz commented on code in PR #22779:
URL: https://github.com/apache/shardingsphere/pull/22779#discussion_r1045503457


##########
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobDataNodeLineConvertUtilTest.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
+import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
+import org.apache.shardingsphere.infra.datanode.DataNode;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public final class JobDataNodeLineConvertUtilTest {
+    
+    @Test
+    public void assertConvertDataNodesToLines() {
+        Map<String, List<DataNode>> mockDataNodes = new LinkedHashMap<>();
+        List<DataNode> dataNodes = Arrays.asList(new DataNode("ds_0", "t_order_0"), new DataNode("ds_0", "t_order_1"));
+        List<DataNode> itemDataNodes = Collections.singletonList(new DataNode("ds_0", "t_order_item_0"));
+        mockDataNodes.put("t_order", dataNodes);
+        mockDataNodes.put("t_order_item", itemDataNodes);
+        List<JobDataNodeLine> jobDataNodeLines = JobDataNodeLineConvertUtil.convertDataNodesToLines(mockDataNodes);
+        assertThat(jobDataNodeLines.size(), is(1));
+        List<JobDataNodeEntry> actualNodeEntry = new ArrayList<>(jobDataNodeLines.get(0).getEntries());
+        assertThat(actualNodeEntry.get(0).getLogicTableName(), is("t_order"));
+        assertThat(actualNodeEntry.get(0).getDataNodes().size(), is(2));
+        assertThat(actualNodeEntry.get(1).getLogicTableName(), is("t_order_item"));
+        assertThat(actualNodeEntry.get(1).getDataNodes().size(), is(1));
+    }
+    
+    @Test
+    public void assertConvertDataNodesToLinesWith() {
+        Map<String, List<DataNode>> mockDataNodes = new LinkedHashMap<>();
+        List<DataNode> dataNodes = Arrays.asList(new DataNode("ds_0", "t_order_0"), new DataNode("ds_0", "t_order_1"));
+        List<DataNode> itemDataNodes = Collections.singletonList(new DataNode("ds_0", "t_order_item_0"));
+        mockDataNodes.put("t_order", dataNodes);
+        mockDataNodes.put("t_order_item", itemDataNodes);
+        List<JobDataNodeLine> jobDataNodeLines = JobDataNodeLineConvertUtil.convertDataNodesToLines(mockDataNodes);
+        assertThat(jobDataNodeLines.size(), is(1));
+        List<JobDataNodeEntry> actualNodeEntry = new ArrayList<>(jobDataNodeLines.get(0).getEntries());
+        assertThat(actualNodeEntry.get(0).getLogicTableName(), is("t_order"));
+        assertThat(actualNodeEntry.get(0).getDataNodes().size(), is(2));
+        assertThat(actualNodeEntry.get(1).getLogicTableName(), is("t_order_item"));
+        assertThat(actualNodeEntry.get(1).getDataNodes().size(), is(1));
+    }

Review Comment:
   Looks the content of `assertConvertDataNodesToLinesWith` is the same as `assertConvertDataNodesToLines`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org