You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by az...@apache.org on 2023/02/03 04:20:33 UTC

[shardingsphere] branch master updated: Dispatch pipeline data node event asynchronously (#23945)

This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new efe9ee21266 Dispatch pipeline data node event asynchronously (#23945)
efe9ee21266 is described below

commit efe9ee21266d6507dee4639eb0c19a6d98992325
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Fri Feb 3 12:20:18 2023 +0800

    Dispatch pipeline data node event asynchronously (#23945)
---
 .../core/metadata/node/PipelineMetaDataNodeWatcher.java      | 12 ++++++++++++
 .../ConsistencyCheckChangedJobConfigurationProcessor.java    |  8 +-------
 .../processor/MigrationChangedJobConfigurationProcessor.java |  8 +-------
 3 files changed, 14 insertions(+), 14 deletions(-)

diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeWatcher.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeWatcher.java
index d9eb92c7a70..05f836f406d 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeWatcher.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeWatcher.java
@@ -17,14 +17,17 @@
 
 package org.apache.shardingsphere.data.pipeline.core.metadata.node;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineMetaDataChangedEventHandler;
 import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -32,6 +35,7 @@ import java.util.stream.Collectors;
 /**
  * Pipeline meta data node watcher.
  */
+@Slf4j
 public final class PipelineMetaDataNodeWatcher {
     
     private static final PipelineMetaDataNodeWatcher INSTANCE = new PipelineMetaDataNodeWatcher();
@@ -45,6 +49,14 @@ public final class PipelineMetaDataNodeWatcher {
     }
     
     private void dispatchEvent(final DataChangedEvent event) {
+        CompletableFuture.runAsync(() -> dispatchEvent0(event), PipelineContext.getEventListenerExecutor()).whenComplete((unused, throwable) -> {
+            if (null != throwable) {
+                log.error("dispatch event failed", throwable);
+            }
+        });
+    }
+    
+    private void dispatchEvent0(final DataChangedEvent event) {
         for (Entry<Pattern, PipelineMetaDataChangedEventHandler> entry : listenerMap.entrySet()) {
             if (entry.getKey().matcher(event.getKey()).matches()) {
                 entry.getValue().handle(event);
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
index 016520a2cbe..b0813ed38e2 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.metada
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.job.type.ConsistencyCheckJobType;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
@@ -31,7 +30,6 @@ import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBoo
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
 
 import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
 
 /**
  * Consistency check changed job configuration processor.
@@ -56,11 +54,7 @@ public final class ConsistencyCheckChangedJobConfigurationProcessor implements P
                 if (PipelineJobCenter.isJobExisting(jobId)) {
                     log.info("{} added to executing jobs failed since it already exists", jobId);
                 } else {
-                    CompletableFuture.runAsync(() -> execute(jobConfig), PipelineContext.getEventListenerExecutor()).whenComplete((unused, throwable) -> {
-                        if (null != throwable) {
-                            log.error("execute failed, jobId={}", jobId, throwable);
-                        }
-                    });
+                    execute(jobConfig);
                 }
                 break;
             case DELETED:
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
index 5150befc687..4fa4e9374b0 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration.metadata.proc
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
@@ -33,7 +32,6 @@ import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBoo
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
 
 import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
 
 /**
  * Migration job configuration changed processor.
@@ -58,11 +56,7 @@ public final class MigrationChangedJobConfigurationProcessor implements Pipeline
                 if (PipelineJobCenter.isJobExisting(jobId)) {
                     log.info("{} added to executing jobs failed since it already exists", jobId);
                 } else {
-                    CompletableFuture.runAsync(() -> execute(jobConfig), PipelineContext.getEventListenerExecutor()).whenComplete((unused, throwable) -> {
-                        if (null != throwable) {
-                            log.error("execute failed, jobId={}", jobId, throwable);
-                        }
-                    });
+                    execute(jobConfig);
                 }
                 break;
             case DELETED: