You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/04/02 10:09:56 UTC

[shardingsphere] branch master updated: Add schemaName in pipeline source and target config (#16558)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new fa8d798  Add schemaName in pipeline source and target config (#16558)
fa8d798 is described below

commit fa8d79835e72b8162d1e7f42aef2cbd633489af8
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Sat Apr 2 18:08:43 2022 +0800

    Add schemaName in pipeline source and target config (#16558)
    
    * Add schemaName in source and target
    
    * Add log
---
 .../data/pipeline/core/execute/PipelineJobExecutor.java              | 5 ++++-
 .../shardingsphere/data/pipeline/core/lock/PipelineSimpleLock.java   | 2 +-
 .../config/impl/ShardingSpherePipelineDataSourceConfiguration.java   | 4 +++-
 3 files changed, 8 insertions(+), 3 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index 9d0dca4..898528c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -66,8 +66,11 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
                 case ADDED:
                 case UPDATED:
                     JobConfiguration jobConfig = YamlEngine.unmarshal(jobConfigPOJO.getJobParameter(), JobConfiguration.class, true);
-                    if (PipelineSimpleLock.getInstance().tryLock(jobConfig.getWorkflowConfig().getSchemaName(), 1000)) {
+                    String schemaName = jobConfig.getWorkflowConfig().getSchemaName();
+                    if (PipelineSimpleLock.getInstance().tryLock(schemaName, 1000)) {
                         execute(jobConfigPOJO);
+                    } else {
+                        log.info("tryLock failed, schemaName={}", schemaName);
                     }
                     break;
                 default:
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/lock/PipelineSimpleLock.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/lock/PipelineSimpleLock.java
index e79a13a..cc4733d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/lock/PipelineSimpleLock.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/lock/PipelineSimpleLock.java
@@ -74,11 +74,11 @@ public final class PipelineSimpleLock {
      * @return true if lock got, else false
      */
     public boolean tryLock(final String lockName, final long timeoutMills) {
-        log.info("tryLock, lockName={}, timeoutMills={}", lockName, timeoutMills);
         boolean result = lockRegistryService.tryLock(decorateLockName(lockName), timeoutMills);
         if (result) {
             lockNameLockedMap.put(lockName, true);
         }
+        log.info("tryLock, lockName={}, timeoutMills={}, result={}", lockName, timeoutMills, result);
         return result;
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
index 6f1592a..7fdfd92 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
@@ -61,7 +61,7 @@ public final class ShardingSpherePipelineDataSourceConfiguration implements Pipe
     }
     
     public ShardingSpherePipelineDataSourceConfiguration(final YamlRootConfiguration rootConfig) {
-        YamlParameterConfiguration parameterConfig = new YamlParameterConfiguration(rootConfig.getDataSources(), rootConfig.getRules());
+        YamlParameterConfiguration parameterConfig = new YamlParameterConfiguration(rootConfig.getSchemaName(), rootConfig.getDataSources(), rootConfig.getRules());
         this.parameter = YamlEngine.marshal(parameterConfig);
         this.rootConfig = rootConfig;
         Map<String, Object> props = rootConfig.getDataSources().values().iterator().next();
@@ -114,6 +114,8 @@ public final class ShardingSpherePipelineDataSourceConfiguration implements Pipe
     @Setter
     private static class YamlParameterConfiguration implements YamlConfiguration {
         
+        private String schemaName;
+        
         private Map<String, Map<String, Object>> dataSources = new HashMap<>();
         
         private Collection<YamlRuleConfiguration> rules = new LinkedList<>();