You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/04/24 05:55:06 UTC

[shardingsphere] branch master updated: Refactor DatabaseDiscoveryType.updateMemberState() (#17046)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 541745e5386 Refactor DatabaseDiscoveryType.updateMemberState() (#17046)
541745e5386 is described below

commit 541745e5386a25920fdfc1afa3329e08a164367e
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Sun Apr 24 13:55:01 2022 +0800

    Refactor DatabaseDiscoveryType.updateMemberState() (#17046)
---
 .../type/mgr/MGRMySQLDatabaseDiscoveryType.java    | 54 +++++++++++-----------
 ...ormalReplicationMySQLDatabaseDiscoveryType.java | 14 ++++--
 ...aussNormalReplicationDatabaseDiscoveryType.java | 12 +++--
 3 files changed, 42 insertions(+), 38 deletions(-)

diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/mgr/MGRMySQLDatabaseDiscoveryType.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/mgr/MGRMySQLDatabaseDiscoveryType.java
index 9824d9ef8c9..c0eee856aab 100644
--- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/mgr/MGRMySQLDatabaseDiscoveryType.java
+++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/mgr/MGRMySQLDatabaseDiscoveryType.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.dbdiscovery.mysql.type.mgr;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.dbdiscovery.mysql.AbstractMySQLDatabaseDiscoveryType;
+import org.apache.shardingsphere.infra.database.metadata.dialect.MySQLDataSourceMetaData;
 import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
 import org.apache.shardingsphere.infra.rule.event.impl.DataSourceDisabledEvent;
 import org.apache.shardingsphere.infra.storage.StorageNodeDataSource;
@@ -27,6 +28,7 @@ import org.apache.shardingsphere.infra.storage.StorageNodeStatus;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -53,6 +55,8 @@ public final class MGRMySQLDatabaseDiscoveryType extends AbstractMySQLDatabaseDi
     private static final String QUERY_PRIMARY_DATA_SOURCE = "SELECT MEMBER_HOST, MEMBER_PORT FROM performance_schema.replication_group_members WHERE MEMBER_ID = "
             + "(SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'group_replication_primary_member')";
     
+    private static final String QUERY_CURRENT_MEMBER_STATE = "SELECT MEMBER_STATE FROM performance_schema.replication_group_members WHERE MEMBER_HOST=? AND MEMBER_PORT=?";
+    
     @Override
     public MGRHighlyAvailableStatus loadHighlyAvailableStatus(final DataSource dataSource) throws SQLException {
         try (
@@ -102,42 +106,26 @@ public final class MGRMySQLDatabaseDiscoveryType extends AbstractMySQLDatabaseDi
     
     @Override
     public void updateMemberState(final String databaseName, final Map<String, DataSource> dataSourceMap, final String groupName) {
-        Collection<String> memberDatabaseInstanceURLs = findMemberDatabaseInstanceURLs(dataSourceMap);
-        if (memberDatabaseInstanceURLs.isEmpty()) {
-            return;
-        }
         for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
             if (!entry.getKey().equals(getPrimaryDataSource())) {
-                StorageNodeStatus storageNodeStatus = isDisabledDataSource(memberDatabaseInstanceURLs, entry.getValue()) ? StorageNodeStatus.DISABLED : StorageNodeStatus.ENABLED;
-                ShardingSphereEventBus.getInstance().post(new DataSourceDisabledEvent(databaseName, groupName, entry.getKey(), new StorageNodeDataSource(StorageNodeRole.MEMBER, storageNodeStatus)));
+                postDataSourceDisabledEvent(databaseName, groupName, entry.getKey(), entry.getValue());
             }
         }
     }
     
-    private Collection<String> findMemberDatabaseInstanceURLs(final Map<String, DataSource> dataSourceMap) {
-        Collection<String> result = new LinkedList<>();
-        try (
-                Connection connection = dataSourceMap.get(getPrimaryDataSource()).getConnection();
-                Statement statement = connection.createStatement();
-                ResultSet resultSet = statement.executeQuery(QUERY_MEMBER_LIST)) {
-            while (resultSet.next()) {
-                if ("ONLINE".equals(resultSet.getString("MEMBER_STATE"))) {
-                    result.add(String.format("%s:%s", resultSet.getString("MEMBER_HOST"), resultSet.getString("MEMBER_PORT")));
-                }
-            }
-        } catch (final SQLException ex) {
-            log.error("An exception occurred while find member data source urls", ex);
-        }
-        return result;
+    private void postDataSourceDisabledEvent(final String databaseName, final String groupName, final String replicaDataSourceName, final DataSource replicaDataSource) {
+        StorageNodeDataSource storageNodeDataSource = getStorageNodeDataSource(replicaDataSource);
+        ShardingSphereEventBus.getInstance().post(new DataSourceDisabledEvent(databaseName, groupName, replicaDataSourceName, storageNodeDataSource));
+    }
+    
+    private StorageNodeDataSource getStorageNodeDataSource(final DataSource replicaDataSource) {
+        return new StorageNodeDataSource(StorageNodeRole.MEMBER, isDisabledDataSource(replicaDataSource) ? StorageNodeStatus.DISABLED : StorageNodeStatus.ENABLED);
     }
     
-    private boolean isDisabledDataSource(final Collection<String> memberDataSourceURLs, final DataSource dataSource) {
-        try (Connection connection = dataSource.getConnection()) {
-            String url = connection.getMetaData().getURL();
-            for (String each : memberDataSourceURLs) {
-                if (null != url && url.contains(each)) {
-                    return false;
-                }
+    private boolean isDisabledDataSource(final DataSource replicaDataSource) {
+        try (Connection connection = replicaDataSource.getConnection()) {
+            if (isOnlineDataSource(connection, new MySQLDataSourceMetaData(connection.getMetaData().getURL()))) {
+                return false;
             }
         } catch (final SQLException ex) {
             log.error("An exception occurred while find data source urls", ex);
@@ -145,6 +133,16 @@ public final class MGRMySQLDatabaseDiscoveryType extends AbstractMySQLDatabaseDi
         return true;
     }
     
+    private boolean isOnlineDataSource(final Connection connection, final MySQLDataSourceMetaData metaData) throws SQLException {
+        try (PreparedStatement preparedStatement = connection.prepareStatement(QUERY_CURRENT_MEMBER_STATE)) {
+            preparedStatement.setString(1, metaData.getHostname());
+            preparedStatement.setString(2, Integer.toString(metaData.getPort()));
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                return resultSet.next() && "ONLINE".equals(resultSet.getString("MEMBER_STATE"));
+            }
+        }
+    }
+    
     @Override
     public String getType() {
         return "MySQL.MGR";
diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/replication/MySQLNormalReplicationMySQLDatabaseDiscoveryType.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/replication/MySQLNormalReplicationMySQLD [...]
index 88b03079f19..4f4a965a01a 100644
--- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/replication/MySQLNormalReplicationMySQLDatabaseDiscoveryType.java
+++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/replication/MySQLNormalReplicationMySQLDatabaseDiscoveryType.java
@@ -69,24 +69,28 @@ public final class MySQLNormalReplicationMySQLDatabaseDiscoveryType extends Abst
     public void updateMemberState(final String databaseName, final Map<String, DataSource> dataSourceMap, final String groupName) {
         for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
             if (!entry.getKey().equals(getPrimaryDataSource())) {
-                postDataSourceDisabledEvent(databaseName, entry.getKey(), entry.getValue(), groupName);
+                postDataSourceDisabledEvent(databaseName, groupName, entry.getKey(), entry.getValue());
             }
         }
     }
     
-    private void postDataSourceDisabledEvent(final String databaseName, final String datasourceName, final DataSource dataSource, final String groupName) {
+    private void postDataSourceDisabledEvent(final String databaseName, final String groupName, final String datasourceName, final DataSource replicaDataSource) {
+        ShardingSphereEventBus.getInstance().post(new DataSourceDisabledEvent(databaseName, groupName, datasourceName, getStorageNodeDataSource(replicaDataSource)));
+    }
+    
+    private StorageNodeDataSource getStorageNodeDataSource(final DataSource replicaDataSource) {
         try (
-                Connection connection = dataSource.getConnection();
+                Connection connection = replicaDataSource.getConnection();
                 Statement statement = connection.createStatement()) {
             long replicationDelayMilliseconds = loadReplicationDelayMilliseconds(statement);
             StorageNodeStatus storageNodeStatus = replicationDelayMilliseconds < Long.parseLong(getProps().getProperty("delay-milliseconds-threshold"))
                     ? StorageNodeStatus.ENABLED
                     : StorageNodeStatus.DISABLED;
-            ShardingSphereEventBus.getInstance().post(
-                    new DataSourceDisabledEvent(databaseName, groupName, datasourceName, new StorageNodeDataSource(StorageNodeRole.MEMBER, storageNodeStatus, replicationDelayMilliseconds)));
+            return new StorageNodeDataSource(StorageNodeRole.MEMBER, storageNodeStatus, replicationDelayMilliseconds);
         } catch (SQLException ex) {
             log.error("An exception occurred while find member data source `Seconds_Behind_Master`", ex);
         }
+        return new StorageNodeDataSource(StorageNodeRole.MEMBER, StorageNodeStatus.DISABLED);
     }
     
     private long loadReplicationDelayMilliseconds(final Statement statement) throws SQLException {
diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-opengauss/src/main/java/org/apache/shardingsphere/dbdiscovery/opengauss/replication/OpenGaussNormalReplicationDatabaseDiscoveryType.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-opengauss/src/main/java/org/apache/shardingsphere/dbdiscovery/opengauss/replication/OpenGaussNormalReplicat [...]
index 3999209281f..4efc78c1a77 100644
--- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-opengauss/src/main/java/org/apache/shardingsphere/dbdiscovery/opengauss/replication/OpenGaussNormalReplicationDatabaseDiscoveryType.java
+++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-opengauss/src/main/java/org/apache/shardingsphere/dbdiscovery/opengauss/replication/OpenGaussNormalReplicationDatabaseDiscoveryType.java
@@ -81,17 +81,19 @@ public final class OpenGaussNormalReplicationDatabaseDiscoveryType implements Da
     @Override
     public void updateMemberState(final String databaseName, final Map<String, DataSource> dataSourceMap, final String groupName) {
         for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
-            StorageNodeStatus storageNodeStatus = isDisabledDataSource(entry.getKey(), entry.getValue()) ? StorageNodeStatus.DISABLED : StorageNodeStatus.ENABLED;
-            ShardingSphereEventBus.getInstance().post(new DataSourceDisabledEvent(databaseName, groupName, entry.getKey(), new StorageNodeDataSource(StorageNodeRole.MEMBER, storageNodeStatus)));
+            if (!entry.getKey().equals(primaryDataSource)) {
+                StorageNodeStatus storageNodeStatus = isDisabledDataSource(entry.getValue()) ? StorageNodeStatus.DISABLED : StorageNodeStatus.ENABLED;
+                ShardingSphereEventBus.getInstance().post(new DataSourceDisabledEvent(databaseName, groupName, entry.getKey(), new StorageNodeDataSource(StorageNodeRole.MEMBER, storageNodeStatus)));
+            }
         }
     }
     
-    private boolean isDisabledDataSource(final String dataSourceName, final DataSource dataSource) {
+    private boolean isDisabledDataSource(final DataSource replicaDataSource) {
         try (
-                Connection connection = dataSource.getConnection();
+                Connection connection = replicaDataSource.getConnection();
                 Statement statement = connection.createStatement();
                 ResultSet resultSet = statement.executeQuery(QUERY_DB_ROLE)) {
-            if (resultSet.next() && ((resultSet.getString("local_role").equals("Standby") && resultSet.getString("db_state").equals("Normal")) || dataSourceName.equals(primaryDataSource))) {
+            if (resultSet.next() && resultSet.getString("local_role").equals("Standby") && resultSet.getString("db_state").equals("Normal")) {
                 return false;
             }
         } catch (final SQLException ex) {