You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/05/04 05:09:14 UTC

[shardingsphere] branch master updated: Refactor ProcessRegistry.addProcess (#25446)

This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 074ff6c506e Refactor ProcessRegistry.addProcess (#25446)
074ff6c506e is described below

commit 074ff6c506efb8e93173d5e90f4afbf51d590cf1
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Thu May 4 13:09:06 2023 +0800

    Refactor ProcessRegistry.addProcess (#25446)
---
 .../infra/executor/kernel/model/ExecutionGroupReportContext.java  | 8 ++++----
 .../shardingsphere/infra/executor/sql/process/ProcessEngine.java  | 8 ++++----
 .../infra/executor/sql/process/ProcessRegistry.java               | 5 ++---
 .../infra/executor/sql/process/ProcessEngineTest.java             | 3 +--
 .../process/subscriber/ProcessListChangedSubscriberTest.java      | 6 ++++--
 .../shardingsphere/proxy/backend/connector/ProxySQLExecutor.java  | 4 ++--
 .../command/query/text/query/MySQLMultiStatementsHandler.java     | 2 +-
 .../query/extended/PostgreSQLBatchedStatementsExecutor.java       | 2 +-
 8 files changed, 19 insertions(+), 19 deletions(-)

diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupReportContext.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupReportContext.java
index 7ba1014c3f0..7f68da7a739 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupReportContext.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupReportContext.java
@@ -31,18 +31,18 @@ import java.util.concurrent.ThreadLocalRandom;
 @Getter
 public final class ExecutionGroupReportContext {
     
+    // TODO processID should same with connectionId
+    private final String processId;
+    
     private final String databaseName;
     
     private final Grantee grantee;
     
-    // TODO processID should same with connectionId
-    private final String processId;
-    
     public ExecutionGroupReportContext(final String databaseName) {
         this(databaseName, new Grantee("", ""));
     }
     
     public ExecutionGroupReportContext(final String databaseName, final Grantee grantee) {
-        this(databaseName, grantee, new UUID(ThreadLocalRandom.current().nextLong(), ThreadLocalRandom.current().nextLong()).toString().replace("-", ""));
+        this(new UUID(ThreadLocalRandom.current().nextLong(), ThreadLocalRandom.current().nextLong()).toString().replace("-", ""), databaseName, grantee);
     }
 }
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
index 11a5bcbcbdb..fcfcf68415f 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
@@ -44,7 +44,7 @@ public final class ProcessEngine {
     public String connect(final Grantee grantee, final String databaseName) {
         ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext = new ExecutionGroupContext<>(Collections.emptyList(), new ExecutionGroupReportContext(databaseName, grantee));
         Process process = new Process(executionGroupContext);
-        ProcessRegistry.getInstance().putProcess(process.getId(), process);
+        ProcessRegistry.getInstance().addProcess(process);
         return executionGroupContext.getReportContext().getProcessId();
     }
     
@@ -68,7 +68,7 @@ public final class ProcessEngine {
         if (isMySQLDDLOrDMLStatement(queryContext.getSqlStatementContext().getSqlStatement())) {
             ProcessIdContext.set(executionGroupContext.getReportContext().getProcessId());
             Process process = new Process(queryContext.getSql(), executionGroupContext);
-            ProcessRegistry.getInstance().putProcess(process.getId(), process);
+            ProcessRegistry.getInstance().addProcess(process);
         }
     }
     
@@ -94,8 +94,8 @@ public final class ProcessEngine {
             return;
         }
         ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext = new ExecutionGroupContext<>(
-                Collections.emptyList(), new ExecutionGroupReportContext(process.getDatabaseName(), new Grantee(process.getUsername(), process.getHostname())));
-        ProcessRegistry.getInstance().putProcess(ProcessIdContext.get(), new Process(executionGroupContext));
+                Collections.emptyList(), new ExecutionGroupReportContext(ProcessIdContext.get(), process.getDatabaseName(), new Grantee(process.getUsername(), process.getHostname())));
+        ProcessRegistry.getInstance().addProcess(new Process(executionGroupContext));
         ProcessIdContext.remove();
     }
     
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java
index 87cd6a7cd10..3899ea2c0e5 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java
@@ -51,11 +51,10 @@ public final class ProcessRegistry {
     /**
      * Put process.
      * 
-     * @param processId process ID
      * @param process process
      */
-    public void putProcess(final String processId, final Process process) {
-        processes.put(processId, process);
+    public void addProcess(final Process process) {
+        processes.put(process.getId(), process);
     }
     
     /**
diff --git a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
index 3d1ab0341a3..267b4cffdf5 100644
--- a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
+++ b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
@@ -38,7 +38,6 @@ import java.util.Collections;
 import java.util.UUID;
 
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -59,7 +58,7 @@ class ProcessEngineTest {
     void assertExecuteSQL() {
         ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext = mockExecutionGroupContext();
         new ProcessEngine().executeSQL(executionGroupContext, new QueryContext(new UpdateStatementContext(getSQLStatement()), null, null));
-        verify(processRegistry).putProcess(eq(executionGroupContext.getReportContext().getProcessId()), any());
+        verify(processRegistry).addProcess(any());
     }
     
     @SuppressWarnings("unchecked")
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
index fe5f4e1256a..ffbd7db80c1 100644
--- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
@@ -102,12 +102,14 @@ class ProcessListChangedSubscriberTest {
     @Test
     void assertReportLocalProcesses() throws ReflectiveOperationException {
         String instanceId = contextManager.getInstanceContext().getInstance().getMetaData().getId();
-        ProcessRegistry.getInstance().putProcess("foo_id", mock(Process.class));
+        Process process = mock(Process.class);
         String processId = "foo_id";
+        when(process.getId()).thenReturn(processId);
+        ProcessRegistry.getInstance().addProcess(process);
         subscriber.reportLocalProcesses(new ShowProcessListTriggerEvent(instanceId, processId));
         ClusterPersistRepository repository = ((RegistryCenter) Plugins.getMemberAccessor().get(ProcessListChangedSubscriber.class.getDeclaredField("registryCenter"), subscriber)).getRepository();
         verify(repository).persist("/execution_nodes/foo_id/" + instanceId,
-                "processes:" + System.lineSeparator() + "- completedUnitCount: 0\n  idle: false\n  startMillis: 0\n  totalUnitCount: 0" + System.lineSeparator());
+                "processes:" + System.lineSeparator() + "- completedUnitCount: 0\n  id: foo_id\n  idle: false\n  startMillis: 0\n  totalUnitCount: 0" + System.lineSeparator());
         verify(repository).delete("/nodes/compute_nodes/process_trigger/" + instanceId + ":foo_id");
     }
     
diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java
index 0e07e8f8447..5e8d2fb09b6 100644
--- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java
+++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java
@@ -176,7 +176,7 @@ public final class ProxySQLExecutor {
         ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext;
         try {
             executionGroupContext = prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(), new ExecutionGroupReportContext(
-                    backendConnection.getConnectionSession().getDatabaseName(), backendConnection.getConnectionSession().getGrantee(), backendConnection.getConnectionSession().getProcessId()));
+                    backendConnection.getConnectionSession().getProcessId(), backendConnection.getConnectionSession().getDatabaseName(), backendConnection.getConnectionSession().getGrantee()));
         } catch (final SQLException ex) {
             return getSaneExecuteResults(executionContext, ex);
         }
@@ -193,7 +193,7 @@ public final class ProxySQLExecutor {
         ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext;
         try {
             executionGroupContext = prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(), new ExecutionGroupReportContext(
-                    backendConnection.getConnectionSession().getDatabaseName(), backendConnection.getConnectionSession().getGrantee(), backendConnection.getConnectionSession().getProcessId()));
+                    backendConnection.getConnectionSession().getProcessId(), backendConnection.getConnectionSession().getDatabaseName(), backendConnection.getConnectionSession().getGrantee()));
         } catch (final SQLException ex) {
             return getSaneExecuteResults(executionContext, ex);
         }
diff --git a/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java b/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java
index 2fce000dbdc..ae38acf28d1 100644
--- a/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java
+++ b/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java
@@ -142,7 +142,7 @@ public final class MySQLMultiStatementsHandler implements ProxyBackendHandler {
                 (JDBCBackendStatement) connectionSession.getStatementManager(), new StatementOption(false), rules,
                 metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName()).getResourceMetaData().getStorageTypes());
         ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = prepareEngine.prepare(anyExecutionContext.getRouteContext(), samplingExecutionUnit(),
-                new ExecutionGroupReportContext(connectionSession.getDatabaseName(), connectionSession.getGrantee(), connectionSession.getProcessId()));
+                new ExecutionGroupReportContext(connectionSession.getProcessId(), connectionSession.getDatabaseName(), connectionSession.getGrantee()));
         for (ExecutionGroup<JDBCExecutionUnit> eachGroup : executionGroupContext.getInputGroups()) {
             for (JDBCExecutionUnit each : eachGroup.getInputs()) {
                 prepareBatchedStatement(each);
diff --git a/proxy/frontend/type/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutor.java b/proxy/frontend/type/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutor.java
index 7ea68641248..2b82ae46121 100644
--- a/proxy/frontend/type/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutor.java
+++ b/proxy/frontend/type/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutor.java
@@ -152,7 +152,7 @@ public final class PostgreSQLBatchedStatementsExecutor {
                 connectionSession.getBackendConnection(), (JDBCBackendStatement) connectionSession.getStatementManager(),
                 new StatementOption(false), rules, metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName()).getResourceMetaData().getStorageTypes());
         executionGroupContext = prepareEngine.prepare(anyExecutionContext.getRouteContext(), executionUnitParams.keySet(),
-                new ExecutionGroupReportContext(connectionSession.getDatabaseName(), connectionSession.getGrantee(), connectionSession.getProcessId()));
+                new ExecutionGroupReportContext(connectionSession.getProcessId(), connectionSession.getDatabaseName(), connectionSession.getGrantee()));
         for (ExecutionGroup<JDBCExecutionUnit> eachGroup : executionGroupContext.getInputGroups()) {
             for (JDBCExecutionUnit each : eachGroup.getInputs()) {
                 prepareJDBCExecutionUnit(each);