You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2020/12/08 09:53:51 UTC

[shardingsphere] branch master updated: Merge RawExecutor and ProxyRawExecutor (#8532)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new bc1c3ae  Merge RawExecutor and ProxyRawExecutor (#8532)
bc1c3ae is described below

commit bc1c3aeae02a91d54a0d65c635d0be0d2390943a
Author: Liang Zhang <te...@163.com>
AuthorDate: Tue Dec 8 17:53:28 2020 +0800

    Merge RawExecutor and ProxyRawExecutor (#8532)
    
    * Remove useless getter of DriverJDBCExecutor
    
    * Rename driverJDBCExecutor
    
    * Refactor ShardingSphereStatement.executeQuery
    
    * Remove useless todo
    
    * Merge RawExecutor and ProxyRawExecutor
---
 .../sql/execute/engine/raw/RawExecutor.java        | 54 ++----------
 .../sql/execute/engine/raw/RawExecutorTest.java    | 97 ----------------------
 .../driver/executor/DriverJDBCExecutor.java        |  2 -
 .../statement/ShardingSpherePreparedStatement.java | 49 +++++++----
 .../core/statement/ShardingSphereStatement.java    | 57 ++++++++-----
 .../backend/communication/ProxySQLExecutor.java    |  6 +-
 .../communication/raw/ProxyRawExecutor.java        | 77 -----------------
 7 files changed, 79 insertions(+), 263 deletions(-)

diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
index 3aaeb08..61ef3c3 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
@@ -23,14 +23,12 @@ import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
-import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
 
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.stream.Collectors;
 
 /**
  * Raw executor.
@@ -43,61 +41,27 @@ public final class RawExecutor {
     private final boolean serial;
     
     /**
-     * Execute query.
-     *
-     * @param executionGroups execution groups
-     * @param callback raw SQL execute callback
-     * @return Query results
-     * @throws SQLException SQL exception
-     */
-    public List<QueryResult> executeQuery(final Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups, final RawSQLExecutorCallback callback) throws SQLException {
-        return doExecute(executionGroups, callback).stream().map(each -> (QueryResult) each).collect(Collectors.toList());
-    }
-    
-    /**
-     * Execute update.
-     *
-     * @param executionGroups execution groups
-     * @param callback raw SQL execute callback
-     * @return update count
-     * @throws SQLException SQL exception
-     */
-    public int executeUpdate(final Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups, final RawSQLExecutorCallback callback) throws SQLException {
-        List<Integer> results = doExecute(executionGroups, callback).stream().map(each -> ((UpdateResult) each).getUpdateCount()).collect(Collectors.toList());
-        // TODO check is need to accumulate
-        // TODO refresh metadata
-        return accumulate(results);
-    }
-    
-    private int accumulate(final List<Integer> results) {
-        int result = 0;
-        for (Integer each : results) {
-            result += null == each ? 0 : each;
-        }
-        return result;
-    }
-    
-    /**
      * Execute.
      *
      * @param executionGroups execution groups
      * @param callback raw SQL execute callback
-     * @return return true if is DQL, false if is DML
+     * @return execute results
      * @throws SQLException SQL exception
      */
-    public boolean execute(final Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups, final RawSQLExecutorCallback callback) throws SQLException {
-        List<ExecuteResult> results = doExecute(executionGroups, callback);
-        // TODO refresh metadata
+    public Collection<ExecuteResult> execute(final Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups, final RawSQLExecutorCallback callback) throws SQLException {
+        // TODO Load query header for first query
+        List<ExecuteResult> results = execute(executionGroups, null, callback);
         if (null == results || results.isEmpty() || null == results.get(0)) {
-            return false;
+            return Collections.singleton(new UpdateResult(0, 0L));
         }
-        return results.get(0) instanceof QueryResult;
+        return results;
     }
     
     @SuppressWarnings("unchecked")
-    private <T> List<T> doExecute(final Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups, final RawSQLExecutorCallback callback) throws SQLException {
+    private <T> List<T> execute(final Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups, 
+                                final RawSQLExecutorCallback firstCallback, final RawSQLExecutorCallback callback) throws SQLException {
         try {
-            return executorEngine.execute((Collection) executionGroups, null, callback, serial);
+            return executorEngine.execute((Collection) executionGroups, firstCallback, callback, serial);
         } catch (final SQLException ex) {
             SQLExecutorExceptionHandler.handleException(ex);
             return Collections.emptyList();
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutorTest.java b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutorTest.java
deleted file mode 100644
index 0db9e8b..0000000
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutorTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.executor.sql.execute.engine.raw;
-
-import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
-import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
-import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
-import org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
-import org.junit.Test;
-
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public final class RawExecutorTest {
-    
-    @Test
-    public void assertExecuteForResultEmpty() throws SQLException {
-        ExecutorEngine executorEngine = mock(ExecutorEngine.class);
-        RawExecutor executor = new RawExecutor(executorEngine, true);
-        assertFalse(executor.execute(null, null));
-    }
-    
-    @Test
-    public void assertExecuteForQueryResult() throws SQLException {
-        ExecutorEngine executorEngine = mock(ExecutorEngine.class);
-        when(executorEngine.execute(any(), any(), any(), anyBoolean())).thenReturn(Collections.singletonList(mock(QueryResult.class)));
-        RawExecutor executor = new RawExecutor(executorEngine, true);
-        assertTrue(executor.execute(null, null));
-    }
-    
-    @Test
-    public void assertExecuteQueryForQueryResult() throws SQLException {
-        ExecutorEngine executorEngine = mock(ExecutorEngine.class);
-        QueryResult queryResult = mock(QueryResult.class);
-        when(executorEngine.execute(any(), any(), any(), anyBoolean())).thenReturn(Collections.singletonList(queryResult));
-        RawExecutor executor = new RawExecutor(executorEngine, true);
-        List<QueryResult> actual = executor.executeQuery(null, null);
-        assertThat(actual, is(Collections.singletonList(queryResult)));
-    }
-    
-    @Test
-    public void assertExecuteUpdate() throws SQLException {
-        ExecutorEngine executorEngine = mock(ExecutorEngine.class);
-        UpdateResult updateResult1 = new UpdateResult(1, 2);
-        UpdateResult updateResult2 = new UpdateResult(3, 4);
-        when(executorEngine.execute(any(), any(), any(), anyBoolean())).thenReturn(Arrays.asList(updateResult1, updateResult2));
-        RawExecutor executor = new RawExecutor(executorEngine, true);
-        assertThat(executor.executeUpdate(null, null), is(4));
-    }
-    
-    @Test
-    public void assertExecuteNotThrownSQLException() throws SQLException {
-        ExecutorEngine executorEngine = mock(ExecutorEngine.class);
-        when(executorEngine.execute(any(), any(), any(), anyBoolean())).thenThrow(new SQLException("TestSQLException"));
-        RawExecutor rawExecutor = new RawExecutor(executorEngine, false);
-        SQLExecutorExceptionHandler.setExceptionThrown(false);
-        assertFalse(rawExecutor.execute(Collections.emptyList(), null));
-    }
-    
-    @Test
-    public void assertExecuteSQLException() {
-        try {
-            ExecutorEngine executorEngine = mock(ExecutorEngine.class);
-            when(executorEngine.execute(any(), any(), any(), anyBoolean())).thenThrow(new SQLException("TestSQLException"));
-            RawExecutor rawExecutor = new RawExecutor(executorEngine, false);
-            rawExecutor.execute(Collections.emptyList(), null);
-        } catch (final SQLException ex) {
-            assertThat(ex.getMessage(), is("TestSQLException"));
-        }
-    }
-}
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
index 30aaded..312e484 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.driver.executor;
 
-import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.driver.executor.callback.ExecuteQueryCallback;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
@@ -54,7 +53,6 @@ import java.util.stream.Collectors;
  * Driver JDBC executor.
  */
 @RequiredArgsConstructor
-@Getter
 public final class DriverJDBCExecutor {
     
     static {
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 758e4a7..c8df8ed 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -50,8 +50,10 @@ import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.J
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
 import org.apache.shardingsphere.infra.executor.sql.log.SQLLogger;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
@@ -105,7 +107,7 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
     
     private final RawExecutor rawExecutor;
     
-    private final DriverJDBCExecutor jdbcStatementExecutor;
+    private final DriverJDBCExecutor driverJDBCExecutor;
     
     private final BatchPreparedStatementExecutor batchPreparedStatementExecutor;
     
@@ -144,13 +146,13 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
         this.sql = sql;
         statements = new ArrayList<>();
         parameterSets = new ArrayList<>();
-        ShardingSphereSQLParserEngine sqlStatementParserEngine = new ShardingSphereSQLParserEngine(DatabaseTypeRegistry.getTrunkDatabaseTypeName(metaDataContexts.getDatabaseType()));
-        sqlStatement = sqlStatementParserEngine.parse(sql, true);
+        ShardingSphereSQLParserEngine sqlParserEngine = new ShardingSphereSQLParserEngine(DatabaseTypeRegistry.getTrunkDatabaseTypeName(metaDataContexts.getDatabaseType()));
+        sqlStatement = sqlParserEngine.parse(sql, true);
         parameterMetaData = new ShardingSphereParameterMetaData(sqlStatement);
         statementOption = returnGeneratedKeys ? new StatementOption(true) : new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability);
         JDBCExecutor jdbcExecutor = new JDBCExecutor(metaDataContexts.getExecutorEngine(), connection.isHoldTransaction());
         rawExecutor = new RawExecutor(metaDataContexts.getExecutorEngine(), connection.isHoldTransaction());
-        jdbcStatementExecutor = new DriverJDBCExecutor(connection.getDataSourceMap(), metaDataContexts, jdbcExecutor);
+        driverJDBCExecutor = new DriverJDBCExecutor(connection.getDataSourceMap(), metaDataContexts, jdbcExecutor);
         batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(metaDataContexts, jdbcExecutor);
         kernelProcessor = new KernelProcessor();
     }
@@ -161,16 +163,7 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
         try {
             clearPrevious();
             executionContext = createExecutionContext();
-            List<QueryResult> queryResults;
-            if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
-                queryResults = rawExecutor.executeQuery(createRawExecutionGroups(), new RawSQLExecutorCallback());
-            } else {
-                Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
-                cacheStatements(executionGroups);
-                reply();
-                queryResults = jdbcStatementExecutor.executeQuery(
-                        executionGroups, new PreparedStatementExecuteQueryCallback(metaDataContexts.getDatabaseType(), SQLExecutorExceptionHandler.isExceptionThrown()));
-            }
+            List<QueryResult> queryResults = executeQuery0();
             MergedResult mergedResult = mergeQuery(queryResults);
             result = new ShardingSphereResultSet(statements.stream().map(this::getResultSet).collect(Collectors.toList()), mergedResult, this, executionContext);
         } finally {
@@ -180,24 +173,43 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
         return result;
     }
     
+    private List<QueryResult> executeQuery0() throws SQLException {
+        if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
+            return rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback()).stream().map(each -> (QueryResult) each).collect(Collectors.toList());
+        }
+        Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
+        cacheStatements(executionGroups);
+        reply();
+        return driverJDBCExecutor.executeQuery(executionGroups, new PreparedStatementExecuteQueryCallback(metaDataContexts.getDatabaseType(), SQLExecutorExceptionHandler.isExceptionThrown()));
+    }
+    
     @Override
     public int executeUpdate() throws SQLException {
         try {
             clearPrevious();
             executionContext = createExecutionContext();
             if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
-                return rawExecutor.executeUpdate(createRawExecutionGroups(), new RawSQLExecutorCallback());
+                Collection<ExecuteResult> executeResults = rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+                accumulate(executeResults);
             }
             Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
             cacheStatements(executionGroups);
             reply();
-            return jdbcStatementExecutor.executeUpdate(
+            return driverJDBCExecutor.executeUpdate(
                     executionGroups, executionContext.getSqlStatementContext(), executionContext.getRouteContext().getRouteUnits(), createExecuteUpdateCallback());
         } finally {
             clearBatch();
         }
     }
     
+    private int accumulate(final Collection<ExecuteResult> results) {
+        int result = 0;
+        for (ExecuteResult each : results) {
+            result += ((UpdateResult) each).getUpdateCount();
+        }
+        return result;
+    }
+    
     private JDBCExecutorCallback<Integer> createExecuteUpdateCallback() {
         boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
         return new JDBCExecutorCallback<Integer>(metaDataContexts.getDatabaseType(), isExceptionThrown) {
@@ -216,12 +228,13 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
             executionContext = createExecutionContext();
             if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
                 // TODO process getStatement
-                return rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+                Collection<ExecuteResult> executeResults = rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+                return executeResults.iterator().next() instanceof QueryResult;
             }
             Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
             cacheStatements(executionGroups);
             reply();
-            return jdbcStatementExecutor.execute(
+            return driverJDBCExecutor.execute(
                     executionGroups, executionContext.getSqlStatementContext().getSqlStatement(), executionContext.getRouteContext().getRouteUnits(), createExecuteCallback());
         } finally {
             clearBatch();
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index ad7fb3f..7bae993 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -50,8 +50,10 @@ import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.J
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
 import org.apache.shardingsphere.infra.executor.sql.log.SQLLogger;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
@@ -94,7 +96,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
     
     private final StatementOption statementOption;
     
-    private final DriverJDBCExecutor jdbcStatementExecutor;
+    private final DriverJDBCExecutor driverJDBCExecutor;
     
     private final RawExecutor rawExecutor;
     
@@ -121,7 +123,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         statements = new LinkedList<>();
         statementOption = new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability);
         JDBCExecutor jdbcExecutor = new JDBCExecutor(metaDataContexts.getExecutorEngine(), connection.isHoldTransaction());
-        jdbcStatementExecutor = new DriverJDBCExecutor(connection.getDataSourceMap(), metaDataContexts, jdbcExecutor);
+        driverJDBCExecutor = new DriverJDBCExecutor(connection.getDataSourceMap(), metaDataContexts, jdbcExecutor);
         rawExecutor = new RawExecutor(metaDataContexts.getExecutorEngine(), connection.isHoldTransaction());
         kernelProcessor = new KernelProcessor();
     }
@@ -134,15 +136,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         ResultSet result;
         try {
             executionContext = createExecutionContext(sql);
-            List<QueryResult> queryResults;
-            if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
-                queryResults = rawExecutor.executeQuery(createRawExecutionGroups(), new RawSQLExecutorCallback());
-            } else {
-                Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
-                cacheStatements(executionGroups);
-                queryResults = jdbcStatementExecutor.executeQuery(
-                        executionGroups, new StatementExecuteQueryCallback(metaDataContexts.getDatabaseType(), SQLExecutorExceptionHandler.isExceptionThrown()));
-            }
+            List<QueryResult> queryResults = executeQuery();
             MergedResult mergedResult = mergeQuery(queryResults);
             result = new ShardingSphereResultSet(statements.stream().map(this::getResultSet).collect(Collectors.toList()), mergedResult, this, executionContext);
         } finally {
@@ -152,12 +146,21 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         return result;
     }
     
+    private List<QueryResult> executeQuery() throws SQLException {
+        if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
+            return rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback()).stream().map(each -> (QueryResult) each).collect(Collectors.toList());
+        }
+        Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
+        cacheStatements(executionGroups);
+        return driverJDBCExecutor.executeQuery(executionGroups, new StatementExecuteQueryCallback(metaDataContexts.getDatabaseType(), SQLExecutorExceptionHandler.isExceptionThrown()));
+    }
+    
     @Override
     public int executeUpdate(final String sql) throws SQLException {
         try {
             executionContext = createExecutionContext(sql);
             if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
-                return rawExecutor.executeUpdate(createRawExecutionGroups(), new RawSQLExecutorCallback());
+                return accumulate(rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback()));
             }
             Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
             cacheStatements(executionGroups);
@@ -176,7 +179,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         try {
             executionContext = createExecutionContext(sql);
             if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
-                return rawExecutor.executeUpdate(createRawExecutionGroups(), new RawSQLExecutorCallback());
+                return accumulate(rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback()));
             }
             Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
             cacheStatements(executionGroups);
@@ -193,7 +196,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         try {
             executionContext = createExecutionContext(sql);
             if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
-                return rawExecutor.executeUpdate(createRawExecutionGroups(), new RawSQLExecutorCallback());
+                return accumulate(rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback()));
             }
             Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
             cacheStatements(executionGroups);
@@ -210,7 +213,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         try {
             executionContext = createExecutionContext(sql);
             if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
-                return rawExecutor.executeUpdate(createRawExecutionGroups(), new RawSQLExecutorCallback());
+                return accumulate(rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback()));
             }
             Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
             cacheStatements(executionGroups);
@@ -231,7 +234,15 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
                 return updater.executeUpdate(sql, statement);
             }
         };
-        return jdbcStatementExecutor.executeUpdate(executionGroups, sqlStatementContext, routeUnits, callback);
+        return driverJDBCExecutor.executeUpdate(executionGroups, sqlStatementContext, routeUnits, callback);
+    }
+    
+    private int accumulate(final Collection<ExecuteResult> results) {
+        int result = 0;
+        for (ExecuteResult each : results) {
+            result += ((UpdateResult) each).getUpdateCount();
+        }
+        return result;
     }
     
     @Override
@@ -240,7 +251,8 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
             executionContext = createExecutionContext(sql);
             if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
                 // TODO process getStatement
-                return rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+                Collection<ExecuteResult> results = rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+                return results.iterator().next() instanceof QueryResult;
             }
             Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
             cacheStatements(executionGroups);
@@ -260,7 +272,8 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
             executionContext = createExecutionContext(sql);
             if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
                 // TODO process getStatement
-                return rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+                Collection<ExecuteResult> results = rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+                return results.iterator().next() instanceof QueryResult;
             }
             Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
             cacheStatements(executionGroups);
@@ -278,7 +291,8 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
             executionContext = createExecutionContext(sql);
             if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
                 // TODO process getStatement
-                return rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+                Collection<ExecuteResult> results = rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+                return results.iterator().next() instanceof QueryResult;
             }
             Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
             cacheStatements(executionGroups);
@@ -296,7 +310,8 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
             executionContext = createExecutionContext(sql);
             if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
                 // TODO process getStatement
-                return rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+                Collection<ExecuteResult> results = rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+                return results.iterator().next() instanceof QueryResult;
             }
             Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
             cacheStatements(executionGroups);
@@ -317,7 +332,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
                 return executor.execute(sql, statement);
             }
         };
-        return jdbcStatementExecutor.execute(executionGroups, sqlStatement, routeUnits, jdbcExecutorCallback);
+        return driverJDBCExecutor.execute(executionGroups, sqlStatement, routeUnits, jdbcExecutorCallback);
     }
     
     private ExecutionContext createExecutionContext(final String sql) throws SQLException {
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index 29e00b7..04a226f 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -35,7 +35,7 @@ import org.apache.shardingsphere.infra.rule.type.RawExecutionRule;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.ProxyJDBCExecutor;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction.TransactionStatus;
-import org.apache.shardingsphere.proxy.backend.communication.raw.ProxyRawExecutor;
+import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
 import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.exception.TableModifyInTransactionException;
@@ -59,7 +59,7 @@ public final class ProxySQLExecutor {
     
     private final ProxyJDBCExecutor jdbcExecutor;
     
-    private final ProxyRawExecutor rawExecutor;
+    private final RawExecutor rawExecutor;
     
     public ProxySQLExecutor(final String type, final BackendConnection backendConnection) {
         this.type = type;
@@ -67,7 +67,7 @@ public final class ProxySQLExecutor {
         ExecutorEngine executorEngine = BackendExecutorContext.getInstance().getExecutorEngine();
         boolean isSerialExecute = backendConnection.isSerialExecute();
         jdbcExecutor = new ProxyJDBCExecutor(type, backendConnection, new JDBCExecutor(executorEngine, isSerialExecute));
-        rawExecutor = new ProxyRawExecutor(executorEngine, isSerialExecute);
+        rawExecutor = new RawExecutor(executorEngine, isSerialExecute);
     }
     
     /**
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/raw/ProxyRawExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/raw/ProxyRawExecutor.java
deleted file mode 100644
index b4e152e..0000000
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/raw/ProxyRawExecutor.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.proxy.backend.communication.raw;
-
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
-import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
-import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
-import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
-import org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
-import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
-
-import java.sql.SQLException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Proxy raw executor.
- */
-@RequiredArgsConstructor
-public final class ProxyRawExecutor {
-    
-    private final ExecutorEngine executorEngine;
-    
-    private final boolean serial;
-    
-    /**
-     * Execute.
-     *
-     * @param executionGroups execution groups
-     * @param callback raw SQL execute callback
-     * @return execute results
-     * @throws SQLException SQL exception
-     */
-    public Collection<ExecuteResult> execute(final Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups, final RawSQLExecutorCallback callback) throws SQLException {
-        // TODO Load query header for first query
-        List<ExecuteResult> results = execute(executionGroups, null, callback);
-        // TODO refresh metadata
-        if (null == results || results.isEmpty() || null == results.get(0)) {
-            return Collections.singleton(new UpdateResult(0, 0L));
-        }
-        // CHECKSTYLE:OFF
-        if (results.get(0) instanceof UpdateResult) {
-            // TODO refresh metadata
-        }
-        // CHECKSTYLE:ON
-        return results;
-    }
-    
-    @SuppressWarnings("unchecked")
-    private <T> List<T> execute(final Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups, 
-                                final RawSQLExecutorCallback firstCallback, final RawSQLExecutorCallback callback) throws SQLException {
-        try {
-            return executorEngine.execute((Collection) executionGroups, firstCallback, callback, serial);
-        } catch (final SQLException ex) {
-            SQLExecutorExceptionHandler.handleException(ex);
-            return Collections.emptyList();
-        }
-    }
-}