You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by do...@apache.org on 2020/07/28 14:01:07 UTC
[shardingsphere] branch master updated: polish scaling (#6475)
This is an automated email from the ASF dual-hosted git repository.
dongzonglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c899766 polish scaling (#6475)
c899766 is described below
commit c899766b8213478a1bb5f150e1c548c7a001fdbb
Author: kimmking <ki...@163.com>
AuthorDate: Tue Jul 28 22:00:49 2020 +0800
polish scaling (#6475)
* merge master
* polish
---
.../shardingsphere/scaling/core/ScalingJobController.java | 2 +-
.../position/resume/AbstractResumablePositionManager.java | 9 +++------
.../resume/ZookeeperResumablePositionManager.java | 12 ++++++------
.../scaling/core/job/preparer/utils/JobPrepareUtil.java | 15 +++------------
.../scaling/core/schedule/ScalingTaskScheduler.java | 14 +++-----------
.../job/preparer/resumer/SyncPositionResumerTest.java | 4 ++--
.../resume/AbstractResumablePositionManagerTest.java | 8 ++++----
7 files changed, 22 insertions(+), 42 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/ScalingJobController.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/ScalingJobController.java
index 37c5a1a..adbcc76 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/ScalingJobController.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/ScalingJobController.java
@@ -67,7 +67,7 @@ public final class ScalingJobController {
return;
}
scalingTaskSchedulerMap.get(shardingScalingJobId).stop();
- scalingJobMap.get(shardingScalingJobId).setStatus("STOPPED");
+ scalingJobMap.get(shardingScalingJobId).setStatus(SyncTaskControlStatus.STOPPED.name());
}
/**
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManager.java
index d524c13..f94bb33 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManager.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.core.job.position.resume;
+import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
@@ -66,7 +67,7 @@ public abstract class AbstractResumablePositionManager implements ResumablePosit
}
protected void resumeInventoryPosition(final String data) {
- if (isEmpty(data)) {
+ if (Strings.isNullOrEmpty(data)) {
return;
}
log.info("resume inventory position from {} = {}", taskPath, data);
@@ -81,7 +82,7 @@ public abstract class AbstractResumablePositionManager implements ResumablePosit
}
protected void resumeIncrementalPosition(final String data) {
- if (isEmpty(data)) {
+ if (Strings.isNullOrEmpty(data)) {
return;
}
log.info("resume incremental position from {} = {}", taskPath, data);
@@ -91,10 +92,6 @@ public abstract class AbstractResumablePositionManager implements ResumablePosit
}
}
- private boolean isEmpty(final String data) {
- return null == data || "".equals(data);
- }
-
protected String getInventoryPositionData() {
JsonObject result = new JsonObject();
JsonObject unfinish = new JsonObject();
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ZookeeperResumablePositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ZookeeperResumablePositionManager.java
index a7c4fb9..5e351ee 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ZookeeperResumablePositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ZookeeperResumablePositionManager.java
@@ -38,7 +38,7 @@ public final class ZookeeperResumablePositionManager extends AbstractResumablePo
private static final String INCREMENTAL = "/incremental";
- private static final CuratorZookeeperRepository ZOOKEEPER = new CuratorZookeeperRepository();
+ private static final CuratorZookeeperRepository CURATOR_ZOOKEEPER_REPOSITORY = new CuratorZookeeperRepository();
private ScheduledExecutorService executor;
@@ -49,7 +49,7 @@ public final class ZookeeperResumablePositionManager extends AbstractResumablePo
public ZookeeperResumablePositionManager() {
ResumeConfiguration resumeConfiguration = ScalingContext.getInstance().getServerConfiguration().getResumeConfiguration();
if (null != resumeConfiguration) {
- ZOOKEEPER.init(resumeConfiguration.getNamespace(), new OrchestrationCenterConfiguration("ZooKeeper", resumeConfiguration.getServerLists(), new Properties()));
+ CURATOR_ZOOKEEPER_REPOSITORY.init(resumeConfiguration.getNamespace(), new OrchestrationCenterConfiguration("ZooKeeper", resumeConfiguration.getServerLists(), new Properties()));
log.info("zookeeper resumable position manager is available.");
setAvailable(true);
}
@@ -73,8 +73,8 @@ public final class ZookeeperResumablePositionManager extends AbstractResumablePo
}
private void resumePosition() {
- resumeInventoryPosition(ZOOKEEPER.get(inventoryPath));
- resumeIncrementalPosition(ZOOKEEPER.get(incrementalPath));
+ resumeInventoryPosition(CURATOR_ZOOKEEPER_REPOSITORY.get(inventoryPath));
+ resumeIncrementalPosition(CURATOR_ZOOKEEPER_REPOSITORY.get(incrementalPath));
}
private void persistPosition() {
@@ -85,14 +85,14 @@ public final class ZookeeperResumablePositionManager extends AbstractResumablePo
@Override
public void persistInventoryPosition() {
String result = getInventoryPositionData();
- ZOOKEEPER.persist(inventoryPath, result);
+ CURATOR_ZOOKEEPER_REPOSITORY.persist(inventoryPath, result);
log.info("persist inventory position {} = {}", inventoryPath, result);
}
@Override
public void persistIncrementalPosition() {
String result = getIncrementalPositionData();
- ZOOKEEPER.persist(incrementalPath, result);
+ CURATOR_ZOOKEEPER_REPOSITORY.persist(incrementalPath, result);
log.info("persist incremental position {} = {}", incrementalPath, result);
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/utils/JobPrepareUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/utils/JobPrepareUtil.java
index ca69ddc..c6daa5e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/utils/JobPrepareUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/utils/JobPrepareUtil.java
@@ -17,13 +17,11 @@
package org.apache.shardingsphere.scaling.core.job.preparer.utils;
+import com.google.common.collect.Lists;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedList;
import java.util.List;
/**
@@ -39,14 +37,7 @@ public final class JobPrepareUtil {
* @param allInventoryDataTasks all inventory data tasks
* @return task group list
*/
- public static List<Collection<ScalingTask>> groupInventoryDataTasks(final int taskNumber, final List<ScalingTask> allInventoryDataTasks) {
- List<Collection<ScalingTask>> result = new ArrayList<>(taskNumber);
- for (int i = 0; i < taskNumber; i++) {
- result.add(new LinkedList<>());
- }
- for (int i = 0; i < allInventoryDataTasks.size(); i++) {
- result.get(i % taskNumber).add(allInventoryDataTasks.get(i));
- }
- return result;
+ public static List<List<ScalingTask>> groupInventoryDataTasks(final int taskNumber, final List<ScalingTask> allInventoryDataTasks) {
+ return Lists.partition(allInventoryDataTasks, taskNumber);
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
index 30be580..1aadf20 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
@@ -25,8 +25,8 @@ import org.apache.shardingsphere.scaling.core.job.SyncProgress;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
import java.util.Collection;
-import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
/**
* Sharding scaling task scheduler.
@@ -125,11 +125,7 @@ public final class ScalingTaskScheduler implements Runnable {
* @return all inventory data task progress
*/
public Collection<SyncProgress> getInventoryDataTaskProgress() {
- Collection<SyncProgress> result = new LinkedList<>();
- for (ScalingTask each : shardingScalingJob.getInventoryDataTasks()) {
- result.add(each.getProgress());
- }
- return result;
+ return shardingScalingJob.getInventoryDataTasks().stream().map(ScalingTask::getProgress).collect(Collectors.toList());
}
/**
@@ -138,10 +134,6 @@ public final class ScalingTaskScheduler implements Runnable {
* @return all incremental data task progress
*/
public Collection<SyncProgress> getIncrementalDataTaskProgress() {
- Collection<SyncProgress> result = new LinkedList<>();
- for (ScalingTask each : shardingScalingJob.getIncrementalDataTasks()) {
- result.add(each.getProgress());
- }
- return result;
+ return shardingScalingJob.getIncrementalDataTasks().stream().map(ScalingTask::getProgress).collect(Collectors.toList());
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
index 34a679e..546858f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
@@ -69,8 +69,8 @@ public final class SyncPositionResumerTest {
resumablePositionManager.getInventoryPositionManagerMap().put("ds0", new PrimaryKeyPositionManager(new PrimaryKeyPosition(0, 100)));
resumablePositionManager.getIncrementalPositionManagerMap().put("ds0.t_order", mockPositionManager());
syncPositionResumer.resumePosition(shardingScalingJob, new DataSourceManager(), resumablePositionManager);
- assertEquals(shardingScalingJob.getIncrementalDataTasks().size(), 1);
- assertEquals(shardingScalingJob.getInventoryDataTasks().size(), 3);
+ assertEquals(1, shardingScalingJob.getIncrementalDataTasks().size());
+ assertEquals(0, shardingScalingJob.getInventoryDataTasks().size());
}
@Test
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManagerTest.java
index 33dd347..1ee0dcc 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManagerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManagerTest.java
@@ -50,20 +50,20 @@ public final class AbstractResumablePositionManagerTest {
@Test
public void assertResumeIncrementalPosition() {
resumablePositionManager.resumeIncrementalPosition(incrementalPosition);
- assertEquals(resumablePositionManager.getIncrementalPositionManagerMap().size(), 2);
+ assertEquals(2, resumablePositionManager.getIncrementalPositionManagerMap().size());
}
@Test
public void assertResumeInventoryPosition() {
resumablePositionManager.resumeInventoryPosition(inventoryPosition);
- assertEquals(resumablePositionManager.getInventoryPositionManagerMap().size(), 3);
+ assertEquals(3, resumablePositionManager.getInventoryPositionManagerMap().size());
}
@Test
public void assertGetIncrementalPositionData() {
resumablePositionManager.getIncrementalPositionManagerMap().put("ds0", new MySQLPositionManager("{\"filename\":\"mysql-bin.000001\",\"position\":4}"));
resumablePositionManager.getIncrementalPositionManagerMap().put("ds1", new MySQLPositionManager("{\"filename\":\"mysql-bin.000002\",\"position\":4}"));
- assertEquals(resumablePositionManager.getIncrementalPositionData(), incrementalPosition);
+ assertEquals(incrementalPosition, resumablePositionManager.getIncrementalPositionData());
}
@Test
@@ -71,6 +71,6 @@ public final class AbstractResumablePositionManagerTest {
resumablePositionManager.getInventoryPositionManagerMap().put("ds0.t_order_1#0", new PrimaryKeyPositionManager(new PrimaryKeyPosition(0, 100)));
resumablePositionManager.getInventoryPositionManagerMap().put("ds0.t_order_1#1", new PrimaryKeyPositionManager(new PrimaryKeyPosition.FinishedPosition()));
resumablePositionManager.getInventoryPositionManagerMap().put("ds1.t_order_1#0", new PrimaryKeyPositionManager(new PrimaryKeyPosition(0, 200)));
- assertEquals(resumablePositionManager.getInventoryPositionData(), inventoryPosition);
+ assertEquals(inventoryPosition, resumablePositionManager.getInventoryPositionData());
}
}