You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2020/12/25 04:19:24 UTC

[shardingsphere] branch master updated: Refactor DriverJDBCExecutor (#8771)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bea065  Refactor DriverJDBCExecutor (#8771)
0bea065 is described below

commit 0bea065e0c9ac5ebd3de040c0942db4e704720f0
Author: Haoran Meng <me...@gmail.com>
AuthorDate: Fri Dec 25 12:18:56 2020 +0800

    Refactor DriverJDBCExecutor (#8771)
---
 .../driver/executor/DriverJDBCExecutor.java        | 54 ++++++++++------------
 1 file changed, 25 insertions(+), 29 deletions(-)

diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
index 1639850..1bab80e 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
@@ -93,36 +93,10 @@ public final class DriverJDBCExecutor {
      */
     public int executeUpdate(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups, 
                              final SQLStatementContext<?> sqlStatementContext, final Collection<RouteUnit> routeUnits, final JDBCExecutorCallback<Integer> callback) throws SQLException {
-        List<Integer> results;
-        boolean locked = false;
-        try {
-            locked = tryLock(sqlStatementContext.getSqlStatement(), metaDataContexts.getProps().<Long>getValue(ConfigurationPropertyKey.LOCK_WAIT_TIMEOUT_MILLISECONDS));
-            results = jdbcExecutor.execute(executionGroups, callback);
-            refreshSchema(metaDataContexts.getDefaultMetaData(), sqlStatementContext.getSqlStatement(), routeUnits);
-        } finally {
-            if (locked) {
-                releaseLock();
-            }
-        }
+        List<Integer> results = doExecute(executionGroups, sqlStatementContext.getSqlStatement(), routeUnits, callback);
         return isNeedAccumulate(metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules(), sqlStatementContext) ? accumulate(results) : results.get(0);
     }
     
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    private void refreshSchema(final ShardingSphereMetaData metaData, final SQLStatement sqlStatement, final Collection<RouteUnit> routeUnits) throws SQLException {
-        Optional<SchemaRefresher> schemaRefresher = SchemaRefresherFactory.newInstance(sqlStatement);
-        if (schemaRefresher.isPresent()) {
-            Collection<String> routeDataSourceNames = routeUnits.stream().map(each -> each.getDataSourceMapper().getLogicName()).collect(Collectors.toList());
-            SchemaBuilderMaterials materials = new SchemaBuilderMaterials(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(), 
-                    dataSourceMap, metaData.getRuleMetaData().getRules(), metaDataContexts.getProps());
-            schemaRefresher.get().refresh(metaData.getSchema(), routeDataSourceNames, sqlStatement, materials);
-            notifySchemaChanged(metaData.getSchema());
-        }
-    }
-    
-    private void notifySchemaChanged(final ShardingSphereSchema schema) {
-        OrderedSPIRegistry.getRegisteredServices(Collections.singletonList(schema), SchemaChangedNotifier.class).values().forEach(each -> each.notify(DefaultSchema.LOGIC_NAME, schema));
-    }
-    
     private boolean isNeedAccumulate(final Collection<ShardingSphereRule> rules, final SQLStatementContext<?> sqlStatementContext) {
         return rules.stream().anyMatch(each -> each instanceof DataNodeContainedRule && ((DataNodeContainedRule) each).isNeedAccumulate(sqlStatementContext.getTablesContext().getTableNames()));
     }
@@ -143,7 +117,13 @@ public final class DriverJDBCExecutor {
      */
     public boolean execute(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups, final SQLStatement sqlStatement,
                            final Collection<RouteUnit> routeUnits, final JDBCExecutorCallback<Boolean> callback) throws SQLException {
-        List<Boolean> results;
+        List<Boolean> results = doExecute(executionGroups, sqlStatement, routeUnits, callback);
+        return null != results && !results.isEmpty() && null != results.get(0) && results.get(0);
+    }
+    
+    private <T> List<T> doExecute(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups, final SQLStatement sqlStatement, 
+                                  final Collection<RouteUnit> routeUnits, final JDBCExecutorCallback<T> callback) throws SQLException {
+        List<T> results;
         boolean locked = false;
         try {
             locked = tryLock(sqlStatement, metaDataContexts.getProps().<Long>getValue(ConfigurationPropertyKey.LOCK_WAIT_TIMEOUT_MILLISECONDS));
@@ -154,7 +134,7 @@ public final class DriverJDBCExecutor {
                 releaseLock();
             }
         }
-        return null != results && !results.isEmpty() && null != results.get(0) && results.get(0);
+        return results;
     }
     
     private boolean tryLock(final SQLStatement sqlStatement, final long lockTimeoutMilliseconds) {
@@ -178,6 +158,22 @@ public final class DriverJDBCExecutor {
         }
     }
     
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private void refreshSchema(final ShardingSphereMetaData metaData, final SQLStatement sqlStatement, final Collection<RouteUnit> routeUnits) throws SQLException {
+        Optional<SchemaRefresher> schemaRefresher = SchemaRefresherFactory.newInstance(sqlStatement);
+        if (schemaRefresher.isPresent()) {
+            Collection<String> routeDataSourceNames = routeUnits.stream().map(each -> each.getDataSourceMapper().getLogicName()).collect(Collectors.toList());
+            SchemaBuilderMaterials materials = new SchemaBuilderMaterials(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(),
+                    dataSourceMap, metaData.getRuleMetaData().getRules(), metaDataContexts.getProps());
+            schemaRefresher.get().refresh(metaData.getSchema(), routeDataSourceNames, sqlStatement, materials);
+            notifySchemaChanged(metaData.getSchema());
+        }
+    }
+    
+    private void notifySchemaChanged(final ShardingSphereSchema schema) {
+        OrderedSPIRegistry.getRegisteredServices(Collections.singletonList(schema), SchemaChangedNotifier.class).values().forEach(each -> each.notify(DefaultSchema.LOGIC_NAME, schema));
+    }
+    
     private void releaseLock() {
         LockContext.getLockStrategy().releaseLock();
     }