You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2020/09/09 05:59:52 UTC

[shardingsphere] branch master updated: extract an abstract class for StatementExecutor&PreparedStatementExecutor

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 1cdc030  extract an abstract class for StatementExecutor&PreparedStatementExecutor
     new b7b96c0  Merge pull request #7343 from kimmking/dev1
1cdc030 is described below

commit 1cdc0302388298fd09ecb46b4c403eb61a3eae10
Author: kimmking <ki...@apache.org>
AuthorDate: Wed Sep 9 12:34:32 2020 +0800

    extract an abstract class for StatementExecutor&PreparedStatementExecutor
---
 .../driver/executor/AbstractStatementExecutor.java | 126 +++++++++++++++++++++
 .../driver/executor/PreparedStatementExecutor.java | 103 +++--------------
 .../driver/executor/StatementExecutor.java         | 122 ++++----------------
 3 files changed, 166 insertions(+), 185 deletions(-)

diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/AbstractStatementExecutor.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/AbstractStatementExecutor.java
new file mode 100644
index 0000000..e0e93c3
--- /dev/null
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/AbstractStatementExecutor.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.driver.executor;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.context.SchemaContext;
+import org.apache.shardingsphere.infra.context.SchemaContexts;
+import org.apache.shardingsphere.infra.database.DefaultSchema;
+import org.apache.shardingsphere.infra.executor.kernel.InputGroup;
+import org.apache.shardingsphere.infra.executor.sql.QueryResult;
+import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.StatementExecuteUnit;
+import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.executor.SQLExecutor;
+import org.apache.shardingsphere.infra.metadata.refresh.MetaDataRefreshStrategy;
+import org.apache.shardingsphere.infra.metadata.refresh.MetaDataRefreshStrategyFactory;
+import org.apache.shardingsphere.infra.metadata.schema.RuleSchemaMetaData;
+import org.apache.shardingsphere.infra.metadata.schema.RuleSchemaMetaDataLoader;
+import org.apache.shardingsphere.infra.metadata.schema.spi.RuleMetaDataNotifier;
+import org.apache.shardingsphere.infra.rule.DataNodeRoutedRule;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.spi.type.TypedSPIRegistry;
+import org.apache.shardingsphere.sql.parser.binder.statement.SQLStatementContext;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Abstract statement executor.
+ */
+@Getter
+@RequiredArgsConstructor
+public abstract class AbstractStatementExecutor {
+    
+    static {
+        ShardingSphereServiceLoader.register(RuleMetaDataNotifier.class);
+    }
+    
+    private final Map<String, DataSource> dataSourceMap;
+    
+    private final SchemaContexts schemaContexts;
+    
+    private final SQLExecutor sqlExecutor;
+    
+    protected boolean isNeedAccumulate(final Collection<ShardingSphereRule> rules, final SQLStatementContext<?> sqlStatementContext) {
+        return rules.stream().anyMatch(each -> ((DataNodeRoutedRule) each).isNeedAccumulate(sqlStatementContext.getTablesContext().getTableNames()));
+    }
+    
+    protected int accumulate(final List<Integer> results) {
+        int result = 0;
+        for (Integer each : results) {
+            result += null == each ? 0 : each;
+        }
+        return result;
+    }
+    
+    @SuppressWarnings("unchecked")
+    protected void refreshTableMetaData(final SchemaContext schemaContext, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
+        if (null == sqlStatementContext) {
+            return;
+        }
+        Optional<MetaDataRefreshStrategy> refreshStrategy = MetaDataRefreshStrategyFactory.newInstance(sqlStatementContext);
+        if (refreshStrategy.isPresent()) {
+            RuleSchemaMetaDataLoader metaDataLoader = new RuleSchemaMetaDataLoader(schemaContext.getSchema().getRules());
+            refreshStrategy.get().refreshMetaData(schemaContext.getSchema().getMetaData(), schemaContexts.getDatabaseType(),
+                    dataSourceMap, sqlStatementContext, tableName -> metaDataLoader.load(schemaContexts.getDatabaseType(),
+                            dataSourceMap, tableName, schemaContexts.getProps()));
+            notifyPersistRuleMetaData(DefaultSchema.LOGIC_NAME, schemaContext.getSchema().getMetaData().getRuleSchemaMetaData());
+        }
+    }
+    
+    private void notifyPersistRuleMetaData(final String schemaName, final RuleSchemaMetaData metaData) {
+        RuleMetaDataNotifier notifier = TypedSPIRegistry.getRegisteredService(RuleMetaDataNotifier.class);
+        if (null != notifier) {
+            notifier.notify(schemaName, metaData);
+        }
+    }
+    
+    /**
+     * Execute SQL.
+     *
+     * @param inputGroups input groups
+     * @param sqlStatementContext SQL statement context
+     * @return return true if is DQL, false if is DML
+     * @throws SQLException SQL exception
+     */
+    public abstract boolean execute(Collection<InputGroup<StatementExecuteUnit>> inputGroups, SQLStatementContext<?> sqlStatementContext) throws SQLException;
+    
+    /**
+     * Execute query.
+     *
+     * @param inputGroups input groups
+     * @return result set list
+     * @throws SQLException SQL exception
+     */
+    public abstract List<QueryResult> executeQuery(Collection<InputGroup<StatementExecuteUnit>> inputGroups) throws SQLException;
+    
+    /**
+     * Execute update.
+     *
+     * @param inputGroups input groups
+     * @param sqlStatementContext SQL statement context
+     * @return effected records count
+     * @throws SQLException SQL exception
+     */
+    public abstract int executeUpdate(Collection<InputGroup<StatementExecuteUnit>> inputGroups, SQLStatementContext<?> sqlStatementContext) throws SQLException;
+}
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/PreparedStatementExecutor.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/PreparedStatementExecutor.java
index aecacc9..81e433a 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/PreparedStatementExecutor.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/PreparedStatementExecutor.java
@@ -17,10 +17,7 @@
 
 package org.apache.shardingsphere.driver.executor;
 
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.context.SchemaContext;
 import org.apache.shardingsphere.infra.context.SchemaContexts;
-import org.apache.shardingsphere.infra.database.DefaultSchema;
 import org.apache.shardingsphere.infra.executor.kernel.InputGroup;
 import org.apache.shardingsphere.infra.executor.sql.ConnectionMode;
 import org.apache.shardingsphere.infra.executor.sql.QueryResult;
@@ -31,15 +28,7 @@ import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.executor.SQLE
 import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.executor.impl.DefaultSQLExecutorCallback;
 import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.queryresult.MemoryQueryResult;
 import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.queryresult.StreamQueryResult;
-import org.apache.shardingsphere.infra.metadata.refresh.MetaDataRefreshStrategy;
-import org.apache.shardingsphere.infra.metadata.refresh.MetaDataRefreshStrategyFactory;
-import org.apache.shardingsphere.infra.metadata.schema.RuleSchemaMetaData;
-import org.apache.shardingsphere.infra.metadata.schema.RuleSchemaMetaDataLoader;
-import org.apache.shardingsphere.infra.metadata.schema.spi.RuleMetaDataNotifier;
 import org.apache.shardingsphere.infra.rule.DataNodeRoutedRule;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
-import org.apache.shardingsphere.infra.spi.type.TypedSPIRegistry;
 import org.apache.shardingsphere.sql.parser.binder.statement.SQLStatementContext;
 
 import javax.sql.DataSource;
@@ -50,40 +39,26 @@ import java.sql.Statement;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.stream.Collectors;
 
 /**
  * Prepared statement executor.
  */
-@RequiredArgsConstructor
-public final class PreparedStatementExecutor {
+public final class PreparedStatementExecutor extends AbstractStatementExecutor {
     
-    static {
-        ShardingSphereServiceLoader.register(RuleMetaDataNotifier.class);
+    public PreparedStatementExecutor(final Map<String, DataSource> dataSourceMap, final SchemaContexts schemaContexts, final SQLExecutor sqlExecutor) {
+        super(dataSourceMap, schemaContexts, sqlExecutor);
     }
     
-    private final Map<String, DataSource> dataSourceMap;
-    
-    private final SchemaContexts schemaContexts;
-    
-    private final SQLExecutor sqlExecutor;
-    
-    /**
-     * Execute query.
-     *
-     * @param inputGroups input groups
-     * @return result set list
-     * @throws SQLException SQL exception
-     */
+    @Override
     public List<QueryResult> executeQuery(final Collection<InputGroup<StatementExecuteUnit>> inputGroups) throws SQLException {
         boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
         SQLExecutorCallback<QueryResult> sqlExecutorCallback = createDefaultSQLExecutorCallbackWithQueryResult(isExceptionThrown);
-        return sqlExecutor.execute(inputGroups, sqlExecutorCallback);
+        return getSqlExecutor().execute(inputGroups, sqlExecutorCallback);
     }
     
     private DefaultSQLExecutorCallback<QueryResult> createDefaultSQLExecutorCallbackWithQueryResult(final boolean isExceptionThrown) {
-        return new DefaultSQLExecutorCallback<QueryResult>(schemaContexts.getDatabaseType(), isExceptionThrown) {
+        return new DefaultSQLExecutorCallback<QueryResult>(getSchemaContexts().getDatabaseType(), isExceptionThrown) {
             
             @Override
             protected QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException { 
@@ -98,26 +73,19 @@ public final class PreparedStatementExecutor {
         };
     }
     
-    /**
-     * Execute update.
-     * 
-     * @param inputGroups input groups
-     * @param sqlStatementContext SQL statement context
-     * @return effected records count
-     * @throws SQLException SQL exception
-     */
+    @Override
     public int executeUpdate(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
         boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
         SQLExecutorCallback<Integer> sqlExecutorCallback = createDefaultSQLExecutorCallbackWithInteger(isExceptionThrown);
-        List<Integer> results = sqlExecutor.execute(inputGroups, sqlExecutorCallback);
-        refreshTableMetaData(schemaContexts.getDefaultSchemaContext(), sqlStatementContext);
+        List<Integer> results = getSqlExecutor().execute(inputGroups, sqlExecutorCallback);
+        refreshTableMetaData(getSchemaContexts().getDefaultSchemaContext(), sqlStatementContext);
         return isNeedAccumulate(
-                schemaContexts.getDefaultSchemaContext().getSchema().getRules().stream().filter(rule -> rule instanceof DataNodeRoutedRule).collect(Collectors.toList()), sqlStatementContext)
+                getSchemaContexts().getDefaultSchemaContext().getSchema().getRules().stream().filter(rule -> rule instanceof DataNodeRoutedRule).collect(Collectors.toList()), sqlStatementContext)
                 ? accumulate(results) : results.get(0);
     }
     
     private DefaultSQLExecutorCallback<Integer> createDefaultSQLExecutorCallbackWithInteger(final boolean isExceptionThrown) {
-        return new DefaultSQLExecutorCallback<Integer>(schemaContexts.getDatabaseType(), isExceptionThrown) {
+        return new DefaultSQLExecutorCallback<Integer>(getSchemaContexts().getDatabaseType(), isExceptionThrown) {
             
             @Override
             protected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
@@ -126,31 +94,12 @@ public final class PreparedStatementExecutor {
         };
     }
     
-    private boolean isNeedAccumulate(final Collection<ShardingSphereRule> rules, final SQLStatementContext<?> sqlStatementContext) {
-        return rules.stream().anyMatch(each -> ((DataNodeRoutedRule) each).isNeedAccumulate(sqlStatementContext.getTablesContext().getTableNames()));
-    }
-    
-    private int accumulate(final List<Integer> results) {
-        int result = 0;
-        for (Integer each : results) {
-            result += null == each ? 0 : each;
-        }
-        return result;
-    }
-    
-    /**
-     * Execute SQL.
-     *
-     * @param inputGroups input groups
-     * @param sqlStatementContext SQL statement context
-     * @return return true if is DQL, false if is DML
-     * @throws SQLException SQL exception
-     */
+    @Override
     public boolean execute(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
         boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
         SQLExecutorCallback<Boolean> sqlExecutorCallback = createDefaultSQLExecutorCallbackWithBoolean(isExceptionThrown);
-        List<Boolean> result = sqlExecutor.execute(inputGroups, sqlExecutorCallback);
-        refreshTableMetaData(schemaContexts.getDefaultSchemaContext(), sqlStatementContext);
+        List<Boolean> result = getSqlExecutor().execute(inputGroups, sqlExecutorCallback);
+        refreshTableMetaData(getSchemaContexts().getDefaultSchemaContext(), sqlStatementContext);
         if (null == result || result.isEmpty() || null == result.get(0)) {
             return false;
         }
@@ -158,7 +107,7 @@ public final class PreparedStatementExecutor {
     }
     
     private DefaultSQLExecutorCallback<Boolean> createDefaultSQLExecutorCallbackWithBoolean(final boolean isExceptionThrown) {
-        return new DefaultSQLExecutorCallback<Boolean>(schemaContexts.getDatabaseType(), isExceptionThrown) {
+        return new DefaultSQLExecutorCallback<Boolean>(getSchemaContexts().getDatabaseType(), isExceptionThrown) {
                     
             @Override
             protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
@@ -166,26 +115,4 @@ public final class PreparedStatementExecutor {
             }
         };
     }
-    
-    @SuppressWarnings("unchecked")
-    private void refreshTableMetaData(final SchemaContext schemaContext, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
-        if (null == sqlStatementContext) {
-            return;
-        }
-        Optional<MetaDataRefreshStrategy> refreshStrategy = MetaDataRefreshStrategyFactory.newInstance(sqlStatementContext);
-        if (refreshStrategy.isPresent()) {
-            RuleSchemaMetaDataLoader metaDataLoader = new RuleSchemaMetaDataLoader(schemaContext.getSchema().getRules());
-            refreshStrategy.get().refreshMetaData(schemaContext.getSchema().getMetaData(), schemaContexts.getDatabaseType(),
-                    dataSourceMap, sqlStatementContext, tableName -> metaDataLoader.load(schemaContexts.getDatabaseType(),
-                            dataSourceMap, tableName, schemaContexts.getProps()));
-            notifyPersistRuleMetaData(DefaultSchema.LOGIC_NAME, schemaContext.getSchema().getMetaData().getRuleSchemaMetaData());
-        }
-    }
-    
-    private void notifyPersistRuleMetaData(final String schemaName, final RuleSchemaMetaData metaData) {
-        RuleMetaDataNotifier notifier = TypedSPIRegistry.getRegisteredService(RuleMetaDataNotifier.class);
-        if (null != notifier) {
-            notifier.notify(schemaName, metaData);
-        }
-    }
 }
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/StatementExecutor.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/StatementExecutor.java
index 4e82e00..5315225 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/StatementExecutor.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/StatementExecutor.java
@@ -17,10 +17,7 @@
 
 package org.apache.shardingsphere.driver.executor;
 
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.context.SchemaContext;
 import org.apache.shardingsphere.infra.context.SchemaContexts;
-import org.apache.shardingsphere.infra.database.DefaultSchema;
 import org.apache.shardingsphere.infra.executor.kernel.InputGroup;
 import org.apache.shardingsphere.infra.executor.sql.ConnectionMode;
 import org.apache.shardingsphere.infra.executor.sql.QueryResult;
@@ -31,15 +28,7 @@ import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.executor.SQLE
 import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.executor.impl.DefaultSQLExecutorCallback;
 import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.queryresult.MemoryQueryResult;
 import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.queryresult.StreamQueryResult;
-import org.apache.shardingsphere.infra.metadata.refresh.MetaDataRefreshStrategy;
-import org.apache.shardingsphere.infra.metadata.refresh.MetaDataRefreshStrategyFactory;
-import org.apache.shardingsphere.infra.metadata.schema.RuleSchemaMetaData;
-import org.apache.shardingsphere.infra.metadata.schema.RuleSchemaMetaDataLoader;
-import org.apache.shardingsphere.infra.metadata.schema.spi.RuleMetaDataNotifier;
 import org.apache.shardingsphere.infra.rule.DataNodeRoutedRule;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
-import org.apache.shardingsphere.infra.spi.type.TypedSPIRegistry;
 import org.apache.shardingsphere.sql.parser.binder.statement.SQLStatementContext;
 
 import javax.sql.DataSource;
@@ -49,35 +38,21 @@ import java.sql.Statement;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.stream.Collectors;
 
 /**
  * Statement executor.
  */
-@RequiredArgsConstructor
-public final class StatementExecutor {
+public final class StatementExecutor extends AbstractStatementExecutor {
     
-    static {
-        ShardingSphereServiceLoader.register(RuleMetaDataNotifier.class);
+    public StatementExecutor(final Map<String, DataSource> dataSourceMap, final SchemaContexts schemaContexts, final SQLExecutor sqlExecutor) {
+        super(dataSourceMap, schemaContexts, sqlExecutor);
     }
     
-    private final Map<String, DataSource> dataSourceMap;
-    
-    private final SchemaContexts schemaContexts;
-    
-    private final SQLExecutor sqlExecutor;
-    
-    /**
-     * Execute query.
-     * 
-     * @param inputGroups input groups
-     * @return result set list
-     * @throws SQLException SQL exception
-     */
+    @Override
     public List<QueryResult> executeQuery(final Collection<InputGroup<StatementExecuteUnit>> inputGroups) throws SQLException {
         boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
-        SQLExecutorCallback<QueryResult> sqlExecutorCallback = new DefaultSQLExecutorCallback<QueryResult>(schemaContexts.getDatabaseType(), isExceptionThrown) {
+        SQLExecutorCallback<QueryResult> sqlExecutorCallback = new DefaultSQLExecutorCallback<QueryResult>(getSchemaContexts().getDatabaseType(), isExceptionThrown) {
             
             @Override
             protected QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
@@ -89,18 +64,11 @@ public final class StatementExecutor {
                 return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet) : new MemoryQueryResult(resultSet);
             }
         };
-        return sqlExecutor.execute(inputGroups, sqlExecutorCallback);
+        return getSqlExecutor().execute(inputGroups, sqlExecutorCallback);
     }
     
-    /**
-     * Execute update.
-     * 
-     * @param inputGroups input groups
-     * @param sqlStatementContext SQL statement context
-     * @return effected records count
-     * @throws SQLException SQL exception
-     */
-    public int executeUpdate(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext sqlStatementContext) throws SQLException {
+    @Override
+    public int executeUpdate(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
         return executeUpdate(inputGroups, Statement::executeUpdate, sqlStatementContext);
     }
     
@@ -113,7 +81,7 @@ public final class StatementExecutor {
      * @return effected records count
      * @throws SQLException SQL exception
      */
-    public int executeUpdate(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext sqlStatementContext, final int autoGeneratedKeys) throws SQLException {
+    public int executeUpdate(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext<?> sqlStatementContext, final int autoGeneratedKeys) throws SQLException {
         return executeUpdate(inputGroups, (statement, sql) -> statement.executeUpdate(sql, autoGeneratedKeys), sqlStatementContext);
     }
     
@@ -126,7 +94,7 @@ public final class StatementExecutor {
      * @return effected records count
      * @throws SQLException SQL exception
      */
-    public int executeUpdate(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext sqlStatementContext, final int[] columnIndexes) throws SQLException {
+    public int executeUpdate(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext<?> sqlStatementContext, final int[] columnIndexes) throws SQLException {
         return executeUpdate(inputGroups, (statement, sql) -> statement.executeUpdate(sql, columnIndexes), sqlStatementContext);
     }
     
@@ -139,50 +107,31 @@ public final class StatementExecutor {
      * @return effected records count
      * @throws SQLException SQL exception
      */
-    public int executeUpdate(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext sqlStatementContext, final String[] columnNames) throws SQLException {
+    public int executeUpdate(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext<?> sqlStatementContext, final String[] columnNames) throws SQLException {
         return executeUpdate(inputGroups, (statement, sql) -> statement.executeUpdate(sql, columnNames), sqlStatementContext);
     }
     
     @SuppressWarnings("unchecked")
-    private int executeUpdate(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final Updater updater, final SQLStatementContext sqlStatementContext) throws SQLException {
+    private int executeUpdate(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final Updater updater, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
         boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
-        SQLExecutorCallback sqlExecutorCallback = new DefaultSQLExecutorCallback<Integer>(schemaContexts.getDatabaseType(), isExceptionThrown) {
+        SQLExecutorCallback sqlExecutorCallback = new DefaultSQLExecutorCallback<Integer>(getSchemaContexts().getDatabaseType(), isExceptionThrown) {
             
             @Override
             protected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
                 return updater.executeUpdate(statement, sql);
             }
         };
-        List<Integer> results = sqlExecutor.execute(inputGroups, sqlExecutorCallback);
-        refreshTableMetaData(schemaContexts.getDefaultSchemaContext(), sqlStatementContext);
+        List<Integer> results = getSqlExecutor().execute(inputGroups, sqlExecutorCallback);
+        refreshTableMetaData(getSchemaContexts().getDefaultSchemaContext(), sqlStatementContext);
         if (isNeedAccumulate(
-                schemaContexts.getDefaultSchemaContext().getSchema().getRules().stream().filter(rule -> rule instanceof DataNodeRoutedRule).collect(Collectors.toList()), sqlStatementContext)) {
+                getSchemaContexts().getDefaultSchemaContext().getSchema().getRules().stream().filter(rule -> rule instanceof DataNodeRoutedRule).collect(Collectors.toList()), sqlStatementContext)) {
             return accumulate(results);
         }
         return null == results.get(0) ? 0 : results.get(0);
     }
     
-    private boolean isNeedAccumulate(final Collection<ShardingSphereRule> rules, final SQLStatementContext sqlStatementContext) {
-        return rules.stream().anyMatch(each -> ((DataNodeRoutedRule) each).isNeedAccumulate(sqlStatementContext.getTablesContext().getTableNames()));
-    }
-    
-    private int accumulate(final List<Integer> results) {
-        int result = 0;
-        for (Integer each : results) {
-            result += null == each ? 0 : each;
-        }
-        return result;
-    }
-    
-    /**
-     * Execute SQL.
-     *
-     * @param inputGroups input groups
-     * @param sqlStatementContext SQL statement context
-     * @return return true if is DQL, false if is DML
-     * @throws SQLException SQL exception
-     */
-    public boolean execute(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext sqlStatementContext) throws SQLException {
+    @Override
+    public boolean execute(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
         return execute(inputGroups, Statement::execute, sqlStatementContext);
     }
     
@@ -195,7 +144,7 @@ public final class StatementExecutor {
      * @return return true if is DQL, false if is DML
      * @throws SQLException SQL exception
      */
-    public boolean execute(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext sqlStatementContext, final int autoGeneratedKeys) throws SQLException {
+    public boolean execute(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext<?> sqlStatementContext, final int autoGeneratedKeys) throws SQLException {
         return execute(inputGroups, (statement, sql) -> statement.execute(sql, autoGeneratedKeys), sqlStatementContext);
     }
     
@@ -208,7 +157,7 @@ public final class StatementExecutor {
      * @return return true if is DQL, false if is DML
      * @throws SQLException SQL exception
      */
-    public boolean execute(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext sqlStatementContext, final int[] columnIndexes) throws SQLException {
+    public boolean execute(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext<?> sqlStatementContext, final int[] columnIndexes) throws SQLException {
         return execute(inputGroups, (statement, sql) -> statement.execute(sql, columnIndexes), sqlStatementContext);
     }
     
@@ -221,49 +170,28 @@ public final class StatementExecutor {
      * @return return true if is DQL, false if is DML
      * @throws SQLException SQL exception
      */
-    public boolean execute(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext sqlStatementContext, final String[] columnNames) throws SQLException {
+    public boolean execute(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext<?> sqlStatementContext, final String[] columnNames) throws SQLException {
         return execute(inputGroups, (statement, sql) -> statement.execute(sql, columnNames), sqlStatementContext);
     }
     
     @SuppressWarnings("unchecked")
-    private boolean execute(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final Executor executor, final SQLStatementContext sqlStatementContext) throws SQLException {
+    private boolean execute(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final Executor executor, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
         boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
-        SQLExecutorCallback sqlExecutorCallback = new DefaultSQLExecutorCallback<Boolean>(schemaContexts.getDatabaseType(), isExceptionThrown) {
+        SQLExecutorCallback sqlExecutorCallback = new DefaultSQLExecutorCallback<Boolean>(getSchemaContexts().getDatabaseType(), isExceptionThrown) {
             
             @Override
             protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
                 return executor.execute(statement, sql);
             }
         };
-        List<Boolean> result = sqlExecutor.execute(inputGroups, sqlExecutorCallback);
-        refreshTableMetaData(schemaContexts.getDefaultSchemaContext(), sqlStatementContext);
+        List<Boolean> result = getSqlExecutor().execute(inputGroups, sqlExecutorCallback);
+        refreshTableMetaData(getSchemaContexts().getDefaultSchemaContext(), sqlStatementContext);
         if (null == result || result.isEmpty() || null == result.get(0)) {
             return false;
         }
         return result.get(0);
     }
     
-    @SuppressWarnings("unchecked")
-    private void refreshTableMetaData(final SchemaContext schemaContext, final SQLStatementContext sqlStatementContext) throws SQLException {
-        if (null == sqlStatementContext) {
-            return;
-        }
-        Optional<MetaDataRefreshStrategy> refreshStrategy = MetaDataRefreshStrategyFactory.newInstance(sqlStatementContext);
-        if (refreshStrategy.isPresent()) {
-            RuleSchemaMetaDataLoader metaDataLoader = new RuleSchemaMetaDataLoader(schemaContext.getSchema().getRules());
-            refreshStrategy.get().refreshMetaData(schemaContext.getSchema().getMetaData(), schemaContexts.getDatabaseType(), dataSourceMap, sqlStatementContext,
-                tableName -> metaDataLoader.load(schemaContexts.getDatabaseType(), dataSourceMap, tableName, schemaContexts.getProps()));
-            notifyPersistRuleMetaData(DefaultSchema.LOGIC_NAME, schemaContext.getSchema().getMetaData().getRuleSchemaMetaData());
-        }
-    }
-    
-    private void notifyPersistRuleMetaData(final String schemaName, final RuleSchemaMetaData metaData) {
-        RuleMetaDataNotifier notifier = TypedSPIRegistry.getRegisteredService(RuleMetaDataNotifier.class);
-        if (null != notifier) {
-            notifier.notify(schemaName, metaData);
-        }
-    }
-    
     private interface Updater {
         
         int executeUpdate(Statement statement, String sql) throws SQLException;