You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by "sandynz (via GitHub)" <gi...@apache.org> on 2023/02/09 13:25:32 UTC

[GitHub] [shardingsphere] sandynz commented on a diff in pull request #24080: Improve CDC pure incremental mode

sandynz commented on code in PR #24080:
URL: https://github.com/apache/shardingsphere/pull/24080#discussion_r1101436707


##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java:
##########
@@ -127,9 +135,43 @@ public boolean createJob(final CreateSubscriptionJobParameter event) {
         JobConfigurationPOJO jobConfigPOJO = convertJobConfiguration(jobConfig);
         jobConfigPOJO.setDisabled(true);
         repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfigPOJO));
+        if (SubscriptionMode.INCREMENTAL.name().equals(param.getSubscriptionMode())) {
+            initIncrementalPosition(jobConfig);
+        }
         return true;
     }
     
+    private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
+        if (SubscriptionMode.FULL.name().equals(jobConfig.getSubscriptionMode())) {
+            return;
+        }
+        PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
+        String jobId = jobConfig.getJobId();
+        try {
+            for (int i = 0; i < jobConfig.getJobShardingCount(); i++) {
+                if (getJobItemProgress(jobId, i).isPresent()) {
+                    continue;
+                }
+                TableNameSchemaNameMapping tableNameSchemaNameMapping = getTableNameSchemaNameMapping(jobConfig.getTableNames());
+                DumperConfiguration dumperConfig = buildDumperConfiguration(jobConfig, i, tableNameSchemaNameMapping);
+                InventoryIncrementalJobItemProgress jobItemProgress = new InventoryIncrementalJobItemProgress();
+                jobItemProgress.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
+                jobItemProgress.setDataSourceName(dumperConfig.getDataSourceName());
+                IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress();
+                incrementalTaskProgress.setPosition(PipelineJobPreparerUtils.getIncrementalPosition(null, dumperConfig, dataSourceManager));
+                jobItemProgress.setIncremental(new JobItemIncrementalTasksProgress(incrementalTaskProgress));
+                jobItemProgress.setStatus(JobStatus.PREPARE_SUCCESS);
+                PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, i, YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
+            }
+        } catch (final SQLException ex) {
+            log.error("Get incremental position failed", ex);
+            //
+            throw new RuntimeException(String.format("Get %s incremental position failed", jobConfig.getDatabase()));

Review Comment:
   It's better to use dedicated exception (sub-class of PipelineSQLException), but not RuntimeException



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java:
##########
@@ -53,26 +55,25 @@ public final class CDCJobPreparer {
      * @param jobItemContext job item context
      */
     public void prepare(final CDCJobItemContext jobItemContext) {
-        if (!jobAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem()).isPresent()) {
+        Optional<InventoryIncrementalJobItemProgress> jobItemProgress = jobAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem());
+        if (!jobItemProgress.isPresent()) {
             jobAPI.persistJobItemProgress(jobItemContext);
         }
         if (jobItemContext.isStopping()) {
             PipelineJobCenter.stop(jobItemContext.getJobId());
             return;
         }
-        updateJobItemStatus(JobStatus.PREPARING, jobItemContext);
+        boolean needUpdateJobStatus = !jobItemProgress.isPresent() || JobStatus.PREPARING.equals(jobItemContext.getStatus()) || JobStatus.RUNNING.equals(jobItemContext.getStatus())
+                || JobStatus.PREPARING_FAILURE.equals(jobItemContext.getStatus());
+        if (needUpdateJobStatus) {
+            jobItemContext.setStatus(JobStatus.PREPARE_SUCCESS);
+        }
         initIncrementalTasks(jobItemContext);
         CDCJobConfiguration jobConfig = jobItemContext.getJobConfig();
         if (SubscriptionMode.FULL.name().equals(jobConfig.getSubscriptionMode())) {
             initInventoryTasks(jobItemContext);
         }
         jobAPI.persistJobItemProgress(jobItemContext);

Review Comment:
   Could we keep `updateJobItemStatus(JobStatus.PREPARING, jobItemContext);` and `updateJobItemStatus(JobStatus.PREPARE_SUCCESS, jobItemContext);`;
   And remove `jobAPI.persistJobItemProgress(jobItemContext);`?



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java:
##########
@@ -195,12 +232,16 @@ private TableNameSchemaNameMapping getTableNameSchemaNameMapping(final List<Stri
         return new TableNameSchemaNameMapping(tableNameSchemaMap);
     }
     
-    private static DumperConfiguration buildDumperConfiguration(final CDCJobConfiguration jobConfig, final String dataSourceName, final PipelineDataSourceConfiguration sourceDataSourceConfig,
-                                                                final Map<ActualTableName, LogicTableName> tableNameMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+    private static DumperConfiguration buildDumperConfiguration(final CDCJobConfiguration jobConfig, final int jobShardingItem, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+        JobDataNodeLine dataNodeLine = jobConfig.getJobShardingDataNodes().get(jobShardingItem);
+        Map<ActualTableName, LogicTableName> tableNameMap = new LinkedHashMap<>();
+        dataNodeLine.getEntries().forEach(each -> each.getDataNodes().forEach(node -> tableNameMap.put(new ActualTableName(node.getTableName()), new LogicTableName(each.getLogicTableName()))));
+        String dataSourceName = dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
+        StandardPipelineDataSourceConfiguration actualDataSourceConfiguration = jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);

Review Comment:
   `actualDataSourceConfiguration` could be `actualDataSourceConfig`



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java:
##########
@@ -127,9 +135,43 @@ public boolean createJob(final CreateSubscriptionJobParameter event) {
         JobConfigurationPOJO jobConfigPOJO = convertJobConfiguration(jobConfig);
         jobConfigPOJO.setDisabled(true);
         repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfigPOJO));
+        if (SubscriptionMode.INCREMENTAL.name().equals(param.getSubscriptionMode())) {
+            initIncrementalPosition(jobConfig);
+        }
         return true;
     }
     
+    private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
+        if (SubscriptionMode.FULL.name().equals(jobConfig.getSubscriptionMode())) {
+            return;
+        }

Review Comment:
   Looks this block code is not required. Since 1) There's already if condition before invocation, 2) The FULL check doesn't filter other options.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org