You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zi...@apache.org on 2023/03/28 21:00:07 UTC

[shardingsphere] branch master updated: Add more rules for checkstyle (#24886)

This is an automated email from the ASF dual-hosted git repository.

zichaowang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d852884620a Add more rules for checkstyle (#24886)
d852884620a is described below

commit d852884620a61137c18a2d754e3b7d3905c66d97
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Wed Mar 29 04:59:49 2023 +0800

    Add more rules for checkstyle (#24886)
    
    * Add more rule for checkstyle
    
    * Add more rule for checkstyle
    
    * Add more rule for checkstyle
---
 .../AbstractJDBCExecutorCallbackAdviceTest.java    | 27 ++++++----
 .../advice/AbstractSQLParserEngineAdviceTest.java  | 27 ++++++----
 .../protocol/PostgreSQLArrayParameterDecoder.java  | 31 ++++++-----
 ...OnDuplicateKeyUpdateValueParameterRewriter.java | 62 +++++++++++-----------
 .../handler/query/ShowEncryptRuleExecutor.java     | 11 ++--
 ...adwriteSplittingRuleStatementConverterTest.java | 35 ++++++------
 .../ShardingPaginationParameterRewriterTest.java   | 25 +++++----
 .../checker/ShardingTableRuleStatementChecker.java | 10 ++--
 .../ShardingTableRuleStatementConverter.java       |  4 +-
 .../data/pipeline/core/task/IncrementalTask.java   | 48 ++++++++---------
 .../memory/MultiplexMemoryPipelineChannelTest.java | 28 +++++-----
 .../mysql/ingest/client/MySQLClientTest.java       | 30 ++++++-----
 .../MigrationDataConsistencyChecker.java           | 29 ++++++----
 .../nacos/listener/NamingEventListener.java        | 36 +++++++------
 ...iteSplittingRuleConfigurationImportChecker.java |  5 +-
 .../ConvertYamlConfigurationExecutor.java          |  9 ++--
 .../ral/queryable/ExportStorageNodesExecutor.java  | 12 +++--
 .../rql/storage/unit/ShowStorageUnitExecutor.java  | 23 +++++---
 ...plittingRuleConfigurationImportCheckerTest.java |  2 +-
 .../admin/executor/ShowProcessListExecutor.java    | 52 +++++++++---------
 src/resources/checkstyle.xml                       | 39 ++++++++------
 src/resources/checkstyle_ci.xml                    | 35 ++++++------
 .../container/wait/JdbcConnectionWaitStrategy.java | 24 +++++----
 23 files changed, 326 insertions(+), 278 deletions(-)

diff --git a/agent/plugins/tracing/test/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/advice/AbstractJDBCExecutorCallbackAdviceTest.java b/agent/plugins/tracing/test/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/advice/AbstractJDBCExecutorCallbackAdviceTest.java
index b0ce4e26675..2ac8ccc6969 100644
--- a/agent/plugins/tracing/test/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/advice/AbstractJDBCExecutorCallbackAdviceTest.java
+++ b/agent/plugins/tracing/test/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/advice/AbstractJDBCExecutorCallbackAdviceTest.java
@@ -31,6 +31,7 @@ import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.J
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.internal.configuration.plugins.Plugins;
+import org.mockito.invocation.InvocationOnMock;
 
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
@@ -74,17 +75,7 @@ public abstract class AbstractJDBCExecutorCallbackAdviceTest implements AdviceTe
         when(connection.getMetaData()).thenReturn(databaseMetaData);
         when(statement.getConnection()).thenReturn(connection);
         executionUnit = new JDBCExecutionUnit(new ExecutionUnit(DATA_SOURCE_NAME, new SQLUnit(SQL, Collections.emptyList())), null, statement);
-        JDBCExecutorCallback mockedJDBCExecutorCallback = mock(JDBCExecutorCallback.class, invocation -> {
-            switch (invocation.getMethod().getName()) {
-                case "getAttachment":
-                    return attachment;
-                case "setAttachment":
-                    attachment = invocation.getArguments()[0];
-                    return null;
-                default:
-                    return invocation.callRealMethod();
-            }
-        });
+        JDBCExecutorCallback mockedJDBCExecutorCallback = mock(JDBCExecutorCallback.class, this::mockAttachment);
         Map<String, DataSourceMetaData> cachedDatasourceMetaData = (Map<String, DataSourceMetaData>) Plugins.getMemberAccessor()
                 .get(JDBCExecutorCallback.class.getDeclaredField("CACHED_DATASOURCE_METADATA"), mockedJDBCExecutorCallback);
         cachedDatasourceMetaData.put("mock_url", mock(DataSourceMetaData.class));
@@ -93,6 +84,20 @@ public abstract class AbstractJDBCExecutorCallbackAdviceTest implements AdviceTe
         targetObject = (TargetAdviceObject) mockedJDBCExecutorCallback;
     }
     
+    // CHECKSTYLE:OFF
+    private Object mockAttachment(final InvocationOnMock invocation) throws Throwable {
+        // CHECKSTYLE:ON
+        switch (invocation.getMethod().getName()) {
+            case "getAttachment":
+                return attachment;
+            case "setAttachment":
+                attachment = invocation.getArguments()[0];
+                return null;
+            default:
+                return invocation.callRealMethod();
+        }
+    }
+    
     /**
      * Get database type.
      *
diff --git a/agent/plugins/tracing/test/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/advice/AbstractSQLParserEngineAdviceTest.java b/agent/plugins/tracing/test/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/advice/AbstractSQLParserEngineAdviceTest.java
index e755f0824f2..f35ce854147 100644
--- a/agent/plugins/tracing/test/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/advice/AbstractSQLParserEngineAdviceTest.java
+++ b/agent/plugins/tracing/test/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/advice/AbstractSQLParserEngineAdviceTest.java
@@ -22,6 +22,7 @@ import org.apache.shardingsphere.agent.api.advice.TargetAdviceObject;
 import org.apache.shardingsphere.agent.plugin.tracing.TracingAgentExtension;
 import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.invocation.InvocationOnMock;
 
 import static org.mockito.Mockito.mock;
 
@@ -36,17 +37,21 @@ public abstract class AbstractSQLParserEngineAdviceTest implements AdviceTestBas
     @SuppressWarnings("ConstantConditions")
     @Override
     public final void prepare() {
-        Object parserEngine = mock(ShardingSphereSQLParserEngine.class, invocation -> {
-            switch (invocation.getMethod().getName()) {
-                case "getAttachment":
-                    return attachment;
-                case "setAttachment":
-                    attachment = invocation.getArguments()[0];
-                    return null;
-                default:
-                    return invocation.callRealMethod();
-            }
-        });
+        Object parserEngine = mock(ShardingSphereSQLParserEngine.class, this::mockAttachment);
         targetObject = (TargetAdviceObject) parserEngine;
     }
+    
+    // CHECKSTYLE:OFF
+    private Object mockAttachment(final InvocationOnMock invocation) throws Throwable {
+        // CHECKSTYLE:ON
+        switch (invocation.getMethod().getName()) {
+            case "getAttachment":
+                return attachment;
+            case "setAttachment":
+                attachment = invocation.getArguments()[0];
+                return null;
+            default:
+                return invocation.callRealMethod();
+        }
+    }
 }
diff --git a/db-protocol/postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/extended/bind/protocol/PostgreSQLArrayParameterDecoder.java b/db-protocol/postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/extended/bind/protocol/PostgreSQLArrayParameterDecoder.java
index c5bf8b0aac8..c9ab8afa907 100644
--- a/db-protocol/postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/extended/bind/protocol/PostgreSQLArrayParameterDecoder.java
+++ b/db-protocol/postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/extended/bind/protocol/PostgreSQLArrayParameterDecoder.java
@@ -171,21 +171,20 @@ public final class PostgreSQLArrayParameterDecoder {
         Preconditions.checkArgument(value.length() >= 2, "value length less than 2");
         Preconditions.checkArgument('{' == value.charAt(0) && '}' == value.charAt(value.length() - 1), "value not start with '{' or not end with '}'");
         String[] elements = value.substring(1, value.length() - 1).split(",");
-        return Arrays.stream(elements).map(each -> {
-            if ("NULL".equals(each)) {
-                return null;
-            }
-            String result = each;
-            if ('"' == result.charAt(0) && '"' == result.charAt(result.length() - 1)) {
-                result = result.substring(1, result.length() - 1);
-            }
-            while (result.contains("\\\"")) {
-                result = result.replace("\\\"", "\"");
-            }
-            while (result.contains("\\\\")) {
-                result = result.replace("\\\\", "\\");
-            }
-            return result;
-        }).collect(Collectors.toList());
+        return Arrays.stream(elements).map(each -> "NULL".equals(each) ? null : decodeElementText(each)).collect(Collectors.toList());
+    }
+    
+    private static String decodeElementText(final String element) {
+        String result = element;
+        if ('"' == result.charAt(0) && '"' == result.charAt(result.length() - 1)) {
+            result = result.substring(1, result.length() - 1);
+        }
+        while (result.contains("\\\"")) {
+            result = result.replace("\\\"", "\"");
+        }
+        while (result.contains("\\\\")) {
+            result = result.replace("\\\\", "\\");
+        }
+        return result;
     }
 }
diff --git a/features/encrypt/core/src/main/java/org/apache/shardingsphere/encrypt/rewrite/parameter/rewriter/EncryptInsertOnDuplicateKeyUpdateValueParameterRewriter.java b/features/encrypt/core/src/main/java/org/apache/shardingsphere/encrypt/rewrite/parameter/rewriter/EncryptInsertOnDuplicateKeyUpdateValueParameterRewriter.java
index 112d30fc2ae..ab8cf18d247 100644
--- a/features/encrypt/core/src/main/java/org/apache/shardingsphere/encrypt/rewrite/parameter/rewriter/EncryptInsertOnDuplicateKeyUpdateValueParameterRewriter.java
+++ b/features/encrypt/core/src/main/java/org/apache/shardingsphere/encrypt/rewrite/parameter/rewriter/EncryptInsertOnDuplicateKeyUpdateValueParameterRewriter.java
@@ -65,40 +65,40 @@ public final class EncryptInsertOnDuplicateKeyUpdateValueParameterRewriter imple
         OnDuplicateUpdateContext onDuplicateKeyUpdateValueContext = insertStatementContext.getOnDuplicateKeyUpdateValueContext();
         String schemaName = insertStatementContext.getTablesContext().getSchemaName().orElseGet(() -> DatabaseTypeEngine.getDefaultSchemaName(insertStatementContext.getDatabaseType(), databaseName));
         for (int index = 0; index < onDuplicateKeyUpdateValueContext.getValueExpressions().size(); index++) {
-            int columnIndex = index;
-            String encryptLogicColumnName = onDuplicateKeyUpdateValueContext.getColumn(columnIndex).getIdentifier().getValue();
+            String encryptLogicColumnName = onDuplicateKeyUpdateValueContext.getColumn(index).getIdentifier().getValue();
             EncryptContext encryptContext = EncryptContextBuilder.build(databaseName, schemaName, tableName, encryptLogicColumnName);
             Optional<StandardEncryptAlgorithm> encryptor = encryptRule.findEncryptor(tableName, encryptLogicColumnName);
-            encryptor.ifPresent(optional -> {
-                Object plainColumnValue = onDuplicateKeyUpdateValueContext.getValue(columnIndex);
-                if (plainColumnValue instanceof FunctionSegment && "VALUES".equalsIgnoreCase(((FunctionSegment) plainColumnValue).getFunctionName())) {
-                    return;
+            if (!encryptor.isPresent()) {
+                continue;
+            }
+            Object plainColumnValue = onDuplicateKeyUpdateValueContext.getValue(index);
+            if (plainColumnValue instanceof FunctionSegment && "VALUES".equalsIgnoreCase(((FunctionSegment) plainColumnValue).getFunctionName())) {
+                return;
+            }
+            Object cipherColumnValue = encryptor.get().encrypt(plainColumnValue, encryptContext);
+            groupedParamBuilder.getGenericParameterBuilder().addReplacedParameters(index, cipherColumnValue);
+            Collection<Object> addedParams = new LinkedList<>();
+            Optional<StandardEncryptAlgorithm> assistedQueryEncryptor = encryptRule.findAssistedQueryEncryptor(tableName, encryptLogicColumnName);
+            if (assistedQueryEncryptor.isPresent()) {
+                Optional<String> assistedColumnName = encryptRule.findAssistedQueryColumn(tableName, encryptLogicColumnName);
+                Preconditions.checkArgument(assistedColumnName.isPresent(), "Can not find assisted query Column Name");
+                addedParams.add(assistedQueryEncryptor.get().encrypt(plainColumnValue, encryptContext));
+            }
+            Optional<LikeEncryptAlgorithm> likeQueryEncryptor = encryptRule.findLikeQueryEncryptor(tableName, encryptLogicColumnName);
+            if (likeQueryEncryptor.isPresent()) {
+                Optional<String> likeColumnName = encryptRule.findLikeQueryColumn(tableName, encryptLogicColumnName);
+                Preconditions.checkArgument(likeColumnName.isPresent(), "Can not find assisted query Column Name");
+                addedParams.add(likeQueryEncryptor.get().encrypt(plainColumnValue, encryptContext));
+            }
+            if (encryptRule.findPlainColumn(tableName, encryptLogicColumnName).isPresent()) {
+                addedParams.add(plainColumnValue);
+            }
+            if (!addedParams.isEmpty()) {
+                if (!groupedParamBuilder.getGenericParameterBuilder().getAddedIndexAndParameters().containsKey(index)) {
+                    groupedParamBuilder.getGenericParameterBuilder().getAddedIndexAndParameters().put(index, new LinkedList<>());
                 }
-                Object cipherColumnValue = encryptor.get().encrypt(plainColumnValue, encryptContext);
-                groupedParamBuilder.getGenericParameterBuilder().addReplacedParameters(columnIndex, cipherColumnValue);
-                Collection<Object> addedParams = new LinkedList<>();
-                Optional<StandardEncryptAlgorithm> assistedQueryEncryptor = encryptRule.findAssistedQueryEncryptor(tableName, encryptLogicColumnName);
-                if (assistedQueryEncryptor.isPresent()) {
-                    Optional<String> assistedColumnName = encryptRule.findAssistedQueryColumn(tableName, encryptLogicColumnName);
-                    Preconditions.checkArgument(assistedColumnName.isPresent(), "Can not find assisted query Column Name");
-                    addedParams.add(assistedQueryEncryptor.get().encrypt(plainColumnValue, encryptContext));
-                }
-                Optional<LikeEncryptAlgorithm> likeQueryEncryptor = encryptRule.findLikeQueryEncryptor(tableName, encryptLogicColumnName);
-                if (likeQueryEncryptor.isPresent()) {
-                    Optional<String> likeColumnName = encryptRule.findLikeQueryColumn(tableName, encryptLogicColumnName);
-                    Preconditions.checkArgument(likeColumnName.isPresent(), "Can not find assisted query Column Name");
-                    addedParams.add(likeQueryEncryptor.get().encrypt(plainColumnValue, encryptContext));
-                }
-                if (encryptRule.findPlainColumn(tableName, encryptLogicColumnName).isPresent()) {
-                    addedParams.add(plainColumnValue);
-                }
-                if (!addedParams.isEmpty()) {
-                    if (!groupedParamBuilder.getGenericParameterBuilder().getAddedIndexAndParameters().containsKey(columnIndex)) {
-                        groupedParamBuilder.getGenericParameterBuilder().getAddedIndexAndParameters().put(columnIndex, new LinkedList<>());
-                    }
-                    groupedParamBuilder.getGenericParameterBuilder().getAddedIndexAndParameters().get(columnIndex).addAll(addedParams);
-                }
-            });
+                groupedParamBuilder.getGenericParameterBuilder().getAddedIndexAndParameters().get(index).addAll(addedParams);
+            }
         }
     }
 }
diff --git a/features/encrypt/distsql/handler/src/main/java/org/apache/shardingsphere/encrypt/distsql/handler/query/ShowEncryptRuleExecutor.java b/features/encrypt/distsql/handler/src/main/java/org/apache/shardingsphere/encrypt/distsql/handler/query/ShowEncryptRuleExecutor.java
index 0ff6aa14a12..aa413226f81 100644
--- a/features/encrypt/distsql/handler/src/main/java/org/apache/shardingsphere/encrypt/distsql/handler/query/ShowEncryptRuleExecutor.java
+++ b/features/encrypt/distsql/handler/src/main/java/org/apache/shardingsphere/encrypt/distsql/handler/query/ShowEncryptRuleExecutor.java
@@ -59,22 +59,25 @@ public final class ShowEncryptRuleExecutor implements RQLExecutor<ShowEncryptRul
     private Collection<LocalDataQueryResultRow> buildColumnData(final EncryptTableRuleConfiguration tableRuleConfig, final Map<String, AlgorithmConfiguration> algorithmMap,
                                                                 final boolean queryWithCipherColumn) {
         Collection<LocalDataQueryResultRow> result = new LinkedList<>();
-        tableRuleConfig.getColumns().forEach(each -> {
+        for (EncryptColumnRuleConfiguration each : tableRuleConfig.getColumns()) {
             AlgorithmConfiguration encryptorAlgorithmConfig = algorithmMap.get(each.getEncryptorName());
             AlgorithmConfiguration assistedQueryEncryptorAlgorithmConfig = algorithmMap.get(each.getAssistedQueryEncryptorName());
             AlgorithmConfiguration likeQueryEncryptorAlgorithmConfig = algorithmMap.get(each.getLikeQueryEncryptorName());
-            result.add(new LocalDataQueryResultRow(Arrays.asList(tableRuleConfig.getName(), each.getLogicColumn(),
+            result.add(new LocalDataQueryResultRow(Arrays.asList(
+                    tableRuleConfig.getName(),
+                    each.getLogicColumn(),
                     each.getCipherColumn(),
                     nullToEmptyString(each.getPlainColumn()),
                     nullToEmptyString(each.getAssistedQueryColumn()),
                     nullToEmptyString(each.getLikeQueryColumn()),
-                    encryptorAlgorithmConfig.getType(), PropertiesConverter.convert(encryptorAlgorithmConfig.getProps()),
+                    encryptorAlgorithmConfig.getType(),
+                    PropertiesConverter.convert(encryptorAlgorithmConfig.getProps()),
                     Objects.isNull(assistedQueryEncryptorAlgorithmConfig) ? nullToEmptyString(null) : assistedQueryEncryptorAlgorithmConfig.getType(),
                     Objects.isNull(assistedQueryEncryptorAlgorithmConfig) ? nullToEmptyString(null) : PropertiesConverter.convert(assistedQueryEncryptorAlgorithmConfig.getProps()),
                     Objects.isNull(likeQueryEncryptorAlgorithmConfig) ? nullToEmptyString(null) : likeQueryEncryptorAlgorithmConfig.getType(),
                     Objects.isNull(likeQueryEncryptorAlgorithmConfig) ? nullToEmptyString(null) : PropertiesConverter.convert(likeQueryEncryptorAlgorithmConfig.getProps()),
                     isQueryWithCipherColumn(queryWithCipherColumn, tableRuleConfig, each).toString())));
-        });
+        }
         return result;
     }
     
diff --git a/features/readwrite-splitting/distsql/handler/src/test/java/org/apache/shardingsphere/readwritesplitting/distsql/handler/converter/ReadwriteSplittingRuleStatementConverterTest.java b/features/readwrite-splitting/distsql/handler/src/test/java/org/apache/shardingsphere/readwritesplitting/distsql/handler/converter/ReadwriteSplittingRuleStatementConverterTest.java
index 02a8e3dac90..9ab1fe0d376 100644
--- a/features/readwrite-splitting/distsql/handler/src/test/java/org/apache/shardingsphere/readwritesplitting/distsql/handler/converter/ReadwriteSplittingRuleStatementConverterTest.java
+++ b/features/readwrite-splitting/distsql/handler/src/test/java/org/apache/shardingsphere/readwritesplitting/distsql/handler/converter/ReadwriteSplittingRuleStatementConverterTest.java
@@ -31,7 +31,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.stream.Stream;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -83,22 +82,24 @@ class ReadwriteSplittingRuleStatementConverterTest {
         assertThat(actualMultipleRuleSegmentConvertResultDataSources.size(), is(expectedMultipleReadwriteSplittingRuleSegments.size()));
         assertThat(actualMultipleRuleSegmentConvertResultLoadBalancers.size(), is(expectedMultipleReadwriteSplittingRuleSegments.size()));
         List<ReadwriteSplittingDataSourceRuleConfiguration> actualRuleConfigs = new ArrayList<>(actualMultipleRuleSegmentConvertResultDataSources);
-        Stream.iterate(0, i -> i + 1)
-                .limit(expectedMultipleReadwriteSplittingRuleSegments.size())
-                .forEach(each -> {
-                    ReadwriteSplittingRuleSegment expectedReadwriteSplittingRuleSegment = expectedMultipleReadwriteSplittingRuleSegments.get(each);
-                    ReadwriteSplittingDataSourceRuleConfiguration actualRuleConfig = actualRuleConfigs.get(each);
-                    assertThat(actualRuleConfig.getName(), is(expectedReadwriteSplittingRuleSegment.getName()));
-                    String expectedLoadBalancerName = String.format("%s_%s", expectedReadwriteSplittingRuleSegment.getName(), expectedReadwriteSplittingRuleSegment.getLoadBalancer().getName());
-                    assertThat(actualRuleConfig.getLoadBalancerName(), is(expectedLoadBalancerName));
-                    assertThat(getWriteDataSourceName(actualRuleConfig), is(expectedReadwriteSplittingRuleSegment.getWriteDataSource()));
-                    assertThat(getReadDataSourceNames(actualRuleConfig), is(
-                            null == expectedReadwriteSplittingRuleSegment.getReadDataSources() ? Collections.emptyList() : expectedReadwriteSplittingRuleSegment.getReadDataSources()));
-                    assertTrue(actualMultipleRuleSegmentConvertResultLoadBalancers.containsKey(expectedLoadBalancerName));
-                    AlgorithmConfiguration actualSphereAlgorithmConfig = actualMultipleRuleSegmentConvertResultLoadBalancers.get(actualRuleConfig.getLoadBalancerName());
-                    assertThat(actualSphereAlgorithmConfig.getType(), is(expectedReadwriteSplittingRuleSegment.getLoadBalancer().getName()));
-                    assertThat(actualSphereAlgorithmConfig.getProps(), is(expectedReadwriteSplittingRuleSegment.getLoadBalancer().getProps()));
-                });
+        long limit = expectedMultipleReadwriteSplittingRuleSegments.size();
+        for (int i = 0;; i = i + 1) {
+            if (0 == limit--) {
+                break;
+            }
+            ReadwriteSplittingRuleSegment expectedReadwriteSplittingRuleSegment = expectedMultipleReadwriteSplittingRuleSegments.get(i);
+            ReadwriteSplittingDataSourceRuleConfiguration actualRuleConfig = actualRuleConfigs.get(i);
+            assertThat(actualRuleConfig.getName(), is(expectedReadwriteSplittingRuleSegment.getName()));
+            String expectedLoadBalancerName = String.format("%s_%s", expectedReadwriteSplittingRuleSegment.getName(), expectedReadwriteSplittingRuleSegment.getLoadBalancer().getName());
+            assertThat(actualRuleConfig.getLoadBalancerName(), is(expectedLoadBalancerName));
+            assertThat(getWriteDataSourceName(actualRuleConfig), is(expectedReadwriteSplittingRuleSegment.getWriteDataSource()));
+            assertThat(getReadDataSourceNames(actualRuleConfig),
+                    is(null == expectedReadwriteSplittingRuleSegment.getReadDataSources() ? Collections.emptyList() : expectedReadwriteSplittingRuleSegment.getReadDataSources()));
+            assertTrue(actualMultipleRuleSegmentConvertResultLoadBalancers.containsKey(expectedLoadBalancerName));
+            AlgorithmConfiguration actualSphereAlgorithmConfig = actualMultipleRuleSegmentConvertResultLoadBalancers.get(actualRuleConfig.getLoadBalancerName());
+            assertThat(actualSphereAlgorithmConfig.getType(), is(expectedReadwriteSplittingRuleSegment.getLoadBalancer().getName()));
+            assertThat(actualSphereAlgorithmConfig.getProps(), is(expectedReadwriteSplittingRuleSegment.getLoadBalancer().getProps()));
+        }
     }
     
     private String getWriteDataSourceName(final ReadwriteSplittingDataSourceRuleConfiguration ruleConfig) {
diff --git a/features/sharding/core/src/test/java/org/apache/shardingsphere/sharding/rewrite/parameter/ShardingPaginationParameterRewriterTest.java b/features/sharding/core/src/test/java/org/apache/shardingsphere/sharding/rewrite/parameter/ShardingPaginationParameterRewriterTest.java
index 8a0d1d0bbdd..511b93a2827 100644
--- a/features/sharding/core/src/test/java/org/apache/shardingsphere/sharding/rewrite/parameter/ShardingPaginationParameterRewriterTest.java
+++ b/features/sharding/core/src/test/java/org/apache/shardingsphere/sharding/rewrite/parameter/ShardingPaginationParameterRewriterTest.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.infra.rewrite.parameter.builder.impl.StandardPa
 import org.apache.shardingsphere.infra.route.context.RouteContext;
 import org.apache.shardingsphere.sharding.rewrite.parameter.impl.ShardingPaginationParameterRewriter;
 import org.junit.jupiter.api.Test;
+import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.util.Optional;
@@ -73,17 +74,7 @@ class ShardingPaginationParameterRewriterTest {
         addOffsetParametersFlag = false;
         addRowCountParameterFlag = false;
         StandardParameterBuilder standardParamBuilder = mock(StandardParameterBuilder.class);
-        doAnswer((Answer<Void>) invocation -> {
-            int index = invocation.getArgument(0);
-            long param = invocation.getArgument(1);
-            if (index == TEST_OFFSET_PARAMETER_INDEX && param == TEST_REVISED_OFFSET) {
-                addOffsetParametersFlag = true;
-            }
-            if (index == TEST_ROW_COUNT_PARAMETER_INDEX && param == TEST_REVISED_ROW_COUNT) {
-                addRowCountParameterFlag = true;
-            }
-            return null;
-        }).when(standardParamBuilder).addReplacedParameters(anyInt(), anyLong());
+        doAnswer((Answer<Void>) ShardingPaginationParameterRewriterTest::mockAddReplacedParameters).when(standardParamBuilder).addReplacedParameters(anyInt(), anyLong());
         SelectStatementContext selectStatementContext = mock(SelectStatementContext.class);
         PaginationContext pagination = mock(PaginationContext.class);
         when(pagination.getOffsetParameterIndex()).thenReturn(Optional.of(TEST_OFFSET_PARAMETER_INDEX));
@@ -95,4 +86,16 @@ class ShardingPaginationParameterRewriterTest {
         assertTrue(addOffsetParametersFlag);
         assertTrue(addRowCountParameterFlag);
     }
+    
+    private static Void mockAddReplacedParameters(final InvocationOnMock invocation) {
+        int index = invocation.getArgument(0);
+        long param = invocation.getArgument(1);
+        if (index == TEST_OFFSET_PARAMETER_INDEX && param == TEST_REVISED_OFFSET) {
+            addOffsetParametersFlag = true;
+        }
+        if (index == TEST_ROW_COUNT_PARAMETER_INDEX && param == TEST_REVISED_ROW_COUNT) {
+            addRowCountParameterFlag = true;
+        }
+        return null;
+    }
 }
diff --git a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/sharding/distsql/handler/checker/ShardingTableRuleStatementChecker.java b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/sharding/distsql/handler/checker/ShardingTableRuleStatementChecker.java
index 4d7642a0585..0e5586a9055 100644
--- a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/sharding/distsql/handler/checker/ShardingTableRuleStatementChecker.java
+++ b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/sharding/distsql/handler/checker/ShardingTableRuleStatementChecker.java
@@ -300,7 +300,7 @@ public final class ShardingTableRuleStatementChecker {
     }
     
     private static void checkStrategy(final String databaseName, final Collection<TableRuleSegment> rules) {
-        rules.forEach(each -> {
+        for (TableRuleSegment each : rules) {
             Optional<ShardingStrategySegment> databaseStrategySegment = Optional.ofNullable(each.getDatabaseStrategySegment());
             if (databaseStrategySegment.isPresent() && !databaseStrategySegment.get().getType().equalsIgnoreCase("none")) {
                 AlgorithmSegment databaseShardingAlgorithm = databaseStrategySegment.get().getShardingAlgorithm();
@@ -311,8 +311,7 @@ public final class ShardingTableRuleStatementChecker {
                                     String.format("auto sharding algorithm cannot be used to create a table in Table mode `%s`", each.getLogicTable())));
                 }
                 ShardingSpherePreconditions.checkState(!isInvalidStrategy(each.getDatabaseStrategySegment()),
-                        () -> new InvalidAlgorithmConfigurationException(databaseName,
-                                null == databaseShardingAlgorithm ? null : databaseShardingAlgorithm.getName()));
+                        () -> new InvalidAlgorithmConfigurationException(databaseName, null == databaseShardingAlgorithm ? null : databaseShardingAlgorithm.getName()));
             }
             Optional<ShardingStrategySegment> tableStrategySegment = Optional.ofNullable(each.getTableStrategySegment());
             if (tableStrategySegment.isPresent() && !tableStrategySegment.get().getType().equalsIgnoreCase("none")) {
@@ -324,10 +323,9 @@ public final class ShardingTableRuleStatementChecker {
                                     String.format("auto sharding algorithm cannot be used to create a table in Table mode `%s`", each.getLogicTable())));
                 }
                 ShardingSpherePreconditions.checkState(!isInvalidStrategy(each.getTableStrategySegment()),
-                        () -> new InvalidAlgorithmConfigurationException(databaseName,
-                                null == tableShardingAlgorithm ? null : tableShardingAlgorithm.getName()));
+                        () -> new InvalidAlgorithmConfigurationException(databaseName, null == tableShardingAlgorithm ? null : tableShardingAlgorithm.getName()));
             }
-        });
+        }
     }
     
     private static boolean isInvalidStrategy(final ShardingStrategySegment shardingStrategySegment) {
diff --git a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/sharding/distsql/handler/converter/ShardingTableRuleStatementConverter.java b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/sharding/distsql/handler/converter/ShardingTableRuleStatementConverter.java
index b96de6bb258..889b7261e81 100644
--- a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/sharding/distsql/handler/converter/ShardingTableRuleStatementConverter.java
+++ b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/sharding/distsql/handler/converter/ShardingTableRuleStatementConverter.java
@@ -59,7 +59,7 @@ public final class ShardingTableRuleStatementConverter {
      */
     public static ShardingRuleConfiguration convert(final Collection<AbstractTableRuleSegment> rules) {
         ShardingRuleConfiguration result = new ShardingRuleConfiguration();
-        rules.forEach(each -> {
+        for (AbstractTableRuleSegment each : rules) {
             result.getKeyGenerators().putAll(createKeyGeneratorConfiguration(each));
             result.getAuditors().putAll(createAuditorConfiguration(each));
             if (each instanceof AutoTableRuleSegment) {
@@ -70,7 +70,7 @@ public final class ShardingTableRuleStatementConverter {
                 result.getShardingAlgorithms().putAll(createAlgorithmConfiguration((TableRuleSegment) each));
                 result.getTables().add(createTableRuleConfiguration((TableRuleSegment) each));
             }
-        });
+        }
         return result;
     }
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 593f6c098d4..4b2aa9a752b 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.task;
 
 import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
@@ -119,32 +120,8 @@ public final class IncrementalTask implements PipelineTask, AutoCloseable {
     public Collection<CompletableFuture<?>> start() {
         taskProgress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
         Collection<CompletableFuture<?>> result = new LinkedList<>();
-        result.add(incrementalExecuteEngine.submit(dumper, new ExecuteCallback() {
-            
-            @Override
-            public void onSuccess() {
-            }
-            
-            @Override
-            public void onFailure(final Throwable throwable) {
-                log.error("incremental dumper onFailure, taskId={}", taskId);
-                stop();
-                close();
-            }
-        }));
-        importers.forEach(each -> result.add(incrementalExecuteEngine.submit(each, new ExecuteCallback() {
-            
-            @Override
-            public void onSuccess() {
-            }
-            
-            @Override
-            public void onFailure(final Throwable throwable) {
-                log.error("importer onFailure, taskId={}", taskId);
-                stop();
-                close();
-            }
-        })));
+        result.add(incrementalExecuteEngine.submit(dumper, new JobExecuteCallback(taskId, "incremental dumper")));
+        importers.forEach(each -> result.add(incrementalExecuteEngine.submit(each, new JobExecuteCallback(taskId, "importer"))));
         return result;
     }
     
@@ -160,4 +137,23 @@ public final class IncrementalTask implements PipelineTask, AutoCloseable {
     public void close() {
         channel.close();
     }
+    
+    @RequiredArgsConstructor
+    private class JobExecuteCallback implements ExecuteCallback {
+        
+        private final String taskId;
+        
+        private final String jobType;
+        
+        @Override
+        public void onSuccess() {
+        }
+        
+        @Override
+        public void onFailure(final Throwable throwable) {
+            log.error("{} on failure, task ID={}", jobType, taskId);
+            IncrementalTask.this.stop();
+            IncrementalTask.this.close();
+        }
+    }
 }
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
index 73bd93458bf..e956d09120d 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
@@ -66,7 +66,7 @@ class MultiplexMemoryPipelineChannelTest {
     private void execute(final AckCallback ackCallback, final int recordCount, final Record... records) {
         CountDownLatch countDownLatch = new CountDownLatch(recordCount);
         MultiplexMemoryPipelineChannel memoryChannel = new MultiplexMemoryPipelineChannel(CHANNEL_NUMBER, 10000, ackCallback);
-        fetchWithMultiThreading(memoryChannel, countDownLatch);
+        fetchWithMultiThreads(memoryChannel, countDownLatch);
         for (Record record : records) {
             memoryChannel.pushRecord(record);
         }
@@ -75,19 +75,21 @@ class MultiplexMemoryPipelineChannelTest {
         memoryChannel.close();
     }
     
-    private void fetchWithMultiThreading(final MultiplexMemoryPipelineChannel memoryChannel, final CountDownLatch countDownLatch) {
+    private void fetchWithMultiThreads(final MultiplexMemoryPipelineChannel memoryChannel, final CountDownLatch countDownLatch) {
         for (int i = 0; i < CHANNEL_NUMBER; i++) {
-            new Thread(() -> {
-                int maxLoopCount = 10;
-                for (int j = 1; j <= maxLoopCount; j++) {
-                    List<Record> records = memoryChannel.fetchRecords(100, 1);
-                    memoryChannel.ack(records);
-                    records.forEach(each -> countDownLatch.countDown());
-                    if (!records.isEmpty() && records.get(records.size() - 1) instanceof FinishedRecord) {
-                        break;
-                    }
-                }
-            }).start();
+            new Thread(() -> fetch(memoryChannel, countDownLatch)).start();
+        }
+    }
+    
+    private static void fetch(final MultiplexMemoryPipelineChannel memoryChannel, final CountDownLatch countDownLatch) {
+        int maxLoopCount = 10;
+        for (int j = 1; j <= maxLoopCount; j++) {
+            List<Record> records = memoryChannel.fetchRecords(100, 1);
+            memoryChannel.ack(records);
+            records.forEach(each -> countDownLatch.countDown());
+            if (!records.isEmpty() && records.get(records.size() - 1) instanceof FinishedRecord) {
+                break;
+            }
         }
     }
     
diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
index 2e77848ac66..ae56e0983c3 100644
--- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
+++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
@@ -134,22 +134,24 @@ class MySQLClientTest {
         verify(channel).writeAndFlush(ArgumentMatchers.any(MySQLComBinlogDumpCommandPacket.class));
     }
     
-    @SuppressWarnings("unchecked")
     private void mockChannelResponse(final Object response) {
-        new Thread(() -> {
-            while (true) {
-                Promise<Object> responseCallback;
-                try {
-                    responseCallback = (Promise<Object>) Plugins.getMemberAccessor().get(MySQLClient.class.getDeclaredField("responseCallback"), mysqlClient);
-                } catch (final ReflectiveOperationException ex) {
-                    throw new RuntimeException(ex);
-                }
-                if (null != responseCallback) {
-                    responseCallback.setSuccess(response);
-                    break;
-                }
+        new Thread(() -> mockChannelResponseInThread(response)).start();
+    }
+    
+    @SuppressWarnings("unchecked")
+    private void mockChannelResponseInThread(final Object response) {
+        while (true) {
+            Promise<Object> responseCallback;
+            try {
+                responseCallback = (Promise<Object>) Plugins.getMemberAccessor().get(MySQLClient.class.getDeclaredField("responseCallback"), mysqlClient);
+            } catch (final ReflectiveOperationException ex) {
+                throw new RuntimeException(ex);
+            }
+            if (null != responseCallback) {
+                responseCallback.setSuccess(response);
+                break;
             }
-        }).start();
+        }
     }
     
     @Test
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index ae57c4dc541..b4fee8c9457 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -21,6 +21,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
 import org.apache.shardingsphere.data.pipeline.api.datanode.DataNodeUtils;
+import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
+import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
@@ -85,23 +87,28 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis
         PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
         try {
             AtomicBoolean checkFailed = new AtomicBoolean(false);
-            jobConfig.getJobShardingDataNodes().forEach(each -> each.getEntries().forEach(entry -> entry.getDataNodes().forEach(dataNode -> {
-                if (checkFailed.get()) {
-                    return;
-                }
-                DataConsistencyCheckResult checkResult = checkSingleTable(entry.getLogicTableName(), dataNode, calculateAlgorithm, dataSourceManager);
-                result.put(DataNodeUtils.formatWithSchema(dataNode), checkResult);
-                if (!checkResult.isMatched()) {
-                    log.info("unmatched on table '{}', ignore left tables", each);
-                    checkFailed.set(true);
-                }
-            })));
+            for (JobDataNodeLine each : jobConfig.getJobShardingDataNodes()) {
+                each.getEntries().forEach(entry -> entry.getDataNodes().forEach(dataNode -> check(calculateAlgorithm, result, dataSourceManager, checkFailed, each, entry, dataNode)));
+            }
         } finally {
             dataSourceManager.close();
         }
         return result;
     }
     
+    private void check(final DataConsistencyCalculateAlgorithm calculateAlgorithm, final Map<String, DataConsistencyCheckResult> checkResults, final PipelineDataSourceManager dataSourceManager,
+                       final AtomicBoolean checkFailed, final JobDataNodeLine jobDataNodeLine, final JobDataNodeEntry entry, final DataNode dataNode) {
+        if (checkFailed.get()) {
+            return;
+        }
+        DataConsistencyCheckResult checkResult = checkSingleTable(entry.getLogicTableName(), dataNode, calculateAlgorithm, dataSourceManager);
+        checkResults.put(DataNodeUtils.formatWithSchema(dataNode), checkResult);
+        if (!checkResult.isMatched()) {
+            log.info("unmatched on table '{}', ignore left tables", jobDataNodeLine);
+            checkFailed.set(true);
+        }
+    }
+    
     private DataConsistencyCheckResult checkSingleTable(final String targetTableName, final DataNode dataNode,
                                                         final DataConsistencyCalculateAlgorithm calculateAlgorithm, final PipelineDataSourceManager dataSourceManager) {
         SchemaTableName sourceTable = new SchemaTableName(new SchemaName(dataNode.getSchemaName()), new TableName(dataNode.getTableName()));
diff --git a/mode/type/cluster/repository/provider/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/listener/NamingEventListener.java b/mode/type/cluster/repository/provider/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/listener/NamingEventListener.java
index f586de85350..02d24a9a925 100644
--- a/mode/type/cluster/repository/provider/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/listener/NamingEventListener.java
+++ b/mode/type/cluster/repository/provider/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/listener/NamingEventListener.java
@@ -68,27 +68,29 @@ public final class NamingEventListener implements EventListener {
                     watchDataList.add(watchData);
                 }
             }));
-            watchDataList.forEach(watchData -> {
-                String key = watchData.getKey();
-                Instance preInstance = watchData.getPreInstance();
-                Instance instance = watchData.getInstance();
-                DataChangedEventListener listener = watchData.getListener();
-                DataChangedEvent.Type changedType = getEventChangedType(preInstance, instance);
-                switch (changedType) {
-                    case ADDED:
-                    case UPDATED:
-                        listener.onChange(new DataChangedEvent(key, NacosMetaDataUtils.getValue(instance), changedType));
-                        break;
-                    case DELETED:
-                        listener.onChange(new DataChangedEvent(key, NacosMetaDataUtils.getValue(preInstance), changedType));
-                        break;
-                    default:
-                }
-            });
+            watchDataList.forEach(this::watch);
             setPreInstances(instances);
         }
     }
     
+    private void watch(final WatchData watchData) {
+        String key = watchData.getKey();
+        Instance preInstance = watchData.getPreInstance();
+        Instance instance = watchData.getInstance();
+        DataChangedEventListener listener = watchData.getListener();
+        Type changedType = getEventChangedType(preInstance, instance);
+        switch (changedType) {
+            case ADDED:
+            case UPDATED:
+                listener.onChange(new DataChangedEvent(key, NacosMetaDataUtils.getValue(instance), changedType));
+                break;
+            case DELETED:
+                listener.onChange(new DataChangedEvent(key, NacosMetaDataUtils.getValue(preInstance), changedType));
+                break;
+            default:
+        }
+    }
+    
     private Type getEventChangedType(final Instance preInstance, final Instance instance) {
         if (Objects.isNull(preInstance) && Objects.nonNull(instance)) {
             return DataChangedEvent.Type.ADDED;
diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/common/checker/ReadwriteSplittingRuleConfigurationImportChecker.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/common/checker/ReadwriteSplittingRuleConfigurationImportChecker.java
index 588f553c57e..e2fbe67f01d 100644
--- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/common/checker/ReadwriteSplittingRuleConfigurationImportChecker.java
+++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/common/checker/ReadwriteSplittingRuleConfigurationImportChecker.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.infra.rule.identifier.type.DataSourceContainedR
 import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.readwritesplitting.api.ReadwriteSplittingRuleConfiguration;
+import org.apache.shardingsphere.readwritesplitting.api.rule.ReadwriteSplittingDataSourceRuleConfiguration;
 import org.apache.shardingsphere.readwritesplitting.spi.ReadQueryLoadBalanceAlgorithm;
 
 import java.util.Collection;
@@ -52,7 +53,7 @@ public final class ReadwriteSplittingRuleConfigurationImportChecker {
     private void checkDataSources(final String databaseName, final ShardingSphereDatabase database, final ReadwriteSplittingRuleConfiguration currentRuleConfig) {
         Collection<String> requiredDataSources = new LinkedHashSet<>();
         Collection<String> requiredLogicalDataSources = new LinkedHashSet<>();
-        currentRuleConfig.getDataSources().forEach(each -> {
+        for (ReadwriteSplittingDataSourceRuleConfiguration each : currentRuleConfig.getDataSources()) {
             if (null != each.getDynamicStrategy()) {
                 requiredLogicalDataSources.add(each.getDynamicStrategy().getAutoAwareDataSourceName());
             }
@@ -64,7 +65,7 @@ public final class ReadwriteSplittingRuleConfigurationImportChecker {
                     requiredDataSources.addAll(each.getStaticStrategy().getReadDataSourceNames());
                 }
             }
-        });
+        }
         Collection<String> notExistedDataSources = database.getResourceMetaData().getNotExistedDataSources(requiredDataSources);
         ShardingSpherePreconditions.checkState(notExistedDataSources.isEmpty(), () -> new MissingRequiredStorageUnitsException(databaseName, notExistedDataSources));
         Collection<String> logicalDataSources = getLogicDataSources(database);
diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ConvertYamlConfigurationExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ConvertYamlConfigurationExecutor.java
index d53a31a5b12..8fcc7a2076b 100644
--- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ConvertYamlConfigurationExecutor.java
+++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ConvertYamlConfigurationExecutor.java
@@ -40,6 +40,7 @@ import org.apache.shardingsphere.infra.datasource.props.custom.CustomDataSourceP
 import org.apache.shardingsphere.infra.datasource.props.synonym.PoolPropertySynonyms;
 import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
 import org.apache.shardingsphere.mask.api.config.MaskRuleConfiguration;
 import org.apache.shardingsphere.mask.api.config.rule.MaskColumnRuleConfiguration;
 import org.apache.shardingsphere.mask.api.config.rule.MaskTableRuleConfiguration;
@@ -114,7 +115,7 @@ public final class ConvertYamlConfigurationExecutor implements QueryableRALExecu
     private String generateDistSQL(final YamlProxyDatabaseConfiguration yamlConfig) {
         StringBuilder result = new StringBuilder();
         appendResourceDistSQL(yamlConfig, result);
-        swapToRuleConfigs(yamlConfig).values().forEach(each -> {
+        for (RuleConfiguration each : swapToRuleConfigs(yamlConfig).values()) {
             if (each instanceof ShardingRuleConfiguration) {
                 appendShardingDistSQL((ShardingRuleConfiguration) each, result);
             } else if (each instanceof ReadwriteSplittingRuleConfiguration) {
@@ -128,13 +129,13 @@ public final class ConvertYamlConfigurationExecutor implements QueryableRALExecu
             } else if (each instanceof MaskRuleConfiguration) {
                 appendMaskDistSQL((MaskRuleConfiguration) each, result);
             }
-        });
+        }
         return result.toString();
     }
     
     private Map<Integer, RuleConfiguration> swapToRuleConfigs(final YamlProxyDatabaseConfiguration yamlConfig) {
         Map<Integer, RuleConfiguration> result = new TreeMap<>(Comparator.reverseOrder());
-        yamlConfig.getRules().forEach(each -> {
+        for (YamlRuleConfiguration each : yamlConfig.getRules()) {
             if (each instanceof YamlShardingRuleConfiguration) {
                 YamlShardingRuleConfigurationSwapper swapper = new YamlShardingRuleConfigurationSwapper();
                 result.put(swapper.getOrder(), swapper.swapToObject((YamlShardingRuleConfiguration) each));
@@ -154,7 +155,7 @@ public final class ConvertYamlConfigurationExecutor implements QueryableRALExecu
                 YamlMaskRuleConfigurationSwapper swapper = new YamlMaskRuleConfigurationSwapper();
                 result.put(swapper.getOrder(), swapper.swapToObject((YamlMaskRuleConfiguration) each));
             }
-        });
+        }
         return result;
     }
     
diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ExportStorageNodesExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ExportStorageNodesExecutor.java
index 71359001e91..951610651be 100644
--- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ExportStorageNodesExecutor.java
+++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ExportStorageNodesExecutor.java
@@ -31,12 +31,14 @@ import org.apache.shardingsphere.proxy.backend.distsql.export.ExportedStorageNod
 import org.apache.shardingsphere.proxy.backend.util.ExportUtils;
 import org.apache.shardingsphere.proxy.backend.util.JsonUtils;
 
+import javax.sql.DataSource;
 import java.time.LocalDateTime;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 
 /**
@@ -86,17 +88,17 @@ public final class ExportStorageNodesExecutor implements MetaDataRequiredQueryab
     
     private Map<String, Collection<ExportedStorageNode>> generateDatabaseExportStorageNodesData(final ShardingSphereDatabase database) {
         Map<String, ExportedStorageNode> storageNodes = new LinkedHashMap<>();
-        database.getResourceMetaData().getDataSources().forEach((key, value) -> {
-            DataSourceMetaData dataSourceMetaData = database.getResourceMetaData().getDataSourceMetaData(key);
+        for (Entry<String, DataSource> entry : database.getResourceMetaData().getDataSources().entrySet()) {
+            DataSourceMetaData dataSourceMetaData = database.getResourceMetaData().getDataSourceMetaData(entry.getKey());
             String databaseInstanceIp = getDatabaseInstanceIp(dataSourceMetaData);
             if (storageNodes.containsKey(databaseInstanceIp)) {
-                return;
+                continue;
             }
-            Map<String, Object> standardProperties = DataSourcePropertiesCreator.create(value).getConnectionPropertySynonyms().getStandardProperties();
+            Map<String, Object> standardProperties = DataSourcePropertiesCreator.create(entry.getValue()).getConnectionPropertySynonyms().getStandardProperties();
             ExportedStorageNode exportedStorageNode = new ExportedStorageNode(dataSourceMetaData.getHostname(), String.valueOf(dataSourceMetaData.getPort()),
                     String.valueOf(standardProperties.get("username")), String.valueOf(standardProperties.get("password")), dataSourceMetaData.getCatalog());
             storageNodes.put(databaseInstanceIp, exportedStorageNode);
-        });
+        }
         return Collections.singletonMap(database.getName(), storageNodes.values());
     }
     
diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rql/storage/unit/ShowStorageUnitExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rql/storage/unit/ShowStorageUnitExecutor.java
index cbcec300668..fce2a449f6a 100644
--- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rql/storage/unit/ShowStorageUnitExecutor.java
+++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rql/storage/unit/ShowStorageUnitExecutor.java
@@ -75,17 +75,24 @@ public final class ShowStorageUnitExecutor implements RQLExecutor<ShowStorageUni
         ShardingSphereResourceMetaData resourceMetaData = database.getResourceMetaData();
         Map<String, DataSourceProperties> dataSourcePropsMap = getDataSourcePropsMap(database, sqlStatement);
         Collection<LocalDataQueryResultRow> result = new LinkedList<>();
-        dataSourcePropsMap.forEach((key, value) -> {
-            DataSourceMetaData metaData = resourceMetaData.getDataSourceMetaData(key);
-            DataSourceProperties dataSourceProps = dataSourcePropsMap.get(key);
+        for (String each : dataSourcePropsMap.keySet()) {
+            DataSourceMetaData metaData = resourceMetaData.getDataSourceMetaData(each);
+            DataSourceProperties dataSourceProps = dataSourcePropsMap.get(each);
             Map<String, Object> standardProps = dataSourceProps.getPoolPropertySynonyms().getStandardProperties();
             Map<String, Object> otherProps = dataSourceProps.getCustomDataSourceProperties().getProperties();
-            result.add(new LocalDataQueryResultRow(key, resourceMetaData.getStorageType(key).getType(), metaData.getHostname(), metaData.getPort(), metaData.getCatalog(),
-                    getStandardProperty(standardProps, CONNECTION_TIMEOUT_MILLISECONDS), getStandardProperty(standardProps, IDLE_TIMEOUT_MILLISECONDS),
-                    getStandardProperty(standardProps, MAX_LIFETIME_MILLISECONDS), getStandardProperty(standardProps, MAX_POOL_SIZE),
-                    getStandardProperty(standardProps, MIN_POOL_SIZE), getStandardProperty(standardProps, READ_ONLY),
+            result.add(new LocalDataQueryResultRow(each,
+                    resourceMetaData.getStorageType(each).getType(),
+                    metaData.getHostname(),
+                    metaData.getPort(),
+                    metaData.getCatalog(),
+                    getStandardProperty(standardProps, CONNECTION_TIMEOUT_MILLISECONDS),
+                    getStandardProperty(standardProps, IDLE_TIMEOUT_MILLISECONDS),
+                    getStandardProperty(standardProps, MAX_LIFETIME_MILLISECONDS),
+                    getStandardProperty(standardProps, MAX_POOL_SIZE),
+                    getStandardProperty(standardProps, MIN_POOL_SIZE),
+                    getStandardProperty(standardProps, READ_ONLY),
                     otherProps.isEmpty() ? "" : new Gson().toJson(otherProps)));
-        });
+        }
         return result;
     }
     
diff --git a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/common/checker/ReadwriteSplittingRuleConfigurationImportCheckerTest.java b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/common/checker/ReadwriteSplittingRuleConfigurationImportCheckerTest.java
index ab1b0fa7c72..8d47e1e627b 100644
--- a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/common/checker/ReadwriteSplittingRuleConfigurationImportCheckerTest.java
+++ b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/common/checker/ReadwriteSplittingRuleConfigurationImportCheckerTest.java
@@ -85,6 +85,6 @@ class ReadwriteSplittingRuleConfigurationImportCheckerTest {
     private ReadwriteSplittingRuleConfiguration createInvalidLoadBalancerRuleConfig() {
         Map<String, AlgorithmConfiguration> loadBalancer = new HashMap<>();
         loadBalancer.put("invalid_load_balancer", mock(AlgorithmConfiguration.class));
-        return new ReadwriteSplittingRuleConfiguration(mock(Collection.class), loadBalancer);
+        return new ReadwriteSplittingRuleConfiguration(Collections.emptyList(), loadBalancer);
     }
 }
diff --git a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
index 8e8882e0245..7fef015f041 100644
--- a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
+++ b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
@@ -89,34 +89,36 @@ public final class ShowProcessListExecutor implements DatabaseAdminQueryExecutor
         for (String each : batchProcessContexts) {
             processContexts.addAll(YamlEngine.unmarshal(each, BatchYamlExecuteProcessContext.class).getContexts());
         }
-        List<MemoryQueryResultDataRow> rows = processContexts.stream().map(processContext -> {
-            List<Object> rowValues = new ArrayList<>(8);
-            rowValues.add(processContext.getExecutionID());
-            rowValues.add(processContext.getUsername());
-            rowValues.add(processContext.getHostname());
-            rowValues.add(processContext.getDatabaseName());
-            rowValues.add(ExecuteProcessStatusEnum.SLEEP == processContext.getProcessStatus() ? "Sleep" : "Execute");
-            rowValues.add(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - processContext.getStartTimeMillis()));
-            String sql = null;
-            if (ExecuteProcessStatusEnum.SLEEP != processContext.getProcessStatus()) {
-                int processDoneCount = processContext.getUnitStatuses().stream()
-                        .map(each -> ExecuteProcessStatusEnum.DONE == each.getProcessStatus() ? 1 : 0)
-                        .reduce(0, Integer::sum);
-                String statePrefix = "Executing ";
-                rowValues.add(statePrefix + processDoneCount + "/" + processContext.getUnitStatuses().size());
-                sql = processContext.getSql();
-            } else {
-                rowValues.add("");
-            }
-            if (null != sql && sql.length() > 100) {
-                sql = sql.substring(0, 100);
-            }
-            rowValues.add(null != sql ? sql : "");
-            return new MemoryQueryResultDataRow(rowValues);
-        }).collect(Collectors.toList());
+        List<MemoryQueryResultDataRow> rows = processContexts.stream().map(ShowProcessListExecutor::getMemoryQueryResultDataRow).collect(Collectors.toList());
         return new RawMemoryQueryResult(queryResultMetaData, rows);
     }
     
+    private static MemoryQueryResultDataRow getMemoryQueryResultDataRow(final YamlExecuteProcessContext processContext) {
+        List<Object> rowValues = new ArrayList<>(8);
+        rowValues.add(processContext.getExecutionID());
+        rowValues.add(processContext.getUsername());
+        rowValues.add(processContext.getHostname());
+        rowValues.add(processContext.getDatabaseName());
+        rowValues.add(ExecuteProcessStatusEnum.SLEEP == processContext.getProcessStatus() ? "Sleep" : "Execute");
+        rowValues.add(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - processContext.getStartTimeMillis()));
+        String sql = null;
+        if (ExecuteProcessStatusEnum.SLEEP != processContext.getProcessStatus()) {
+            int processDoneCount = processContext.getUnitStatuses().stream()
+                    .map(each -> ExecuteProcessStatusEnum.DONE == each.getProcessStatus() ? 1 : 0)
+                    .reduce(0, Integer::sum);
+            String statePrefix = "Executing ";
+            rowValues.add(statePrefix + processDoneCount + "/" + processContext.getUnitStatuses().size());
+            sql = processContext.getSql();
+        } else {
+            rowValues.add("");
+        }
+        if (null != sql && sql.length() > 100) {
+            sql = sql.substring(0, 100);
+        }
+        rowValues.add(null != sql ? sql : "");
+        return new MemoryQueryResultDataRow(rowValues);
+    }
+    
     private QueryResultMetaData createQueryResultMetaData() {
         List<RawQueryResultColumnMetaData> columns = new ArrayList<>();
         columns.add(new RawQueryResultColumnMetaData("", "Id", "Id", Types.VARCHAR, "VARCHAR", 20, 0));
diff --git a/src/resources/checkstyle.xml b/src/resources/checkstyle.xml
index 5e9062c55d8..214cc92db22 100644
--- a/src/resources/checkstyle.xml
+++ b/src/resources/checkstyle.xml
@@ -36,41 +36,46 @@
     <module name="SeverityMatchFilter" />
     
     <module name="LineLength">
-        <property name="fileExtensions" value="java"/>
+        <property name="fileExtensions" value="java" />
         <property name="max" value="200" />
     </module>
     
     <module name="TreeWalker">
-        
         <!-- Naming Conventions -->
-        <module name="PackageName">
-            <property name="format" value="^[a-z]+(\.[a-z][a-z0-9]*)*$" />
+        <module name="AbbreviationAsWordInName">
+            <property name="allowedAbbreviationLength" value="8" />
         </module>
-        <module name="TypeName" />
-        <module name="MethodName" />
-        <module name="InterfaceTypeParameterName" />
+        <module name="CatchParameterName" />
         <module name="ClassTypeParameterName" />
-        <module name="MethodTypeParameterName" />
         <module name="ConstantName" />
-        <module name="StaticVariableName" />
-        <module name="MemberName" />
-        <module name="LocalVariableName" />
+        <module name="InterfaceTypeParameterName" />
+        <module name="LambdaParameterName" />
         <module name="LocalFinalVariableName" />
-        <module name="ParameterName" />
-        <module name="CatchParameterName" />
-        <module name="AbbreviationAsWordInName">
-            <property name="allowedAbbreviationLength" value="8" />
+        <module name="LocalVariableName" />
+        <module name="MemberName" />
+        <module name="MethodName" />
+        <module name="MethodTypeParameterName" />
+        <module name="PackageName">
+            <property name="format" value="^[a-z]+(\.[a-z][a-z0-9]*)*$" />
         </module>
+        <module name="ParameterName" />
+        <module name="PatternVariableName" />
+        <module name="RecordComponentName" />
+        <module name="RecordTypeParameterName" />
+        <module name="StaticVariableName" />
+        <module name="TypeName" />
         
         <!-- Size Violations -->
         <module name="AnonInnerLength" />
-        <module name="MethodLength" />
-        <module name="MethodCount" />
         <module name="ExecutableStatementCount">
             <property name="max" value="50" />
         </module>
+        <module name="LambdaBodyLength" />
+        <module name="MethodCount" />
+        <module name="MethodLength" />
         <module name="OuterTypeNumber" />
         <module name="ParameterNumber" />
+        <module name="RecordComponentNumber" />
         
         <!-- Whitespace -->
         <module name="EmptyForInitializerPad" />
diff --git a/src/resources/checkstyle_ci.xml b/src/resources/checkstyle_ci.xml
index eaf4e6f29d7..2e7ab7ae2ed 100644
--- a/src/resources/checkstyle_ci.xml
+++ b/src/resources/checkstyle_ci.xml
@@ -36,36 +36,41 @@
     <module name="SeverityMatchFilter" />
     
     <module name="LineLength">
-        <property name="fileExtensions" value="java"/>
+        <property name="fileExtensions" value="java" />
         <property name="max" value="200" />
     </module>
     
     <module name="TreeWalker">
-        
         <!-- Naming Conventions -->
-        <module name="PackageName">
-            <property name="format" value="^[a-z]+(\.[a-z][a-z0-9]*)*$" />
+        <module name="AbbreviationAsWordInName">
+            <property name="allowedAbbreviationLength" value="8" />
         </module>
-        <module name="TypeName" />
-        <module name="MethodName" />
-        <module name="InterfaceTypeParameterName" />
+        <module name="CatchParameterName" />
         <module name="ClassTypeParameterName" />
-        <module name="MethodTypeParameterName" />
         <module name="ConstantName" />
-        <module name="StaticVariableName" />
-        <module name="MemberName" />
-        <module name="LocalVariableName" />
+        <module name="InterfaceTypeParameterName" />
+        <module name="LambdaParameterName" />
         <module name="LocalFinalVariableName" />
-        <module name="ParameterName" />
-        <module name="CatchParameterName" />
-        <module name="AbbreviationAsWordInName">
-            <property name="allowedAbbreviationLength" value="8" />
+        <module name="LocalVariableName" />
+        <module name="MemberName" />
+        <module name="MethodName" />
+        <module name="MethodTypeParameterName" />
+        <module name="PackageName">
+            <property name="format" value="^[a-z]+(\.[a-z][a-z0-9]*)*$" />
         </module>
+        <module name="ParameterName" />
+        <module name="PatternVariableName" />
+        <module name="RecordComponentName" />
+        <module name="RecordTypeParameterName" />
+        <module name="StaticVariableName" />
+        <module name="TypeName" />
         
         <!-- Size Violations -->
         <module name="AnonInnerLength" />
+        <module name="LambdaBodyLength" />
         <module name="MethodLength" />
         <module name="OuterTypeNumber" />
+        <module name="RecordComponentNumber" />
         
         <!-- Whitespace -->
         <module name="EmptyForInitializerPad" />
diff --git a/test/e2e/env/src/test/java/org/apache/shardingsphere/test/e2e/env/container/wait/JdbcConnectionWaitStrategy.java b/test/e2e/env/src/test/java/org/apache/shardingsphere/test/e2e/env/container/wait/JdbcConnectionWaitStrategy.java
index 4151a07654f..1b5d4130c95 100644
--- a/test/e2e/env/src/test/java/org/apache/shardingsphere/test/e2e/env/container/wait/JdbcConnectionWaitStrategy.java
+++ b/test/e2e/env/src/test/java/org/apache/shardingsphere/test/e2e/env/container/wait/JdbcConnectionWaitStrategy.java
@@ -37,17 +37,19 @@ public final class JdbcConnectionWaitStrategy extends AbstractWaitStrategy {
     
     @Override
     protected void waitUntilReady() {
-        Unreliables.retryUntilSuccess((int) startupTimeout.getSeconds(), TimeUnit.SECONDS, () -> {
-            getRateLimiter().doWhenReady(() -> {
-                try (Connection ignored = connectionSupplier.call()) {
-                    log.info("Container ready.");
-                    // CHECKSTYLE:OFF
-                } catch (final Exception ex) {
-                    // CHECKSTYLE:ON
-                    throw new RuntimeException("Not Ready yet.", ex);
-                }
-            });
-            return true;
+        Unreliables.retryUntilSuccess((int) startupTimeout.getSeconds(), TimeUnit.SECONDS, this::mockRateLimiter);
+    }
+    
+    private boolean mockRateLimiter() {
+        getRateLimiter().doWhenReady(() -> {
+            try (Connection ignored = connectionSupplier.call()) {
+                log.info("Container ready.");
+                // CHECKSTYLE:OFF
+            } catch (final Exception ex) {
+                // CHECKSTYLE:ON
+                throw new RuntimeException("Not Ready yet.", ex);
+            }
         });
+        return true;
     }
 }