You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by "azexcy (via GitHub)" <gi...@apache.org> on 2023/04/23 08:41:02 UTC

[GitHub] [shardingsphere] azexcy commented on a diff in pull request #25289: Add drop streaming DistSQL implementation

azexcy commented on code in PR #25289:
URL: https://github.com/apache/shardingsphere/pull/25289#discussion_r1174537307


##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java:
##########
@@ -218,6 +220,45 @@ protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
         return DigestUtils.md5Hex(text.getBytes(StandardCharsets.UTF_8));
     }
     
+    /**
+     * Start job.
+     *
+     * @param jobId             job id
+     * @param importerConnector importer connector
+     */
+    public void startJob(final String jobId, final ImporterConnector importerConnector) {
+        CDCJob job = new CDCJob(importerConnector);
+        PipelineJobCenter.addJob(jobId, job);
+        JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+        updateJobConfigurationDisabled(jobConfigPOJO, false);
+        OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)), job, jobConfigPOJO.toJobConfiguration());
+        job.setJobBootstrap(oneOffJobBootstrap);
+        oneOffJobBootstrap.execute();
+    }
+    
+    /**
+     * Update job configuration disabled.
+     *
+     * @param jobId    job id
+     * @param disabled disabled
+     */
+    public void updateJobConfigurationDisabled(final String jobId, final boolean disabled) {
+        JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+        updateJobConfigurationDisabled(jobConfigPOJO, disabled);
+    }
+    
+    private void updateJobConfigurationDisabled(final JobConfigurationPOJO jobConfigPOJO, final boolean disabled) {
+        // TODO, ensure that there is only one consumer at a time, job config disable may not be updated when the program is forced to close
+        jobConfigPOJO.setDisabled(disabled);

Review Comment:
   I will removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org