You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by to...@apache.org on 2023/05/01 17:23:44 UTC
[shardingsphere] branch master updated: Merge ProcessReporter and ProcessEngine (#25427)
This is an automated email from the ASF dual-hosted git repository.
totalo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 3d421ac95d5 Merge ProcessReporter and ProcessEngine (#25427)
3d421ac95d5 is described below
commit 3d421ac95d5680efa8d8d1e1b5fa9ba1c7141bd6
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Tue May 2 01:23:29 2023 +0800
Merge ProcessReporter and ProcessEngine (#25427)
---
.../engine/driver/jdbc/JDBCExecutorCallback.java | 2 +-
.../sql/execute/engine/raw/RawExecutor.java | 4 +-
.../raw/callback/RawSQLExecutorCallback.java | 6 +-
.../infra/executor/sql/process/ProcessEngine.java | 55 ++++++++-----
...ExecuteIDContext.java => ProcessIDContext.java} | 29 +++----
.../executor/sql/process/ProcessReporter.java | 91 ----------------------
...essReporterTest.java => ProcessEngineTest.java} | 30 ++++---
...DContextTest.java => ProcessIDContextTest.java} | 36 ++++-----
.../driver/executor/DriverJDBCExecutor.java | 12 +--
.../executor/FilterableTableScanExecutor.java | 4 +-
.../executor/TranslatableTableScanExecutor.java | 8 +-
.../connector/jdbc/executor/ProxyJDBCExecutor.java | 4 +-
.../netty/FrontendChannelInboundHandler.java | 4 +-
13 files changed, 109 insertions(+), 176 deletions(-)
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
index 2113466b73e..7f3704f1d38 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
@@ -86,7 +86,7 @@ public abstract class JDBCExecutorCallback<T> implements ExecutorCallback<JDBCEx
sqlExecutionHook.start(jdbcExecutionUnit.getExecutionUnit().getDataSourceName(), sqlUnit.getSql(), sqlUnit.getParameters(), dataSourceMetaData, isTrunkThread);
T result = executeSQL(sqlUnit.getSql(), jdbcExecutionUnit.getStorageResource(), jdbcExecutionUnit.getConnectionMode(), storageType);
sqlExecutionHook.finishSuccess();
- processEngine.finishExecution();
+ processEngine.completeSQLUnitExecution();
return result;
} catch (final SQLException ex) {
if (!storageType.equals(protocolType)) {
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
index b481e08d80c..6cfdddcfafb 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
@@ -57,12 +57,12 @@ public final class RawExecutor {
public List<ExecuteResult> execute(final ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext,
final QueryContext queryContext, final RawSQLExecutorCallback callback) throws SQLException {
try {
- processEngine.initializeExecution(executionGroupContext, queryContext);
+ processEngine.executeSQL(executionGroupContext, queryContext);
// TODO Load query header for first query
List<ExecuteResult> results = execute(executionGroupContext, (RawSQLExecutorCallback) null, callback);
return results.isEmpty() || Objects.isNull(results.get(0)) ? Collections.singletonList(new UpdateResult(0, 0L)) : results;
} finally {
- processEngine.cleanExecution();
+ processEngine.completeSQLExecution();
}
}
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
index 01a3880d6c3..8edc78144ee 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
@@ -21,7 +21,7 @@ import com.google.common.base.Preconditions;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
-import org.apache.shardingsphere.infra.executor.sql.process.ExecuteIDContext;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessIDContext;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
@@ -47,9 +47,9 @@ public final class RawSQLExecutorCallback implements ExecutorCallback<RawSQLExec
@Override
public Collection<ExecuteResult> execute(final Collection<RawSQLExecutionUnit> inputs, final boolean isTrunkThread) throws SQLException {
Collection<ExecuteResult> result = callbacks.iterator().next().execute(inputs, isTrunkThread);
- if (!ExecuteIDContext.isEmpty()) {
+ if (!ProcessIDContext.isEmpty()) {
for (int i = 0; i < inputs.size(); i++) {
- processEngine.finishExecution();
+ processEngine.completeSQLUnitExecution();
}
}
return result;
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
index f745b07e19c..10a322445df 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
@@ -19,72 +19,85 @@ package org.apache.shardingsphere.infra.executor.sql.process;
import org.apache.shardingsphere.infra.binder.QueryContext;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DDLStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.MySQLStatement;
+import java.util.Collections;
+
/**
* Process engine.
*/
public final class ProcessEngine {
- private final ProcessReporter reporter = new ProcessReporter();
-
/**
- * Initialize connection.
+ * Connect.
*
* @param grantee grantee
* @param databaseName database name
* @return process ID
*/
- public String initializeConnection(final Grantee grantee, final String databaseName) {
- return reporter.reportConnect(grantee, databaseName);
+ public String connect(final Grantee grantee, final String databaseName) {
+ ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext = new ExecutionGroupContext<>(Collections.emptyList(), new ExecutionGroupReportContext(databaseName, grantee));
+ ProcessContext processContext = new ProcessContext(executionGroupContext);
+ ProcessRegistry.getInstance().putProcessContext(processContext.getId(), processContext);
+ return executionGroupContext.getReportContext().getProcessID();
}
/**
- * Finish connection.
+ * Disconnect.
*
* @param processID process ID
*/
- public void finishConnection(final String processID) {
- reporter.remove(processID);
+ public void disconnect(final String processID) {
+ ProcessRegistry.getInstance().removeProcessContext(processID);
+
}
/**
- * Initialize execution.
+ * Execute SQL.
*
* @param executionGroupContext execution group context
* @param queryContext query context
*/
- public void initializeExecution(final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext, final QueryContext queryContext) {
+ public void executeSQL(final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext, final QueryContext queryContext) {
if (isMySQLDDLOrDMLStatement(queryContext.getSqlStatementContext().getSqlStatement())) {
- ExecuteIDContext.set(executionGroupContext.getReportContext().getProcessID());
- reporter.reportExecute(queryContext, executionGroupContext);
+ ProcessIDContext.set(executionGroupContext.getReportContext().getProcessID());
+ ProcessContext processContext = new ProcessContext(queryContext.getSql(), executionGroupContext);
+ ProcessRegistry.getInstance().putProcessContext(processContext.getId(), processContext);
}
}
/**
- * Finish execution.
+ * Complete SQL unit execution.
*/
- public void finishExecution() {
- if (ExecuteIDContext.isEmpty()) {
+ public void completeSQLUnitExecution() {
+ if (ProcessIDContext.isEmpty()) {
return;
}
- reporter.reportComplete(ExecuteIDContext.get());
+ ProcessRegistry.getInstance().getProcessContext(ProcessIDContext.get()).completeExecutionUnit();
}
/**
- * Clean execution.
+ * Complete SQL execution.
*/
- public void cleanExecution() {
- if (ExecuteIDContext.isEmpty()) {
+ public void completeSQLExecution() {
+ if (ProcessIDContext.isEmpty()) {
return;
}
- reporter.reset(ExecuteIDContext.get());
- ExecuteIDContext.remove();
+ ProcessContext context = ProcessRegistry.getInstance().getProcessContext(ProcessIDContext.get());
+ if (null == context) {
+ return;
+ }
+ for (ProcessReporterCleaner each : ShardingSphereServiceLoader.getServiceInstances(ProcessReporterCleaner.class)) {
+ each.reset(context);
+ }
+ ProcessIDContext.remove();
}
private boolean isMySQLDDLOrDMLStatement(final SQLStatement sqlStatement) {
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteIDContext.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIDContext.java
similarity index 68%
rename from infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteIDContext.java
rename to infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIDContext.java
index e1f5bfbf18e..c444515ec9d 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteIDContext.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIDContext.java
@@ -21,45 +21,46 @@ import com.alibaba.ttl.TransmittableThreadLocal;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+// TODO should remove the class, process ID should same with connection ID
/**
- * Execute ID context.
+ * Process ID context.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class ExecuteIDContext {
+public final class ProcessIDContext {
- private static final TransmittableThreadLocal<String> EXECUTE_ID = new TransmittableThreadLocal<>();
+ private static final TransmittableThreadLocal<String> PROCESS_ID = new TransmittableThreadLocal<>();
/**
- * Judge whether execute ID is empty or not.
+ * Judge whether process ID is empty or not.
*
- * @return whether execute ID is empty or not
+ * @return whether process ID is empty or not
*/
public static boolean isEmpty() {
- return null == EXECUTE_ID.get();
+ return null == PROCESS_ID.get();
}
/**
- * Get execute ID.
+ * Get process ID.
*
- * @return execute ID
+ * @return process ID
*/
public static String get() {
- return EXECUTE_ID.get();
+ return PROCESS_ID.get();
}
/**
- * Set execute ID.
+ * Set process ID.
*
- * @param executeId execute ID
+ * @param executeId process ID
*/
public static void set(final String executeId) {
- EXECUTE_ID.set(executeId);
+ PROCESS_ID.set(executeId);
}
/**
- * Remove execute ID.
+ * Remove process ID.
*/
public static void remove() {
- EXECUTE_ID.remove();
+ PROCESS_ID.remove();
}
}
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java
deleted file mode 100644
index bbb5e2510f9..00000000000
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.executor.sql.process;
-
-import org.apache.shardingsphere.infra.binder.QueryContext;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
-import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
-import org.apache.shardingsphere.infra.metadata.user.Grantee;
-import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
-
-import java.util.Collections;
-
-/**
- * Process report.
- */
-public final class ProcessReporter {
-
- /**
- * Report connect.
- *
- * @param grantee grantee
- * @param databaseName databaseName
- * @return process ID
- */
- public String reportConnect(final Grantee grantee, final String databaseName) {
- ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext = new ExecutionGroupContext<>(Collections.emptyList(), new ExecutionGroupReportContext(databaseName, grantee));
- ProcessContext processContext = new ProcessContext(executionGroupContext);
- ProcessRegistry.getInstance().putProcessContext(processContext.getId(), processContext);
- return executionGroupContext.getReportContext().getProcessID();
- }
-
- /**
- * Report execute.
- *
- * @param queryContext query context
- * @param executionGroupContext execution group context
- */
- public void reportExecute(final QueryContext queryContext, final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext) {
- ProcessContext processContext = new ProcessContext(queryContext.getSql(), executionGroupContext);
- ProcessRegistry.getInstance().putProcessContext(processContext.getId(), processContext);
- }
-
- /**
- * Report complete execution unit.
- *
- * @param processID process ID
- */
- public void reportComplete(final String processID) {
- ProcessRegistry.getInstance().getProcessContext(processID).completeExecutionUnit();
- }
-
- /**
- * Reset report.
- *
- * @param processID process ID
- */
- public void reset(final String processID) {
- ProcessContext context = ProcessRegistry.getInstance().getProcessContext(processID);
- if (null == context) {
- return;
- }
- for (ProcessReporterCleaner each : ShardingSphereServiceLoader.getServiceInstances(ProcessReporterCleaner.class)) {
- each.reset(context);
- }
- }
-
- /**
- * Remove process context.
- *
- * @param processID process ID
- */
- public void remove(final String processID) {
- ProcessRegistry.getInstance().removeProcessContext(processID);
- }
-}
diff --git a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
similarity index 69%
rename from infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java
rename to infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
index 6a53ce5185a..2f8fdf05683 100644
--- a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java
+++ b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
@@ -18,9 +18,15 @@
package org.apache.shardingsphere.infra.executor.sql.process;
import org.apache.shardingsphere.infra.binder.QueryContext;
+import org.apache.shardingsphere.infra.binder.statement.dml.UpdateStatementContext;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
+import org.apache.shardingsphere.sql.parser.sql.common.segment.dml.assignment.SetAssignmentSegment;
+import org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.SimpleTableSegment;
+import org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.TableNameSegment;
+import org.apache.shardingsphere.sql.parser.sql.common.value.identifier.IdentifierValue;
+import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLUpdateStatement;
import org.apache.shardingsphere.test.mock.AutoMockExtension;
import org.apache.shardingsphere.test.mock.StaticMockSettings;
import org.junit.jupiter.api.BeforeEach;
@@ -28,6 +34,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
+import java.util.Collections;
import java.util.UUID;
import static org.mockito.ArgumentMatchers.any;
@@ -38,7 +45,7 @@ import static org.mockito.Mockito.when;
@ExtendWith(AutoMockExtension.class)
@StaticMockSettings(ProcessRegistry.class)
-class ProcessReporterTest {
+class ProcessEngineTest {
@Mock
private ProcessRegistry processRegistry;
@@ -49,9 +56,9 @@ class ProcessReporterTest {
}
@Test
- void assertReportExecute() {
+ void assertExecuteSQL() {
ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext = mockExecutionGroupContext();
- new ProcessReporter().reportExecute(new QueryContext(null, null, null), executionGroupContext);
+ new ProcessEngine().executeSQL(executionGroupContext, new QueryContext(new UpdateStatementContext(getSQLStatement()), null, null));
verify(processRegistry).putProcessContext(eq(executionGroupContext.getReportContext().getProcessID()), any());
}
@@ -64,16 +71,19 @@ class ProcessReporterTest {
return result;
}
- @Test
- void assertReportUnit() {
- when(processRegistry.getProcessContext("foo_id")).thenReturn(mock(ProcessContext.class));
- new ProcessReporter().reportComplete("foo_id");
- verify(processRegistry).getProcessContext("foo_id");
+ private MySQLUpdateStatement getSQLStatement() {
+ MySQLUpdateStatement result = new MySQLUpdateStatement();
+ result.setTable(new SimpleTableSegment(new TableNameSegment(0, 0, new IdentifierValue("foo_tbl"))));
+ result.setSetAssignment(new SetAssignmentSegment(0, 0, Collections.emptyList()));
+ return result;
}
@Test
- void assertReportClean() {
+ void assertCompleteSQLUnitExecution() {
+ ProcessIDContext.set("foo_id");
when(processRegistry.getProcessContext("foo_id")).thenReturn(mock(ProcessContext.class));
- new ProcessReporter().reset("foo_id");
+ new ProcessEngine().completeSQLUnitExecution();
+ verify(processRegistry).getProcessContext("foo_id");
+ ProcessIDContext.remove();
}
}
diff --git a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteIDContextTest.java b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIDContextTest.java
similarity index 61%
rename from infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteIDContextTest.java
rename to infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIDContextTest.java
index d6f108b747d..a0e61c6c793 100644
--- a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteIDContextTest.java
+++ b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIDContextTest.java
@@ -26,42 +26,42 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-class ExecuteIDContextTest {
+class ProcessIDContextTest {
@AfterEach
void tearDown() {
- ExecuteIDContext.remove();
+ ProcessIDContext.remove();
}
@Test
void assertIsEmpty() {
- assertTrue(ExecuteIDContext.isEmpty());
- ExecuteIDContext.set("123e4567e89b12d3a456426655440000");
- assertFalse(ExecuteIDContext.isEmpty());
+ assertTrue(ProcessIDContext.isEmpty());
+ ProcessIDContext.set("123e4567e89b12d3a456426655440000");
+ assertFalse(ProcessIDContext.isEmpty());
}
@Test
void assertGet() {
- assertNull(ExecuteIDContext.get());
- ExecuteIDContext.set("123e4567e89b12d3a456426655440000");
- assertThat(ExecuteIDContext.get(), is("123e4567e89b12d3a456426655440000"));
+ assertNull(ProcessIDContext.get());
+ ProcessIDContext.set("123e4567e89b12d3a456426655440000");
+ assertThat(ProcessIDContext.get(), is("123e4567e89b12d3a456426655440000"));
}
@Test
void assertSet() {
- assertNull(ExecuteIDContext.get());
- ExecuteIDContext.set("123e4567e89b12d3a456426655440000");
- assertThat(ExecuteIDContext.get(), is("123e4567e89b12d3a456426655440000"));
- ExecuteIDContext.set("123e4567e89b12d3a456426655440001");
- assertThat(ExecuteIDContext.get(), is("123e4567e89b12d3a456426655440001"));
+ assertNull(ProcessIDContext.get());
+ ProcessIDContext.set("123e4567e89b12d3a456426655440000");
+ assertThat(ProcessIDContext.get(), is("123e4567e89b12d3a456426655440000"));
+ ProcessIDContext.set("123e4567e89b12d3a456426655440001");
+ assertThat(ProcessIDContext.get(), is("123e4567e89b12d3a456426655440001"));
}
@Test
void assertRemove() {
- assertNull(ExecuteIDContext.get());
- ExecuteIDContext.set("123e4567e89b12d3a456426655440000");
- assertThat(ExecuteIDContext.get(), is("123e4567e89b12d3a456426655440000"));
- ExecuteIDContext.remove();
- assertNull(ExecuteIDContext.get());
+ assertNull(ProcessIDContext.get());
+ ProcessIDContext.set("123e4567e89b12d3a456426655440000");
+ assertThat(ProcessIDContext.get(), is("123e4567e89b12d3a456426655440000"));
+ ProcessIDContext.remove();
+ assertNull(ProcessIDContext.get());
}
}
diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
index 9633ac77d09..c185f9d7455 100644
--- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
+++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
@@ -72,10 +72,10 @@ public final class DriverJDBCExecutor {
public List<QueryResult> executeQuery(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
final QueryContext queryContext, final ExecuteQueryCallback callback) throws SQLException {
try {
- processEngine.initializeExecution(executionGroupContext, queryContext);
+ processEngine.executeSQL(executionGroupContext, queryContext);
return jdbcExecutor.execute(executionGroupContext, callback);
} finally {
- processEngine.cleanExecution();
+ processEngine.completeSQLExecution();
}
}
@@ -92,12 +92,12 @@ public final class DriverJDBCExecutor {
public int executeUpdate(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
final QueryContext queryContext, final Collection<RouteUnit> routeUnits, final JDBCExecutorCallback<Integer> callback) throws SQLException {
try {
- processEngine.initializeExecution(executionGroupContext, queryContext);
+ processEngine.executeSQL(executionGroupContext, queryContext);
SQLStatementContext<?> sqlStatementContext = queryContext.getSqlStatementContext();
List<Integer> results = doExecute(executionGroupContext, sqlStatementContext, routeUnits, callback);
return isNeedAccumulate(metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules(), sqlStatementContext) ? accumulate(results) : results.get(0);
} finally {
- processEngine.cleanExecution();
+ processEngine.completeSQLExecution();
}
}
@@ -131,11 +131,11 @@ public final class DriverJDBCExecutor {
public boolean execute(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final QueryContext queryContext,
final Collection<RouteUnit> routeUnits, final JDBCExecutorCallback<Boolean> callback) throws SQLException {
try {
- processEngine.initializeExecution(executionGroupContext, queryContext);
+ processEngine.executeSQL(executionGroupContext, queryContext);
List<Boolean> results = doExecute(executionGroupContext, queryContext.getSqlStatementContext(), routeUnits, callback);
return null != results && !results.isEmpty() && null != results.get(0) && results.get(0);
} finally {
- processEngine.cleanExecution();
+ processEngine.completeSQLExecution();
}
}
diff --git a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java
index 8b7948d8b6d..537df40f871 100644
--- a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java
+++ b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java
@@ -155,7 +155,7 @@ public final class FilterableTableScanExecutor implements TableScanExecutor {
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
prepareEngine.prepare(context.getRouteContext(), context.getExecutionUnits(), new ExecutionGroupReportContext(database.getName()));
setParameters(executionGroupContext.getInputGroups());
- processEngine.initializeExecution(executionGroupContext, context.getQueryContext());
+ processEngine.executeSQL(executionGroupContext, context.getQueryContext());
List<QueryResult> queryResults = execute(executionGroupContext, databaseType);
// TODO need to get session context
MergeEngine mergeEngine = new MergeEngine(database, executorContext.getProps(), new ConnectionContext());
@@ -165,7 +165,7 @@ public final class FilterableTableScanExecutor implements TableScanExecutor {
} catch (final SQLException ex) {
throw new SQLWrapperException(ex);
} finally {
- processEngine.cleanExecution();
+ processEngine.completeSQLExecution();
}
}
diff --git a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java
index b30fb712a7b..461e1549b2f 100644
--- a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java
+++ b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java
@@ -181,7 +181,7 @@ public final class TranslatableTableScanExecutor implements TableScanExecutor {
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
prepareEngine.prepare(context.getRouteContext(), context.getExecutionUnits(), new ExecutionGroupReportContext(database.getName()));
setParameters(executionGroupContext.getInputGroups());
- processEngine.initializeExecution(executionGroupContext, context.getQueryContext());
+ processEngine.executeSQL(executionGroupContext, context.getQueryContext());
List<QueryResult> queryResults = execute(executionGroupContext, databaseType);
MergeEngine mergeEngine = new MergeEngine(database, executorContext.getProps(), new ConnectionContext());
MergedResult mergedResult = mergeEngine.merge(queryResults, queryContext.getSqlStatementContext());
@@ -190,7 +190,7 @@ public final class TranslatableTableScanExecutor implements TableScanExecutor {
} catch (final SQLException ex) {
throw new SQLWrapperException(ex);
} finally {
- processEngine.cleanExecution();
+ processEngine.completeSQLExecution();
}
}
@@ -240,7 +240,7 @@ public final class TranslatableTableScanExecutor implements TableScanExecutor {
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
prepareEngine.prepare(context.getRouteContext(), context.getExecutionUnits(), new ExecutionGroupReportContext(database.getName()));
setParameters(executionGroupContext.getInputGroups());
- processEngine.initializeExecution(executionGroupContext, context.getQueryContext());
+ processEngine.executeSQL(executionGroupContext, context.getQueryContext());
List<QueryResult> queryResults = execute(executionGroupContext, databaseType);
MergeEngine mergeEngine = new MergeEngine(database, executorContext.getProps(), new ConnectionContext());
MergedResult mergedResult = mergeEngine.merge(queryResults, queryContext.getSqlStatementContext());
@@ -249,7 +249,7 @@ public final class TranslatableTableScanExecutor implements TableScanExecutor {
} catch (final SQLException ex) {
throw new SQLWrapperException(ex);
} finally {
- processEngine.cleanExecution();
+ processEngine.completeSQLExecution();
}
}
diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/executor/ProxyJDBCExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/executor/ProxyJDBCExecutor.java
index 64eb74db2c7..1f98a935e90 100644
--- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/executor/ProxyJDBCExecutor.java
+++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/executor/ProxyJDBCExecutor.java
@@ -70,7 +70,7 @@ public final class ProxyJDBCExecutor {
ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName());
DatabaseType protocolType = database.getProtocolType();
Map<String, DatabaseType> storageTypes = database.getResourceMetaData().getStorageTypes();
- processEngine.initializeExecution(executionGroupContext, queryContext);
+ processEngine.executeSQL(executionGroupContext, queryContext);
SQLStatementContext<?> context = queryContext.getSqlStatementContext();
return jdbcExecutor.execute(executionGroupContext,
ProxyJDBCExecutorCallbackFactory.newInstance(type, protocolType, storageTypes, context.getSqlStatement(), databaseConnector, isReturnGeneratedKeys, isExceptionThrown,
@@ -78,7 +78,7 @@ public final class ProxyJDBCExecutor {
ProxyJDBCExecutorCallbackFactory.newInstance(type, protocolType, storageTypes, context.getSqlStatement(), databaseConnector, isReturnGeneratedKeys, isExceptionThrown,
false));
} finally {
- processEngine.cleanExecution();
+ processEngine.completeSQLExecution();
}
}
}
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
index 7388694be47..d046b9dcc5b 100644
--- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
@@ -82,7 +82,7 @@ public final class FrontendChannelInboundHandler extends ChannelInboundHandlerAd
if (authResult.isFinished()) {
connectionSession.setGrantee(new Grantee(authResult.getUsername(), authResult.getHostname()));
connectionSession.setCurrentDatabase(authResult.getDatabase());
- connectionSession.setProcessId(processEngine.initializeConnection(connectionSession.getGrantee(), connectionSession.getDatabaseName()));
+ connectionSession.setProcessId(processEngine.connect(connectionSession.getGrantee(), connectionSession.getDatabaseName()));
}
return authResult.isFinished();
// CHECKSTYLE:OFF
@@ -108,7 +108,7 @@ public final class FrontendChannelInboundHandler extends ChannelInboundHandlerAd
private void closeAllResources() {
ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(connectionSession.getConnectionId());
connectionSession.getBackendConnection().closeAllResources();
- Optional.ofNullable(connectionSession.getProcessId()).ifPresent(processEngine::finishConnection);
+ Optional.ofNullable(connectionSession.getProcessId()).ifPresent(processEngine::disconnect);
databaseProtocolFrontendEngine.release(connectionSession);
}