You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2020/11/15 14:20:52 UTC

[shardingsphere] branch master updated: Rename refreshSchema (#8166)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 86c5a51  Rename refreshSchema (#8166)
86c5a51 is described below

commit 86c5a5120521e6e150ff3ee0e0f3a1b2c70f45bb
Author: Liang Zhang <te...@163.com>
AuthorDate: Sun Nov 15 22:20:27 2020 +0800

    Rename refreshSchema (#8166)
---
 .../driver/executor/AbstractStatementExecutor.java | 21 ++++++++++-----------
 .../driver/executor/PreparedStatementExecutor.java |  2 +-
 .../driver/executor/StatementExecutor.java         |  2 +-
 .../jdbc/JDBCDatabaseCommunicationEngine.java      | 22 +++++++++++++---------
 4 files changed, 25 insertions(+), 22 deletions(-)

diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/AbstractStatementExecutor.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/AbstractStatementExecutor.java
index 01d24a2..d83da91 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/AbstractStatementExecutor.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/AbstractStatementExecutor.java
@@ -33,7 +33,6 @@ import org.apache.shardingsphere.infra.metadata.schema.builder.SchemaBuilderMate
 import org.apache.shardingsphere.infra.metadata.schema.refresher.SchemaRefresher;
 import org.apache.shardingsphere.infra.metadata.schema.refresher.SchemaRefresherFactory;
 import org.apache.shardingsphere.infra.metadata.schema.refresher.spi.SchemaChangedNotifier;
-import org.apache.shardingsphere.infra.route.context.RouteMapper;
 import org.apache.shardingsphere.infra.route.context.RouteUnit;
 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 import org.apache.shardingsphere.infra.rule.type.DataNodeContainedRule;
@@ -76,30 +75,30 @@ public abstract class AbstractStatementExecutor {
     }
     
     @SuppressWarnings({"unchecked", "rawtypes"})
-    protected final void refreshTableMetaData(final ShardingSphereMetaData metaData, final SQLStatement sqlStatement, final Collection<RouteUnit> routeUnits) throws SQLException {
+    protected final void refreshSchema(final ShardingSphereMetaData metaData, final SQLStatement sqlStatement, final Collection<RouteUnit> routeUnits) throws SQLException {
         if (null == sqlStatement) {
             return;
         }
         Optional<SchemaRefresher> schemaRefresher = SchemaRefresherFactory.newInstance(sqlStatement);
         if (schemaRefresher.isPresent()) {
-            Collection<String> routeDataSourceNames = routeUnits.stream().map(RouteUnit::getDataSourceMapper).map(RouteMapper::getLogicName).collect(Collectors.toList());
-            schemaRefresher.get().refresh(metaData.getSchema(), routeDataSourceNames, sqlStatement, 
-                    new SchemaBuilderMaterials(metaDataContexts.getDatabaseType(), dataSourceMap, metaData.getRuleMetaData().getRules(), metaDataContexts.getProps()));
-            notifyPersistSchema(DefaultSchema.LOGIC_NAME, metaData.getSchema());
+            Collection<String> routeDataSourceNames = routeUnits.stream().map(each -> each.getDataSourceMapper().getLogicName()).collect(Collectors.toList());
+            SchemaBuilderMaterials materials = new SchemaBuilderMaterials(metaDataContexts.getDatabaseType(), dataSourceMap, metaData.getRuleMetaData().getRules(), metaDataContexts.getProps());
+            schemaRefresher.get().refresh(metaData.getSchema(), routeDataSourceNames, sqlStatement, materials);
+            notifySchemaChanged(DefaultSchema.LOGIC_NAME, metaData.getSchema());
         }
     }
     
+    private void notifySchemaChanged(final String schemaName, final ShardingSphereSchema schema) {
+        OrderedSPIRegistry.getRegisteredServices(Collections.singletonList(schema), SchemaChangedNotifier.class).values().forEach(each -> each.notify(schemaName, schema));
+    }
+    
     protected final boolean executeAndRefreshMetaData(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatement sqlStatement,
                                                 final Collection<RouteUnit> routeUnits, final SQLExecutorCallback<Boolean> sqlExecutorCallback) throws SQLException {
         List<Boolean> result = sqlExecutor.execute(inputGroups, sqlExecutorCallback);
-        refreshTableMetaData(metaDataContexts.getDefaultMetaData(), sqlStatement, routeUnits);
+        refreshSchema(metaDataContexts.getDefaultMetaData(), sqlStatement, routeUnits);
         return null != result && !result.isEmpty() && null != result.get(0) && result.get(0);
     }
     
-    private void notifyPersistSchema(final String schemaName, final ShardingSphereSchema schema) {
-        OrderedSPIRegistry.getRegisteredServices(Collections.singletonList(schema), SchemaChangedNotifier.class).values().forEach(each -> each.notify(schemaName, schema));
-    }
-    
     /**
      * Execute SQL.
      *
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/PreparedStatementExecutor.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/PreparedStatementExecutor.java
index 1ef3b5f..57fab5d 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/PreparedStatementExecutor.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/PreparedStatementExecutor.java
@@ -81,7 +81,7 @@ public final class PreparedStatementExecutor extends AbstractStatementExecutor {
         boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
         SQLExecutorCallback<Integer> sqlExecutorCallback = createDefaultSQLExecutorCallbackWithInteger(isExceptionThrown);
         List<Integer> results = getSqlExecutor().execute(inputGroups, sqlExecutorCallback);
-        refreshTableMetaData(getMetaDataContexts().getDefaultMetaData(), sqlStatementContext.getSqlStatement(), routeUnits);
+        refreshSchema(getMetaDataContexts().getDefaultMetaData(), sqlStatementContext.getSqlStatement(), routeUnits);
         return isNeedAccumulate(getMetaDataContexts().getDefaultMetaData().getRuleMetaData().getRules().stream().filter(
             rule -> rule instanceof DataNodeContainedRule).collect(Collectors.toList()), sqlStatementContext) ? accumulate(results) : results.get(0);
     }
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/StatementExecutor.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/StatementExecutor.java
index 7fba381..c3ee746 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/StatementExecutor.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/StatementExecutor.java
@@ -132,7 +132,7 @@ public final class StatementExecutor extends AbstractStatementExecutor {
             }
         };
         List<Integer> results = getSqlExecutor().execute(inputGroups, sqlExecutorCallback);
-        refreshTableMetaData(getMetaDataContexts().getDefaultMetaData(), sqlStatementContext.getSqlStatement(), routeUnits);
+        refreshSchema(getMetaDataContexts().getDefaultMetaData(), sqlStatementContext.getSqlStatement(), routeUnits);
         if (isNeedAccumulate(getMetaDataContexts().getDefaultMetaData().getRuleMetaData().getRules().stream().filter(
             rule -> rule instanceof DataNodeContainedRule).collect(Collectors.toList()), sqlStatementContext)) {
             return accumulate(results);
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
index abf6c69..e8a3cb5 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
@@ -18,8 +18,6 @@
 package org.apache.shardingsphere.proxy.backend.communication.jdbc;
 
 import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.governance.core.event.GovernanceEventBus;
-import org.apache.shardingsphere.governance.core.event.model.schema.SchemaPersistEvent;
 import org.apache.shardingsphere.infra.binder.LogicSQL;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
@@ -31,12 +29,13 @@ import org.apache.shardingsphere.infra.executor.sql.raw.execute.result.query.Que
 import org.apache.shardingsphere.infra.merge.MergeEngine;
 import org.apache.shardingsphere.infra.merge.result.MergedResult;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
 import org.apache.shardingsphere.infra.metadata.schema.builder.SchemaBuilderMaterials;
 import org.apache.shardingsphere.infra.metadata.schema.refresher.SchemaRefresher;
 import org.apache.shardingsphere.infra.metadata.schema.refresher.SchemaRefresherFactory;
-import org.apache.shardingsphere.infra.route.context.RouteMapper;
-import org.apache.shardingsphere.infra.route.context.RouteUnit;
+import org.apache.shardingsphere.infra.metadata.schema.refresher.spi.SchemaChangedNotifier;
 import org.apache.shardingsphere.infra.rule.type.DataNodeContainedRule;
+import org.apache.shardingsphere.infra.spi.ordered.OrderedSPIRegistry;
 import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.execute.SQLExecuteEngine;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -49,6 +48,7 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -90,26 +90,30 @@ public final class JDBCDatabaseCommunicationEngine implements DatabaseCommunicat
         }
         sqlExecuteEngine.checkExecutePrerequisites(executionContext);
         response = sqlExecuteEngine.execute(executionContext);
-        Collection<String> routeDataSourceNames = executionContext.getRouteContext().getRouteUnits().stream()
-                .map(RouteUnit::getDataSourceMapper).map(RouteMapper::getLogicName).collect(Collectors.toList());
-        refreshTableMetaData(executionContext.getSqlStatementContext().getSqlStatement(), routeDataSourceNames);
+        refreshSchema(executionContext);
         return merge(executionContext.getSqlStatementContext());
     }
     
     @SuppressWarnings({"unchecked", "rawtypes"})
-    private void refreshTableMetaData(final SQLStatement sqlStatement, final Collection<String> routeDataSourceNames) throws SQLException {
+    private void refreshSchema(final ExecutionContext executionContext) throws SQLException {
+        SQLStatement sqlStatement = executionContext.getSqlStatementContext().getSqlStatement();
         if (null == sqlStatement) {
             return;
         }
         Optional<SchemaRefresher> schemaRefresher = SchemaRefresherFactory.newInstance(sqlStatement);
         if (schemaRefresher.isPresent()) {
+            Collection<String> routeDataSourceNames = executionContext.getRouteContext().getRouteUnits().stream().map(each -> each.getDataSourceMapper().getLogicName()).collect(Collectors.toList());
             SchemaBuilderMaterials materials = new SchemaBuilderMaterials(ProxyContext.getInstance().getMetaDataContexts().getDatabaseType(), 
                     metaData.getResource().getDataSources(), metaData.getRuleMetaData().getRules(), ProxyContext.getInstance().getMetaDataContexts().getProps());
             schemaRefresher.get().refresh(metaData.getSchema(), routeDataSourceNames, sqlStatement, materials);
-            GovernanceEventBus.getInstance().post(new SchemaPersistEvent(metaData.getName(), metaData.getSchema()));
+            notifySchemaChanged(metaData.getName(), metaData.getSchema());
         }
     }
     
+    private void notifySchemaChanged(final String schemaName, final ShardingSphereSchema schema) {
+        OrderedSPIRegistry.getRegisteredServices(Collections.singletonList(schema), SchemaChangedNotifier.class).values().forEach(each -> each.notify(schemaName, schema));
+    }
+    
     private BackendResponse merge(final SQLStatementContext<?> sqlStatementContext) throws SQLException {
         if (response instanceof UpdateResponse) {
             mergeUpdateCount(sqlStatementContext);