You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/04/26 10:39:25 UTC
[shardingsphere] branch master updated: optimization check highly available status (#17049)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e1be9173ca0 optimization check highly available status (#17049)
e1be9173ca0 is described below
commit e1be9173ca0ba58d5de1fc40fb674c08d9b9b94e
Author: weihubeats <we...@163.com>
AuthorDate: Tue Apr 26 18:39:17 2022 +0800
optimization check highly available status (#17049)
* optimization check highly available status
* user ExecutorEngine
* remove redundant code
* method process datasource
---
.../shardingsphere-db-discovery-core/pom.xml | 5 ++
.../algorithm/DatabaseDiscoveryEngine.java | 24 ++++++++--
.../DatabaseDiscoveryExecutorCallback.java | 54 ++++++++++++++++++++++
3 files changed, 79 insertions(+), 4 deletions(-)
diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/pom.xml b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/pom.xml
index abb500a86c3..b4289aca81b 100644
--- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/pom.xml
+++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/pom.xml
@@ -62,6 +62,11 @@
<artifactId>shardingsphere-schedule-core</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-infra-executor</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java
index 852d226c394..91cdb2a6a59 100644
--- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java
+++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java
@@ -17,11 +17,16 @@
package org.apache.shardingsphere.dbdiscovery.algorithm;
+import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryProviderAlgorithm;
import org.apache.shardingsphere.dbdiscovery.spi.ReplicaDataSourceStatus;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorDataMap;
import org.apache.shardingsphere.infra.metadata.schema.QualifiedDatabase;
import org.apache.shardingsphere.infra.rule.event.impl.DataSourceDisabledEvent;
import org.apache.shardingsphere.infra.rule.event.impl.PrimaryDataSourceChangedEvent;
@@ -31,6 +36,7 @@ import org.apache.shardingsphere.infra.storage.StorageNodeStatus;
import javax.sql.DataSource;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -43,9 +49,11 @@ import java.util.Optional;
@RequiredArgsConstructor
@Slf4j
public final class DatabaseDiscoveryEngine {
+
+ private static final int CPU_CORES = Runtime.getRuntime().availableProcessors();
private final DatabaseDiscoveryProviderAlgorithm databaseDiscoveryProviderAlgorithm;
-
+
/**
* Check environment of database cluster.
*
@@ -54,12 +62,20 @@ public final class DatabaseDiscoveryEngine {
* @throws SQLException SQL exception
*/
public void checkEnvironment(final String databaseName, final Map<String, DataSource> dataSourceMap) throws SQLException {
+ ExecutorEngine executorEngine = new ExecutorEngine(Math.min(CPU_CORES * 2, dataSourceMap.isEmpty() ? 1 : dataSourceMap.entrySet().size()));
+ ExecutorDataMap.getValue().put(DatabaseDiscoveryExecutorCallback.DATABASE_NAME, databaseName);
+ executorEngine.execute(createExecutionGroupContext(dataSourceMap), new DatabaseDiscoveryExecutorCallback(databaseDiscoveryProviderAlgorithm));
+ }
+
+ private ExecutionGroupContext<DataSource> createExecutionGroupContext(final Map<String, DataSource> dataSourceMap) {
+ Collection<ExecutionGroup<DataSource>> inputGroups = new ArrayList<>();
for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
- // TODO query with multiple threads
- databaseDiscoveryProviderAlgorithm.checkEnvironment(databaseName, entry.getValue());
+ ExecutionGroup<DataSource> executionGroup = new ExecutionGroup<>(Lists.newArrayList(entry.getValue()));
+ inputGroups.add(executionGroup);
}
+ return new ExecutionGroupContext<>(inputGroups);
}
-
+
/**
* Change primary data source.
*
diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryExecutorCallback.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryExecutorCallback.java
new file mode 100644
index 00000000000..5a564f59f1f
--- /dev/null
+++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryExecutorCallback.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.dbdiscovery.algorithm;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryProviderAlgorithm;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+@RequiredArgsConstructor
+public class DatabaseDiscoveryExecutorCallback implements ExecutorCallback<DataSource, String> {
+
+ public static final String DATABASE_NAME = "databaseName";
+
+ private static final String SUCCEED = "succeed";
+
+ private final DatabaseDiscoveryProviderAlgorithm databaseDiscoveryProviderAlgorithm;
+
+ @Override
+ public Collection<String> execute(final Collection<DataSource> inputs, final boolean isTrunkThread, final Map<String, Object> dataMap) throws SQLException {
+ List<String> result = new LinkedList<>();
+ String databaseName = (String) dataMap.get(DATABASE_NAME);
+ inputs.forEach(dataSource -> {
+ try {
+ databaseDiscoveryProviderAlgorithm.checkEnvironment(databaseName, dataSource);
+ result.add("succeed");
+ } catch (SQLException e) {
+ throw new IllegalStateException(String.format("Error while loading highly available Status with %s", dataSource), e);
+ }
+ });
+ return result;
+ }
+}