You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2020/12/08 09:53:51 UTC
[shardingsphere] branch master updated: Merge RawExecutor and
ProxyRawExecutor (#8532)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new bc1c3ae Merge RawExecutor and ProxyRawExecutor (#8532)
bc1c3ae is described below
commit bc1c3aeae02a91d54a0d65c635d0be0d2390943a
Author: Liang Zhang <te...@163.com>
AuthorDate: Tue Dec 8 17:53:28 2020 +0800
Merge RawExecutor and ProxyRawExecutor (#8532)
* Remove useless getter of DriverJDBCExecutor
* Rename driverJDBCExecutor
* Refactor ShardingSphereStatement.executeQuery
* Remove useless todo
* Merge RawExecutor and ProxyRawExecutor
---
.../sql/execute/engine/raw/RawExecutor.java | 54 ++----------
.../sql/execute/engine/raw/RawExecutorTest.java | 97 ----------------------
.../driver/executor/DriverJDBCExecutor.java | 2 -
.../statement/ShardingSpherePreparedStatement.java | 49 +++++++----
.../core/statement/ShardingSphereStatement.java | 57 ++++++++-----
.../backend/communication/ProxySQLExecutor.java | 6 +-
.../communication/raw/ProxyRawExecutor.java | 77 -----------------
7 files changed, 79 insertions(+), 263 deletions(-)
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
index 3aaeb08..61ef3c3 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
@@ -23,14 +23,12 @@ import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
-import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.stream.Collectors;
/**
* Raw executor.
@@ -43,61 +41,27 @@ public final class RawExecutor {
private final boolean serial;
/**
- * Execute query.
- *
- * @param executionGroups execution groups
- * @param callback raw SQL execute callback
- * @return Query results
- * @throws SQLException SQL exception
- */
- public List<QueryResult> executeQuery(final Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups, final RawSQLExecutorCallback callback) throws SQLException {
- return doExecute(executionGroups, callback).stream().map(each -> (QueryResult) each).collect(Collectors.toList());
- }
-
- /**
- * Execute update.
- *
- * @param executionGroups execution groups
- * @param callback raw SQL execute callback
- * @return update count
- * @throws SQLException SQL exception
- */
- public int executeUpdate(final Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups, final RawSQLExecutorCallback callback) throws SQLException {
- List<Integer> results = doExecute(executionGroups, callback).stream().map(each -> ((UpdateResult) each).getUpdateCount()).collect(Collectors.toList());
- // TODO check is need to accumulate
- // TODO refresh metadata
- return accumulate(results);
- }
-
- private int accumulate(final List<Integer> results) {
- int result = 0;
- for (Integer each : results) {
- result += null == each ? 0 : each;
- }
- return result;
- }
-
- /**
* Execute.
*
* @param executionGroups execution groups
* @param callback raw SQL execute callback
- * @return return true if is DQL, false if is DML
+ * @return execute results
* @throws SQLException SQL exception
*/
- public boolean execute(final Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups, final RawSQLExecutorCallback callback) throws SQLException {
- List<ExecuteResult> results = doExecute(executionGroups, callback);
- // TODO refresh metadata
+ public Collection<ExecuteResult> execute(final Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups, final RawSQLExecutorCallback callback) throws SQLException {
+ // TODO Load query header for first query
+ List<ExecuteResult> results = execute(executionGroups, null, callback);
if (null == results || results.isEmpty() || null == results.get(0)) {
- return false;
+ return Collections.singleton(new UpdateResult(0, 0L));
}
- return results.get(0) instanceof QueryResult;
+ return results;
}
@SuppressWarnings("unchecked")
- private <T> List<T> doExecute(final Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups, final RawSQLExecutorCallback callback) throws SQLException {
+ private <T> List<T> execute(final Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups,
+ final RawSQLExecutorCallback firstCallback, final RawSQLExecutorCallback callback) throws SQLException {
try {
- return executorEngine.execute((Collection) executionGroups, null, callback, serial);
+ return executorEngine.execute((Collection) executionGroups, firstCallback, callback, serial);
} catch (final SQLException ex) {
SQLExecutorExceptionHandler.handleException(ex);
return Collections.emptyList();
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutorTest.java b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutorTest.java
deleted file mode 100644
index 0db9e8b..0000000
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutorTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.executor.sql.execute.engine.raw;
-
-import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
-import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
-import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
-import org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
-import org.junit.Test;
-
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public final class RawExecutorTest {
-
- @Test
- public void assertExecuteForResultEmpty() throws SQLException {
- ExecutorEngine executorEngine = mock(ExecutorEngine.class);
- RawExecutor executor = new RawExecutor(executorEngine, true);
- assertFalse(executor.execute(null, null));
- }
-
- @Test
- public void assertExecuteForQueryResult() throws SQLException {
- ExecutorEngine executorEngine = mock(ExecutorEngine.class);
- when(executorEngine.execute(any(), any(), any(), anyBoolean())).thenReturn(Collections.singletonList(mock(QueryResult.class)));
- RawExecutor executor = new RawExecutor(executorEngine, true);
- assertTrue(executor.execute(null, null));
- }
-
- @Test
- public void assertExecuteQueryForQueryResult() throws SQLException {
- ExecutorEngine executorEngine = mock(ExecutorEngine.class);
- QueryResult queryResult = mock(QueryResult.class);
- when(executorEngine.execute(any(), any(), any(), anyBoolean())).thenReturn(Collections.singletonList(queryResult));
- RawExecutor executor = new RawExecutor(executorEngine, true);
- List<QueryResult> actual = executor.executeQuery(null, null);
- assertThat(actual, is(Collections.singletonList(queryResult)));
- }
-
- @Test
- public void assertExecuteUpdate() throws SQLException {
- ExecutorEngine executorEngine = mock(ExecutorEngine.class);
- UpdateResult updateResult1 = new UpdateResult(1, 2);
- UpdateResult updateResult2 = new UpdateResult(3, 4);
- when(executorEngine.execute(any(), any(), any(), anyBoolean())).thenReturn(Arrays.asList(updateResult1, updateResult2));
- RawExecutor executor = new RawExecutor(executorEngine, true);
- assertThat(executor.executeUpdate(null, null), is(4));
- }
-
- @Test
- public void assertExecuteNotThrownSQLException() throws SQLException {
- ExecutorEngine executorEngine = mock(ExecutorEngine.class);
- when(executorEngine.execute(any(), any(), any(), anyBoolean())).thenThrow(new SQLException("TestSQLException"));
- RawExecutor rawExecutor = new RawExecutor(executorEngine, false);
- SQLExecutorExceptionHandler.setExceptionThrown(false);
- assertFalse(rawExecutor.execute(Collections.emptyList(), null));
- }
-
- @Test
- public void assertExecuteSQLException() {
- try {
- ExecutorEngine executorEngine = mock(ExecutorEngine.class);
- when(executorEngine.execute(any(), any(), any(), anyBoolean())).thenThrow(new SQLException("TestSQLException"));
- RawExecutor rawExecutor = new RawExecutor(executorEngine, false);
- rawExecutor.execute(Collections.emptyList(), null);
- } catch (final SQLException ex) {
- assertThat(ex.getMessage(), is("TestSQLException"));
- }
- }
-}
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
index 30aaded..312e484 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.driver.executor;
-import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.driver.executor.callback.ExecuteQueryCallback;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
@@ -54,7 +53,6 @@ import java.util.stream.Collectors;
* Driver JDBC executor.
*/
@RequiredArgsConstructor
-@Getter
public final class DriverJDBCExecutor {
static {
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 758e4a7..c8df8ed 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -50,8 +50,10 @@ import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.J
import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
import org.apache.shardingsphere.infra.executor.sql.log.SQLLogger;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
@@ -105,7 +107,7 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
private final RawExecutor rawExecutor;
- private final DriverJDBCExecutor jdbcStatementExecutor;
+ private final DriverJDBCExecutor driverJDBCExecutor;
private final BatchPreparedStatementExecutor batchPreparedStatementExecutor;
@@ -144,13 +146,13 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
this.sql = sql;
statements = new ArrayList<>();
parameterSets = new ArrayList<>();
- ShardingSphereSQLParserEngine sqlStatementParserEngine = new ShardingSphereSQLParserEngine(DatabaseTypeRegistry.getTrunkDatabaseTypeName(metaDataContexts.getDatabaseType()));
- sqlStatement = sqlStatementParserEngine.parse(sql, true);
+ ShardingSphereSQLParserEngine sqlParserEngine = new ShardingSphereSQLParserEngine(DatabaseTypeRegistry.getTrunkDatabaseTypeName(metaDataContexts.getDatabaseType()));
+ sqlStatement = sqlParserEngine.parse(sql, true);
parameterMetaData = new ShardingSphereParameterMetaData(sqlStatement);
statementOption = returnGeneratedKeys ? new StatementOption(true) : new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability);
JDBCExecutor jdbcExecutor = new JDBCExecutor(metaDataContexts.getExecutorEngine(), connection.isHoldTransaction());
rawExecutor = new RawExecutor(metaDataContexts.getExecutorEngine(), connection.isHoldTransaction());
- jdbcStatementExecutor = new DriverJDBCExecutor(connection.getDataSourceMap(), metaDataContexts, jdbcExecutor);
+ driverJDBCExecutor = new DriverJDBCExecutor(connection.getDataSourceMap(), metaDataContexts, jdbcExecutor);
batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(metaDataContexts, jdbcExecutor);
kernelProcessor = new KernelProcessor();
}
@@ -161,16 +163,7 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
try {
clearPrevious();
executionContext = createExecutionContext();
- List<QueryResult> queryResults;
- if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
- queryResults = rawExecutor.executeQuery(createRawExecutionGroups(), new RawSQLExecutorCallback());
- } else {
- Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
- cacheStatements(executionGroups);
- reply();
- queryResults = jdbcStatementExecutor.executeQuery(
- executionGroups, new PreparedStatementExecuteQueryCallback(metaDataContexts.getDatabaseType(), SQLExecutorExceptionHandler.isExceptionThrown()));
- }
+ List<QueryResult> queryResults = executeQuery0();
MergedResult mergedResult = mergeQuery(queryResults);
result = new ShardingSphereResultSet(statements.stream().map(this::getResultSet).collect(Collectors.toList()), mergedResult, this, executionContext);
} finally {
@@ -180,24 +173,43 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
return result;
}
+ private List<QueryResult> executeQuery0() throws SQLException {
+ if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
+ return rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback()).stream().map(each -> (QueryResult) each).collect(Collectors.toList());
+ }
+ Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
+ cacheStatements(executionGroups);
+ reply();
+ return driverJDBCExecutor.executeQuery(executionGroups, new PreparedStatementExecuteQueryCallback(metaDataContexts.getDatabaseType(), SQLExecutorExceptionHandler.isExceptionThrown()));
+ }
+
@Override
public int executeUpdate() throws SQLException {
try {
clearPrevious();
executionContext = createExecutionContext();
if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
- return rawExecutor.executeUpdate(createRawExecutionGroups(), new RawSQLExecutorCallback());
+ Collection<ExecuteResult> executeResults = rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+ accumulate(executeResults);
}
Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
cacheStatements(executionGroups);
reply();
- return jdbcStatementExecutor.executeUpdate(
+ return driverJDBCExecutor.executeUpdate(
executionGroups, executionContext.getSqlStatementContext(), executionContext.getRouteContext().getRouteUnits(), createExecuteUpdateCallback());
} finally {
clearBatch();
}
}
+ private int accumulate(final Collection<ExecuteResult> results) {
+ int result = 0;
+ for (ExecuteResult each : results) {
+ result += ((UpdateResult) each).getUpdateCount();
+ }
+ return result;
+ }
+
private JDBCExecutorCallback<Integer> createExecuteUpdateCallback() {
boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
return new JDBCExecutorCallback<Integer>(metaDataContexts.getDatabaseType(), isExceptionThrown) {
@@ -216,12 +228,13 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
executionContext = createExecutionContext();
if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
// TODO process getStatement
- return rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+ Collection<ExecuteResult> executeResults = rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+ return executeResults.iterator().next() instanceof QueryResult;
}
Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
cacheStatements(executionGroups);
reply();
- return jdbcStatementExecutor.execute(
+ return driverJDBCExecutor.execute(
executionGroups, executionContext.getSqlStatementContext().getSqlStatement(), executionContext.getRouteContext().getRouteUnits(), createExecuteCallback());
} finally {
clearBatch();
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index ad7fb3f..7bae993 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -50,8 +50,10 @@ import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.J
import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
import org.apache.shardingsphere.infra.executor.sql.log.SQLLogger;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
@@ -94,7 +96,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
private final StatementOption statementOption;
- private final DriverJDBCExecutor jdbcStatementExecutor;
+ private final DriverJDBCExecutor driverJDBCExecutor;
private final RawExecutor rawExecutor;
@@ -121,7 +123,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
statements = new LinkedList<>();
statementOption = new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability);
JDBCExecutor jdbcExecutor = new JDBCExecutor(metaDataContexts.getExecutorEngine(), connection.isHoldTransaction());
- jdbcStatementExecutor = new DriverJDBCExecutor(connection.getDataSourceMap(), metaDataContexts, jdbcExecutor);
+ driverJDBCExecutor = new DriverJDBCExecutor(connection.getDataSourceMap(), metaDataContexts, jdbcExecutor);
rawExecutor = new RawExecutor(metaDataContexts.getExecutorEngine(), connection.isHoldTransaction());
kernelProcessor = new KernelProcessor();
}
@@ -134,15 +136,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
ResultSet result;
try {
executionContext = createExecutionContext(sql);
- List<QueryResult> queryResults;
- if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
- queryResults = rawExecutor.executeQuery(createRawExecutionGroups(), new RawSQLExecutorCallback());
- } else {
- Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
- cacheStatements(executionGroups);
- queryResults = jdbcStatementExecutor.executeQuery(
- executionGroups, new StatementExecuteQueryCallback(metaDataContexts.getDatabaseType(), SQLExecutorExceptionHandler.isExceptionThrown()));
- }
+ List<QueryResult> queryResults = executeQuery();
MergedResult mergedResult = mergeQuery(queryResults);
result = new ShardingSphereResultSet(statements.stream().map(this::getResultSet).collect(Collectors.toList()), mergedResult, this, executionContext);
} finally {
@@ -152,12 +146,21 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
return result;
}
+ private List<QueryResult> executeQuery() throws SQLException {
+ if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
+ return rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback()).stream().map(each -> (QueryResult) each).collect(Collectors.toList());
+ }
+ Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
+ cacheStatements(executionGroups);
+ return driverJDBCExecutor.executeQuery(executionGroups, new StatementExecuteQueryCallback(metaDataContexts.getDatabaseType(), SQLExecutorExceptionHandler.isExceptionThrown()));
+ }
+
@Override
public int executeUpdate(final String sql) throws SQLException {
try {
executionContext = createExecutionContext(sql);
if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
- return rawExecutor.executeUpdate(createRawExecutionGroups(), new RawSQLExecutorCallback());
+ return accumulate(rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback()));
}
Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
cacheStatements(executionGroups);
@@ -176,7 +179,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
try {
executionContext = createExecutionContext(sql);
if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
- return rawExecutor.executeUpdate(createRawExecutionGroups(), new RawSQLExecutorCallback());
+ return accumulate(rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback()));
}
Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
cacheStatements(executionGroups);
@@ -193,7 +196,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
try {
executionContext = createExecutionContext(sql);
if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
- return rawExecutor.executeUpdate(createRawExecutionGroups(), new RawSQLExecutorCallback());
+ return accumulate(rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback()));
}
Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
cacheStatements(executionGroups);
@@ -210,7 +213,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
try {
executionContext = createExecutionContext(sql);
if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
- return rawExecutor.executeUpdate(createRawExecutionGroups(), new RawSQLExecutorCallback());
+ return accumulate(rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback()));
}
Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
cacheStatements(executionGroups);
@@ -231,7 +234,15 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
return updater.executeUpdate(sql, statement);
}
};
- return jdbcStatementExecutor.executeUpdate(executionGroups, sqlStatementContext, routeUnits, callback);
+ return driverJDBCExecutor.executeUpdate(executionGroups, sqlStatementContext, routeUnits, callback);
+ }
+
+ private int accumulate(final Collection<ExecuteResult> results) {
+ int result = 0;
+ for (ExecuteResult each : results) {
+ result += ((UpdateResult) each).getUpdateCount();
+ }
+ return result;
}
@Override
@@ -240,7 +251,8 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
executionContext = createExecutionContext(sql);
if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
// TODO process getStatement
- return rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+ Collection<ExecuteResult> results = rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+ return results.iterator().next() instanceof QueryResult;
}
Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
cacheStatements(executionGroups);
@@ -260,7 +272,8 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
executionContext = createExecutionContext(sql);
if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
// TODO process getStatement
- return rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+ Collection<ExecuteResult> results = rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+ return results.iterator().next() instanceof QueryResult;
}
Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
cacheStatements(executionGroups);
@@ -278,7 +291,8 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
executionContext = createExecutionContext(sql);
if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
// TODO process getStatement
- return rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+ Collection<ExecuteResult> results = rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+ return results.iterator().next() instanceof QueryResult;
}
Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
cacheStatements(executionGroups);
@@ -296,7 +310,8 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
executionContext = createExecutionContext(sql);
if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
// TODO process getStatement
- return rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+ Collection<ExecuteResult> results = rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+ return results.iterator().next() instanceof QueryResult;
}
Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
cacheStatements(executionGroups);
@@ -317,7 +332,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
return executor.execute(sql, statement);
}
};
- return jdbcStatementExecutor.execute(executionGroups, sqlStatement, routeUnits, jdbcExecutorCallback);
+ return driverJDBCExecutor.execute(executionGroups, sqlStatement, routeUnits, jdbcExecutorCallback);
}
private ExecutionContext createExecutionContext(final String sql) throws SQLException {
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index 29e00b7..04a226f 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -35,7 +35,7 @@ import org.apache.shardingsphere.infra.rule.type.RawExecutionRule;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.ProxyJDBCExecutor;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction.TransactionStatus;
-import org.apache.shardingsphere.proxy.backend.communication.raw.ProxyRawExecutor;
+import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.exception.TableModifyInTransactionException;
@@ -59,7 +59,7 @@ public final class ProxySQLExecutor {
private final ProxyJDBCExecutor jdbcExecutor;
- private final ProxyRawExecutor rawExecutor;
+ private final RawExecutor rawExecutor;
public ProxySQLExecutor(final String type, final BackendConnection backendConnection) {
this.type = type;
@@ -67,7 +67,7 @@ public final class ProxySQLExecutor {
ExecutorEngine executorEngine = BackendExecutorContext.getInstance().getExecutorEngine();
boolean isSerialExecute = backendConnection.isSerialExecute();
jdbcExecutor = new ProxyJDBCExecutor(type, backendConnection, new JDBCExecutor(executorEngine, isSerialExecute));
- rawExecutor = new ProxyRawExecutor(executorEngine, isSerialExecute);
+ rawExecutor = new RawExecutor(executorEngine, isSerialExecute);
}
/**
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/raw/ProxyRawExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/raw/ProxyRawExecutor.java
deleted file mode 100644
index b4e152e..0000000
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/raw/ProxyRawExecutor.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.proxy.backend.communication.raw;
-
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
-import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
-import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
-import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
-import org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
-import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
-
-import java.sql.SQLException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Proxy raw executor.
- */
-@RequiredArgsConstructor
-public final class ProxyRawExecutor {
-
- private final ExecutorEngine executorEngine;
-
- private final boolean serial;
-
- /**
- * Execute.
- *
- * @param executionGroups execution groups
- * @param callback raw SQL execute callback
- * @return execute results
- * @throws SQLException SQL exception
- */
- public Collection<ExecuteResult> execute(final Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups, final RawSQLExecutorCallback callback) throws SQLException {
- // TODO Load query header for first query
- List<ExecuteResult> results = execute(executionGroups, null, callback);
- // TODO refresh metadata
- if (null == results || results.isEmpty() || null == results.get(0)) {
- return Collections.singleton(new UpdateResult(0, 0L));
- }
- // CHECKSTYLE:OFF
- if (results.get(0) instanceof UpdateResult) {
- // TODO refresh metadata
- }
- // CHECKSTYLE:ON
- return results;
- }
-
- @SuppressWarnings("unchecked")
- private <T> List<T> execute(final Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups,
- final RawSQLExecutorCallback firstCallback, final RawSQLExecutorCallback callback) throws SQLException {
- try {
- return executorEngine.execute((Collection) executionGroups, firstCallback, callback, serial);
- } catch (final SQLException ex) {
- SQLExecutorExceptionHandler.handleException(ex);
- return Collections.emptyList();
- }
- }
-}