You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2020/12/25 04:19:24 UTC
[shardingsphere] branch master updated: Refactor DriverJDBCExecutor
(#8771)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0bea065 Refactor DriverJDBCExecutor (#8771)
0bea065 is described below
commit 0bea065e0c9ac5ebd3de040c0942db4e704720f0
Author: Haoran Meng <me...@gmail.com>
AuthorDate: Fri Dec 25 12:18:56 2020 +0800
Refactor DriverJDBCExecutor (#8771)
---
.../driver/executor/DriverJDBCExecutor.java | 54 ++++++++++------------
1 file changed, 25 insertions(+), 29 deletions(-)
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
index 1639850..1bab80e 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
@@ -93,36 +93,10 @@ public final class DriverJDBCExecutor {
*/
public int executeUpdate(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups,
final SQLStatementContext<?> sqlStatementContext, final Collection<RouteUnit> routeUnits, final JDBCExecutorCallback<Integer> callback) throws SQLException {
- List<Integer> results;
- boolean locked = false;
- try {
- locked = tryLock(sqlStatementContext.getSqlStatement(), metaDataContexts.getProps().<Long>getValue(ConfigurationPropertyKey.LOCK_WAIT_TIMEOUT_MILLISECONDS));
- results = jdbcExecutor.execute(executionGroups, callback);
- refreshSchema(metaDataContexts.getDefaultMetaData(), sqlStatementContext.getSqlStatement(), routeUnits);
- } finally {
- if (locked) {
- releaseLock();
- }
- }
+ List<Integer> results = doExecute(executionGroups, sqlStatementContext.getSqlStatement(), routeUnits, callback);
return isNeedAccumulate(metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules(), sqlStatementContext) ? accumulate(results) : results.get(0);
}
- @SuppressWarnings({"unchecked", "rawtypes"})
- private void refreshSchema(final ShardingSphereMetaData metaData, final SQLStatement sqlStatement, final Collection<RouteUnit> routeUnits) throws SQLException {
- Optional<SchemaRefresher> schemaRefresher = SchemaRefresherFactory.newInstance(sqlStatement);
- if (schemaRefresher.isPresent()) {
- Collection<String> routeDataSourceNames = routeUnits.stream().map(each -> each.getDataSourceMapper().getLogicName()).collect(Collectors.toList());
- SchemaBuilderMaterials materials = new SchemaBuilderMaterials(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(),
- dataSourceMap, metaData.getRuleMetaData().getRules(), metaDataContexts.getProps());
- schemaRefresher.get().refresh(metaData.getSchema(), routeDataSourceNames, sqlStatement, materials);
- notifySchemaChanged(metaData.getSchema());
- }
- }
-
- private void notifySchemaChanged(final ShardingSphereSchema schema) {
- OrderedSPIRegistry.getRegisteredServices(Collections.singletonList(schema), SchemaChangedNotifier.class).values().forEach(each -> each.notify(DefaultSchema.LOGIC_NAME, schema));
- }
-
private boolean isNeedAccumulate(final Collection<ShardingSphereRule> rules, final SQLStatementContext<?> sqlStatementContext) {
return rules.stream().anyMatch(each -> each instanceof DataNodeContainedRule && ((DataNodeContainedRule) each).isNeedAccumulate(sqlStatementContext.getTablesContext().getTableNames()));
}
@@ -143,7 +117,13 @@ public final class DriverJDBCExecutor {
*/
public boolean execute(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups, final SQLStatement sqlStatement,
final Collection<RouteUnit> routeUnits, final JDBCExecutorCallback<Boolean> callback) throws SQLException {
- List<Boolean> results;
+ List<Boolean> results = doExecute(executionGroups, sqlStatement, routeUnits, callback);
+ return null != results && !results.isEmpty() && null != results.get(0) && results.get(0);
+ }
+
+ private <T> List<T> doExecute(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups, final SQLStatement sqlStatement,
+ final Collection<RouteUnit> routeUnits, final JDBCExecutorCallback<T> callback) throws SQLException {
+ List<T> results;
boolean locked = false;
try {
locked = tryLock(sqlStatement, metaDataContexts.getProps().<Long>getValue(ConfigurationPropertyKey.LOCK_WAIT_TIMEOUT_MILLISECONDS));
@@ -154,7 +134,7 @@ public final class DriverJDBCExecutor {
releaseLock();
}
}
- return null != results && !results.isEmpty() && null != results.get(0) && results.get(0);
+ return results;
}
private boolean tryLock(final SQLStatement sqlStatement, final long lockTimeoutMilliseconds) {
@@ -178,6 +158,22 @@ public final class DriverJDBCExecutor {
}
}
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private void refreshSchema(final ShardingSphereMetaData metaData, final SQLStatement sqlStatement, final Collection<RouteUnit> routeUnits) throws SQLException {
+ Optional<SchemaRefresher> schemaRefresher = SchemaRefresherFactory.newInstance(sqlStatement);
+ if (schemaRefresher.isPresent()) {
+ Collection<String> routeDataSourceNames = routeUnits.stream().map(each -> each.getDataSourceMapper().getLogicName()).collect(Collectors.toList());
+ SchemaBuilderMaterials materials = new SchemaBuilderMaterials(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(),
+ dataSourceMap, metaData.getRuleMetaData().getRules(), metaDataContexts.getProps());
+ schemaRefresher.get().refresh(metaData.getSchema(), routeDataSourceNames, sqlStatement, materials);
+ notifySchemaChanged(metaData.getSchema());
+ }
+ }
+
+ private void notifySchemaChanged(final ShardingSphereSchema schema) {
+ OrderedSPIRegistry.getRegisteredServices(Collections.singletonList(schema), SchemaChangedNotifier.class).values().forEach(each -> each.notify(DefaultSchema.LOGIC_NAME, schema));
+ }
+
private void releaseLock() {
LockContext.getLockStrategy().releaseLock();
}