You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by to...@apache.org on 2023/05/01 17:23:44 UTC

[shardingsphere] branch master updated: Merge ProcessReporter and ProcessEngine (#25427)

This is an automated email from the ASF dual-hosted git repository.

totalo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d421ac95d5 Merge ProcessReporter and ProcessEngine (#25427)
3d421ac95d5 is described below

commit 3d421ac95d5680efa8d8d1e1b5fa9ba1c7141bd6
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Tue May 2 01:23:29 2023 +0800

    Merge ProcessReporter and ProcessEngine (#25427)
---
 .../engine/driver/jdbc/JDBCExecutorCallback.java   |  2 +-
 .../sql/execute/engine/raw/RawExecutor.java        |  4 +-
 .../raw/callback/RawSQLExecutorCallback.java       |  6 +-
 .../infra/executor/sql/process/ProcessEngine.java  | 55 ++++++++-----
 ...ExecuteIDContext.java => ProcessIDContext.java} | 29 +++----
 .../executor/sql/process/ProcessReporter.java      | 91 ----------------------
 ...essReporterTest.java => ProcessEngineTest.java} | 30 ++++---
 ...DContextTest.java => ProcessIDContextTest.java} | 36 ++++-----
 .../driver/executor/DriverJDBCExecutor.java        | 12 +--
 .../executor/FilterableTableScanExecutor.java      |  4 +-
 .../executor/TranslatableTableScanExecutor.java    |  8 +-
 .../connector/jdbc/executor/ProxyJDBCExecutor.java |  4 +-
 .../netty/FrontendChannelInboundHandler.java       |  4 +-
 13 files changed, 109 insertions(+), 176 deletions(-)

diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
index 2113466b73e..7f3704f1d38 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
@@ -86,7 +86,7 @@ public abstract class JDBCExecutorCallback<T> implements ExecutorCallback<JDBCEx
             sqlExecutionHook.start(jdbcExecutionUnit.getExecutionUnit().getDataSourceName(), sqlUnit.getSql(), sqlUnit.getParameters(), dataSourceMetaData, isTrunkThread);
             T result = executeSQL(sqlUnit.getSql(), jdbcExecutionUnit.getStorageResource(), jdbcExecutionUnit.getConnectionMode(), storageType);
             sqlExecutionHook.finishSuccess();
-            processEngine.finishExecution();
+            processEngine.completeSQLUnitExecution();
             return result;
         } catch (final SQLException ex) {
             if (!storageType.equals(protocolType)) {
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
index b481e08d80c..6cfdddcfafb 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
@@ -57,12 +57,12 @@ public final class RawExecutor {
     public List<ExecuteResult> execute(final ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext,
                                        final QueryContext queryContext, final RawSQLExecutorCallback callback) throws SQLException {
         try {
-            processEngine.initializeExecution(executionGroupContext, queryContext);
+            processEngine.executeSQL(executionGroupContext, queryContext);
             // TODO Load query header for first query
             List<ExecuteResult> results = execute(executionGroupContext, (RawSQLExecutorCallback) null, callback);
             return results.isEmpty() || Objects.isNull(results.get(0)) ? Collections.singletonList(new UpdateResult(0, 0L)) : results;
         } finally {
-            processEngine.cleanExecution();
+            processEngine.completeSQLExecution();
         }
     }
     
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
index 01a3880d6c3..8edc78144ee 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
@@ -21,7 +21,7 @@ import com.google.common.base.Preconditions;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
-import org.apache.shardingsphere.infra.executor.sql.process.ExecuteIDContext;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessIDContext;
 import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
 import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
 
@@ -47,9 +47,9 @@ public final class RawSQLExecutorCallback implements ExecutorCallback<RawSQLExec
     @Override
     public Collection<ExecuteResult> execute(final Collection<RawSQLExecutionUnit> inputs, final boolean isTrunkThread) throws SQLException {
         Collection<ExecuteResult> result = callbacks.iterator().next().execute(inputs, isTrunkThread);
-        if (!ExecuteIDContext.isEmpty()) {
+        if (!ProcessIDContext.isEmpty()) {
             for (int i = 0; i < inputs.size(); i++) {
-                processEngine.finishExecution();
+                processEngine.completeSQLUnitExecution();
             }
         }
         return result;
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
index f745b07e19c..10a322445df 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
@@ -19,72 +19,85 @@ package org.apache.shardingsphere.infra.executor.sql.process;
 
 import org.apache.shardingsphere.infra.binder.QueryContext;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
 import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DDLStatement;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
 import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.MySQLStatement;
 
+import java.util.Collections;
+
 /**
  * Process engine.
  */
 public final class ProcessEngine {
     
-    private final ProcessReporter reporter = new ProcessReporter();
-    
     /**
-     * Initialize connection.
+     * Connect.
      *
      * @param grantee grantee
      * @param databaseName database name
      * @return process ID
      */
-    public String initializeConnection(final Grantee grantee, final String databaseName) {
-        return reporter.reportConnect(grantee, databaseName);
+    public String connect(final Grantee grantee, final String databaseName) {
+        ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext = new ExecutionGroupContext<>(Collections.emptyList(), new ExecutionGroupReportContext(databaseName, grantee));
+        ProcessContext processContext = new ProcessContext(executionGroupContext);
+        ProcessRegistry.getInstance().putProcessContext(processContext.getId(), processContext);
+        return executionGroupContext.getReportContext().getProcessID();
     }
     
     /**
-     * Finish connection.
+     * Disconnect.
      *
      * @param processID process ID
      */
-    public void finishConnection(final String processID) {
-        reporter.remove(processID);
+    public void disconnect(final String processID) {
+        ProcessRegistry.getInstance().removeProcessContext(processID);
+        
     }
     
     /**
-     * Initialize execution.
+     * Execute SQL.
      *
      * @param executionGroupContext execution group context
      * @param queryContext query context
      */
-    public void initializeExecution(final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext, final QueryContext queryContext) {
+    public void executeSQL(final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext, final QueryContext queryContext) {
         if (isMySQLDDLOrDMLStatement(queryContext.getSqlStatementContext().getSqlStatement())) {
-            ExecuteIDContext.set(executionGroupContext.getReportContext().getProcessID());
-            reporter.reportExecute(queryContext, executionGroupContext);
+            ProcessIDContext.set(executionGroupContext.getReportContext().getProcessID());
+            ProcessContext processContext = new ProcessContext(queryContext.getSql(), executionGroupContext);
+            ProcessRegistry.getInstance().putProcessContext(processContext.getId(), processContext);
         }
     }
     
     /**
-     * Finish execution.
+     * Complete SQL unit execution.
      */
-    public void finishExecution() {
-        if (ExecuteIDContext.isEmpty()) {
+    public void completeSQLUnitExecution() {
+        if (ProcessIDContext.isEmpty()) {
             return;
         }
-        reporter.reportComplete(ExecuteIDContext.get());
+        ProcessRegistry.getInstance().getProcessContext(ProcessIDContext.get()).completeExecutionUnit();
     }
     
     /**
-     * Clean execution.
+     * Complete SQL execution.
      */
-    public void cleanExecution() {
-        if (ExecuteIDContext.isEmpty()) {
+    public void completeSQLExecution() {
+        if (ProcessIDContext.isEmpty()) {
             return;
         }
-        reporter.reset(ExecuteIDContext.get());
-        ExecuteIDContext.remove();
+        ProcessContext context = ProcessRegistry.getInstance().getProcessContext(ProcessIDContext.get());
+        if (null == context) {
+            return;
+        }
+        for (ProcessReporterCleaner each : ShardingSphereServiceLoader.getServiceInstances(ProcessReporterCleaner.class)) {
+            each.reset(context);
+        }
+        ProcessIDContext.remove();
     }
     
     private boolean isMySQLDDLOrDMLStatement(final SQLStatement sqlStatement) {
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteIDContext.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIDContext.java
similarity index 68%
rename from infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteIDContext.java
rename to infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIDContext.java
index e1f5bfbf18e..c444515ec9d 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteIDContext.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIDContext.java
@@ -21,45 +21,46 @@ import com.alibaba.ttl.TransmittableThreadLocal;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 
+// TODO should remove the class, process ID should same with connection ID
 /**
- * Execute ID context.
+ * Process ID context.
  */
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class ExecuteIDContext {
+public final class ProcessIDContext {
     
-    private static final TransmittableThreadLocal<String> EXECUTE_ID = new TransmittableThreadLocal<>();
+    private static final TransmittableThreadLocal<String> PROCESS_ID = new TransmittableThreadLocal<>();
     
     /**
-     * Judge whether execute ID is empty or not.
+     * Judge whether process ID is empty or not.
      *
-     * @return whether execute ID is empty or not
+     * @return whether process ID is empty or not
      */
     public static boolean isEmpty() {
-        return null == EXECUTE_ID.get();
+        return null == PROCESS_ID.get();
     }
     
     /**
-     * Get execute ID.
+     * Get process ID.
      *
-     * @return execute ID
+     * @return process ID
      */
     public static String get() {
-        return EXECUTE_ID.get();
+        return PROCESS_ID.get();
     }
     
     /**
-     * Set execute ID.
+     * Set process ID.
      *
-     * @param executeId execute ID
+     * @param executeId process ID
      */
     public static void set(final String executeId) {
-        EXECUTE_ID.set(executeId);
+        PROCESS_ID.set(executeId);
     }
     
     /**
-     * Remove execute ID.
+     * Remove process ID.
      */
     public static void remove() {
-        EXECUTE_ID.remove();
+        PROCESS_ID.remove();
     }
 }
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java
deleted file mode 100644
index bbb5e2510f9..00000000000
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.executor.sql.process;
-
-import org.apache.shardingsphere.infra.binder.QueryContext;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
-import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
-import org.apache.shardingsphere.infra.metadata.user.Grantee;
-import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
-
-import java.util.Collections;
-
-/**
- * Process report.
- */
-public final class ProcessReporter {
-    
-    /**
-     * Report connect.
-     *
-     * @param grantee grantee
-     * @param databaseName databaseName
-     * @return process ID
-     */
-    public String reportConnect(final Grantee grantee, final String databaseName) {
-        ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext = new ExecutionGroupContext<>(Collections.emptyList(), new ExecutionGroupReportContext(databaseName, grantee));
-        ProcessContext processContext = new ProcessContext(executionGroupContext);
-        ProcessRegistry.getInstance().putProcessContext(processContext.getId(), processContext);
-        return executionGroupContext.getReportContext().getProcessID();
-    }
-    
-    /**
-     * Report execute.
-     *
-     * @param queryContext query context
-     * @param executionGroupContext execution group context
-     */
-    public void reportExecute(final QueryContext queryContext, final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext) {
-        ProcessContext processContext = new ProcessContext(queryContext.getSql(), executionGroupContext);
-        ProcessRegistry.getInstance().putProcessContext(processContext.getId(), processContext);
-    }
-    
-    /**
-     * Report complete execution unit.
-     *
-     * @param processID process ID
-     */
-    public void reportComplete(final String processID) {
-        ProcessRegistry.getInstance().getProcessContext(processID).completeExecutionUnit();
-    }
-    
-    /**
-     * Reset report.
-     *
-     * @param processID process ID
-     */
-    public void reset(final String processID) {
-        ProcessContext context = ProcessRegistry.getInstance().getProcessContext(processID);
-        if (null == context) {
-            return;
-        }
-        for (ProcessReporterCleaner each : ShardingSphereServiceLoader.getServiceInstances(ProcessReporterCleaner.class)) {
-            each.reset(context);
-        }
-    }
-    
-    /**
-     * Remove process context.
-     *
-     * @param processID process ID
-     */
-    public void remove(final String processID) {
-        ProcessRegistry.getInstance().removeProcessContext(processID);
-    }
-}
diff --git a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
similarity index 69%
rename from infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java
rename to infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
index 6a53ce5185a..2f8fdf05683 100644
--- a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java
+++ b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
@@ -18,9 +18,15 @@
 package org.apache.shardingsphere.infra.executor.sql.process;
 
 import org.apache.shardingsphere.infra.binder.QueryContext;
+import org.apache.shardingsphere.infra.binder.statement.dml.UpdateStatementContext;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
+import org.apache.shardingsphere.sql.parser.sql.common.segment.dml.assignment.SetAssignmentSegment;
+import org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.SimpleTableSegment;
+import org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.TableNameSegment;
+import org.apache.shardingsphere.sql.parser.sql.common.value.identifier.IdentifierValue;
+import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLUpdateStatement;
 import org.apache.shardingsphere.test.mock.AutoMockExtension;
 import org.apache.shardingsphere.test.mock.StaticMockSettings;
 import org.junit.jupiter.api.BeforeEach;
@@ -28,6 +34,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
 
+import java.util.Collections;
 import java.util.UUID;
 
 import static org.mockito.ArgumentMatchers.any;
@@ -38,7 +45,7 @@ import static org.mockito.Mockito.when;
 
 @ExtendWith(AutoMockExtension.class)
 @StaticMockSettings(ProcessRegistry.class)
-class ProcessReporterTest {
+class ProcessEngineTest {
     
     @Mock
     private ProcessRegistry processRegistry;
@@ -49,9 +56,9 @@ class ProcessReporterTest {
     }
     
     @Test
-    void assertReportExecute() {
+    void assertExecuteSQL() {
         ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext = mockExecutionGroupContext();
-        new ProcessReporter().reportExecute(new QueryContext(null, null, null), executionGroupContext);
+        new ProcessEngine().executeSQL(executionGroupContext, new QueryContext(new UpdateStatementContext(getSQLStatement()), null, null));
         verify(processRegistry).putProcessContext(eq(executionGroupContext.getReportContext().getProcessID()), any());
     }
     
@@ -64,16 +71,19 @@ class ProcessReporterTest {
         return result;
     }
     
-    @Test
-    void assertReportUnit() {
-        when(processRegistry.getProcessContext("foo_id")).thenReturn(mock(ProcessContext.class));
-        new ProcessReporter().reportComplete("foo_id");
-        verify(processRegistry).getProcessContext("foo_id");
+    private MySQLUpdateStatement getSQLStatement() {
+        MySQLUpdateStatement result = new MySQLUpdateStatement();
+        result.setTable(new SimpleTableSegment(new TableNameSegment(0, 0, new IdentifierValue("foo_tbl"))));
+        result.setSetAssignment(new SetAssignmentSegment(0, 0, Collections.emptyList()));
+        return result;
     }
     
     @Test
-    void assertReportClean() {
+    void assertCompleteSQLUnitExecution() {
+        ProcessIDContext.set("foo_id");
         when(processRegistry.getProcessContext("foo_id")).thenReturn(mock(ProcessContext.class));
-        new ProcessReporter().reset("foo_id");
+        new ProcessEngine().completeSQLUnitExecution();
+        verify(processRegistry).getProcessContext("foo_id");
+        ProcessIDContext.remove();
     }
 }
diff --git a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteIDContextTest.java b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIDContextTest.java
similarity index 61%
rename from infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteIDContextTest.java
rename to infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIDContextTest.java
index d6f108b747d..a0e61c6c793 100644
--- a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteIDContextTest.java
+++ b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIDContextTest.java
@@ -26,42 +26,42 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-class ExecuteIDContextTest {
+class ProcessIDContextTest {
     
     @AfterEach
     void tearDown() {
-        ExecuteIDContext.remove();
+        ProcessIDContext.remove();
     }
     
     @Test
     void assertIsEmpty() {
-        assertTrue(ExecuteIDContext.isEmpty());
-        ExecuteIDContext.set("123e4567e89b12d3a456426655440000");
-        assertFalse(ExecuteIDContext.isEmpty());
+        assertTrue(ProcessIDContext.isEmpty());
+        ProcessIDContext.set("123e4567e89b12d3a456426655440000");
+        assertFalse(ProcessIDContext.isEmpty());
     }
     
     @Test
     void assertGet() {
-        assertNull(ExecuteIDContext.get());
-        ExecuteIDContext.set("123e4567e89b12d3a456426655440000");
-        assertThat(ExecuteIDContext.get(), is("123e4567e89b12d3a456426655440000"));
+        assertNull(ProcessIDContext.get());
+        ProcessIDContext.set("123e4567e89b12d3a456426655440000");
+        assertThat(ProcessIDContext.get(), is("123e4567e89b12d3a456426655440000"));
     }
     
     @Test
     void assertSet() {
-        assertNull(ExecuteIDContext.get());
-        ExecuteIDContext.set("123e4567e89b12d3a456426655440000");
-        assertThat(ExecuteIDContext.get(), is("123e4567e89b12d3a456426655440000"));
-        ExecuteIDContext.set("123e4567e89b12d3a456426655440001");
-        assertThat(ExecuteIDContext.get(), is("123e4567e89b12d3a456426655440001"));
+        assertNull(ProcessIDContext.get());
+        ProcessIDContext.set("123e4567e89b12d3a456426655440000");
+        assertThat(ProcessIDContext.get(), is("123e4567e89b12d3a456426655440000"));
+        ProcessIDContext.set("123e4567e89b12d3a456426655440001");
+        assertThat(ProcessIDContext.get(), is("123e4567e89b12d3a456426655440001"));
     }
     
     @Test
     void assertRemove() {
-        assertNull(ExecuteIDContext.get());
-        ExecuteIDContext.set("123e4567e89b12d3a456426655440000");
-        assertThat(ExecuteIDContext.get(), is("123e4567e89b12d3a456426655440000"));
-        ExecuteIDContext.remove();
-        assertNull(ExecuteIDContext.get());
+        assertNull(ProcessIDContext.get());
+        ProcessIDContext.set("123e4567e89b12d3a456426655440000");
+        assertThat(ProcessIDContext.get(), is("123e4567e89b12d3a456426655440000"));
+        ProcessIDContext.remove();
+        assertNull(ProcessIDContext.get());
     }
 }
diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
index 9633ac77d09..c185f9d7455 100644
--- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
+++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
@@ -72,10 +72,10 @@ public final class DriverJDBCExecutor {
     public List<QueryResult> executeQuery(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
                                           final QueryContext queryContext, final ExecuteQueryCallback callback) throws SQLException {
         try {
-            processEngine.initializeExecution(executionGroupContext, queryContext);
+            processEngine.executeSQL(executionGroupContext, queryContext);
             return jdbcExecutor.execute(executionGroupContext, callback);
         } finally {
-            processEngine.cleanExecution();
+            processEngine.completeSQLExecution();
         }
     }
     
@@ -92,12 +92,12 @@ public final class DriverJDBCExecutor {
     public int executeUpdate(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
                              final QueryContext queryContext, final Collection<RouteUnit> routeUnits, final JDBCExecutorCallback<Integer> callback) throws SQLException {
         try {
-            processEngine.initializeExecution(executionGroupContext, queryContext);
+            processEngine.executeSQL(executionGroupContext, queryContext);
             SQLStatementContext<?> sqlStatementContext = queryContext.getSqlStatementContext();
             List<Integer> results = doExecute(executionGroupContext, sqlStatementContext, routeUnits, callback);
             return isNeedAccumulate(metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules(), sqlStatementContext) ? accumulate(results) : results.get(0);
         } finally {
-            processEngine.cleanExecution();
+            processEngine.completeSQLExecution();
         }
     }
     
@@ -131,11 +131,11 @@ public final class DriverJDBCExecutor {
     public boolean execute(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final QueryContext queryContext,
                            final Collection<RouteUnit> routeUnits, final JDBCExecutorCallback<Boolean> callback) throws SQLException {
         try {
-            processEngine.initializeExecution(executionGroupContext, queryContext);
+            processEngine.executeSQL(executionGroupContext, queryContext);
             List<Boolean> results = doExecute(executionGroupContext, queryContext.getSqlStatementContext(), routeUnits, callback);
             return null != results && !results.isEmpty() && null != results.get(0) && results.get(0);
         } finally {
-            processEngine.cleanExecution();
+            processEngine.completeSQLExecution();
         }
     }
     
diff --git a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java
index 8b7948d8b6d..537df40f871 100644
--- a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java
+++ b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java
@@ -155,7 +155,7 @@ public final class FilterableTableScanExecutor implements TableScanExecutor {
             ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
                     prepareEngine.prepare(context.getRouteContext(), context.getExecutionUnits(), new ExecutionGroupReportContext(database.getName()));
             setParameters(executionGroupContext.getInputGroups());
-            processEngine.initializeExecution(executionGroupContext, context.getQueryContext());
+            processEngine.executeSQL(executionGroupContext, context.getQueryContext());
             List<QueryResult> queryResults = execute(executionGroupContext, databaseType);
             // TODO need to get session context
             MergeEngine mergeEngine = new MergeEngine(database, executorContext.getProps(), new ConnectionContext());
@@ -165,7 +165,7 @@ public final class FilterableTableScanExecutor implements TableScanExecutor {
         } catch (final SQLException ex) {
             throw new SQLWrapperException(ex);
         } finally {
-            processEngine.cleanExecution();
+            processEngine.completeSQLExecution();
         }
     }
     
diff --git a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java
index b30fb712a7b..461e1549b2f 100644
--- a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java
+++ b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java
@@ -181,7 +181,7 @@ public final class TranslatableTableScanExecutor implements TableScanExecutor {
             ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
                     prepareEngine.prepare(context.getRouteContext(), context.getExecutionUnits(), new ExecutionGroupReportContext(database.getName()));
             setParameters(executionGroupContext.getInputGroups());
-            processEngine.initializeExecution(executionGroupContext, context.getQueryContext());
+            processEngine.executeSQL(executionGroupContext, context.getQueryContext());
             List<QueryResult> queryResults = execute(executionGroupContext, databaseType);
             MergeEngine mergeEngine = new MergeEngine(database, executorContext.getProps(), new ConnectionContext());
             MergedResult mergedResult = mergeEngine.merge(queryResults, queryContext.getSqlStatementContext());
@@ -190,7 +190,7 @@ public final class TranslatableTableScanExecutor implements TableScanExecutor {
         } catch (final SQLException ex) {
             throw new SQLWrapperException(ex);
         } finally {
-            processEngine.cleanExecution();
+            processEngine.completeSQLExecution();
         }
     }
     
@@ -240,7 +240,7 @@ public final class TranslatableTableScanExecutor implements TableScanExecutor {
             ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
                     prepareEngine.prepare(context.getRouteContext(), context.getExecutionUnits(), new ExecutionGroupReportContext(database.getName()));
             setParameters(executionGroupContext.getInputGroups());
-            processEngine.initializeExecution(executionGroupContext, context.getQueryContext());
+            processEngine.executeSQL(executionGroupContext, context.getQueryContext());
             List<QueryResult> queryResults = execute(executionGroupContext, databaseType);
             MergeEngine mergeEngine = new MergeEngine(database, executorContext.getProps(), new ConnectionContext());
             MergedResult mergedResult = mergeEngine.merge(queryResults, queryContext.getSqlStatementContext());
@@ -249,7 +249,7 @@ public final class TranslatableTableScanExecutor implements TableScanExecutor {
         } catch (final SQLException ex) {
             throw new SQLWrapperException(ex);
         } finally {
-            processEngine.cleanExecution();
+            processEngine.completeSQLExecution();
         }
     }
     
diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/executor/ProxyJDBCExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/executor/ProxyJDBCExecutor.java
index 64eb74db2c7..1f98a935e90 100644
--- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/executor/ProxyJDBCExecutor.java
+++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/executor/ProxyJDBCExecutor.java
@@ -70,7 +70,7 @@ public final class ProxyJDBCExecutor {
             ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName());
             DatabaseType protocolType = database.getProtocolType();
             Map<String, DatabaseType> storageTypes = database.getResourceMetaData().getStorageTypes();
-            processEngine.initializeExecution(executionGroupContext, queryContext);
+            processEngine.executeSQL(executionGroupContext, queryContext);
             SQLStatementContext<?> context = queryContext.getSqlStatementContext();
             return jdbcExecutor.execute(executionGroupContext,
                     ProxyJDBCExecutorCallbackFactory.newInstance(type, protocolType, storageTypes, context.getSqlStatement(), databaseConnector, isReturnGeneratedKeys, isExceptionThrown,
@@ -78,7 +78,7 @@ public final class ProxyJDBCExecutor {
                     ProxyJDBCExecutorCallbackFactory.newInstance(type, protocolType, storageTypes, context.getSqlStatement(), databaseConnector, isReturnGeneratedKeys, isExceptionThrown,
                             false));
         } finally {
-            processEngine.cleanExecution();
+            processEngine.completeSQLExecution();
         }
     }
 }
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
index 7388694be47..d046b9dcc5b 100644
--- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
@@ -82,7 +82,7 @@ public final class FrontendChannelInboundHandler extends ChannelInboundHandlerAd
             if (authResult.isFinished()) {
                 connectionSession.setGrantee(new Grantee(authResult.getUsername(), authResult.getHostname()));
                 connectionSession.setCurrentDatabase(authResult.getDatabase());
-                connectionSession.setProcessId(processEngine.initializeConnection(connectionSession.getGrantee(), connectionSession.getDatabaseName()));
+                connectionSession.setProcessId(processEngine.connect(connectionSession.getGrantee(), connectionSession.getDatabaseName()));
             }
             return authResult.isFinished();
             // CHECKSTYLE:OFF
@@ -108,7 +108,7 @@ public final class FrontendChannelInboundHandler extends ChannelInboundHandlerAd
     private void closeAllResources() {
         ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(connectionSession.getConnectionId());
         connectionSession.getBackendConnection().closeAllResources();
-        Optional.ofNullable(connectionSession.getProcessId()).ifPresent(processEngine::finishConnection);
+        Optional.ofNullable(connectionSession.getProcessId()).ifPresent(processEngine::disconnect);
         databaseProtocolFrontendEngine.release(connectionSession);
     }