You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2021/03/12 09:13:02 UTC
[shardingsphere] branch master updated: Create executionGroup
context (#9643)
This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 30ebff2 Create executionGroup context (#9643)
30ebff2 is described below
commit 30ebff26e8734ce0809baf9be78cd2513ee1ae98
Author: Juan Pan(Trista) <pa...@apache.org>
AuthorDate: Fri Mar 12 17:12:30 2021 +0800
Create executionGroup context (#9643)
* Show ddl/dml process
* fix unit test
* fix java doc
---
.../infra/executor/kernel/ExecutorEngine.java | 16 +++--
.../model/ExecutionGroupContext.java} | 32 +++++----
.../execute/engine/driver/jdbc/JDBCExecutor.java | 15 ++--
.../sql/execute/engine/raw/RawExecutor.java | 12 ++--
.../prepare/AbstractExecutionPrepareEngine.java | 9 +--
.../sql/prepare/ExecutionPrepareEngine.java | 6 +-
.../infra/executor/kernel/ExecutorEngineTest.java | 17 ++---
.../sql/execute/engine/jdbc/JDBCExecutorTest.java | 19 +++--
.../optimize/schema/row/CalciteRowExecutor.java | 8 +--
.../driver/executor/DriverJDBCExecutor.java | 24 +++----
.../batch/BatchPreparedStatementExecutor.java | 23 +++---
.../statement/ShardingSpherePreparedStatement.java | 33 ++++-----
.../core/statement/ShardingSphereStatement.java | 83 +++++++++++-----------
.../batch/BatchPreparedStatementExecutorTest.java | 5 +-
.../backend/communication/ProxySQLExecutor.java | 14 ++--
.../jdbc/executor/ProxyJDBCExecutor.java | 8 +--
16 files changed, 168 insertions(+), 156 deletions(-)
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngine.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngine.java
index 75c2db5..a7cf377 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngine.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngine.java
@@ -21,6 +21,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import lombok.Getter;
import org.apache.shardingsphere.infra.exception.ShardingSphereException;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorDataMap;
import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorServiceManager;
@@ -49,21 +50,21 @@ public final class ExecutorEngine implements AutoCloseable {
/**
* Execute.
*
- * @param executionGroups execution groups
+ * @param executionGroupContext execution group context
* @param callback executor callback
* @param <I> type of input value
* @param <O> type of return value
* @return execute result
* @throws SQLException throw if execute failure
*/
- public <I, O> List<O> execute(final Collection<ExecutionGroup<I>> executionGroups, final ExecutorCallback<I, O> callback) throws SQLException {
- return execute(executionGroups, null, callback, false);
+ public <I, O> List<O> execute(final ExecutionGroupContext<I> executionGroupContext, final ExecutorCallback<I, O> callback) throws SQLException {
+ return execute(executionGroupContext, null, callback, false);
}
/**
* Execute.
*
- * @param executionGroups execution groups
+ * @param executionGroupContext execution group context
* @param firstCallback first executor callback
* @param callback other executor callback
* @param serial whether using multi thread execute or not
@@ -72,12 +73,13 @@ public final class ExecutorEngine implements AutoCloseable {
* @return execute result
* @throws SQLException throw if execute failure
*/
- public <I, O> List<O> execute(final Collection<ExecutionGroup<I>> executionGroups,
+ public <I, O> List<O> execute(final ExecutionGroupContext<I> executionGroupContext,
final ExecutorCallback<I, O> firstCallback, final ExecutorCallback<I, O> callback, final boolean serial) throws SQLException {
- if (executionGroups.isEmpty()) {
+ if (executionGroupContext.getInputGroups().isEmpty()) {
return Collections.emptyList();
}
- return serial ? serialExecute(executionGroups.iterator(), firstCallback, callback) : parallelExecute(executionGroups.iterator(), firstCallback, callback);
+ return serial ? serialExecute(executionGroupContext.getInputGroups().iterator(), firstCallback, callback)
+ : parallelExecute(executionGroupContext.getInputGroups().iterator(), firstCallback, callback);
}
private <I, O> List<O> serialExecute(final Iterator<ExecutionGroup<I>> executionGroups, final ExecutorCallback<I, O> firstCallback, final ExecutorCallback<I, O> callback) throws SQLException {
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/ExecutionPrepareEngine.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupContext.java
similarity index 54%
copy from shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/ExecutionPrepareEngine.java
copy to shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupContext.java
index b7c1830..f938136b 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/ExecutionPrepareEngine.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupContext.java
@@ -15,29 +15,31 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.infra.executor.sql.prepare;
+package org.apache.shardingsphere.infra.executor.kernel.model;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
-import org.apache.shardingsphere.infra.route.context.RouteContext;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
-import java.sql.SQLException;
import java.util.Collection;
+import java.util.UUID;
/**
- * Execution prepare engine.
- *
- * @param <T> type of input value
+ * Execution group context.
+ *
+ * @param <T> type of execution input value
*/
-public interface ExecutionPrepareEngine<T> {
+@RequiredArgsConstructor
+@Getter
+public final class ExecutionGroupContext<T> {
+
+ private final Collection<ExecutionGroup<T>> inputGroups;
/**
- * Prepare to execute.
+ * Get execution ID.
*
- * @param routeContext route context
- * @param executionUnits execution units
- * @return execution groups
- * @throws SQLException SQL exception
+ * @return execution ID
*/
- Collection<ExecutionGroup<T>> prepare(RouteContext routeContext, Collection<ExecutionUnit> executionUnits) throws SQLException;
+ public String getExecutionID() {
+ return UUID.randomUUID().toString();
+ }
}
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutor.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutor.java
index 91989e8..923ab33 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutor.java
@@ -19,11 +19,10 @@ package org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import java.sql.SQLException;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -40,30 +39,30 @@ public final class JDBCExecutor {
/**
* Execute.
*
- * @param executionGroups execution groups
+ * @param executionGroupContext execution group context
* @param callback JDBC execute callback
* @param <T> class type of return value
* @return execute result
* @throws SQLException SQL exception
*/
- public <T> List<T> execute(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups, final JDBCExecutorCallback<T> callback) throws SQLException {
- return execute(executionGroups, null, callback);
+ public <T> List<T> execute(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final JDBCExecutorCallback<T> callback) throws SQLException {
+ return execute(executionGroupContext, null, callback);
}
/**
* Execute.
*
- * @param executionGroups execution groups
+ * @param executionGroupContext execution group context
* @param firstCallback first JDBC execute callback
* @param callback JDBC execute callback
* @param <T> class type of return value
* @return execute result
* @throws SQLException SQL exception
*/
- public <T> List<T> execute(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups,
+ public <T> List<T> execute(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
final JDBCExecutorCallback<T> firstCallback, final JDBCExecutorCallback<T> callback) throws SQLException {
try {
- return executorEngine.execute(executionGroups, firstCallback, callback, serial);
+ return executorEngine.execute(executionGroupContext, firstCallback, callback, serial);
} catch (final SQLException ex) {
SQLExecutorExceptionHandler.handleException(ex);
return Collections.emptyList();
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
index 1cb0cb6..10106c1 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.infra.executor.sql.execute.engine.raw;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
@@ -43,14 +43,14 @@ public final class RawExecutor {
/**
* Execute.
*
- * @param executionGroups execution groups
+ * @param executionGroupContext execution group context
* @param callback raw SQL executor callback
* @return execute results
* @throws SQLException SQL exception
*/
- public Collection<ExecuteResult> execute(final Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups, final RawSQLExecutorCallback callback) throws SQLException {
+ public Collection<ExecuteResult> execute(final ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext, final RawSQLExecutorCallback callback) throws SQLException {
// TODO Load query header for first query
- List<ExecuteResult> results = execute(executionGroups, null, callback);
+ List<ExecuteResult> results = execute(executionGroupContext, null, callback);
if (null == results || results.isEmpty() || null == results.get(0)) {
return Collections.singleton(new UpdateResult(0, 0L));
}
@@ -58,10 +58,10 @@ public final class RawExecutor {
}
@SuppressWarnings("unchecked")
- private <T> List<T> execute(final Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups,
+ private <T> List<T> execute(final ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext,
final RawSQLExecutorCallback firstCallback, final RawSQLExecutorCallback callback) throws SQLException {
try {
- return executorEngine.execute((Collection) executionGroups, firstCallback, callback, serial);
+ return (List<T>) executorEngine.execute(executionGroupContext, firstCallback, callback, serial);
} catch (final SQLException ex) {
SQLExecutorExceptionHandler.handleException(ex);
return Collections.emptyList();
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/AbstractExecutionPrepareEngine.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/AbstractExecutionPrepareEngine.java
index df75be9..d06c11e 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/AbstractExecutionPrepareEngine.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/AbstractExecutionPrepareEngine.java
@@ -19,9 +19,10 @@ package org.apache.shardingsphere.infra.executor.sql.prepare;
import com.google.common.collect.Lists;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
-import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
+import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import org.apache.shardingsphere.infra.route.context.RouteContext;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
@@ -57,7 +58,7 @@ public abstract class AbstractExecutionPrepareEngine<T> implements ExecutionPrep
}
@Override
- public final Collection<ExecutionGroup<T>> prepare(final RouteContext routeContext, final Collection<ExecutionUnit> executionUnits) throws SQLException {
+ public final ExecutionGroupContext<T> prepare(final RouteContext routeContext, final Collection<ExecutionUnit> executionUnits) throws SQLException {
Collection<ExecutionGroup<T>> result = new LinkedList<>();
for (Entry<String, List<SQLUnit>> entry : aggregateSQLUnitGroups(executionUnits).entrySet()) {
String dataSourceName = entry.getKey();
@@ -88,11 +89,11 @@ public abstract class AbstractExecutionPrepareEngine<T> implements ExecutionPrep
}
@SuppressWarnings({"unchecked", "rawtypes"})
- private Collection<ExecutionGroup<T>> decorate(final RouteContext routeContext, final Collection<ExecutionGroup<T>> executionGroups) {
+ private ExecutionGroupContext decorate(final RouteContext routeContext, final Collection<ExecutionGroup<T>> executionGroups) {
Collection<ExecutionGroup<T>> result = executionGroups;
for (Entry<ShardingSphereRule, ExecutionPrepareDecorator> each : decorators.entrySet()) {
result = each.getValue().decorate(routeContext, each.getKey(), result);
}
- return result;
+ return new ExecutionGroupContext(result);
}
}
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/ExecutionPrepareEngine.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/ExecutionPrepareEngine.java
index b7c1830..45c5678 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/ExecutionPrepareEngine.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/ExecutionPrepareEngine.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.infra.executor.sql.prepare;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
import org.apache.shardingsphere.infra.route.context.RouteContext;
@@ -36,8 +36,8 @@ public interface ExecutionPrepareEngine<T> {
*
* @param routeContext route context
* @param executionUnits execution units
- * @return execution groups
+ * @return execution group context
* @throws SQLException SQL exception
*/
- Collection<ExecutionGroup<T>> prepare(RouteContext routeContext, Collection<ExecutionUnit> executionUnits) throws SQLException;
+ ExecutionGroupContext<T> prepare(RouteContext routeContext, Collection<ExecutionUnit> executionUnits) throws SQLException;
}
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngineTest.java b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngineTest.java
index 53c9dba..9870130 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngineTest.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngineTest.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.infra.executor.kernel;
import org.apache.shardingsphere.infra.executor.kernel.fixture.ExecutorCallbackFixture;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -39,7 +40,7 @@ public final class ExecutorEngineTest {
private final CountDownLatch latch = new CountDownLatch(4);
- private Collection<ExecutionGroup<Object>> executionGroups;
+ private ExecutionGroupContext<Object> executionGroupContext;
private ExecutorCallbackFixture firstCallback;
@@ -47,7 +48,7 @@ public final class ExecutorEngineTest {
@Before
public void setUp() {
- executionGroups = createMockedExecutionGroups(2, 2);
+ executionGroupContext = createMockedExecutionGroups(2, 2);
firstCallback = new ExecutorCallbackFixture(latch);
callback = new ExecutorCallbackFixture(latch);
}
@@ -57,12 +58,12 @@ public final class ExecutorEngineTest {
executorEngine.close();
}
- private Collection<ExecutionGroup<Object>> createMockedExecutionGroups(final int groupSize, final int unitSize) {
+ private ExecutionGroupContext<Object> createMockedExecutionGroups(final int groupSize, final int unitSize) {
Collection<ExecutionGroup<Object>> result = new LinkedList<>();
for (int i = 0; i < groupSize; i++) {
result.add(new ExecutionGroup<>(createMockedInputs(unitSize)));
}
- return result;
+ return new ExecutionGroupContext(result);
}
private List<Object> createMockedInputs(final int size) {
@@ -75,21 +76,21 @@ public final class ExecutorEngineTest {
@Test
public void assertParallelExecuteWithoutFirstCallback() throws SQLException, InterruptedException {
- List<String> actual = executorEngine.execute(executionGroups, callback);
+ List<String> actual = executorEngine.execute(executionGroupContext, callback);
latch.await();
assertThat(actual.size(), is(4));
}
@Test
public void assertParallelExecuteWithFirstCallback() throws SQLException, InterruptedException {
- List<String> actual = executorEngine.execute(executionGroups, firstCallback, callback, false);
+ List<String> actual = executorEngine.execute(executionGroupContext, firstCallback, callback, false);
latch.await();
assertThat(actual.size(), is(4));
}
@Test
public void assertSerialExecute() throws SQLException, InterruptedException {
- List<String> actual = executorEngine.execute(executionGroups, firstCallback, callback, true);
+ List<String> actual = executorEngine.execute(executionGroupContext, firstCallback, callback, true);
latch.await();
assertThat(actual.size(), is(4));
}
@@ -97,7 +98,7 @@ public final class ExecutorEngineTest {
@Test
public void assertExecutionGroupIsEmpty() throws SQLException {
CountDownLatch latch = new CountDownLatch(1);
- List<String> actual = executorEngine.execute(new LinkedList<>(), new ExecutorCallbackFixture(latch));
+ List<String> actual = executorEngine.execute(new ExecutionGroupContext<>(new LinkedList<>()), new ExecutorCallbackFixture(latch));
latch.countDown();
assertThat(actual.size(), is(0));
}
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorTest.java b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorTest.java
index c240440..3dc4d69 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorTest.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorTest.java
@@ -18,7 +18,10 @@
package org.apache.shardingsphere.infra.executor.sql.execute.engine.jdbc;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
+import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import org.junit.Test;
@@ -39,11 +42,13 @@ public final class JDBCExecutorTest {
@Test
public void assertExecute() throws SQLException {
ExecutorEngine executorEngine = mock(ExecutorEngine.class);
- when(executorEngine.execute(anyCollection(), any(), any(), anyBoolean())).thenReturn(Collections.singletonList("test"));
+ ExecutionGroup<JDBCExecutionUnit> group = new ExecutionGroup<>(Collections.singletonList(mock(JDBCExecutionUnit.class)));
+ ExecutionGroupContext context = new ExecutionGroupContext(Collections.singletonList(group));
+ when(executorEngine.execute(any(), any(), any(), anyBoolean())).thenReturn(Collections.singletonList("test"));
JDBCExecutor jdbcExecutor = new JDBCExecutor(executorEngine, false);
- List<?> actual1 = jdbcExecutor.execute(Collections.emptyList(), null);
+ List<?> actual1 = jdbcExecutor.execute(context, null);
assertThat(actual1, is(Collections.singletonList("test")));
- List<?> actual2 = jdbcExecutor.execute(Collections.emptyList(), null, null);
+ List<?> actual2 = jdbcExecutor.execute(context, null, null);
assertThat(actual2, is(Collections.singletonList("test")));
}
@@ -51,9 +56,9 @@ public final class JDBCExecutorTest {
public void assertExecuteSQLException() {
try {
ExecutorEngine executorEngine = mock(ExecutorEngine.class);
- when(executorEngine.execute(anyCollection(), any(), any(), anyBoolean())).thenThrow(new SQLException("TestSQLException"));
+ when(executorEngine.execute(new ExecutionGroupContext<>(anyCollection()), any(), any(), anyBoolean())).thenThrow(new SQLException("TestSQLException"));
JDBCExecutor jdbcExecutor = new JDBCExecutor(executorEngine, false);
- jdbcExecutor.execute(Collections.emptyList(), null);
+ jdbcExecutor.execute(new ExecutionGroupContext<>(Collections.emptyList()), null);
} catch (final SQLException ex) {
assertThat(ex.getMessage(), is("TestSQLException"));
}
@@ -62,10 +67,10 @@ public final class JDBCExecutorTest {
@Test
public void assertExecuteNotThrownSQLException() throws SQLException {
ExecutorEngine executorEngine = mock(ExecutorEngine.class);
- when(executorEngine.execute(anyCollection(), any(), any(), anyBoolean())).thenThrow(new SQLException("TestSQLException"));
+ when(executorEngine.execute(new ExecutionGroupContext<>(anyCollection()), any(), any(), anyBoolean())).thenThrow(new SQLException("TestSQLException"));
JDBCExecutor jdbcExecutor = new JDBCExecutor(executorEngine, false);
SQLExecutorExceptionHandler.setExceptionThrown(false);
- List<?> actual = jdbcExecutor.execute(Collections.emptyList(), null);
+ List<?> actual = jdbcExecutor.execute(new ExecutionGroupContext<>(Collections.emptyList()), null);
assertThat(actual, is(Collections.emptyList()));
}
}
diff --git a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/schema/row/CalciteRowExecutor.java b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/schema/row/CalciteRowExecutor.java
index b283a13..738291d 100644
--- a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/schema/row/CalciteRowExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/schema/row/CalciteRowExecutor.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.infra.optimize.schema.row;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.exception.ShardingSphereException;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
@@ -66,14 +66,14 @@ public final class CalciteRowExecutor {
*/
public Collection<QueryResult> execute(final ExecutionContext context) {
try {
- Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups(context);
- return jdbcExecutor.execute(executionGroups, callback).stream().map(each -> (QueryResult) each).collect(Collectors.toList());
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext(context);
+ return jdbcExecutor.execute(executionGroupContext, callback).stream().map(each -> (QueryResult) each).collect(Collectors.toList());
} catch (final SQLException ex) {
throw new ShardingSphereException(ex);
}
}
- private Collection<ExecutionGroup<JDBCExecutionUnit>> createExecutionGroups(final ExecutionContext executionContext) throws SQLException {
+ private ExecutionGroupContext<JDBCExecutionUnit> createExecutionGroupContext(final ExecutionContext executionContext) throws SQLException {
// TODO Set parameters for StatementOption
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(
JDBCDriverType.STATEMENT, maxConnectionsSizePerQuery, jdbcManager, new StatementOption(true), rules);
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
index 562e8a9..089db90 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.infra.context.metadata.MetaDataContexts;
import org.apache.shardingsphere.infra.database.DefaultSchema;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.exception.ShardingSphereException;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
@@ -68,28 +68,28 @@ public final class DriverJDBCExecutor {
/**
* Execute query.
*
- * @param executionGroups execution groups
+ * @param executionGroupContext execution group context
* @param callback execute query callback
* @return query results
* @throws SQLException SQL exception
*/
- public List<QueryResult> executeQuery(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups, final ExecuteQueryCallback callback) throws SQLException {
- return jdbcExecutor.execute(executionGroups, callback);
+ public List<QueryResult> executeQuery(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final ExecuteQueryCallback callback) throws SQLException {
+ return jdbcExecutor.execute(executionGroupContext, callback);
}
/**
* Execute update.
*
- * @param executionGroups execution groups
+ * @param executionGroupContext execution group context
* @param sqlStatementContext SQL statement context
* @param routeUnits route units
* @param callback JDBC executor callback
* @return effected records count
* @throws SQLException SQL exception
*/
- public int executeUpdate(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups,
+ public int executeUpdate(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
final SQLStatementContext<?> sqlStatementContext, final Collection<RouteUnit> routeUnits, final JDBCExecutorCallback<Integer> callback) throws SQLException {
- List<Integer> results = doExecute(executionGroups, sqlStatementContext.getSqlStatement(), routeUnits, callback);
+ List<Integer> results = doExecute(executionGroupContext, sqlStatementContext.getSqlStatement(), routeUnits, callback);
return isNeedAccumulate(metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules(), sqlStatementContext) ? accumulate(results) : results.get(0);
}
@@ -104,26 +104,26 @@ public final class DriverJDBCExecutor {
/**
* Execute SQL.
*
- * @param executionGroups execution groups
+ * @param executionGroupContext execution group context
* @param sqlStatement SQL statement
* @param routeUnits route units
* @param callback JDBC executor callback
* @return return true if is DQL, false if is DML
* @throws SQLException SQL exception
*/
- public boolean execute(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups, final SQLStatement sqlStatement,
+ public boolean execute(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final SQLStatement sqlStatement,
final Collection<RouteUnit> routeUnits, final JDBCExecutorCallback<Boolean> callback) throws SQLException {
- List<Boolean> results = doExecute(executionGroups, sqlStatement, routeUnits, callback);
+ List<Boolean> results = doExecute(executionGroupContext, sqlStatement, routeUnits, callback);
return null != results && !results.isEmpty() && null != results.get(0) && results.get(0);
}
- private <T> List<T> doExecute(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups, final SQLStatement sqlStatement,
+ private <T> List<T> doExecute(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final SQLStatement sqlStatement,
final Collection<RouteUnit> routeUnits, final JDBCExecutorCallback<T> callback) throws SQLException {
List<T> results;
boolean locked = false;
try {
locked = tryGlobalLock(sqlStatement, metaDataContexts.getProps().<Long>getValue(ConfigurationPropertyKey.LOCK_WAIT_TIMEOUT_MILLISECONDS));
- results = jdbcExecutor.execute(executionGroups, callback);
+ results = jdbcExecutor.execute(executionGroupContext, callback);
refreshSchema(metaDataContexts.getDefaultMetaData(), sqlStatement, routeUnits);
} finally {
if (locked) {
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
index 12d89b6..68c6d6b 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
@@ -22,6 +22,7 @@ import lombok.Getter;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.context.metadata.MetaDataContexts;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
@@ -52,7 +53,7 @@ public final class BatchPreparedStatementExecutor {
private final JDBCExecutor jdbcExecutor;
- private final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups;
+ private ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext;
@Getter
private final Collection<BatchExecutionUnit> batchExecutionUnits;
@@ -62,17 +63,17 @@ public final class BatchPreparedStatementExecutor {
public BatchPreparedStatementExecutor(final MetaDataContexts metaDataContexts, final JDBCExecutor jdbcExecutor) {
this.metaDataContexts = metaDataContexts;
this.jdbcExecutor = jdbcExecutor;
- executionGroups = new LinkedList<>();
+ executionGroupContext = new ExecutionGroupContext<>(new LinkedList<>());
batchExecutionUnits = new LinkedList<>();
}
/**
* Initialize executor.
*
- * @param executionGroups execution groups
+ * @param executionGroupContext execution group context
*/
- public void init(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups) {
- this.executionGroups.addAll(executionGroups);
+ public void init(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext) {
+ this.executionGroupContext = executionGroupContext;
}
/**
@@ -133,7 +134,7 @@ public final class BatchPreparedStatementExecutor {
return Optional.of(new int[batchCount]);
}
};
- List<int[]> results = jdbcExecutor.execute(executionGroups, callback);
+ List<int[]> results = jdbcExecutor.execute(executionGroupContext, callback);
if (results.isEmpty()) {
return new int[0];
}
@@ -149,7 +150,7 @@ public final class BatchPreparedStatementExecutor {
private int[] accumulate(final List<int[]> results) {
int[] result = new int[batchCount];
int count = 0;
- for (ExecutionGroup<JDBCExecutionUnit> each : executionGroups) {
+ for (ExecutionGroup<JDBCExecutionUnit> each : executionGroupContext.getInputGroups()) {
for (JDBCExecutionUnit eachUnit : each.getInputs()) {
Map<Integer, Integer> jdbcAndActualAddBatchCallTimesMap = Collections.emptyMap();
for (BatchExecutionUnit eachExecutionUnit : batchExecutionUnits) {
@@ -180,7 +181,7 @@ public final class BatchPreparedStatementExecutor {
*/
public List<Statement> getStatements() {
List<Statement> result = new LinkedList<>();
- for (ExecutionGroup<JDBCExecutionUnit> each : executionGroups) {
+ for (ExecutionGroup<JDBCExecutionUnit> each : executionGroupContext.getInputGroups()) {
result.addAll(each.getInputs().stream().map(JDBCExecutionUnit::getStorageResource).collect(Collectors.toList()));
}
return result;
@@ -193,7 +194,7 @@ public final class BatchPreparedStatementExecutor {
* @return parameter sets
*/
public List<List<Object>> getParameterSet(final Statement statement) {
- return executionGroups.stream().map(each -> findJDBCExecutionUnit(statement, each)).filter(Optional::isPresent).findFirst().map(Optional::get)
+ return executionGroupContext.getInputGroups().stream().map(each -> findJDBCExecutionUnit(statement, each)).filter(Optional::isPresent).findFirst().map(Optional::get)
.map(this::getParameterSets).orElse(Collections.emptyList());
}
@@ -215,7 +216,7 @@ public final class BatchPreparedStatementExecutor {
public void clear() throws SQLException {
closeStatements();
getStatements().clear();
- executionGroups.clear();
+ executionGroupContext.getInputGroups().clear();
batchCount = 0;
batchExecutionUnits.clear();
}
@@ -226,5 +227,3 @@ public final class BatchPreparedStatementExecutor {
}
}
}
-
-
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 49cbf82..b4fcefa 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -45,6 +45,7 @@ import org.apache.shardingsphere.infra.database.DefaultSchema;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.exception.ShardingSphereException;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
@@ -195,14 +196,14 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
private List<QueryResult> executeQuery0() throws SQLException {
if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
- return rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback()).stream().map(each -> (QueryResult) each).collect(Collectors.toList());
+ return rawExecutor.execute(createRawExecutionGroupContext(), new RawSQLExecutorCallback()).stream().map(each -> (QueryResult) each).collect(Collectors.toList());
}
if (executionContext.getRouteContext().isToCalcite()) {
return executeQueryByCalcite();
}
- Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
- cacheStatements(executionGroups);
- return driverJDBCExecutor.executeQuery(executionGroups,
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext();
+ cacheStatements(executionGroupContext.getInputGroups());
+ return driverJDBCExecutor.executeQuery(executionGroupContext,
new PreparedStatementExecuteQueryCallback(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(), sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown()));
}
@@ -231,13 +232,13 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
clearPrevious();
executionContext = createExecutionContext();
if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
- Collection<ExecuteResult> executeResults = rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+ Collection<ExecuteResult> executeResults = rawExecutor.execute(createRawExecutionGroupContext(), new RawSQLExecutorCallback());
accumulate(executeResults);
}
- Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
- cacheStatements(executionGroups);
- return driverJDBCExecutor.executeUpdate(
- executionGroups, executionContext.getSqlStatementContext(), executionContext.getRouteContext().getRouteUnits(), createExecuteUpdateCallback());
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext();
+ cacheStatements(executionGroupContext.getInputGroups());
+ return driverJDBCExecutor.executeUpdate(executionGroupContext,
+ executionContext.getSqlStatementContext(), executionContext.getRouteContext().getRouteUnits(), createExecuteUpdateCallback());
} finally {
clearBatch();
}
@@ -274,25 +275,25 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
executionContext = createExecutionContext();
if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
// TODO process getStatement
- Collection<ExecuteResult> executeResults = rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+ Collection<ExecuteResult> executeResults = rawExecutor.execute(createRawExecutionGroupContext(), new RawSQLExecutorCallback());
return executeResults.iterator().next() instanceof QueryResult;
}
- Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
- cacheStatements(executionGroups);
- return driverJDBCExecutor.execute(
- executionGroups, executionContext.getSqlStatementContext().getSqlStatement(), executionContext.getRouteContext().getRouteUnits(), createExecuteCallback());
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext();
+ cacheStatements(executionGroupContext.getInputGroups());
+ return driverJDBCExecutor.execute(executionGroupContext,
+ executionContext.getSqlStatementContext().getSqlStatement(), executionContext.getRouteContext().getRouteUnits(), createExecuteCallback());
} finally {
clearBatch();
}
}
- private Collection<ExecutionGroup<RawSQLExecutionUnit>> createRawExecutionGroups() throws SQLException {
+ private ExecutionGroupContext<RawSQLExecutionUnit> createRawExecutionGroupContext() throws SQLException {
int maxConnectionsSizePerQuery = metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules())
.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
}
- private Collection<ExecutionGroup<JDBCExecutionUnit>> createExecutionGroups() throws SQLException {
+ private ExecutionGroupContext<JDBCExecutionUnit> createExecutionGroupContext() throws SQLException {
int maxConnectionsSizePerQuery = metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(
JDBCDriverType.PREPARED_STATEMENT, maxConnectionsSizePerQuery, connection, statementOption, metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules());
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index d1344ef..4a81496 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -43,6 +43,7 @@ import org.apache.shardingsphere.infra.database.DefaultSchema;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.exception.ShardingSphereException;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
@@ -164,16 +165,16 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
private List<QueryResult> executeQuery0() throws SQLException {
if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
- return rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback()).stream().map(each -> (QueryResult) each).collect(Collectors.toList());
+ return rawExecutor.execute(createRawExecutionContext(), new RawSQLExecutorCallback()).stream().map(each -> (QueryResult) each).collect(Collectors.toList());
}
if (executionContext.getRouteContext().isToCalcite()) {
return executeQueryByCalcite();
}
- Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
- cacheStatements(executionGroups);
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionContext();
+ cacheStatements(executionGroupContext.getInputGroups());
StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(),
executionContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown());
- return driverJDBCExecutor.executeQuery(executionGroups, callback);
+ return driverJDBCExecutor.executeQuery(executionGroupContext, callback);
}
private List<QueryResult> executeQueryByCalcite() throws SQLException {
@@ -200,11 +201,11 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
try {
executionContext = createExecutionContext(sql);
if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
- return accumulate(rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback()));
+ return accumulate(rawExecutor.execute(createRawExecutionContext(), new RawSQLExecutorCallback()));
}
- Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
- cacheStatements(executionGroups);
- return executeUpdate(executionGroups,
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionContext();
+ cacheStatements(executionGroupContext.getInputGroups());
+ return executeUpdate(executionGroupContext,
(actualSQL, statement) -> statement.executeUpdate(actualSQL), executionContext.getSqlStatementContext(), executionContext.getRouteContext().getRouteUnits());
} finally {
currentResultSet = null;
@@ -219,11 +220,11 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
try {
executionContext = createExecutionContext(sql);
if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
- return accumulate(rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback()));
+ return accumulate(rawExecutor.execute(createRawExecutionContext(), new RawSQLExecutorCallback()));
}
- Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
- cacheStatements(executionGroups);
- return executeUpdate(executionGroups,
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionContext();
+ cacheStatements(executionGroupContext.getInputGroups());
+ return executeUpdate(executionGroupContext,
(actualSQL, statement) -> statement.executeUpdate(actualSQL, autoGeneratedKeys), executionContext.getSqlStatementContext(), executionContext.getRouteContext().getRouteUnits());
} finally {
currentResultSet = null;
@@ -236,10 +237,10 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
try {
executionContext = createExecutionContext(sql);
if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
- return accumulate(rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback()));
+ return accumulate(rawExecutor.execute(createRawExecutionContext(), new RawSQLExecutorCallback()));
}
- Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
- cacheStatements(executionGroups);
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroups = createExecutionContext();
+ cacheStatements(executionGroups.getInputGroups());
return executeUpdate(executionGroups,
(actualSQL, statement) -> statement.executeUpdate(actualSQL, columnIndexes), executionContext.getSqlStatementContext(), executionContext.getRouteContext().getRouteUnits());
} finally {
@@ -253,18 +254,18 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
try {
executionContext = createExecutionContext(sql);
if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
- return accumulate(rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback()));
+ return accumulate(rawExecutor.execute(createRawExecutionContext(), new RawSQLExecutorCallback()));
}
- Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
- cacheStatements(executionGroups);
- return executeUpdate(executionGroups,
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionContext();
+ cacheStatements(executionGroupContext.getInputGroups());
+ return executeUpdate(executionGroupContext,
(actualSQL, statement) -> statement.executeUpdate(actualSQL, columnNames), executionContext.getSqlStatementContext(), executionContext.getRouteContext().getRouteUnits());
} finally {
currentResultSet = null;
}
}
- private int executeUpdate(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups, final ExecuteUpdateCallback updater,
+ private int executeUpdate(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final ExecuteUpdateCallback updater,
final SQLStatementContext<?> sqlStatementContext, final Collection<RouteUnit> routeUnits) throws SQLException {
boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
JDBCExecutorCallback<Integer> callback = new JDBCExecutorCallback<Integer>(
@@ -280,7 +281,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
return Optional.of(0);
}
};
- return driverJDBCExecutor.executeUpdate(executionGroups, sqlStatementContext, routeUnits, callback);
+ return driverJDBCExecutor.executeUpdate(executionGroupContext, sqlStatementContext, routeUnits, callback);
}
private int accumulate(final Collection<ExecuteResult> results) {
@@ -297,12 +298,12 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
executionContext = createExecutionContext(sql);
if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
// TODO process getStatement
- Collection<ExecuteResult> results = rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+ Collection<ExecuteResult> results = rawExecutor.execute(createRawExecutionContext(), new RawSQLExecutorCallback());
return results.iterator().next() instanceof QueryResult;
}
- Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
- cacheStatements(executionGroups);
- return execute(executionGroups,
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionContext();
+ cacheStatements(executionGroupContext.getInputGroups());
+ return execute(executionGroupContext,
(actualSQL, statement) -> statement.execute(actualSQL), executionContext.getSqlStatementContext().getSqlStatement(), executionContext.getRouteContext().getRouteUnits());
} finally {
currentResultSet = null;
@@ -318,12 +319,12 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
executionContext = createExecutionContext(sql);
if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
// TODO process getStatement
- Collection<ExecuteResult> results = rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+ Collection<ExecuteResult> results = rawExecutor.execute(createRawExecutionContext(), new RawSQLExecutorCallback());
return results.iterator().next() instanceof QueryResult;
}
- Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
- cacheStatements(executionGroups);
- return execute(executionGroups, (actualSQL, statement) -> statement.execute(actualSQL, autoGeneratedKeys),
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionContext();
+ cacheStatements(executionGroupContext.getInputGroups());
+ return execute(executionGroupContext, (actualSQL, statement) -> statement.execute(actualSQL, autoGeneratedKeys),
executionContext.getSqlStatementContext().getSqlStatement(), executionContext.getRouteContext().getRouteUnits());
} finally {
currentResultSet = null;
@@ -337,12 +338,12 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
executionContext = createExecutionContext(sql);
if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
// TODO process getStatement
- Collection<ExecuteResult> results = rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+ Collection<ExecuteResult> results = rawExecutor.execute(createRawExecutionContext(), new RawSQLExecutorCallback());
return results.iterator().next() instanceof QueryResult;
}
- Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
- cacheStatements(executionGroups);
- return execute(executionGroups, (actualSQL, statement) -> statement.execute(actualSQL, columnIndexes),
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionContext();
+ cacheStatements(executionGroupContext.getInputGroups());
+ return execute(executionGroupContext, (actualSQL, statement) -> statement.execute(actualSQL, columnIndexes),
executionContext.getSqlStatementContext().getSqlStatement(), executionContext.getRouteContext().getRouteUnits());
} finally {
currentResultSet = null;
@@ -356,19 +357,19 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
executionContext = createExecutionContext(sql);
if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
// TODO process getStatement
- Collection<ExecuteResult> results = rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
+ Collection<ExecuteResult> results = rawExecutor.execute(createRawExecutionContext(), new RawSQLExecutorCallback());
return results.iterator().next() instanceof QueryResult;
}
- Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
- cacheStatements(executionGroups);
- return execute(executionGroups, (actualSQL, statement) -> statement.execute(actualSQL, columnNames),
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionContext();
+ cacheStatements(executionGroupContext.getInputGroups());
+ return execute(executionGroupContext, (actualSQL, statement) -> statement.execute(actualSQL, columnNames),
executionContext.getSqlStatementContext().getSqlStatement(), executionContext.getRouteContext().getRouteUnits());
} finally {
currentResultSet = null;
}
}
- private boolean execute(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups, final ExecuteCallback executor,
+ private boolean execute(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final ExecuteCallback executor,
final SQLStatement sqlStatement, final Collection<RouteUnit> routeUnits) throws SQLException {
boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
JDBCExecutorCallback<Boolean> jdbcExecutorCallback = new JDBCExecutorCallback<Boolean>(
@@ -384,7 +385,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
return Optional.of(sqlStatement instanceof SelectStatement);
}
};
- return driverJDBCExecutor.execute(executionGroups, sqlStatement, routeUnits, jdbcExecutorCallback);
+ return driverJDBCExecutor.execute(executionGroupContext, sqlStatement, routeUnits, jdbcExecutorCallback);
}
private ExecutionContext createExecutionContext(final String sql) throws SQLException {
@@ -410,14 +411,14 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
return new LogicSQL(sqlStatementContext, sql, Collections.emptyList());
}
- private Collection<ExecutionGroup<JDBCExecutionUnit>> createExecutionGroups() throws SQLException {
+ private ExecutionGroupContext<JDBCExecutionUnit> createExecutionContext() throws SQLException {
int maxConnectionsSizePerQuery = metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(
JDBCDriverType.STATEMENT, maxConnectionsSizePerQuery, connection, statementOption, metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules());
return prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
}
- private Collection<ExecutionGroup<RawSQLExecutionUnit>> createRawExecutionGroups() throws SQLException {
+ private ExecutionGroupContext<RawSQLExecutionUnit> createRawExecutionContext() throws SQLException {
int maxConnectionsSizePerQuery = metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules())
.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutorTest.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutorTest.java
index 943fc7a..fe24a9d 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutorTest.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutorTest.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.driver.executor.batch;
import lombok.SneakyThrows;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import org.apache.shardingsphere.driver.executor.AbstractBaseExecutorTest;
import org.apache.shardingsphere.infra.binder.segment.table.TablesContext;
@@ -144,9 +145,9 @@ public final class BatchPreparedStatementExecutorTest extends AbstractBaseExecut
@SneakyThrows(ReflectiveOperationException.class)
private void setFields(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups, final Collection<BatchExecutionUnit> batchExecutionUnits) {
- Field field = BatchPreparedStatementExecutor.class.getDeclaredField("executionGroups");
+ Field field = BatchPreparedStatementExecutor.class.getDeclaredField("executionGroupContext");
field.setAccessible(true);
- field.set(actual, executionGroups);
+ field.set(actual, new ExecutionGroupContext<>(executionGroups));
field = BatchPreparedStatementExecutor.class.getDeclaredField("batchExecutionUnits");
field.setAccessible(true);
field.set(actual, batchExecutionUnits);
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index 1e53dcf..b556504 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -21,7 +21,7 @@ import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKe
import org.apache.shardingsphere.infra.context.metadata.MetaDataContexts;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
@@ -125,14 +125,14 @@ public final class ProxySQLExecutor {
private Collection<ExecuteResult> rawExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules, final int maxConnectionsSizePerQuery) throws SQLException {
RawExecutionPrepareEngine prepareEngine = new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, rules);
- Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups;
+ ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext;
try {
- executionGroups = prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
+ executionGroupContext = prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
} catch (final SQLException ex) {
return getSaneExecuteResults(executionContext, ex);
}
// TODO handle query header
- return rawExecutor.execute(executionGroups, new RawSQLExecutorCallback());
+ return rawExecutor.execute(executionGroupContext, new RawSQLExecutorCallback());
}
private Collection<ExecuteResult> useCalciteToExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules,
@@ -154,13 +154,13 @@ public final class ProxySQLExecutor {
final int maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(
type, maxConnectionsSizePerQuery, backendConnection, new StatementOption(isReturnGeneratedKeys), rules);
- Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups;
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext;
try {
- executionGroups = prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
+ executionGroupContext = prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
} catch (final SQLException ex) {
return getSaneExecuteResults(executionContext, ex);
}
- return jdbcExecutor.execute(executionContext.getSqlStatementContext().getSqlStatement(), executionGroups, isReturnGeneratedKeys, isExceptionThrown);
+ return jdbcExecutor.execute(executionContext.getSqlStatementContext().getSqlStatement(), executionGroupContext, isReturnGeneratedKeys, isExceptionThrown);
}
private Collection<ExecuteResult> getSaneExecuteResults(final ExecutionContext executionContext, final SQLException originalException) throws SQLException {
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
index 2b1dd82..bfdb293 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.proxy.backend.communication.jdbc.executor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
@@ -49,16 +49,16 @@ public final class ProxyJDBCExecutor {
* Execute.
*
* @param sqlStatement SQL statement
- * @param executionGroups execution groups
+ * @param executionGroupContext execution group context
* @param isReturnGeneratedKeys is return generated keys
* @param isExceptionThrown is exception thrown
* @return execute results
* @throws SQLException SQL exception
*/
- public Collection<ExecuteResult> execute(final SQLStatement sqlStatement, final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups,
+ public Collection<ExecuteResult> execute(final SQLStatement sqlStatement, final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
final boolean isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
DatabaseType databaseType = ProxyContext.getInstance().getMetaDataContexts().getMetaData(backendConnection.getSchemaName()).getResource().getDatabaseType();
- return jdbcExecutor.execute(executionGroups,
+ return jdbcExecutor.execute(executionGroupContext,
ProxyJDBCExecutorCallbackFactory.newInstance(type, databaseType, sqlStatement, backendConnection, isReturnGeneratedKeys, isExceptionThrown, true),
ProxyJDBCExecutorCallbackFactory.newInstance(type, databaseType, sqlStatement, backendConnection, isReturnGeneratedKeys, isExceptionThrown, false));
}