You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2020/11/15 14:20:52 UTC
[shardingsphere] branch master updated: Rename refreshSchema (#8166)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 86c5a51 Rename refreshSchema (#8166)
86c5a51 is described below
commit 86c5a5120521e6e150ff3ee0e0f3a1b2c70f45bb
Author: Liang Zhang <te...@163.com>
AuthorDate: Sun Nov 15 22:20:27 2020 +0800
Rename refreshSchema (#8166)
---
.../driver/executor/AbstractStatementExecutor.java | 21 ++++++++++-----------
.../driver/executor/PreparedStatementExecutor.java | 2 +-
.../driver/executor/StatementExecutor.java | 2 +-
.../jdbc/JDBCDatabaseCommunicationEngine.java | 22 +++++++++++++---------
4 files changed, 25 insertions(+), 22 deletions(-)
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/AbstractStatementExecutor.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/AbstractStatementExecutor.java
index 01d24a2..d83da91 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/AbstractStatementExecutor.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/AbstractStatementExecutor.java
@@ -33,7 +33,6 @@ import org.apache.shardingsphere.infra.metadata.schema.builder.SchemaBuilderMate
import org.apache.shardingsphere.infra.metadata.schema.refresher.SchemaRefresher;
import org.apache.shardingsphere.infra.metadata.schema.refresher.SchemaRefresherFactory;
import org.apache.shardingsphere.infra.metadata.schema.refresher.spi.SchemaChangedNotifier;
-import org.apache.shardingsphere.infra.route.context.RouteMapper;
import org.apache.shardingsphere.infra.route.context.RouteUnit;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.rule.type.DataNodeContainedRule;
@@ -76,30 +75,30 @@ public abstract class AbstractStatementExecutor {
}
@SuppressWarnings({"unchecked", "rawtypes"})
- protected final void refreshTableMetaData(final ShardingSphereMetaData metaData, final SQLStatement sqlStatement, final Collection<RouteUnit> routeUnits) throws SQLException {
+ protected final void refreshSchema(final ShardingSphereMetaData metaData, final SQLStatement sqlStatement, final Collection<RouteUnit> routeUnits) throws SQLException {
if (null == sqlStatement) {
return;
}
Optional<SchemaRefresher> schemaRefresher = SchemaRefresherFactory.newInstance(sqlStatement);
if (schemaRefresher.isPresent()) {
- Collection<String> routeDataSourceNames = routeUnits.stream().map(RouteUnit::getDataSourceMapper).map(RouteMapper::getLogicName).collect(Collectors.toList());
- schemaRefresher.get().refresh(metaData.getSchema(), routeDataSourceNames, sqlStatement,
- new SchemaBuilderMaterials(metaDataContexts.getDatabaseType(), dataSourceMap, metaData.getRuleMetaData().getRules(), metaDataContexts.getProps()));
- notifyPersistSchema(DefaultSchema.LOGIC_NAME, metaData.getSchema());
+ Collection<String> routeDataSourceNames = routeUnits.stream().map(each -> each.getDataSourceMapper().getLogicName()).collect(Collectors.toList());
+ SchemaBuilderMaterials materials = new SchemaBuilderMaterials(metaDataContexts.getDatabaseType(), dataSourceMap, metaData.getRuleMetaData().getRules(), metaDataContexts.getProps());
+ schemaRefresher.get().refresh(metaData.getSchema(), routeDataSourceNames, sqlStatement, materials);
+ notifySchemaChanged(DefaultSchema.LOGIC_NAME, metaData.getSchema());
}
}
+ private void notifySchemaChanged(final String schemaName, final ShardingSphereSchema schema) {
+ OrderedSPIRegistry.getRegisteredServices(Collections.singletonList(schema), SchemaChangedNotifier.class).values().forEach(each -> each.notify(schemaName, schema));
+ }
+
protected final boolean executeAndRefreshMetaData(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatement sqlStatement,
final Collection<RouteUnit> routeUnits, final SQLExecutorCallback<Boolean> sqlExecutorCallback) throws SQLException {
List<Boolean> result = sqlExecutor.execute(inputGroups, sqlExecutorCallback);
- refreshTableMetaData(metaDataContexts.getDefaultMetaData(), sqlStatement, routeUnits);
+ refreshSchema(metaDataContexts.getDefaultMetaData(), sqlStatement, routeUnits);
return null != result && !result.isEmpty() && null != result.get(0) && result.get(0);
}
- private void notifyPersistSchema(final String schemaName, final ShardingSphereSchema schema) {
- OrderedSPIRegistry.getRegisteredServices(Collections.singletonList(schema), SchemaChangedNotifier.class).values().forEach(each -> each.notify(schemaName, schema));
- }
-
/**
* Execute SQL.
*
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/PreparedStatementExecutor.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/PreparedStatementExecutor.java
index 1ef3b5f..57fab5d 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/PreparedStatementExecutor.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/PreparedStatementExecutor.java
@@ -81,7 +81,7 @@ public final class PreparedStatementExecutor extends AbstractStatementExecutor {
boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
SQLExecutorCallback<Integer> sqlExecutorCallback = createDefaultSQLExecutorCallbackWithInteger(isExceptionThrown);
List<Integer> results = getSqlExecutor().execute(inputGroups, sqlExecutorCallback);
- refreshTableMetaData(getMetaDataContexts().getDefaultMetaData(), sqlStatementContext.getSqlStatement(), routeUnits);
+ refreshSchema(getMetaDataContexts().getDefaultMetaData(), sqlStatementContext.getSqlStatement(), routeUnits);
return isNeedAccumulate(getMetaDataContexts().getDefaultMetaData().getRuleMetaData().getRules().stream().filter(
rule -> rule instanceof DataNodeContainedRule).collect(Collectors.toList()), sqlStatementContext) ? accumulate(results) : results.get(0);
}
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/StatementExecutor.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/StatementExecutor.java
index 7fba381..c3ee746 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/StatementExecutor.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/StatementExecutor.java
@@ -132,7 +132,7 @@ public final class StatementExecutor extends AbstractStatementExecutor {
}
};
List<Integer> results = getSqlExecutor().execute(inputGroups, sqlExecutorCallback);
- refreshTableMetaData(getMetaDataContexts().getDefaultMetaData(), sqlStatementContext.getSqlStatement(), routeUnits);
+ refreshSchema(getMetaDataContexts().getDefaultMetaData(), sqlStatementContext.getSqlStatement(), routeUnits);
if (isNeedAccumulate(getMetaDataContexts().getDefaultMetaData().getRuleMetaData().getRules().stream().filter(
rule -> rule instanceof DataNodeContainedRule).collect(Collectors.toList()), sqlStatementContext)) {
return accumulate(results);
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
index abf6c69..e8a3cb5 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
@@ -18,8 +18,6 @@
package org.apache.shardingsphere.proxy.backend.communication.jdbc;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.governance.core.event.GovernanceEventBus;
-import org.apache.shardingsphere.governance.core.event.model.schema.SchemaPersistEvent;
import org.apache.shardingsphere.infra.binder.LogicSQL;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
@@ -31,12 +29,13 @@ import org.apache.shardingsphere.infra.executor.sql.raw.execute.result.query.Que
import org.apache.shardingsphere.infra.merge.MergeEngine;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.schema.builder.SchemaBuilderMaterials;
import org.apache.shardingsphere.infra.metadata.schema.refresher.SchemaRefresher;
import org.apache.shardingsphere.infra.metadata.schema.refresher.SchemaRefresherFactory;
-import org.apache.shardingsphere.infra.route.context.RouteMapper;
-import org.apache.shardingsphere.infra.route.context.RouteUnit;
+import org.apache.shardingsphere.infra.metadata.schema.refresher.spi.SchemaChangedNotifier;
import org.apache.shardingsphere.infra.rule.type.DataNodeContainedRule;
+import org.apache.shardingsphere.infra.spi.ordered.OrderedSPIRegistry;
import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.execute.SQLExecuteEngine;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -49,6 +48,7 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -90,26 +90,30 @@ public final class JDBCDatabaseCommunicationEngine implements DatabaseCommunicat
}
sqlExecuteEngine.checkExecutePrerequisites(executionContext);
response = sqlExecuteEngine.execute(executionContext);
- Collection<String> routeDataSourceNames = executionContext.getRouteContext().getRouteUnits().stream()
- .map(RouteUnit::getDataSourceMapper).map(RouteMapper::getLogicName).collect(Collectors.toList());
- refreshTableMetaData(executionContext.getSqlStatementContext().getSqlStatement(), routeDataSourceNames);
+ refreshSchema(executionContext);
return merge(executionContext.getSqlStatementContext());
}
@SuppressWarnings({"unchecked", "rawtypes"})
- private void refreshTableMetaData(final SQLStatement sqlStatement, final Collection<String> routeDataSourceNames) throws SQLException {
+ private void refreshSchema(final ExecutionContext executionContext) throws SQLException {
+ SQLStatement sqlStatement = executionContext.getSqlStatementContext().getSqlStatement();
if (null == sqlStatement) {
return;
}
Optional<SchemaRefresher> schemaRefresher = SchemaRefresherFactory.newInstance(sqlStatement);
if (schemaRefresher.isPresent()) {
+ Collection<String> routeDataSourceNames = executionContext.getRouteContext().getRouteUnits().stream().map(each -> each.getDataSourceMapper().getLogicName()).collect(Collectors.toList());
SchemaBuilderMaterials materials = new SchemaBuilderMaterials(ProxyContext.getInstance().getMetaDataContexts().getDatabaseType(),
metaData.getResource().getDataSources(), metaData.getRuleMetaData().getRules(), ProxyContext.getInstance().getMetaDataContexts().getProps());
schemaRefresher.get().refresh(metaData.getSchema(), routeDataSourceNames, sqlStatement, materials);
- GovernanceEventBus.getInstance().post(new SchemaPersistEvent(metaData.getName(), metaData.getSchema()));
+ notifySchemaChanged(metaData.getName(), metaData.getSchema());
}
}
+ private void notifySchemaChanged(final String schemaName, final ShardingSphereSchema schema) {
+ OrderedSPIRegistry.getRegisteredServices(Collections.singletonList(schema), SchemaChangedNotifier.class).values().forEach(each -> each.notify(schemaName, schema));
+ }
+
private BackendResponse merge(final SQLStatementContext<?> sqlStatementContext) throws SQLException {
if (response instanceof UpdateResponse) {
mergeUpdateCount(sqlStatementContext);