You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/05/16 06:05:31 UTC

[shardingsphere] branch master updated: Replace getOffsetPath to getScalingJobOffsetPath (#17690)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 165ef092c0d Replace getOffsetPath to getScalingJobOffsetPath (#17690)
165ef092c0d is described below

commit 165ef092c0db2985d4602a241d6ad02f4eab4739
Author: azexcy <10...@users.noreply.github.com>
AuthorDate: Mon May 16 14:05:23 2022 +0800

    Replace getOffsetPath to getScalingJobOffsetPath (#17690)
    
    * Change logback-test.xml log level
    
    * Replace getOffsetPath use getScalingJobOffsetPath
    
    * Fix codestyle
    
    * Add assertGetScalingRootPath
---
 .../core/api/impl/GovernanceRepositoryAPIImpl.java | 17 +++-------
 .../core/metadata/node/PipelineMetaDataNode.java   | 14 ++++++--
 .../metadata/node/PipelineMetaDataNodeTest.java    | 39 ++++++++++++++++++++++
 .../src/test/resources/logback-test.xml            |  4 +--
 4 files changed, 58 insertions(+), 16 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 61ae24f551f..1843cfe010c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -28,6 +28,7 @@ import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.JobProgressYamlSwapper;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgress;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
@@ -65,7 +66,7 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
         jobProgress.setIncrementalTaskProgressMap(getIncrementalTaskProgressMap(jobContext));
         jobProgress.setInventoryTaskProgressMap(getInventoryTaskProgressMap(jobContext));
         String value = YamlEngine.marshal(JOB_PROGRESS_YAML_SWAPPER.swapToYaml(jobProgress));
-        repository.persist(getOffsetPath(jobContext.getJobId(), jobContext.getShardingItem()), value);
+        repository.persist(PipelineMetaDataNode.getScalingJobOffsetPath(jobContext.getJobId(), jobContext.getShardingItem()), value);
     }
     
     private Map<String, IncrementalTaskProgress> getIncrementalTaskProgressMap(final RuleAlteredJobContext jobContext) {
@@ -86,7 +87,7 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
     
     @Override
     public JobProgress getJobProgress(final String jobId, final int shardingItem) {
-        String data = repository.get(getOffsetPath(jobId, shardingItem));
+        String data = repository.get(PipelineMetaDataNode.getScalingJobOffsetPath(jobId, shardingItem));
         if (Strings.isNullOrEmpty(data)) {
             return null;
         }
@@ -132,7 +133,7 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
     
     @Override
     public List<Integer> getShardingItems(final String jobId) {
-        List<String> result = getChildrenKeys(getOffsetPath(jobId));
+        List<String> result = getChildrenKeys(PipelineMetaDataNode.getScalingJobOffsetPath(jobId));
         log.info("getShardingItems, jobId={}, offsetKeys={}", jobId, result);
         return result.stream().map(Integer::parseInt).collect(Collectors.toList());
     }
@@ -145,14 +146,6 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
             return;
         }
         jobProgress.setStatus(status);
-        persist(getOffsetPath(jobId, shardingItem), YamlEngine.marshal(JOB_PROGRESS_YAML_SWAPPER.swapToYaml(jobProgress)));
-    }
-    
-    private String getOffsetPath(final String jobId, final int shardingItem) {
-        return String.format("%s/%s/offset/%d", DataPipelineConstants.DATA_PIPELINE_ROOT, jobId, shardingItem);
-    }
-    
-    private String getOffsetPath(final String jobId) {
-        return String.format("%s/%s/offset", DataPipelineConstants.DATA_PIPELINE_ROOT, jobId);
+        persist(PipelineMetaDataNode.getScalingJobOffsetPath(jobId, shardingItem), YamlEngine.marshal(JOB_PROGRESS_YAML_SWAPPER.swapToYaml(jobProgress)));
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
index 0616f8521c3..a3608ae4ec0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
@@ -48,13 +48,23 @@ public final class PipelineMetaDataNode {
     }
     
     /**
-     * Get scaling job offset path.
+     * Get scaling job offset path, include job id and sharding item.
      *
      * @param jobId job id.
      * @param shardingItem sharding item.
      * @return job offset path.
      */
     public static String getScalingJobOffsetPath(final String jobId, final int shardingItem) {
-        return String.join("/", getScalingRootPath(), jobId, "offset", Integer.toString(shardingItem));
+        return String.join("/", getScalingJobOffsetPath(jobId), Integer.toString(shardingItem));
+    }
+    
+    /**
+     * Get scaling job offset path.
+     *
+     * @param jobId job id.
+     * @return job offset path.
+     */
+    public static String getScalingJobOffsetPath(final String jobId) {
+        return String.join("/", getScalingRootPath(), jobId, "offset");
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
new file mode 100644
index 00000000000..5650f5387da
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.metadata.node;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public final class PipelineMetaDataNodeTest {
+    
+    @Test
+    public void assertGetScalingRootPath() {
+        assertThat(PipelineMetaDataNode.getScalingRootPath(), is("/scaling"));
+    }
+    
+    @Test
+    public void assertGetJobConfigPath() {
+        String actualOffsetPath = PipelineMetaDataNode.getScalingJobOffsetPath("0130317c30317c3054317c7368617264696e675f6462");
+        assertThat(actualOffsetPath, is("/scaling/0130317c30317c3054317c7368617264696e675f6462/offset"));
+        actualOffsetPath = PipelineMetaDataNode.getScalingJobOffsetPath("0130317c30317c3054317c7368617264696e675f6462", 1);
+        assertThat(actualOffsetPath, is("/scaling/0130317c30317c3054317c7368617264696e675f6462/offset/1"));
+    }
+}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/logback-test.xml b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/logback-test.xml
index 414dd870181..3ee302a72df 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/logback-test.xml
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/logback-test.xml
@@ -22,14 +22,14 @@
             <pattern>[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36} - %msg%n</pattern>
         </encoder>
     </appender>
-    <logger name="org.apache.shardingsphere" level="INFO" additivity="false">
+    <logger name="org.apache.shardingsphere" level="WARN" additivity="false">
         <appender-ref ref="console" />
     </logger>
     <logger name="org.springframework.jdbc.core.JdbcTemplate" level="DEBUG" additivity="false">
         <appender-ref ref="console" />
     </logger>
     <logger name="com.zaxxer.hikari.pool.ProxyConnection" level="ERROR" />
-    <root level="WARN">
+    <root level="INFO">
         <appender-ref ref="console" />
     </root>
 </configuration>