You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by ki...@apache.org on 2020/09/11 12:19:41 UTC

[shardingsphere] branch master updated: Optimize RepositoryResumeBreakPointManager (#7412)

This is an automated email from the ASF dual-hosted git repository.

kimmking pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e18cc3  Optimize RepositoryResumeBreakPointManager (#7412)
5e18cc3 is described below

commit 5e18cc3dd7ac103e6b65694336dda1804da24f93
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Fri Sep 11 20:19:17 2020 +0800

    Optimize RepositoryResumeBreakPointManager (#7412)
    
    * add log
    
    * Optimize RepositoryResumeBreakPointManager
    
    Co-authored-by: qiulu3 <Lucas209910>
---
 .../resume/RepositoryResumeBreakPointManager.java  | 18 ++---
 .../core/schedule/ScalingTaskScheduler.java        |  4 ++
 .../core/fixture/FixtureRegistryRepository.java    | 82 ++++++++++++++++++++++
 .../RepositoryResumeBreakPointManagerTest.java     | 41 ++++++-----
 ...re.governance.repository.api.RegistryRepository | 18 +++++
 5 files changed, 138 insertions(+), 25 deletions(-)

diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManager.java
index 861b413..8bf5b98 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManager.java
@@ -42,7 +42,7 @@ public final class RepositoryResumeBreakPointManager extends AbstractResumeBreak
     
     private static final String INCREMENTAL = "/incremental";
     
-    private static RegistryRepository registryRepository = RegistryRepositoryHolder.REGISTRY_REPOSITORY;
+    private static final RegistryRepository REGISTRY_REPOSITORY = RegistryRepositoryHolder.REGISTRY_REPOSITORY;
     
     private final ScheduledExecutorService executor;
     
@@ -77,8 +77,8 @@ public final class RepositoryResumeBreakPointManager extends AbstractResumeBreak
     }
     
     private void resumePosition() {
-        resumeInventoryPosition(registryRepository.get(inventoryPath));
-        resumeIncrementalPosition(registryRepository.get(incrementalPath));
+        resumeInventoryPosition(REGISTRY_REPOSITORY.get(inventoryPath));
+        resumeIncrementalPosition(REGISTRY_REPOSITORY.get(incrementalPath));
     }
     
     private void persistPosition() {
@@ -89,14 +89,14 @@ public final class RepositoryResumeBreakPointManager extends AbstractResumeBreak
     @Override
     public void persistInventoryPosition() {
         String result = getInventoryPositionData();
-        registryRepository.persist(inventoryPath, result);
+        REGISTRY_REPOSITORY.persist(inventoryPath, result);
         log.info("persist inventory position {} = {}", inventoryPath, result);
     }
     
     @Override
     public void persistIncrementalPosition() {
         String result = getIncrementalPositionData();
-        registryRepository.persist(incrementalPath, result);
+        REGISTRY_REPOSITORY.persist(incrementalPath, result);
         log.info("persist incremental position {} = {}", incrementalPath, result);
     }
     
@@ -108,16 +108,16 @@ public final class RepositoryResumeBreakPointManager extends AbstractResumeBreak
         private static boolean available;
     
         private static RegistryRepository getInstance() {
-            RegistryRepository registryRepository = null;
+            RegistryRepository result = null;
             YamlGovernanceConfiguration resumeBreakPoint = ScalingContext.getInstance().getServerConfiguration().getResumeBreakPoint();
             if (resumeBreakPoint != null) {
-                registryRepository = createRegistryRepository(new GovernanceConfigurationYamlSwapper().swapToObject(resumeBreakPoint));
+                result = createRegistryRepository(new GovernanceConfigurationYamlSwapper().swapToObject(resumeBreakPoint));
             }
-            if (registryRepository != null) {
+            if (result != null) {
                 log.info("zookeeper resume from break-point manager is available.");
                 available = true;
             }
-            return registryRepository;
+            return result;
         }
     
         private static RegistryRepository createRegistryRepository(final GovernanceConfiguration config) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
index fbc68a8..bc2a460 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.scaling.core.schedule;
 
 import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.scaling.core.config.ScalingContext;
 import org.apache.shardingsphere.scaling.core.execute.engine.ExecuteCallback;
 import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
@@ -37,6 +38,7 @@ import java.util.stream.Collectors;
 /**
  * Sharding scaling task scheduler.
  */
+@Slf4j
 @RequiredArgsConstructor
 public final class ScalingTaskScheduler implements Runnable {
     
@@ -72,6 +74,7 @@ public final class ScalingTaskScheduler implements Runnable {
             executeIncrementalDataSyncTask();
             return;
         }
+        log.info("Start inventory data sync task.");
         for (ScalingTask<InventoryPosition> each : shardingScalingJob.getInventoryDataTasks()) {
             ScalingContext.getInstance().getTaskExecuteEngine().submit(each, inventoryDataTaskCallback);
         }
@@ -104,6 +107,7 @@ public final class ScalingTaskScheduler implements Runnable {
     }
     
     private void executeIncrementalDataSyncTask() {
+        log.info("Start incremental data sync task.");
         if (!SyncTaskControlStatus.MIGRATE_INVENTORY_DATA.name().equals(shardingScalingJob.getStatus())) {
             shardingScalingJob.setStatus(SyncTaskControlStatus.STOPPED.name());
             return;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureRegistryRepository.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureRegistryRepository.java
new file mode 100644
index 0000000..33a5c08
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureRegistryRepository.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.fixture;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.shardingsphere.governance.repository.api.ConfigurationRepository;
+import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
+import org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration;
+import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEventListener;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+@Getter
+@Setter
+public final class FixtureRegistryRepository implements RegistryRepository, ConfigurationRepository {
+    
+    private static final Map<String, String> REGISTRY_DATA = new LinkedHashMap<>();
+    
+    private Properties props = new Properties();
+    
+    @Override
+    public void init(final String name, final GovernanceCenterConfiguration config) {
+    }
+    
+    @Override
+    public String get(final String key) {
+        return REGISTRY_DATA.get(key);
+    }
+    
+    @Override
+    public List<String> getChildrenKeys(final String key) {
+        return Collections.emptyList();
+    }
+    
+    @Override
+    public void persist(final String key, final String value) {
+        REGISTRY_DATA.put(key, value);
+    }
+    
+    @Override
+    public void persistEphemeral(final String key, final String value) {
+        REGISTRY_DATA.put(key, value);
+    }
+    
+    @Override
+    public void delete(final String key) {
+    }
+    
+    @Override
+    public void watch(final String key, final DataChangedEventListener listener) {
+    }
+    
+    @Override
+    public void close() {
+        REGISTRY_DATA.clear();
+    }
+    
+    @Override
+    public String getType() {
+        return "REG_FIXTURE";
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManagerTest.java
index f1d6f8b..db4f738 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManagerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/RepositoryResumeBreakPointManagerTest.java
@@ -18,6 +18,8 @@
 package org.apache.shardingsphere.scaling.core.job.position.resume;
 
 import lombok.SneakyThrows;
+import org.apache.shardingsphere.governance.core.yaml.config.YamlGovernanceCenterConfiguration;
+import org.apache.shardingsphere.governance.core.yaml.config.YamlGovernanceConfiguration;
 import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
 import org.apache.shardingsphere.scaling.core.config.ScalingContext;
 import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
@@ -25,42 +27,49 @@ import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
 
-import static org.mockito.Mockito.verify;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
 
-@RunWith(MockitoJUnitRunner.class)
 public final class RepositoryResumeBreakPointManagerTest {
     
-    @Mock
-    private RegistryRepository registryRepository;
+    private ResumeBreakPointManager resumeBreakPointManager;
     
-    private RepositoryResumeBreakPointManager repositoryResumeBreakPointManager;
+    private RegistryRepository registryRepository;
     
     @Before
     @SneakyThrows
     public void setUp() {
-        ScalingContext.getInstance().init(new ServerConfiguration());
-        ReflectionUtil.setFieldValue(RepositoryResumeBreakPointManager.class, null, "registryRepository", registryRepository);
-        repositoryResumeBreakPointManager = new RepositoryResumeBreakPointManager("H2", "/base");
+        ScalingContext.getInstance().init(mockServerConfiguration());
+        resumeBreakPointManager = ResumeBreakPointManagerFactory.newInstance("MySQL", "/scalingTest/position/0");
+        registryRepository = ReflectionUtil.getFieldValueFromClass(resumeBreakPointManager, "REGISTRY_REPOSITORY", RegistryRepository.class);
+    }
+    
+    private ServerConfiguration mockServerConfiguration() {
+        ServerConfiguration result = new ServerConfiguration();
+        result.setResumeBreakPoint(new YamlGovernanceConfiguration());
+        result.getResumeBreakPoint().setName("scalingJob");
+        result.getResumeBreakPoint().setRegistryCenter(new YamlGovernanceCenterConfiguration());
+        result.getResumeBreakPoint().getRegistryCenter().setType("REG_FIXTURE");
+        return result;
     }
     
     @Test
+    @SneakyThrows
     public void assertPersistIncrementalPosition() {
-        repositoryResumeBreakPointManager.persistIncrementalPosition();
-        verify(registryRepository).persist("/base/incremental", "{}");
+        resumeBreakPointManager.persistIncrementalPosition();
+        assertThat(registryRepository.get("/scalingTest/position/0/incremental"), is("{}"));
     }
     
     @Test
+    @SneakyThrows
     public void assertPersistInventoryPosition() {
-        repositoryResumeBreakPointManager.persistInventoryPosition();
-        verify(registryRepository).persist("/base/inventory", "{\"unfinished\":{},\"finished\":[]}");
+        resumeBreakPointManager.persistInventoryPosition();
+        assertThat(registryRepository.get("/scalingTest/position/0/inventory"), is("{\"unfinished\":{},\"finished\":[]}"));
     }
     
     @After
     public void tearDown() {
-        repositoryResumeBreakPointManager.close();
+        resumeBreakPointManager.close();
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/META-INF/services/org.apache.shardingsphere.governance.repository.api.RegistryRepository b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/META-INF/services/org.apache.shardingsphere.governance.repository.api.RegistryRepository
new file mode 100644
index 0000000..df0fa47
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/META-INF/services/org.apache.shardingsphere.governance.repository.api.RegistryRepository
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.scaling.core.fixture.FixtureRegistryRepository