You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/04/26 10:39:25 UTC

[shardingsphere] branch master updated: optimization check highly available status (#17049)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new e1be9173ca0 optimization check highly available status (#17049)
e1be9173ca0 is described below

commit e1be9173ca0ba58d5de1fc40fb674c08d9b9b94e
Author: weihubeats <we...@163.com>
AuthorDate: Tue Apr 26 18:39:17 2022 +0800

    optimization check highly available status (#17049)
    
    * optimization check highly available status
    
    * user ExecutorEngine
    
    * remove redundant code
    
    * method process datasource
---
 .../shardingsphere-db-discovery-core/pom.xml       |  5 ++
 .../algorithm/DatabaseDiscoveryEngine.java         | 24 ++++++++--
 .../DatabaseDiscoveryExecutorCallback.java         | 54 ++++++++++++++++++++++
 3 files changed, 79 insertions(+), 4 deletions(-)

diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/pom.xml b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/pom.xml
index abb500a86c3..b4289aca81b 100644
--- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/pom.xml
+++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/pom.xml
@@ -62,6 +62,11 @@
             <artifactId>shardingsphere-schedule-core</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-infra-executor</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         
         <dependency>
             <groupId>org.apache.shardingsphere</groupId>
diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java
index 852d226c394..91cdb2a6a59 100644
--- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java
+++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java
@@ -17,11 +17,16 @@
 
 package org.apache.shardingsphere.dbdiscovery.algorithm;
 
+import com.google.common.collect.Lists;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryProviderAlgorithm;
 import org.apache.shardingsphere.dbdiscovery.spi.ReplicaDataSourceStatus;
 import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorDataMap;
 import org.apache.shardingsphere.infra.metadata.schema.QualifiedDatabase;
 import org.apache.shardingsphere.infra.rule.event.impl.DataSourceDisabledEvent;
 import org.apache.shardingsphere.infra.rule.event.impl.PrimaryDataSourceChangedEvent;
@@ -31,6 +36,7 @@ import org.apache.shardingsphere.infra.storage.StorageNodeStatus;
 
 import javax.sql.DataSource;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -43,9 +49,11 @@ import java.util.Optional;
 @RequiredArgsConstructor
 @Slf4j
 public final class DatabaseDiscoveryEngine {
+
+    private static final int CPU_CORES = Runtime.getRuntime().availableProcessors();
     
     private final DatabaseDiscoveryProviderAlgorithm databaseDiscoveryProviderAlgorithm;
-    
+
     /**
      * Check environment of database cluster.
      *
@@ -54,12 +62,20 @@ public final class DatabaseDiscoveryEngine {
      * @throws SQLException SQL exception
      */
     public void checkEnvironment(final String databaseName, final Map<String, DataSource> dataSourceMap) throws SQLException {
+        ExecutorEngine executorEngine = new ExecutorEngine(Math.min(CPU_CORES * 2, dataSourceMap.isEmpty() ? 1 : dataSourceMap.entrySet().size()));
+        ExecutorDataMap.getValue().put(DatabaseDiscoveryExecutorCallback.DATABASE_NAME, databaseName);
+        executorEngine.execute(createExecutionGroupContext(dataSourceMap), new DatabaseDiscoveryExecutorCallback(databaseDiscoveryProviderAlgorithm));
+    }
+
+    private ExecutionGroupContext<DataSource> createExecutionGroupContext(final Map<String, DataSource> dataSourceMap) {
+        Collection<ExecutionGroup<DataSource>> inputGroups = new ArrayList<>();
         for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
-            // TODO query with multiple threads
-            databaseDiscoveryProviderAlgorithm.checkEnvironment(databaseName, entry.getValue());
+            ExecutionGroup<DataSource> executionGroup = new ExecutionGroup<>(Lists.newArrayList(entry.getValue()));
+            inputGroups.add(executionGroup);
         }
+        return new ExecutionGroupContext<>(inputGroups);
     }
-    
+
     /**
      * Change primary data source.
      *
diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryExecutorCallback.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryExecutorCallback.java
new file mode 100644
index 00000000000..5a564f59f1f
--- /dev/null
+++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryExecutorCallback.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.dbdiscovery.algorithm;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryProviderAlgorithm;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+@RequiredArgsConstructor
+public class DatabaseDiscoveryExecutorCallback implements ExecutorCallback<DataSource, String> {
+
+    public static final String DATABASE_NAME = "databaseName";
+
+    private static final String SUCCEED = "succeed";
+
+    private final DatabaseDiscoveryProviderAlgorithm databaseDiscoveryProviderAlgorithm;
+
+    @Override
+    public Collection<String> execute(final Collection<DataSource> inputs, final boolean isTrunkThread, final Map<String, Object> dataMap) throws SQLException {
+        List<String> result = new LinkedList<>();
+        String databaseName = (String) dataMap.get(DATABASE_NAME);
+        inputs.forEach(dataSource -> {
+            try {
+                databaseDiscoveryProviderAlgorithm.checkEnvironment(databaseName, dataSource);
+                result.add("succeed");
+            } catch (SQLException e) {
+                throw new IllegalStateException(String.format("Error while loading highly available Status with %s", dataSource), e);
+            }
+        });
+        return result;
+    }
+}