You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/05/01 11:02:55 UTC

[shardingsphere] branch master updated: Add PipelineChannelCreatorFactory (#17256)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f579ae4523e Add PipelineChannelCreatorFactory (#17256)
f579ae4523e is described below

commit f579ae4523eb1e9844db359ca33386619a6118f9
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Sun May 1 19:02:48 2022 +0800

    Add PipelineChannelCreatorFactory (#17256)
---
 .../CreateShardingScalingRuleStatementUpdater.java | 21 ++------
 .../channel/PipelineChannelCreatorFactory.java     | 58 ++++++++++++++++++++++
 .../scenario/rulealtered/RuleAlteredContext.java   |  5 +-
 .../rulealtered/RuleAlteredJobPreparer.java        |  3 +-
 4 files changed, 65 insertions(+), 22 deletions(-)

diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/CreateShardingScalingRuleStatementUpdater.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/CreateShardingScalingRuleStatementUpdater.java
index 9e21e256998..c1f32018b68 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/CreateShardingScalingRuleStatementUpdater.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/CreateShardingScalingRuleStatementUpdater.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.scaling.distsql.handler.update;
 
 import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
 import org.apache.shardingsphere.data.pipeline.spi.detect.JobCompletionDetectAlgorithmFactory;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreatorFactory;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithmFactory;
 import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
 import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
@@ -33,15 +33,11 @@ import org.apache.shardingsphere.scaling.distsql.handler.converter.ShardingScali
 import org.apache.shardingsphere.scaling.distsql.statement.CreateShardingScalingRuleStatement;
 import org.apache.shardingsphere.scaling.distsql.statement.segment.ShardingScalingRuleConfigurationSegment;
 import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
-import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.spi.exception.ServiceProviderNotFoundException;
-import org.apache.shardingsphere.spi.type.typed.StatefulTypedSPI;
-import org.apache.shardingsphere.spi.type.typed.TypedSPIRegistry;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -49,10 +45,6 @@ import java.util.Properties;
  */
 public final class CreateShardingScalingRuleStatementUpdater implements RuleDefinitionCreateUpdater<CreateShardingScalingRuleStatement, ShardingRuleConfiguration> {
     
-    static {
-        ShardingSphereServiceLoader.register(PipelineChannelCreator.class);
-    }
-    
     @Override
     public void checkSQLStatement(final ShardingSphereMetaData shardingSphereMetaData, final CreateShardingScalingRuleStatement sqlStatement,
                                   final ShardingRuleConfiguration currentRuleConfig) throws DistSQLException {
@@ -100,8 +92,8 @@ public final class CreateShardingScalingRuleStatementUpdater implements RuleDefi
     }
     
     private void checkStreamChannelExist(final ShardingScalingRuleConfigurationSegment segment) throws DistSQLException {
-        if (null != segment.getStreamChannel()) {
-            checkAlgorithm(PipelineChannelCreator.class, "stream channel", segment.getStreamChannel());
+        if (null != segment.getStreamChannel() && !PipelineChannelCreatorFactory.contains(segment.getStreamChannel().getName())) {
+            throw new InvalidAlgorithmConfigurationException("stream channel", segment.getStreamChannel().getName());
         }
     }
     
@@ -121,13 +113,6 @@ public final class CreateShardingScalingRuleStatementUpdater implements RuleDefi
         }
     }
     
-    private <T extends StatefulTypedSPI> void checkAlgorithm(final Class<T> algorithmClass, final String algorithmType, final AlgorithmSegment segment) throws DistSQLException {
-        Optional<T> service = TypedSPIRegistry.findRegisteredService(algorithmClass, segment.getName(), new Properties());
-        if (!service.isPresent()) {
-            throw new InvalidAlgorithmConfigurationException(algorithmType, segment.getName());
-        }
-    }
-    
     @Override
     public ShardingRuleConfiguration buildToBeCreatedRuleConfiguration(final CreateShardingScalingRuleStatement sqlStatement) {
         ShardingRuleConfiguration result = new ShardingRuleConfiguration();
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelCreatorFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelCreatorFactory.java
new file mode 100644
index 00000000000..8dbd013edf4
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelCreatorFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.spi.ingest.channel;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
+import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
+import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.spi.type.typed.TypedSPIRegistry;
+
+import java.util.Properties;
+
+/**
+ * Pipeline channel creator factory.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class PipelineChannelCreatorFactory {
+    
+    static {
+        ShardingSphereServiceLoader.register(PipelineChannelCreator.class);
+    }
+    
+    /**
+     * Create new instance of pipeline channel creator.
+     *
+     * @param pipelineChannelCreatorConfig pipeline channel creator configuration
+     * @return new instance of pipeline channel creator
+     */
+    public static PipelineChannelCreator newInstance(final ShardingSphereAlgorithmConfiguration pipelineChannelCreatorConfig) {
+        return ShardingSphereAlgorithmFactory.createAlgorithm(pipelineChannelCreatorConfig, PipelineChannelCreator.class);
+    }
+    
+    /**
+     * Judge whether contains pipeline channel creator.
+     *
+     * @param pipelineChannelCreatorType pipeline channel creator type
+     * @return contains pipeline channel creator or not
+     */
+    public static boolean contains(final String pipelineChannelCreatorType) {
+        return TypedSPIRegistry.findRegisteredService(PipelineChannelCreator.class, pipelineChannelCreatorType, new Properties()).isPresent();
+    }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
index 1916373b3ad..0de892522d1 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
@@ -27,12 +27,12 @@ import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsist
 import org.apache.shardingsphere.data.pipeline.spi.detect.JobCompletionDetectAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.detect.JobCompletionDetectAlgorithmFactory;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreatorFactory;
 import org.apache.shardingsphere.data.pipeline.spi.lock.RowBasedJobLockAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.lock.RuleBasedJobLockAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithmFactory;
 import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
-import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
 import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
 import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration.InputConfiguration;
 import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration.OutputConfiguration;
@@ -57,7 +57,6 @@ public final class RuleAlteredContext {
     private static final OnRuleAlteredActionConfigurationYamlSwapper ACTION_CONFIG_YAML_SWAPPER = new OnRuleAlteredActionConfigurationYamlSwapper();
     
     static {
-        ShardingSphereServiceLoader.register(PipelineChannelCreator.class);
         ShardingSphereServiceLoader.register(RowBasedJobLockAlgorithm.class);
         ShardingSphereServiceLoader.register(RuleBasedJobLockAlgorithm.class);
     }
@@ -95,7 +94,7 @@ public final class RuleAlteredContext {
         ShardingSphereAlgorithmConfiguration outputRateLimiter = outputConfig.getRateLimiter();
         outputRateLimitAlgorithm = null != outputRateLimiter ? JobRateLimitAlgorithmFactory.newInstance(outputRateLimiter) : null;
         ShardingSphereAlgorithmConfiguration streamChannel = onRuleAlteredActionConfig.getStreamChannel();
-        pipelineChannelCreator = ShardingSphereAlgorithmFactory.createAlgorithm(streamChannel, PipelineChannelCreator.class);
+        pipelineChannelCreator = PipelineChannelCreatorFactory.newInstance(streamChannel);
         ShardingSphereAlgorithmConfiguration completionDetector = onRuleAlteredActionConfig.getCompletionDetector();
         completionDetectAlgorithm = null != completionDetector ? JobCompletionDetectAlgorithmFactory.newInstance(completionDetector) : null;
         sourceWritingStopAlgorithm = RequiredSPIRegistry.getRegisteredService(RowBasedJobLockAlgorithm.class);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index c0ea9680312..ce7a2084caf 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -161,7 +161,8 @@ public final class RuleAlteredJobPreparer {
         PipelineDataSourceManager dataSourceManager = jobContext.getDataSourceManager();
         taskConfig.getDumperConfig().setPosition(getIncrementalPosition(jobContext, taskConfig, dataSourceManager));
         PipelineTableMetaDataLoader sourceMetaDataLoader = jobContext.getSourceMetaDataLoader();
-        IncrementalTask incrementalTask = new IncrementalTask(taskConfig.getJobConfig().getConcurrency(), taskConfig.getDumperConfig(), taskConfig.getImporterConfig(), pipelineChannelCreator, dataSourceManager, sourceMetaDataLoader, incrementalDumperExecuteEngine);
+        IncrementalTask incrementalTask = new IncrementalTask(taskConfig.getJobConfig().getConcurrency(), 
+                taskConfig.getDumperConfig(), taskConfig.getImporterConfig(), pipelineChannelCreator, dataSourceManager, sourceMetaDataLoader, incrementalDumperExecuteEngine);
         jobContext.getIncrementalTasks().add(incrementalTask);
     }