You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by to...@apache.org on 2023/05/05 16:12:27 UTC

[shardingsphere] branch master updated: Use awaitility to instead of Thread.sleep() (#25478)

This is an automated email from the ASF dual-hosted git repository.

totalo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c1fb80d0a53 Use awaitility to instead of Thread.sleep() (#25478)
c1fb80d0a53 is described below

commit c1fb80d0a53d663e6697b69b50732d1c0c26d773
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Sat May 6 00:12:19 2023 +0800

    Use awaitility to instead of Thread.sleep() (#25478)
---
 infra/common/pom.xml                                     |  4 ++++
 .../datasource/registry/GlobalDataSourceRegistry.java    |  8 +++-----
 .../pool/destroyer/DataSourcePoolDestroyerTest.java      | 12 +++---------
 .../database/resource/ShardingSphereResourceTest.java    |  9 ++++-----
 infra/executor/pom.xml                                   |  5 +++++
 .../kernel/thread/ExecutorServiceManagerTest.java        |  8 ++++----
 mode/core/pom.xml                                        |  5 +++++
 .../mode/manager/switcher/ResourceSwitchManagerTest.java | 16 +++++++---------
 proxy/backend/type/hbase/pom.xml                         |  9 +++++----
 .../test/e2e/agent/common/env/E2ETestEnvironment.java    |  4 ++--
 .../data/pipeline/cases/PipelineContainerComposer.java   |  2 +-
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java       |  4 ++--
 .../general/PostgreSQLMigrationGeneralE2EIT.java         |  4 ++--
 13 files changed, 47 insertions(+), 43 deletions(-)

diff --git a/infra/common/pom.xml b/infra/common/pom.xml
index 87c9ffe3d3f..2b8cf2df93f 100644
--- a/infra/common/pom.xml
+++ b/infra/common/pom.xml
@@ -58,6 +58,10 @@
             <scope>test</scope>
         </dependency>
         
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+        </dependency>
         <dependency>
             <groupId>com.zaxxer</groupId>
             <artifactId>HikariCP</artifactId>
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/datasource/registry/GlobalDataSourceRegistry.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/datasource/registry/GlobalDataSourceRegistry.java
index 289299d096c..bb5437f232c 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/datasource/registry/GlobalDataSourceRegistry.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/datasource/registry/GlobalDataSourceRegistry.java
@@ -20,25 +20,23 @@ package org.apache.shardingsphere.infra.datasource.registry;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
-import lombok.Setter;
 
 import javax.sql.DataSource;
-import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Global data source registry.
  */
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 @Getter
-@Setter
 public final class GlobalDataSourceRegistry {
     
     private static final GlobalDataSourceRegistry INSTANCE = new GlobalDataSourceRegistry();
     
-    private volatile Map<String, DataSource> cachedDataSourceDataSources = new LinkedHashMap<>();
+    private final Map<String, DataSource> cachedDataSourceDataSources = new ConcurrentHashMap<>();
     
-    private volatile Map<String, String> cachedDatabaseTables = new LinkedHashMap<>();
+    private final Map<String, String> cachedDatabaseTables = new ConcurrentHashMap<>();
     
     /**
      * Get global data source.
diff --git a/infra/common/src/test/java/org/apache/shardingsphere/infra/datasource/pool/destroyer/DataSourcePoolDestroyerTest.java b/infra/common/src/test/java/org/apache/shardingsphere/infra/datasource/pool/destroyer/DataSourcePoolDestroyerTest.java
index 6308637cb14..1a553b5f1cb 100644
--- a/infra/common/src/test/java/org/apache/shardingsphere/infra/datasource/pool/destroyer/DataSourcePoolDestroyerTest.java
+++ b/infra/common/src/test/java/org/apache/shardingsphere/infra/datasource/pool/destroyer/DataSourcePoolDestroyerTest.java
@@ -21,14 +21,14 @@ import com.zaxxer.hikari.HikariConfig;
 import com.zaxxer.hikari.HikariDataSource;
 import org.apache.shardingsphere.test.fixture.jdbc.MockedDataSource;
 import org.apache.shardingsphere.test.fixture.jdbc.MockedDriver;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
 
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.time.Duration;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTimeout;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class DataSourcePoolDestroyerTest {
@@ -45,13 +45,7 @@ class DataSourcePoolDestroyerTest {
             new DataSourcePoolDestroyer(dataSource).asyncDestroy();
             assertFalse(dataSource.isClosed());
         }
-        assertTimeout(Duration.ofSeconds(2L), () -> assertClose(dataSource));
-    }
-    
-    private static void assertClose(final HikariDataSource dataSource) throws InterruptedException {
-        while (!dataSource.isClosed()) {
-            Thread.sleep(10L);
-        }
+        Awaitility.await().atMost(2L, TimeUnit.SECONDS).pollInterval(10L, TimeUnit.MILLISECONDS).until(dataSource::isClosed);
         assertTrue(dataSource.isClosed());
     }
     
diff --git a/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/database/resource/ShardingSphereResourceTest.java b/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/database/resource/ShardingSphereResourceTest.java
index c9db02538a0..f6dea0b43fe 100644
--- a/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/database/resource/ShardingSphereResourceTest.java
+++ b/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/database/resource/ShardingSphereResourceTest.java
@@ -18,22 +18,21 @@
 package org.apache.shardingsphere.infra.metadata.database.resource;
 
 import org.apache.shardingsphere.test.fixture.jdbc.MockedDataSource;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class ShardingSphereResourceTest {
     
-    @SuppressWarnings("BusyWait")
     @Test
-    void assertClose() throws InterruptedException {
+    void assertClose() {
         MockedDataSource dataSource = new MockedDataSource();
         new ShardingSphereResourceMetaData("sharding_db", Collections.singletonMap("foo_ds", dataSource)).close(dataSource);
-        while (!dataSource.isClosed()) {
-            Thread.sleep(10L);
-        }
+        Awaitility.await().atMost(1L, TimeUnit.MINUTES).pollInterval(10L, TimeUnit.MILLISECONDS).until(dataSource::isClosed);
         assertTrue(dataSource.isClosed());
     }
 }
diff --git a/infra/executor/pom.xml b/infra/executor/pom.xml
index 67a120673d5..92714c4bc5f 100644
--- a/infra/executor/pom.xml
+++ b/infra/executor/pom.xml
@@ -56,5 +56,10 @@
             <groupId>com.alibaba</groupId>
             <artifactId>transmittable-thread-local</artifactId>
         </dependency>
+        
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/kernel/thread/ExecutorServiceManagerTest.java b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/kernel/thread/ExecutorServiceManagerTest.java
index ba9e4a5fcce..0d1d46d67e4 100644
--- a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/kernel/thread/ExecutorServiceManagerTest.java
+++ b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/kernel/thread/ExecutorServiceManagerTest.java
@@ -18,10 +18,12 @@
 package org.apache.shardingsphere.infra.executor.kernel.thread;
 
 import com.alibaba.ttl.TransmittableThreadLocal;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -48,10 +50,8 @@ class ExecutorServiceManagerTest {
         assertTimeout(Duration.ofSeconds(1L), () -> assertFinished(finished));
     }
     
-    private void assertFinished(final AtomicBoolean finished) throws InterruptedException {
-        while (!finished.get()) {
-            Thread.sleep(100L);
-        }
+    private void assertFinished(final AtomicBoolean finished) {
+        Awaitility.await().atMost(1L, TimeUnit.MINUTES).pollInterval(100L, TimeUnit.MILLISECONDS).until(finished::get);
     }
     
     private void assertValueChangedInConcurrencyThread() {
diff --git a/mode/core/pom.xml b/mode/core/pom.xml
index de61c9bbcf6..211c728784b 100644
--- a/mode/core/pom.xml
+++ b/mode/core/pom.xml
@@ -62,5 +62,10 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git a/mode/core/src/test/java/org/apache/shardingsphere/mode/manager/switcher/ResourceSwitchManagerTest.java b/mode/core/src/test/java/org/apache/shardingsphere/mode/manager/switcher/ResourceSwitchManagerTest.java
index 44bd4b01b7e..509a7d93153 100644
--- a/mode/core/src/test/java/org/apache/shardingsphere/mode/manager/switcher/ResourceSwitchManagerTest.java
+++ b/mode/core/src/test/java/org/apache/shardingsphere/mode/manager/switcher/ResourceSwitchManagerTest.java
@@ -20,12 +20,14 @@ package org.apache.shardingsphere.mode.manager.switcher;
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 import org.apache.shardingsphere.infra.metadata.database.resource.ShardingSphereResourceMetaData;
 import org.apache.shardingsphere.test.fixture.jdbc.MockedDataSource;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
 
 import javax.sql.DataSource;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -35,7 +37,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 class ResourceSwitchManagerTest {
     
     @Test
-    void assertCreate() throws InterruptedException {
+    void assertCreate() {
         Map<String, DataSource> dataSourceMap = createDataSourceMap();
         SwitchingResource actual = new ResourceSwitchManager().create(new ShardingSphereResourceMetaData("sharding_db", dataSourceMap), createToBeChangedDataSourcePropsMap());
         assertNewDataSources(actual);
@@ -44,7 +46,7 @@ class ResourceSwitchManagerTest {
     }
     
     @Test
-    void assertCreateByAlterDataSourceProps() throws InterruptedException {
+    void assertCreateByAlterDataSourceProps() {
         Map<String, DataSource> dataSourceMap = new HashMap<>(3, 1);
         dataSourceMap.put("ds_0", new MockedDataSource());
         dataSourceMap.put("ds_1", new MockedDataSource());
@@ -78,17 +80,13 @@ class ResourceSwitchManagerTest {
         assertTrue(actual.getNewDataSources().containsKey("replace"));
     }
     
-    private void assertStaleDataSources(final Map<String, DataSource> originalDataSourceMap) throws InterruptedException {
+    private void assertStaleDataSources(final Map<String, DataSource> originalDataSourceMap) {
         assertStaleDataSource((MockedDataSource) originalDataSourceMap.get("replace"));
         assertNotStaleDataSource((MockedDataSource) originalDataSourceMap.get("not_change"));
     }
     
-    @SuppressWarnings("BusyWait")
-    private void assertStaleDataSource(final MockedDataSource dataSource) throws InterruptedException {
-        while (!dataSource.isClosed()) {
-            Thread.sleep(10L);
-        }
-        assertTrue(dataSource.isClosed());
+    private void assertStaleDataSource(final MockedDataSource dataSource) {
+        Awaitility.await().atMost(1L, TimeUnit.MINUTES).pollInterval(10L, TimeUnit.MILLISECONDS).until(dataSource::isClosed);
     }
     
     private void assertNotStaleDataSource(final MockedDataSource dataSource) {
diff --git a/proxy/backend/type/hbase/pom.xml b/proxy/backend/type/hbase/pom.xml
index 52533b1936e..99850d374cf 100644
--- a/proxy/backend/type/hbase/pom.xml
+++ b/proxy/backend/type/hbase/pom.xml
@@ -38,10 +38,6 @@
             <version>${project.version}</version>
         </dependency>
         
-        <dependency>
-            <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase-shaded-client</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.apache.shardingsphere</groupId>
             <artifactId>shardingsphere-proxy-backend-core</artifactId>
@@ -54,5 +50,10 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-shaded-client</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git a/test/e2e/agent/plugins/common/src/test/java/org/apache/shardingsphere/test/e2e/agent/common/env/E2ETestEnvironment.java b/test/e2e/agent/plugins/common/src/test/java/org/apache/shardingsphere/test/e2e/agent/common/env/E2ETestEnvironment.java
index 327d06ed935..77af5057cb9 100644
--- a/test/e2e/agent/plugins/common/src/test/java/org/apache/shardingsphere/test/e2e/agent/common/env/E2ETestEnvironment.java
+++ b/test/e2e/agent/plugins/common/src/test/java/org/apache/shardingsphere/test/e2e/agent/common/env/E2ETestEnvironment.java
@@ -98,7 +98,7 @@ public final class E2ETestEnvironment {
     private boolean waitForProxyEnvironmentReady(final Properties props) {
         log.info("Proxy with agent environment initializing ...");
         try {
-            Awaitility.await().atMost(2, TimeUnit.MINUTES).pollInterval(5, TimeUnit.SECONDS).until(() -> isProxyReady(props));
+            Awaitility.await().atMost(2L, TimeUnit.MINUTES).pollInterval(5L, TimeUnit.SECONDS).until(() -> isProxyReady(props));
         } catch (final ConditionTimeoutException ignored) {
             log.info("Proxy with agent environment initialization failed ...");
             return false;
@@ -136,7 +136,7 @@ public final class E2ETestEnvironment {
     private boolean waitForJdbcEnvironmentReady() {
         log.info("Jdbc project with agent environment initializing ...");
         try {
-            Awaitility.await().atMost(2, TimeUnit.MINUTES).pollInterval(5, TimeUnit.SECONDS).until(() -> isJdbcReady(props));
+            Awaitility.await().atMost(2L, TimeUnit.MINUTES).pollInterval(5L, TimeUnit.SECONDS).until(() -> isJdbcReady(props));
         } catch (final ConditionTimeoutException ignored) {
             log.info("Jdbc project with agent environment initialization failed ...");
             return false;
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index 036ac685976..19b903cf565 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -529,7 +529,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
      */
     // TODO proxy support for some fields still needs to be optimized, such as binary of MySQL, after these problems are optimized, Proxy dataSource can be used.
     public DataSource generateShardingSphereDataSourceFromProxy() throws SQLException {
-        Awaitility.await().atMost(5, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> !getYamlRootConfig().getRules().isEmpty());
+        Awaitility.await().atMost(5L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> !getYamlRootConfig().getRules().isEmpty());
         YamlRootConfiguration rootConfig = getYamlRootConfig();
         if (PipelineEnvTypeEnum.DOCKER == PipelineE2EEnvironment.getInstance().getItEnvType()) {
             DockerStorageContainer storageContainer = ((DockerContainerComposer) containerComposer).getStorageContainers().get(0);
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index e93a4687ce9..133bd0e8495 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -121,7 +121,7 @@ class CDCE2EIT {
                 initSchemaAndTable(containerComposer, connection, 0);
             }
             startCDCClient(containerComposer);
-            Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW STREAMING LIST").isEmpty());
+            Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW STREAMING LIST").isEmpty());
             String jobId = containerComposer.queryForListWithLog("SHOW STREAMING LIST").get(0).get("id").toString();
             containerComposer.waitIncrementTaskFinished(String.format("SHOW STREAMING STATUS '%s'", jobId));
             String tableName = containerComposer.getDatabaseType().isSchemaAvailable() ? String.join(".", "test", SOURCE_TABLE_NAME) : SOURCE_TABLE_NAME;
@@ -132,7 +132,7 @@ class CDCE2EIT {
                 ResultSet resultSet = connection.createStatement().executeQuery(String.format("SELECT * FROM %s ORDER BY order_id ASC", getOrderTableNameWithSchema(containerComposer)));
                 actualProxyList = containerComposer.transformResultSetToList(resultSet);
             }
-            Awaitility.await().atMost(20, TimeUnit.SECONDS).pollInterval(2, TimeUnit.SECONDS)
+            Awaitility.await().atMost(20L, TimeUnit.SECONDS).pollInterval(2L, TimeUnit.SECONDS)
                     .until(() -> listOrderRecords(containerComposer, getOrderTableNameWithSchema(containerComposer)).size() == actualProxyList.size());
             SchemaTableName schemaTableName = containerComposer.getDatabaseType().isSchemaAvailable()
                     ? new SchemaTableName(new SchemaName(PipelineContainerComposer.SCHEMA_NAME), new TableName(SOURCE_TABLE_NAME))
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index ed078d92c19..f518c43e58d 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -76,7 +76,7 @@ class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
             DataSourceExecuteUtils.execute(containerComposer.getSourceDataSource(), containerComposer.getExtraSQLCommand().getFullInsertOrderItem(), dataPair.getRight());
             log.info("init data end: {}", LocalDateTime.now());
             startMigrationWithSchema(containerComposer, SOURCE_TABLE_NAME, "t_order");
-            Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> listJobId(containerComposer).size() > 0);
+            Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> listJobId(containerComposer).size() > 0);
             String jobId = getJobIdByTableName(containerComposer, "ds_0.test." + SOURCE_TABLE_NAME);
             containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
             containerComposer.startIncrementTask(new E2EIncrementalTask(
@@ -103,7 +103,7 @@ class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
         startMigrationByJobId(containerComposer, jobId);
         // must refresh firstly, otherwise proxy can't get schema and table info
         containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA;", 2);
-        Awaitility.await().atMost(5, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog(
+        Awaitility.await().atMost(5L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog(
                 String.format("SELECT * FROM %s WHERE order_id = %s", String.join(".", PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME), recordId)).isEmpty());
         containerComposer.assertProxyOrderRecordExist(String.join(".", PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME), recordId);
         assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");