You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/05/16 06:05:31 UTC
[shardingsphere] branch master updated: Replace getOffsetPath to getScalingJobOffsetPath (#17690)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 165ef092c0d Replace getOffsetPath to getScalingJobOffsetPath (#17690)
165ef092c0d is described below
commit 165ef092c0db2985d4602a241d6ad02f4eab4739
Author: azexcy <10...@users.noreply.github.com>
AuthorDate: Mon May 16 14:05:23 2022 +0800
Replace getOffsetPath to getScalingJobOffsetPath (#17690)
* Change logback-test.xml log level
* Replace getOffsetPath use getScalingJobOffsetPath
* Fix codestyle
* Add assertGetScalingRootPath
---
.../core/api/impl/GovernanceRepositoryAPIImpl.java | 17 +++-------
.../core/metadata/node/PipelineMetaDataNode.java | 14 ++++++--
.../metadata/node/PipelineMetaDataNodeTest.java | 39 ++++++++++++++++++++++
.../src/test/resources/logback-test.xml | 4 +--
4 files changed, 58 insertions(+), 16 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 61ae24f551f..1843cfe010c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -28,6 +28,7 @@ import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.JobProgressYamlSwapper;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgress;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
@@ -65,7 +66,7 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
jobProgress.setIncrementalTaskProgressMap(getIncrementalTaskProgressMap(jobContext));
jobProgress.setInventoryTaskProgressMap(getInventoryTaskProgressMap(jobContext));
String value = YamlEngine.marshal(JOB_PROGRESS_YAML_SWAPPER.swapToYaml(jobProgress));
- repository.persist(getOffsetPath(jobContext.getJobId(), jobContext.getShardingItem()), value);
+ repository.persist(PipelineMetaDataNode.getScalingJobOffsetPath(jobContext.getJobId(), jobContext.getShardingItem()), value);
}
private Map<String, IncrementalTaskProgress> getIncrementalTaskProgressMap(final RuleAlteredJobContext jobContext) {
@@ -86,7 +87,7 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
@Override
public JobProgress getJobProgress(final String jobId, final int shardingItem) {
- String data = repository.get(getOffsetPath(jobId, shardingItem));
+ String data = repository.get(PipelineMetaDataNode.getScalingJobOffsetPath(jobId, shardingItem));
if (Strings.isNullOrEmpty(data)) {
return null;
}
@@ -132,7 +133,7 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
@Override
public List<Integer> getShardingItems(final String jobId) {
- List<String> result = getChildrenKeys(getOffsetPath(jobId));
+ List<String> result = getChildrenKeys(PipelineMetaDataNode.getScalingJobOffsetPath(jobId));
log.info("getShardingItems, jobId={}, offsetKeys={}", jobId, result);
return result.stream().map(Integer::parseInt).collect(Collectors.toList());
}
@@ -145,14 +146,6 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
return;
}
jobProgress.setStatus(status);
- persist(getOffsetPath(jobId, shardingItem), YamlEngine.marshal(JOB_PROGRESS_YAML_SWAPPER.swapToYaml(jobProgress)));
- }
-
- private String getOffsetPath(final String jobId, final int shardingItem) {
- return String.format("%s/%s/offset/%d", DataPipelineConstants.DATA_PIPELINE_ROOT, jobId, shardingItem);
- }
-
- private String getOffsetPath(final String jobId) {
- return String.format("%s/%s/offset", DataPipelineConstants.DATA_PIPELINE_ROOT, jobId);
+ persist(PipelineMetaDataNode.getScalingJobOffsetPath(jobId, shardingItem), YamlEngine.marshal(JOB_PROGRESS_YAML_SWAPPER.swapToYaml(jobProgress)));
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
index 0616f8521c3..a3608ae4ec0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
@@ -48,13 +48,23 @@ public final class PipelineMetaDataNode {
}
/**
- * Get scaling job offset path.
+ * Get scaling job offset path, include job id and sharding item.
*
* @param jobId job id.
* @param shardingItem sharding item.
* @return job offset path.
*/
public static String getScalingJobOffsetPath(final String jobId, final int shardingItem) {
- return String.join("/", getScalingRootPath(), jobId, "offset", Integer.toString(shardingItem));
+ return String.join("/", getScalingJobOffsetPath(jobId), Integer.toString(shardingItem));
+ }
+
+ /**
+ * Get scaling job offset path.
+ *
+ * @param jobId job id.
+ * @return job offset path.
+ */
+ public static String getScalingJobOffsetPath(final String jobId) {
+ return String.join("/", getScalingRootPath(), jobId, "offset");
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
new file mode 100644
index 00000000000..5650f5387da
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.metadata.node;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public final class PipelineMetaDataNodeTest {
+
+ @Test
+ public void assertGetScalingRootPath() {
+ assertThat(PipelineMetaDataNode.getScalingRootPath(), is("/scaling"));
+ }
+
+ @Test
+ public void assertGetJobConfigPath() {
+ String actualOffsetPath = PipelineMetaDataNode.getScalingJobOffsetPath("0130317c30317c3054317c7368617264696e675f6462");
+ assertThat(actualOffsetPath, is("/scaling/0130317c30317c3054317c7368617264696e675f6462/offset"));
+ actualOffsetPath = PipelineMetaDataNode.getScalingJobOffsetPath("0130317c30317c3054317c7368617264696e675f6462", 1);
+ assertThat(actualOffsetPath, is("/scaling/0130317c30317c3054317c7368617264696e675f6462/offset/1"));
+ }
+}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/logback-test.xml b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/logback-test.xml
index 414dd870181..3ee302a72df 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/logback-test.xml
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/logback-test.xml
@@ -22,14 +22,14 @@
<pattern>[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36} - %msg%n</pattern>
</encoder>
</appender>
- <logger name="org.apache.shardingsphere" level="INFO" additivity="false">
+ <logger name="org.apache.shardingsphere" level="WARN" additivity="false">
<appender-ref ref="console" />
</logger>
<logger name="org.springframework.jdbc.core.JdbcTemplate" level="DEBUG" additivity="false">
<appender-ref ref="console" />
</logger>
<logger name="com.zaxxer.hikari.pool.ProxyConnection" level="ERROR" />
- <root level="WARN">
+ <root level="INFO">
<appender-ref ref="console" />
</root>
</configuration>