You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2021/02/22 05:36:59 UTC
[shardingsphere] branch master updated: Add
RegistryRepositoryAPIImplTest (#9466)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4dc30fa Add RegistryRepositoryAPIImplTest (#9466)
4dc30fa is described below
commit 4dc30fadc0a368f6936fe40f2cc1a477d8069046
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Mon Feb 22 13:36:27 2021 +0800
Add RegistryRepositoryAPIImplTest (#9466)
* Add tests for ScalingAPIImplTest
* Add RegistryRepositoryAPIImplTest
* Optimize ScalingAPIImplTest
Co-authored-by: qiulu3 <Lucas209910>
---
.../api/impl/RegistryRepositoryAPIImplTest.java | 141 +++++++++++++++++++++
.../scaling/core/api/impl/ScalingAPIImplTest.java | 50 +++++++-
2 files changed, 189 insertions(+), 2 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImplTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImplTest.java
new file mode 100644
index 0000000..7d22321
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImplTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.api.impl;
+
+import org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration;
+import org.apache.shardingsphere.governance.repository.api.config.GovernanceConfiguration;
+import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEvent;
+import org.apache.shardingsphere.scaling.core.api.RegistryRepositoryAPI;
+import org.apache.shardingsphere.scaling.core.api.ScalingAPIFactory;
+import org.apache.shardingsphere.scaling.core.common.constant.ScalingConstant;
+import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
+import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ScalingContext;
+import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
+import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
+import org.apache.shardingsphere.scaling.core.fixture.EmbedTestingServer;
+import org.apache.shardingsphere.scaling.core.job.JobContext;
+import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
+import org.apache.shardingsphere.scaling.core.job.progress.JobProgress;
+import org.apache.shardingsphere.scaling.core.job.task.ScalingTaskFactory;
+import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTask;
+import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTask;
+import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
+import org.apache.shardingsphere.scaling.core.util.ResourceUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+
+public final class RegistryRepositoryAPIImplTest {
+
+ private static RegistryRepositoryAPI registryRepositoryAPI;
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ EmbedTestingServer.start();
+ ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "serverConfig", mockServerConfig());
+ registryRepositoryAPI = ScalingAPIFactory.getRegistryRepositoryAPI();
+ }
+
+ @Test
+ public void assertPersistJobProgress() {
+ JobContext jobContext = mockJobContext();
+ registryRepositoryAPI.persistJobProgress(jobContext);
+ JobProgress actual = registryRepositoryAPI.getJobProgress(jobContext.getJobId(), jobContext.getShardingItem());
+ assertThat(actual.toString(), is(mockYamlJobProgress()));
+ }
+
+ @Test
+ public void assertDeleteJob() {
+ registryRepositoryAPI.persist(ScalingConstant.SCALING_ROOT + "/1", "");
+ registryRepositoryAPI.deleteJob(1L);
+ JobProgress actual = registryRepositoryAPI.getJobProgress(0L, 0);
+ assertNull(actual);
+ }
+
+ @Test
+ public void assertGetChildrenKeys() {
+ registryRepositoryAPI.persist(ScalingConstant.SCALING_ROOT + "/1", "");
+ List<String> actual = registryRepositoryAPI.getChildrenKeys(ScalingConstant.SCALING_ROOT);
+ assertFalse(actual.isEmpty());
+ }
+
+ @Test
+ public void assertWatch() throws InterruptedException {
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ String key = ScalingConstant.SCALING_ROOT + "/1";
+ registryRepositoryAPI.persist(key, "");
+ registryRepositoryAPI.watch(ScalingConstant.SCALING_ROOT, event -> {
+ if (event.getKey().equals(key)) {
+ assertThat(event.getType(), is(DataChangedEvent.Type.ADDED));
+ countDownLatch.countDown();
+ }
+ });
+ countDownLatch.await();
+ }
+
+ private static ServerConfiguration mockServerConfig() {
+ ServerConfiguration result = new ServerConfiguration();
+ result.setGovernanceConfig(new GovernanceConfiguration("test", new GovernanceCenterConfiguration("Zookeeper", EmbedTestingServer.getConnectionString(), null), true));
+ return result;
+ }
+
+ private JobContext mockJobContext() {
+ JobContext result = new JobContext(ResourceUtil.mockJobConfig());
+ TaskConfiguration taskConfig = result.getTaskConfigs().iterator().next();
+ result.getInventoryTasks().add(mockInventoryTask(taskConfig));
+ result.getIncrementalTasks().add(mockIncrementalTask(taskConfig));
+ return result;
+ }
+
+ private InventoryTask mockInventoryTask(final TaskConfiguration taskConfig) {
+ InventoryDumperConfiguration dumperConfig = new InventoryDumperConfiguration(taskConfig.getDumperConfig());
+ dumperConfig.setPosition(new PlaceholderPosition());
+ dumperConfig.setTableName("t_order");
+ dumperConfig.setPrimaryKey("order_id");
+ dumperConfig.setShardingItem(0);
+ return ScalingTaskFactory.createInventoryTask(dumperConfig, taskConfig.getImporterConfig());
+ }
+
+ private IncrementalTask mockIncrementalTask(final TaskConfiguration taskConfig) {
+ DumperConfiguration dumperConfig = taskConfig.getDumperConfig();
+ dumperConfig.setPosition(new PlaceholderPosition());
+ return ScalingTaskFactory.createIncrementalTask(3, dumperConfig, taskConfig.getImporterConfig());
+ }
+
+ private String mockYamlJobProgress() {
+ return "databaseType: H2\n"
+ + "incremental:\n"
+ + " ds_0:\n"
+ + " delay:\n"
+ + " delayMilliseconds: -1\n"
+ + " lastEventTimestamps: 0\n"
+ + " position: ''\n"
+ + "inventory:\n"
+ + " unfinished:\n"
+ + " ds_0.t_order#0: ''\n"
+ + "status: RUNNING\n";
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
index 09493c2..142c8fc 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
@@ -23,10 +23,13 @@ import org.apache.shardingsphere.governance.repository.api.config.GovernanceConf
import org.apache.shardingsphere.scaling.core.api.JobInfo;
import org.apache.shardingsphere.scaling.core.api.ScalingAPI;
import org.apache.shardingsphere.scaling.core.api.ScalingAPIFactory;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
+import org.apache.shardingsphere.scaling.core.config.RuleConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
import org.apache.shardingsphere.scaling.core.fixture.EmbedTestingServer;
import org.apache.shardingsphere.scaling.core.job.JobStatus;
+import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
import org.apache.shardingsphere.scaling.core.job.progress.JobProgress;
import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
import org.apache.shardingsphere.scaling.core.util.ResourceUtil;
@@ -34,6 +37,10 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@@ -106,9 +113,33 @@ public final class ScalingAPIImplTest {
assertThat(jobProgressMap.size(), is(1));
}
+ @Test
+ public void assertDataConsistencyCheck() {
+ Optional<Long> jobId = scalingAPI.start(ResourceUtil.mockJobConfig());
+ assertTrue(jobId.isPresent());
+ JobConfiguration jobConfig = scalingAPI.getJobConfig(jobId.get());
+ initTableData(jobConfig.getRuleConfig());
+ Map<String, DataConsistencyCheckResult> checkResultMap = scalingAPI.dataConsistencyCheck(jobId.get());
+ assertThat(checkResultMap.size(), is(1));
+ assertTrue(checkResultMap.get("t_order").isCountValid());
+ assertFalse(checkResultMap.get("t_order").isDataValid());
+ assertThat(checkResultMap.get("t_order").getTargetCount(), is(2L));
+ }
+
+ @Test
+ @SneakyThrows(SQLException.class)
+ public void assertResetTargetTable() {
+ Optional<Long> jobId = scalingAPI.start(ResourceUtil.mockJobConfig());
+ assertTrue(jobId.isPresent());
+ JobConfiguration jobConfig = scalingAPI.getJobConfig(jobId.get());
+ initTableData(jobConfig.getRuleConfig());
+ scalingAPI.resetTargetTable(jobId.get());
+ Map<String, DataConsistencyCheckResult> checkResultMap = scalingAPI.dataConsistencyCheck(jobId.get());
+ assertThat(checkResultMap.get("t_order").getTargetCount(), is(0L));
+ }
+
@AfterClass
- @SneakyThrows(ReflectiveOperationException.class)
- public static void afterClass() {
+ public static void afterClass() throws Exception {
ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "serverConfig", null);
}
@@ -117,4 +148,19 @@ public final class ScalingAPIImplTest {
result.setGovernanceConfig(new GovernanceConfiguration("test", new GovernanceCenterConfiguration("Zookeeper", EmbedTestingServer.getConnectionString(), new Properties()), true));
return result;
}
+
+ @SneakyThrows(SQLException.class)
+ private void initTableData(final RuleConfiguration ruleConfig) {
+ initTableData(ruleConfig.getSource().unwrap().toDataSource());
+ initTableData(ruleConfig.getTarget().unwrap().toDataSource());
+ }
+
+ private void initTableData(final DataSource dataSource) throws SQLException {
+ try (Connection connection = dataSource.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("DROP TABLE IF EXISTS t_order");
+ statement.execute("CREATE TABLE t_order (id INT PRIMARY KEY, user_id VARCHAR(12))");
+ statement.execute("INSERT INTO t_order (id, user_id) VALUES (1, 'xxx'), (999, 'yyy')");
+ }
+ }
}