You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2021/02/22 05:36:59 UTC

[shardingsphere] branch master updated: Add RegistryRepositoryAPIImplTest (#9466)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4dc30fa  Add RegistryRepositoryAPIImplTest (#9466)
4dc30fa is described below

commit 4dc30fadc0a368f6936fe40f2cc1a477d8069046
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Mon Feb 22 13:36:27 2021 +0800

    Add RegistryRepositoryAPIImplTest (#9466)
    
    * Add tests for ScalingAPIImplTest
    
    * Add RegistryRepositoryAPIImplTest
    
    * Optimize ScalingAPIImplTest
    
    Co-authored-by: qiulu3 <Lucas209910>
---
 .../api/impl/RegistryRepositoryAPIImplTest.java    | 141 +++++++++++++++++++++
 .../scaling/core/api/impl/ScalingAPIImplTest.java  |  50 +++++++-
 2 files changed, 189 insertions(+), 2 deletions(-)

diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImplTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImplTest.java
new file mode 100644
index 0000000..7d22321
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImplTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.api.impl;
+
+import org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration;
+import org.apache.shardingsphere.governance.repository.api.config.GovernanceConfiguration;
+import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEvent;
+import org.apache.shardingsphere.scaling.core.api.RegistryRepositoryAPI;
+import org.apache.shardingsphere.scaling.core.api.ScalingAPIFactory;
+import org.apache.shardingsphere.scaling.core.common.constant.ScalingConstant;
+import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
+import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ScalingContext;
+import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
+import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
+import org.apache.shardingsphere.scaling.core.fixture.EmbedTestingServer;
+import org.apache.shardingsphere.scaling.core.job.JobContext;
+import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
+import org.apache.shardingsphere.scaling.core.job.progress.JobProgress;
+import org.apache.shardingsphere.scaling.core.job.task.ScalingTaskFactory;
+import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTask;
+import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTask;
+import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
+import org.apache.shardingsphere.scaling.core.util.ResourceUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+
+public final class RegistryRepositoryAPIImplTest {
+    
+    private static RegistryRepositoryAPI registryRepositoryAPI;
+    
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        EmbedTestingServer.start();
+        ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "serverConfig", mockServerConfig());
+        registryRepositoryAPI = ScalingAPIFactory.getRegistryRepositoryAPI();
+    }
+    
+    @Test
+    public void assertPersistJobProgress() {
+        JobContext jobContext = mockJobContext();
+        registryRepositoryAPI.persistJobProgress(jobContext);
+        JobProgress actual = registryRepositoryAPI.getJobProgress(jobContext.getJobId(), jobContext.getShardingItem());
+        assertThat(actual.toString(), is(mockYamlJobProgress()));
+    }
+    
+    @Test
+    public void assertDeleteJob() {
+        registryRepositoryAPI.persist(ScalingConstant.SCALING_ROOT + "/1", "");
+        registryRepositoryAPI.deleteJob(1L);
+        JobProgress actual = registryRepositoryAPI.getJobProgress(0L, 0);
+        assertNull(actual);
+    }
+    
+    @Test
+    public void assertGetChildrenKeys() {
+        registryRepositoryAPI.persist(ScalingConstant.SCALING_ROOT + "/1", "");
+        List<String> actual = registryRepositoryAPI.getChildrenKeys(ScalingConstant.SCALING_ROOT);
+        assertFalse(actual.isEmpty());
+    }
+    
+    @Test
+    public void assertWatch() throws InterruptedException {
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        String key = ScalingConstant.SCALING_ROOT + "/1";
+        registryRepositoryAPI.persist(key, "");
+        registryRepositoryAPI.watch(ScalingConstant.SCALING_ROOT, event -> {
+            if (event.getKey().equals(key)) {
+                assertThat(event.getType(), is(DataChangedEvent.Type.ADDED));
+                countDownLatch.countDown();
+            }
+        });
+        countDownLatch.await();
+    }
+    
+    private static ServerConfiguration mockServerConfig() {
+        ServerConfiguration result = new ServerConfiguration();
+        result.setGovernanceConfig(new GovernanceConfiguration("test", new GovernanceCenterConfiguration("Zookeeper", EmbedTestingServer.getConnectionString(), null), true));
+        return result;
+    }
+    
+    private JobContext mockJobContext() {
+        JobContext result = new JobContext(ResourceUtil.mockJobConfig());
+        TaskConfiguration taskConfig = result.getTaskConfigs().iterator().next();
+        result.getInventoryTasks().add(mockInventoryTask(taskConfig));
+        result.getIncrementalTasks().add(mockIncrementalTask(taskConfig));
+        return result;
+    }
+    
+    private InventoryTask mockInventoryTask(final TaskConfiguration taskConfig) {
+        InventoryDumperConfiguration dumperConfig = new InventoryDumperConfiguration(taskConfig.getDumperConfig());
+        dumperConfig.setPosition(new PlaceholderPosition());
+        dumperConfig.setTableName("t_order");
+        dumperConfig.setPrimaryKey("order_id");
+        dumperConfig.setShardingItem(0);
+        return ScalingTaskFactory.createInventoryTask(dumperConfig, taskConfig.getImporterConfig());
+    }
+    
+    private IncrementalTask mockIncrementalTask(final TaskConfiguration taskConfig) {
+        DumperConfiguration dumperConfig = taskConfig.getDumperConfig();
+        dumperConfig.setPosition(new PlaceholderPosition());
+        return ScalingTaskFactory.createIncrementalTask(3, dumperConfig, taskConfig.getImporterConfig());
+    }
+    
+    private String mockYamlJobProgress() {
+        return "databaseType: H2\n"
+                + "incremental:\n"
+                + "  ds_0:\n"
+                + "    delay:\n"
+                + "      delayMilliseconds: -1\n"
+                + "      lastEventTimestamps: 0\n"
+                + "    position: ''\n"
+                + "inventory:\n"
+                + "  unfinished:\n"
+                + "    ds_0.t_order#0: ''\n"
+                + "status: RUNNING\n";
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
index 09493c2..142c8fc 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
@@ -23,10 +23,13 @@ import org.apache.shardingsphere.governance.repository.api.config.GovernanceConf
 import org.apache.shardingsphere.scaling.core.api.JobInfo;
 import org.apache.shardingsphere.scaling.core.api.ScalingAPI;
 import org.apache.shardingsphere.scaling.core.api.ScalingAPIFactory;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
+import org.apache.shardingsphere.scaling.core.config.RuleConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ScalingContext;
 import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
 import org.apache.shardingsphere.scaling.core.fixture.EmbedTestingServer;
 import org.apache.shardingsphere.scaling.core.job.JobStatus;
+import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
 import org.apache.shardingsphere.scaling.core.job.progress.JobProgress;
 import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
 import org.apache.shardingsphere.scaling.core.util.ResourceUtil;
@@ -34,6 +37,10 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
@@ -106,9 +113,33 @@ public final class ScalingAPIImplTest {
         assertThat(jobProgressMap.size(), is(1));
     }
     
+    @Test
+    public void assertDataConsistencyCheck() {
+        Optional<Long> jobId = scalingAPI.start(ResourceUtil.mockJobConfig());
+        assertTrue(jobId.isPresent());
+        JobConfiguration jobConfig = scalingAPI.getJobConfig(jobId.get());
+        initTableData(jobConfig.getRuleConfig());
+        Map<String, DataConsistencyCheckResult> checkResultMap = scalingAPI.dataConsistencyCheck(jobId.get());
+        assertThat(checkResultMap.size(), is(1));
+        assertTrue(checkResultMap.get("t_order").isCountValid());
+        assertFalse(checkResultMap.get("t_order").isDataValid());
+        assertThat(checkResultMap.get("t_order").getTargetCount(), is(2L));
+    }
+    
+    @Test
+    @SneakyThrows(SQLException.class)
+    public void assertResetTargetTable() {
+        Optional<Long> jobId = scalingAPI.start(ResourceUtil.mockJobConfig());
+        assertTrue(jobId.isPresent());
+        JobConfiguration jobConfig = scalingAPI.getJobConfig(jobId.get());
+        initTableData(jobConfig.getRuleConfig());
+        scalingAPI.resetTargetTable(jobId.get());
+        Map<String, DataConsistencyCheckResult> checkResultMap = scalingAPI.dataConsistencyCheck(jobId.get());
+        assertThat(checkResultMap.get("t_order").getTargetCount(), is(0L));
+    }
+    
     @AfterClass
-    @SneakyThrows(ReflectiveOperationException.class)
-    public static void afterClass() {
+    public static void afterClass() throws Exception {
         ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "serverConfig", null);
     }
     
@@ -117,4 +148,19 @@ public final class ScalingAPIImplTest {
         result.setGovernanceConfig(new GovernanceConfiguration("test", new GovernanceCenterConfiguration("Zookeeper", EmbedTestingServer.getConnectionString(), new Properties()), true));
         return result;
     }
+    
+    @SneakyThrows(SQLException.class)
+    private void initTableData(final RuleConfiguration ruleConfig) {
+        initTableData(ruleConfig.getSource().unwrap().toDataSource());
+        initTableData(ruleConfig.getTarget().unwrap().toDataSource());
+    }
+    
+    private void initTableData(final DataSource dataSource) throws SQLException {
+        try (Connection connection = dataSource.getConnection();
+             Statement statement = connection.createStatement()) {
+            statement.execute("DROP TABLE IF EXISTS t_order");
+            statement.execute("CREATE TABLE t_order (id INT PRIMARY KEY, user_id VARCHAR(12))");
+            statement.execute("INSERT INTO t_order (id, user_id) VALUES (1, 'xxx'), (999, 'yyy')");
+        }
+    }
 }