You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by ki...@apache.org on 2020/09/11 12:19:41 UTC
[shardingsphere] branch master updated: Optimize
RepositoryResumeBreakPointManager (#7412)
This is an automated email from the ASF dual-hosted git repository.
kimmking pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5e18cc3 Optimize RepositoryResumeBreakPointManager (#7412)
5e18cc3 is described below
commit 5e18cc3dd7ac103e6b65694336dda1804da24f93
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Fri Sep 11 20:19:17 2020 +0800
Optimize RepositoryResumeBreakPointManager (#7412)
* add log
* Optimize RepositoryResumeBreakPointManager
Co-authored-by: qiulu3 <Lucas209910>
---
.../resume/RepositoryResumeBreakPointManager.java | 18 ++---
.../core/schedule/ScalingTaskScheduler.java | 4 ++
.../core/fixture/FixtureRegistryRepository.java | 82 ++++++++++++++++++++++
.../RepositoryResumeBreakPointManagerTest.java | 41 ++++++-----
...re.governance.repository.api.RegistryRepository | 18 +++++
5 files changed, 138 insertions(+), 25 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManager.java
index 861b413..8bf5b98 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManager.java
@@ -42,7 +42,7 @@ public final class RepositoryResumeBreakPointManager extends AbstractResumeBreak
private static final String INCREMENTAL = "/incremental";
- private static RegistryRepository registryRepository = RegistryRepositoryHolder.REGISTRY_REPOSITORY;
+ private static final RegistryRepository REGISTRY_REPOSITORY = RegistryRepositoryHolder.REGISTRY_REPOSITORY;
private final ScheduledExecutorService executor;
@@ -77,8 +77,8 @@ public final class RepositoryResumeBreakPointManager extends AbstractResumeBreak
}
private void resumePosition() {
- resumeInventoryPosition(registryRepository.get(inventoryPath));
- resumeIncrementalPosition(registryRepository.get(incrementalPath));
+ resumeInventoryPosition(REGISTRY_REPOSITORY.get(inventoryPath));
+ resumeIncrementalPosition(REGISTRY_REPOSITORY.get(incrementalPath));
}
private void persistPosition() {
@@ -89,14 +89,14 @@ public final class RepositoryResumeBreakPointManager extends AbstractResumeBreak
@Override
public void persistInventoryPosition() {
String result = getInventoryPositionData();
- registryRepository.persist(inventoryPath, result);
+ REGISTRY_REPOSITORY.persist(inventoryPath, result);
log.info("persist inventory position {} = {}", inventoryPath, result);
}
@Override
public void persistIncrementalPosition() {
String result = getIncrementalPositionData();
- registryRepository.persist(incrementalPath, result);
+ REGISTRY_REPOSITORY.persist(incrementalPath, result);
log.info("persist incremental position {} = {}", incrementalPath, result);
}
@@ -108,16 +108,16 @@ public final class RepositoryResumeBreakPointManager extends AbstractResumeBreak
private static boolean available;
private static RegistryRepository getInstance() {
- RegistryRepository registryRepository = null;
+ RegistryRepository result = null;
YamlGovernanceConfiguration resumeBreakPoint = ScalingContext.getInstance().getServerConfiguration().getResumeBreakPoint();
if (resumeBreakPoint != null) {
- registryRepository = createRegistryRepository(new GovernanceConfigurationYamlSwapper().swapToObject(resumeBreakPoint));
+ result = createRegistryRepository(new GovernanceConfigurationYamlSwapper().swapToObject(resumeBreakPoint));
}
- if (registryRepository != null) {
+ if (result != null) {
log.info("zookeeper resume from break-point manager is available.");
available = true;
}
- return registryRepository;
+ return result;
}
private static RegistryRepository createRegistryRepository(final GovernanceConfiguration config) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
index fbc68a8..bc2a460 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.scaling.core.schedule;
import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.execute.engine.ExecuteCallback;
import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
@@ -37,6 +38,7 @@ import java.util.stream.Collectors;
/**
* Sharding scaling task scheduler.
*/
+@Slf4j
@RequiredArgsConstructor
public final class ScalingTaskScheduler implements Runnable {
@@ -72,6 +74,7 @@ public final class ScalingTaskScheduler implements Runnable {
executeIncrementalDataSyncTask();
return;
}
+ log.info("Start inventory data sync task.");
for (ScalingTask<InventoryPosition> each : shardingScalingJob.getInventoryDataTasks()) {
ScalingContext.getInstance().getTaskExecuteEngine().submit(each, inventoryDataTaskCallback);
}
@@ -104,6 +107,7 @@ public final class ScalingTaskScheduler implements Runnable {
}
private void executeIncrementalDataSyncTask() {
+ log.info("Start incremental data sync task.");
if (!SyncTaskControlStatus.MIGRATE_INVENTORY_DATA.name().equals(shardingScalingJob.getStatus())) {
shardingScalingJob.setStatus(SyncTaskControlStatus.STOPPED.name());
return;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureRegistryRepository.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureRegistryRepository.java
new file mode 100644
index 0000000..33a5c08
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureRegistryRepository.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.fixture;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.shardingsphere.governance.repository.api.ConfigurationRepository;
+import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
+import org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration;
+import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEventListener;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+@Getter
+@Setter
+public final class FixtureRegistryRepository implements RegistryRepository, ConfigurationRepository {
+
+ private static final Map<String, String> REGISTRY_DATA = new LinkedHashMap<>();
+
+ private Properties props = new Properties();
+
+ @Override
+ public void init(final String name, final GovernanceCenterConfiguration config) {
+ }
+
+ @Override
+ public String get(final String key) {
+ return REGISTRY_DATA.get(key);
+ }
+
+ @Override
+ public List<String> getChildrenKeys(final String key) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void persist(final String key, final String value) {
+ REGISTRY_DATA.put(key, value);
+ }
+
+ @Override
+ public void persistEphemeral(final String key, final String value) {
+ REGISTRY_DATA.put(key, value);
+ }
+
+ @Override
+ public void delete(final String key) {
+ }
+
+ @Override
+ public void watch(final String key, final DataChangedEventListener listener) {
+ }
+
+ @Override
+ public void close() {
+ REGISTRY_DATA.clear();
+ }
+
+ @Override
+ public String getType() {
+ return "REG_FIXTURE";
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManagerTest.java
index f1d6f8b..db4f738 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManagerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManagerTest.java
@@ -18,6 +18,8 @@
package org.apache.shardingsphere.scaling.core.job.position.resume;
import lombok.SneakyThrows;
+import org.apache.shardingsphere.governance.core.yaml.config.YamlGovernanceCenterConfiguration;
+import org.apache.shardingsphere.governance.core.yaml.config.YamlGovernanceConfiguration;
import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
@@ -25,42 +27,49 @@ import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-import static org.mockito.Mockito.verify;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
-@RunWith(MockitoJUnitRunner.class)
public final class RepositoryResumeBreakPointManagerTest {
- @Mock
- private RegistryRepository registryRepository;
+ private ResumeBreakPointManager resumeBreakPointManager;
- private RepositoryResumeBreakPointManager repositoryResumeBreakPointManager;
+ private RegistryRepository registryRepository;
@Before
@SneakyThrows
public void setUp() {
- ScalingContext.getInstance().init(new ServerConfiguration());
- ReflectionUtil.setFieldValue(RepositoryResumeBreakPointManager.class, null, "registryRepository", registryRepository);
- repositoryResumeBreakPointManager = new RepositoryResumeBreakPointManager("H2", "/base");
+ ScalingContext.getInstance().init(mockServerConfiguration());
+ resumeBreakPointManager = ResumeBreakPointManagerFactory.newInstance("MySQL", "/scalingTest/position/0");
+ registryRepository = ReflectionUtil.getFieldValueFromClass(resumeBreakPointManager, "REGISTRY_REPOSITORY", RegistryRepository.class);
+ }
+
+ private ServerConfiguration mockServerConfiguration() {
+ ServerConfiguration result = new ServerConfiguration();
+ result.setResumeBreakPoint(new YamlGovernanceConfiguration());
+ result.getResumeBreakPoint().setName("scalingJob");
+ result.getResumeBreakPoint().setRegistryCenter(new YamlGovernanceCenterConfiguration());
+ result.getResumeBreakPoint().getRegistryCenter().setType("REG_FIXTURE");
+ return result;
}
@Test
+ @SneakyThrows
public void assertPersistIncrementalPosition() {
- repositoryResumeBreakPointManager.persistIncrementalPosition();
- verify(registryRepository).persist("/base/incremental", "{}");
+ resumeBreakPointManager.persistIncrementalPosition();
+ assertThat(registryRepository.get("/scalingTest/position/0/incremental"), is("{}"));
}
@Test
+ @SneakyThrows
public void assertPersistInventoryPosition() {
- repositoryResumeBreakPointManager.persistInventoryPosition();
- verify(registryRepository).persist("/base/inventory", "{\"unfinished\":{},\"finished\":[]}");
+ resumeBreakPointManager.persistInventoryPosition();
+ assertThat(registryRepository.get("/scalingTest/position/0/inventory"), is("{\"unfinished\":{},\"finished\":[]}"));
}
@After
public void tearDown() {
- repositoryResumeBreakPointManager.close();
+ resumeBreakPointManager.close();
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/META-INF/services/org.apache.shardingsphere.governance.repository.api.RegistryRepository b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/META-INF/services/org.apache.shardingsphere.governance.repository.api.RegistryRepository
new file mode 100644
index 0000000..df0fa47
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/META-INF/services/org.apache.shardingsphere.governance.repository.api.RegistryRepository
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.scaling.core.fixture.FixtureRegistryRepository