You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/04/24 05:55:06 UTC
[shardingsphere] branch master updated: Refactor DatabaseDiscoveryType.updateMemberState() (#17046)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 541745e5386 Refactor DatabaseDiscoveryType.updateMemberState() (#17046)
541745e5386 is described below
commit 541745e5386a25920fdfc1afa3329e08a164367e
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Sun Apr 24 13:55:01 2022 +0800
Refactor DatabaseDiscoveryType.updateMemberState() (#17046)
---
.../type/mgr/MGRMySQLDatabaseDiscoveryType.java | 54 +++++++++++-----------
...ormalReplicationMySQLDatabaseDiscoveryType.java | 14 ++++--
...aussNormalReplicationDatabaseDiscoveryType.java | 12 +++--
3 files changed, 42 insertions(+), 38 deletions(-)
diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/mgr/MGRMySQLDatabaseDiscoveryType.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/mgr/MGRMySQLDatabaseDiscoveryType.java
index 9824d9ef8c9..c0eee856aab 100644
--- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/mgr/MGRMySQLDatabaseDiscoveryType.java
+++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/mgr/MGRMySQLDatabaseDiscoveryType.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.dbdiscovery.mysql.type.mgr;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.dbdiscovery.mysql.AbstractMySQLDatabaseDiscoveryType;
+import org.apache.shardingsphere.infra.database.metadata.dialect.MySQLDataSourceMetaData;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.rule.event.impl.DataSourceDisabledEvent;
import org.apache.shardingsphere.infra.storage.StorageNodeDataSource;
@@ -27,6 +28,7 @@ import org.apache.shardingsphere.infra.storage.StorageNodeStatus;
import javax.sql.DataSource;
import java.sql.Connection;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -53,6 +55,8 @@ public final class MGRMySQLDatabaseDiscoveryType extends AbstractMySQLDatabaseDi
private static final String QUERY_PRIMARY_DATA_SOURCE = "SELECT MEMBER_HOST, MEMBER_PORT FROM performance_schema.replication_group_members WHERE MEMBER_ID = "
+ "(SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'group_replication_primary_member')";
+ private static final String QUERY_CURRENT_MEMBER_STATE = "SELECT MEMBER_STATE FROM performance_schema.replication_group_members WHERE MEMBER_HOST=? AND MEMBER_PORT=?";
+
@Override
public MGRHighlyAvailableStatus loadHighlyAvailableStatus(final DataSource dataSource) throws SQLException {
try (
@@ -102,42 +106,26 @@ public final class MGRMySQLDatabaseDiscoveryType extends AbstractMySQLDatabaseDi
@Override
public void updateMemberState(final String databaseName, final Map<String, DataSource> dataSourceMap, final String groupName) {
- Collection<String> memberDatabaseInstanceURLs = findMemberDatabaseInstanceURLs(dataSourceMap);
- if (memberDatabaseInstanceURLs.isEmpty()) {
- return;
- }
for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
if (!entry.getKey().equals(getPrimaryDataSource())) {
- StorageNodeStatus storageNodeStatus = isDisabledDataSource(memberDatabaseInstanceURLs, entry.getValue()) ? StorageNodeStatus.DISABLED : StorageNodeStatus.ENABLED;
- ShardingSphereEventBus.getInstance().post(new DataSourceDisabledEvent(databaseName, groupName, entry.getKey(), new StorageNodeDataSource(StorageNodeRole.MEMBER, storageNodeStatus)));
+ postDataSourceDisabledEvent(databaseName, groupName, entry.getKey(), entry.getValue());
}
}
}
- private Collection<String> findMemberDatabaseInstanceURLs(final Map<String, DataSource> dataSourceMap) {
- Collection<String> result = new LinkedList<>();
- try (
- Connection connection = dataSourceMap.get(getPrimaryDataSource()).getConnection();
- Statement statement = connection.createStatement();
- ResultSet resultSet = statement.executeQuery(QUERY_MEMBER_LIST)) {
- while (resultSet.next()) {
- if ("ONLINE".equals(resultSet.getString("MEMBER_STATE"))) {
- result.add(String.format("%s:%s", resultSet.getString("MEMBER_HOST"), resultSet.getString("MEMBER_PORT")));
- }
- }
- } catch (final SQLException ex) {
- log.error("An exception occurred while find member data source urls", ex);
- }
- return result;
+ private void postDataSourceDisabledEvent(final String databaseName, final String groupName, final String replicaDataSourceName, final DataSource replicaDataSource) {
+ StorageNodeDataSource storageNodeDataSource = getStorageNodeDataSource(replicaDataSource);
+ ShardingSphereEventBus.getInstance().post(new DataSourceDisabledEvent(databaseName, groupName, replicaDataSourceName, storageNodeDataSource));
+ }
+
+ private StorageNodeDataSource getStorageNodeDataSource(final DataSource replicaDataSource) {
+ return new StorageNodeDataSource(StorageNodeRole.MEMBER, isDisabledDataSource(replicaDataSource) ? StorageNodeStatus.DISABLED : StorageNodeStatus.ENABLED);
}
- private boolean isDisabledDataSource(final Collection<String> memberDataSourceURLs, final DataSource dataSource) {
- try (Connection connection = dataSource.getConnection()) {
- String url = connection.getMetaData().getURL();
- for (String each : memberDataSourceURLs) {
- if (null != url && url.contains(each)) {
- return false;
- }
+ private boolean isDisabledDataSource(final DataSource replicaDataSource) {
+ try (Connection connection = replicaDataSource.getConnection()) {
+ if (isOnlineDataSource(connection, new MySQLDataSourceMetaData(connection.getMetaData().getURL()))) {
+ return false;
}
} catch (final SQLException ex) {
log.error("An exception occurred while find data source urls", ex);
@@ -145,6 +133,16 @@ public final class MGRMySQLDatabaseDiscoveryType extends AbstractMySQLDatabaseDi
return true;
}
+ private boolean isOnlineDataSource(final Connection connection, final MySQLDataSourceMetaData metaData) throws SQLException {
+ try (PreparedStatement preparedStatement = connection.prepareStatement(QUERY_CURRENT_MEMBER_STATE)) {
+ preparedStatement.setString(1, metaData.getHostname());
+ preparedStatement.setString(2, Integer.toString(metaData.getPort()));
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ return resultSet.next() && "ONLINE".equals(resultSet.getString("MEMBER_STATE"));
+ }
+ }
+ }
+
@Override
public String getType() {
return "MySQL.MGR";
diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/replication/MySQLNormalReplicationMySQLDatabaseDiscoveryType.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/replication/MySQLNormalReplicationMySQLD [...]
index 88b03079f19..4f4a965a01a 100644
--- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/replication/MySQLNormalReplicationMySQLDatabaseDiscoveryType.java
+++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/replication/MySQLNormalReplicationMySQLDatabaseDiscoveryType.java
@@ -69,24 +69,28 @@ public final class MySQLNormalReplicationMySQLDatabaseDiscoveryType extends Abst
public void updateMemberState(final String databaseName, final Map<String, DataSource> dataSourceMap, final String groupName) {
for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
if (!entry.getKey().equals(getPrimaryDataSource())) {
- postDataSourceDisabledEvent(databaseName, entry.getKey(), entry.getValue(), groupName);
+ postDataSourceDisabledEvent(databaseName, groupName, entry.getKey(), entry.getValue());
}
}
}
- private void postDataSourceDisabledEvent(final String databaseName, final String datasourceName, final DataSource dataSource, final String groupName) {
+ private void postDataSourceDisabledEvent(final String databaseName, final String groupName, final String datasourceName, final DataSource replicaDataSource) {
+ ShardingSphereEventBus.getInstance().post(new DataSourceDisabledEvent(databaseName, groupName, datasourceName, getStorageNodeDataSource(replicaDataSource)));
+ }
+
+ private StorageNodeDataSource getStorageNodeDataSource(final DataSource replicaDataSource) {
try (
- Connection connection = dataSource.getConnection();
+ Connection connection = replicaDataSource.getConnection();
Statement statement = connection.createStatement()) {
long replicationDelayMilliseconds = loadReplicationDelayMilliseconds(statement);
StorageNodeStatus storageNodeStatus = replicationDelayMilliseconds < Long.parseLong(getProps().getProperty("delay-milliseconds-threshold"))
? StorageNodeStatus.ENABLED
: StorageNodeStatus.DISABLED;
- ShardingSphereEventBus.getInstance().post(
- new DataSourceDisabledEvent(databaseName, groupName, datasourceName, new StorageNodeDataSource(StorageNodeRole.MEMBER, storageNodeStatus, replicationDelayMilliseconds)));
+ return new StorageNodeDataSource(StorageNodeRole.MEMBER, storageNodeStatus, replicationDelayMilliseconds);
} catch (SQLException ex) {
log.error("An exception occurred while find member data source `Seconds_Behind_Master`", ex);
}
+ return new StorageNodeDataSource(StorageNodeRole.MEMBER, StorageNodeStatus.DISABLED);
}
private long loadReplicationDelayMilliseconds(final Statement statement) throws SQLException {
diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-opengauss/src/main/java/org/apache/shardingsphere/dbdiscovery/opengauss/replication/OpenGaussNormalReplicationDatabaseDiscoveryType.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-opengauss/src/main/java/org/apache/shardingsphere/dbdiscovery/opengauss/replication/OpenGaussNormalReplicat [...]
index 3999209281f..4efc78c1a77 100644
--- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-opengauss/src/main/java/org/apache/shardingsphere/dbdiscovery/opengauss/replication/OpenGaussNormalReplicationDatabaseDiscoveryType.java
+++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-opengauss/src/main/java/org/apache/shardingsphere/dbdiscovery/opengauss/replication/OpenGaussNormalReplicationDatabaseDiscoveryType.java
@@ -81,17 +81,19 @@ public final class OpenGaussNormalReplicationDatabaseDiscoveryType implements Da
@Override
public void updateMemberState(final String databaseName, final Map<String, DataSource> dataSourceMap, final String groupName) {
for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
- StorageNodeStatus storageNodeStatus = isDisabledDataSource(entry.getKey(), entry.getValue()) ? StorageNodeStatus.DISABLED : StorageNodeStatus.ENABLED;
- ShardingSphereEventBus.getInstance().post(new DataSourceDisabledEvent(databaseName, groupName, entry.getKey(), new StorageNodeDataSource(StorageNodeRole.MEMBER, storageNodeStatus)));
+ if (!entry.getKey().equals(primaryDataSource)) {
+ StorageNodeStatus storageNodeStatus = isDisabledDataSource(entry.getValue()) ? StorageNodeStatus.DISABLED : StorageNodeStatus.ENABLED;
+ ShardingSphereEventBus.getInstance().post(new DataSourceDisabledEvent(databaseName, groupName, entry.getKey(), new StorageNodeDataSource(StorageNodeRole.MEMBER, storageNodeStatus)));
+ }
}
}
- private boolean isDisabledDataSource(final String dataSourceName, final DataSource dataSource) {
+ private boolean isDisabledDataSource(final DataSource replicaDataSource) {
try (
- Connection connection = dataSource.getConnection();
+ Connection connection = replicaDataSource.getConnection();
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(QUERY_DB_ROLE)) {
- if (resultSet.next() && ((resultSet.getString("local_role").equals("Standby") && resultSet.getString("db_state").equals("Normal")) || dataSourceName.equals(primaryDataSource))) {
+ if (resultSet.next() && resultSet.getString("local_role").equals("Standby") && resultSet.getString("db_state").equals("Normal")) {
return false;
}
} catch (final SQLException ex) {