You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by do...@apache.org on 2020/07/28 14:01:07 UTC

[shardingsphere] branch master updated: polish scaling (#6475)

This is an automated email from the ASF dual-hosted git repository.

dongzonglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c899766  polish scaling (#6475)
c899766 is described below

commit c899766b8213478a1bb5f150e1c548c7a001fdbb
Author: kimmking <ki...@163.com>
AuthorDate: Tue Jul 28 22:00:49 2020 +0800

    polish scaling (#6475)
    
    * merge master
    
    * polish
---
 .../shardingsphere/scaling/core/ScalingJobController.java |  2 +-
 .../position/resume/AbstractResumablePositionManager.java |  9 +++------
 .../resume/ZookeeperResumablePositionManager.java         | 12 ++++++------
 .../scaling/core/job/preparer/utils/JobPrepareUtil.java   | 15 +++------------
 .../scaling/core/schedule/ScalingTaskScheduler.java       | 14 +++-----------
 .../job/preparer/resumer/SyncPositionResumerTest.java     |  4 ++--
 .../resume/AbstractResumablePositionManagerTest.java      |  8 ++++----
 7 files changed, 22 insertions(+), 42 deletions(-)

diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/ScalingJobController.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/ScalingJobController.java
index 37c5a1a..adbcc76 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/ScalingJobController.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/ScalingJobController.java
@@ -67,7 +67,7 @@ public final class ScalingJobController {
             return;
         }
         scalingTaskSchedulerMap.get(shardingScalingJobId).stop();
-        scalingJobMap.get(shardingScalingJobId).setStatus("STOPPED");
+        scalingJobMap.get(shardingScalingJobId).setStatus(SyncTaskControlStatus.STOPPED.name());
     }
     
     /**
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManager.java
index d524c13..f94bb33 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.scaling.core.job.position.resume;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.gson.Gson;
@@ -66,7 +67,7 @@ public abstract class AbstractResumablePositionManager implements ResumablePosit
     }
     
     protected void resumeInventoryPosition(final String data) {
-        if (isEmpty(data)) {
+        if (Strings.isNullOrEmpty(data)) {
             return;
         }
         log.info("resume inventory position from {} = {}", taskPath, data);
@@ -81,7 +82,7 @@ public abstract class AbstractResumablePositionManager implements ResumablePosit
     }
     
     protected void resumeIncrementalPosition(final String data) {
-        if (isEmpty(data)) {
+        if (Strings.isNullOrEmpty(data)) {
             return;
         }
         log.info("resume incremental position from {} = {}", taskPath, data);
@@ -91,10 +92,6 @@ public abstract class AbstractResumablePositionManager implements ResumablePosit
         }
     }
     
-    private boolean isEmpty(final String data) {
-        return null == data || "".equals(data);
-    }
-    
     protected String getInventoryPositionData() {
         JsonObject result = new JsonObject();
         JsonObject unfinish = new JsonObject();
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ZookeeperResumablePositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ZookeeperResumablePositionManager.java
index a7c4fb9..5e351ee 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ZookeeperResumablePositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ZookeeperResumablePositionManager.java
@@ -38,7 +38,7 @@ public final class ZookeeperResumablePositionManager extends AbstractResumablePo
     
     private static final String INCREMENTAL = "/incremental";
     
-    private static final CuratorZookeeperRepository ZOOKEEPER = new CuratorZookeeperRepository();
+    private static final CuratorZookeeperRepository CURATOR_ZOOKEEPER_REPOSITORY = new CuratorZookeeperRepository();
     
     private ScheduledExecutorService executor;
     
@@ -49,7 +49,7 @@ public final class ZookeeperResumablePositionManager extends AbstractResumablePo
     public ZookeeperResumablePositionManager() {
         ResumeConfiguration resumeConfiguration = ScalingContext.getInstance().getServerConfiguration().getResumeConfiguration();
         if (null != resumeConfiguration) {
-            ZOOKEEPER.init(resumeConfiguration.getNamespace(), new OrchestrationCenterConfiguration("ZooKeeper", resumeConfiguration.getServerLists(), new Properties()));
+            CURATOR_ZOOKEEPER_REPOSITORY.init(resumeConfiguration.getNamespace(), new OrchestrationCenterConfiguration("ZooKeeper", resumeConfiguration.getServerLists(), new Properties()));
             log.info("zookeeper resumable position manager is available.");
             setAvailable(true);
         }
@@ -73,8 +73,8 @@ public final class ZookeeperResumablePositionManager extends AbstractResumablePo
     }
     
     private void resumePosition() {
-        resumeInventoryPosition(ZOOKEEPER.get(inventoryPath));
-        resumeIncrementalPosition(ZOOKEEPER.get(incrementalPath));
+        resumeInventoryPosition(CURATOR_ZOOKEEPER_REPOSITORY.get(inventoryPath));
+        resumeIncrementalPosition(CURATOR_ZOOKEEPER_REPOSITORY.get(incrementalPath));
     }
     
     private void persistPosition() {
@@ -85,14 +85,14 @@ public final class ZookeeperResumablePositionManager extends AbstractResumablePo
     @Override
     public void persistInventoryPosition() {
         String result = getInventoryPositionData();
-        ZOOKEEPER.persist(inventoryPath, result);
+        CURATOR_ZOOKEEPER_REPOSITORY.persist(inventoryPath, result);
         log.info("persist inventory position {} = {}", inventoryPath, result);
     }
     
     @Override
     public void persistIncrementalPosition() {
         String result = getIncrementalPositionData();
-        ZOOKEEPER.persist(incrementalPath, result);
+        CURATOR_ZOOKEEPER_REPOSITORY.persist(incrementalPath, result);
         log.info("persist incremental position {} = {}", incrementalPath, result);
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/utils/JobPrepareUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/utils/JobPrepareUtil.java
index ca69ddc..c6daa5e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/utils/JobPrepareUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/utils/JobPrepareUtil.java
@@ -17,13 +17,11 @@
 
 package org.apache.shardingsphere.scaling.core.job.preparer.utils;
 
+import com.google.common.collect.Lists;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedList;
 import java.util.List;
 
 /**
@@ -39,14 +37,7 @@ public final class JobPrepareUtil {
      * @param allInventoryDataTasks all inventory data tasks
      * @return task group list
      */
-    public static List<Collection<ScalingTask>> groupInventoryDataTasks(final int taskNumber, final List<ScalingTask> allInventoryDataTasks) {
-        List<Collection<ScalingTask>> result = new ArrayList<>(taskNumber);
-        for (int i = 0; i < taskNumber; i++) {
-            result.add(new LinkedList<>());
-        }
-        for (int i = 0; i < allInventoryDataTasks.size(); i++) {
-            result.get(i % taskNumber).add(allInventoryDataTasks.get(i));
-        }
-        return result;
+    public static List<List<ScalingTask>> groupInventoryDataTasks(final int taskNumber, final List<ScalingTask> allInventoryDataTasks) {
+        return Lists.partition(allInventoryDataTasks, taskNumber);
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
index 30be580..1aadf20 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
@@ -25,8 +25,8 @@ import org.apache.shardingsphere.scaling.core.job.SyncProgress;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
 
 import java.util.Collection;
-import java.util.LinkedList;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 /**
  * Sharding scaling task scheduler.
@@ -125,11 +125,7 @@ public final class ScalingTaskScheduler implements Runnable {
      * @return all inventory data task progress
      */
     public Collection<SyncProgress> getInventoryDataTaskProgress() {
-        Collection<SyncProgress> result = new LinkedList<>();
-        for (ScalingTask each : shardingScalingJob.getInventoryDataTasks()) {
-            result.add(each.getProgress());
-        }
-        return result;
+        return shardingScalingJob.getInventoryDataTasks().stream().map(ScalingTask::getProgress).collect(Collectors.toList());
     }
     
     /**
@@ -138,10 +134,6 @@ public final class ScalingTaskScheduler implements Runnable {
      * @return all incremental data task progress
      */
     public Collection<SyncProgress> getIncrementalDataTaskProgress() {
-        Collection<SyncProgress> result = new LinkedList<>();
-        for (ScalingTask each : shardingScalingJob.getIncrementalDataTasks()) {
-            result.add(each.getProgress());
-        }
-        return result;
+        return shardingScalingJob.getIncrementalDataTasks().stream().map(ScalingTask::getProgress).collect(Collectors.toList());
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
index 34a679e..546858f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
@@ -69,8 +69,8 @@ public final class SyncPositionResumerTest {
         resumablePositionManager.getInventoryPositionManagerMap().put("ds0", new PrimaryKeyPositionManager(new PrimaryKeyPosition(0, 100)));
         resumablePositionManager.getIncrementalPositionManagerMap().put("ds0.t_order", mockPositionManager());
         syncPositionResumer.resumePosition(shardingScalingJob, new DataSourceManager(), resumablePositionManager);
-        assertEquals(shardingScalingJob.getIncrementalDataTasks().size(), 1);
-        assertEquals(shardingScalingJob.getInventoryDataTasks().size(), 3);
+        assertEquals(1, shardingScalingJob.getIncrementalDataTasks().size());
+        assertEquals(0, shardingScalingJob.getInventoryDataTasks().size());
     }
     
     @Test
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManagerTest.java
index 33dd347..1ee0dcc 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManagerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManagerTest.java
@@ -50,20 +50,20 @@ public final class AbstractResumablePositionManagerTest {
     @Test
     public void assertResumeIncrementalPosition() {
         resumablePositionManager.resumeIncrementalPosition(incrementalPosition);
-        assertEquals(resumablePositionManager.getIncrementalPositionManagerMap().size(), 2);
+        assertEquals(2, resumablePositionManager.getIncrementalPositionManagerMap().size());
     }
     
     @Test
     public void assertResumeInventoryPosition() {
         resumablePositionManager.resumeInventoryPosition(inventoryPosition);
-        assertEquals(resumablePositionManager.getInventoryPositionManagerMap().size(), 3);
+        assertEquals(3, resumablePositionManager.getInventoryPositionManagerMap().size());
     }
     
     @Test
     public void assertGetIncrementalPositionData() {
         resumablePositionManager.getIncrementalPositionManagerMap().put("ds0", new MySQLPositionManager("{\"filename\":\"mysql-bin.000001\",\"position\":4}"));
         resumablePositionManager.getIncrementalPositionManagerMap().put("ds1", new MySQLPositionManager("{\"filename\":\"mysql-bin.000002\",\"position\":4}"));
-        assertEquals(resumablePositionManager.getIncrementalPositionData(), incrementalPosition);
+        assertEquals(incrementalPosition, resumablePositionManager.getIncrementalPositionData());
     }
     
     @Test
@@ -71,6 +71,6 @@ public final class AbstractResumablePositionManagerTest {
         resumablePositionManager.getInventoryPositionManagerMap().put("ds0.t_order_1#0", new PrimaryKeyPositionManager(new PrimaryKeyPosition(0, 100)));
         resumablePositionManager.getInventoryPositionManagerMap().put("ds0.t_order_1#1", new PrimaryKeyPositionManager(new PrimaryKeyPosition.FinishedPosition()));
         resumablePositionManager.getInventoryPositionManagerMap().put("ds1.t_order_1#0", new PrimaryKeyPositionManager(new PrimaryKeyPosition(0, 200)));
-        assertEquals(resumablePositionManager.getInventoryPositionData(), inventoryPosition);
+        assertEquals(inventoryPosition, resumablePositionManager.getInventoryPositionData());
     }
 }