You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/04/02 10:09:56 UTC
[shardingsphere] branch master updated: Add schemaName in pipeline source and target config (#16558)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new fa8d798 Add schemaName in pipeline source and target config (#16558)
fa8d798 is described below
commit fa8d79835e72b8162d1e7f42aef2cbd633489af8
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Sat Apr 2 18:08:43 2022 +0800
Add schemaName in pipeline source and target config (#16558)
* Add schemaName in source and target
* Add log
---
.../data/pipeline/core/execute/PipelineJobExecutor.java | 5 ++++-
.../shardingsphere/data/pipeline/core/lock/PipelineSimpleLock.java | 2 +-
.../config/impl/ShardingSpherePipelineDataSourceConfiguration.java | 4 +++-
3 files changed, 8 insertions(+), 3 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index 9d0dca4..898528c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -66,8 +66,11 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
case ADDED:
case UPDATED:
JobConfiguration jobConfig = YamlEngine.unmarshal(jobConfigPOJO.getJobParameter(), JobConfiguration.class, true);
- if (PipelineSimpleLock.getInstance().tryLock(jobConfig.getWorkflowConfig().getSchemaName(), 1000)) {
+ String schemaName = jobConfig.getWorkflowConfig().getSchemaName();
+ if (PipelineSimpleLock.getInstance().tryLock(schemaName, 1000)) {
execute(jobConfigPOJO);
+ } else {
+ log.info("tryLock failed, schemaName={}", schemaName);
}
break;
default:
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/lock/PipelineSimpleLock.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/lock/PipelineSimpleLock.java
index e79a13a..cc4733d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/lock/PipelineSimpleLock.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/lock/PipelineSimpleLock.java
@@ -74,11 +74,11 @@ public final class PipelineSimpleLock {
* @return true if lock got, else false
*/
public boolean tryLock(final String lockName, final long timeoutMills) {
- log.info("tryLock, lockName={}, timeoutMills={}", lockName, timeoutMills);
boolean result = lockRegistryService.tryLock(decorateLockName(lockName), timeoutMills);
if (result) {
lockNameLockedMap.put(lockName, true);
}
+ log.info("tryLock, lockName={}, timeoutMills={}, result={}", lockName, timeoutMills, result);
return result;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
index 6f1592a..7fdfd92 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
@@ -61,7 +61,7 @@ public final class ShardingSpherePipelineDataSourceConfiguration implements Pipe
}
public ShardingSpherePipelineDataSourceConfiguration(final YamlRootConfiguration rootConfig) {
- YamlParameterConfiguration parameterConfig = new YamlParameterConfiguration(rootConfig.getDataSources(), rootConfig.getRules());
+ YamlParameterConfiguration parameterConfig = new YamlParameterConfiguration(rootConfig.getSchemaName(), rootConfig.getDataSources(), rootConfig.getRules());
this.parameter = YamlEngine.marshal(parameterConfig);
this.rootConfig = rootConfig;
Map<String, Object> props = rootConfig.getDataSources().values().iterator().next();
@@ -114,6 +114,8 @@ public final class ShardingSpherePipelineDataSourceConfiguration implements Pipe
@Setter
private static class YamlParameterConfiguration implements YamlConfiguration {
+ private String schemaName;
+
private Map<String, Map<String, Object>> dataSources = new HashMap<>();
private Collection<YamlRuleConfiguration> rules = new LinkedList<>();