You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/05/04 05:09:14 UTC
[shardingsphere] branch master updated: Refactor ProcessRegistry.addProcess (#25446)
This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 074ff6c506e Refactor ProcessRegistry.addProcess (#25446)
074ff6c506e is described below
commit 074ff6c506efb8e93173d5e90f4afbf51d590cf1
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Thu May 4 13:09:06 2023 +0800
Refactor ProcessRegistry.addProcess (#25446)
---
.../infra/executor/kernel/model/ExecutionGroupReportContext.java | 8 ++++----
.../shardingsphere/infra/executor/sql/process/ProcessEngine.java | 8 ++++----
.../infra/executor/sql/process/ProcessRegistry.java | 5 ++---
.../infra/executor/sql/process/ProcessEngineTest.java | 3 +--
.../process/subscriber/ProcessListChangedSubscriberTest.java | 6 ++++--
.../shardingsphere/proxy/backend/connector/ProxySQLExecutor.java | 4 ++--
.../command/query/text/query/MySQLMultiStatementsHandler.java | 2 +-
.../query/extended/PostgreSQLBatchedStatementsExecutor.java | 2 +-
8 files changed, 19 insertions(+), 19 deletions(-)
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupReportContext.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupReportContext.java
index 7ba1014c3f0..7f68da7a739 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupReportContext.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupReportContext.java
@@ -31,18 +31,18 @@ import java.util.concurrent.ThreadLocalRandom;
@Getter
public final class ExecutionGroupReportContext {
+ // TODO processID should same with connectionId
+ private final String processId;
+
private final String databaseName;
private final Grantee grantee;
- // TODO processID should same with connectionId
- private final String processId;
-
public ExecutionGroupReportContext(final String databaseName) {
this(databaseName, new Grantee("", ""));
}
public ExecutionGroupReportContext(final String databaseName, final Grantee grantee) {
- this(databaseName, grantee, new UUID(ThreadLocalRandom.current().nextLong(), ThreadLocalRandom.current().nextLong()).toString().replace("-", ""));
+ this(new UUID(ThreadLocalRandom.current().nextLong(), ThreadLocalRandom.current().nextLong()).toString().replace("-", ""), databaseName, grantee);
}
}
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
index 11a5bcbcbdb..fcfcf68415f 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
@@ -44,7 +44,7 @@ public final class ProcessEngine {
public String connect(final Grantee grantee, final String databaseName) {
ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext = new ExecutionGroupContext<>(Collections.emptyList(), new ExecutionGroupReportContext(databaseName, grantee));
Process process = new Process(executionGroupContext);
- ProcessRegistry.getInstance().putProcess(process.getId(), process);
+ ProcessRegistry.getInstance().addProcess(process);
return executionGroupContext.getReportContext().getProcessId();
}
@@ -68,7 +68,7 @@ public final class ProcessEngine {
if (isMySQLDDLOrDMLStatement(queryContext.getSqlStatementContext().getSqlStatement())) {
ProcessIdContext.set(executionGroupContext.getReportContext().getProcessId());
Process process = new Process(queryContext.getSql(), executionGroupContext);
- ProcessRegistry.getInstance().putProcess(process.getId(), process);
+ ProcessRegistry.getInstance().addProcess(process);
}
}
@@ -94,8 +94,8 @@ public final class ProcessEngine {
return;
}
ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext = new ExecutionGroupContext<>(
- Collections.emptyList(), new ExecutionGroupReportContext(process.getDatabaseName(), new Grantee(process.getUsername(), process.getHostname())));
- ProcessRegistry.getInstance().putProcess(ProcessIdContext.get(), new Process(executionGroupContext));
+ Collections.emptyList(), new ExecutionGroupReportContext(ProcessIdContext.get(), process.getDatabaseName(), new Grantee(process.getUsername(), process.getHostname())));
+ ProcessRegistry.getInstance().addProcess(new Process(executionGroupContext));
ProcessIdContext.remove();
}
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java
index 87cd6a7cd10..3899ea2c0e5 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java
@@ -51,11 +51,10 @@ public final class ProcessRegistry {
/**
* Put process.
*
- * @param processId process ID
* @param process process
*/
- public void putProcess(final String processId, final Process process) {
- processes.put(processId, process);
+ public void addProcess(final Process process) {
+ processes.put(process.getId(), process);
}
/**
diff --git a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
index 3d1ab0341a3..267b4cffdf5 100644
--- a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
+++ b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
@@ -38,7 +38,6 @@ import java.util.Collections;
import java.util.UUID;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -59,7 +58,7 @@ class ProcessEngineTest {
void assertExecuteSQL() {
ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext = mockExecutionGroupContext();
new ProcessEngine().executeSQL(executionGroupContext, new QueryContext(new UpdateStatementContext(getSQLStatement()), null, null));
- verify(processRegistry).putProcess(eq(executionGroupContext.getReportContext().getProcessId()), any());
+ verify(processRegistry).addProcess(any());
}
@SuppressWarnings("unchecked")
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
index fe5f4e1256a..ffbd7db80c1 100644
--- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
@@ -102,12 +102,14 @@ class ProcessListChangedSubscriberTest {
@Test
void assertReportLocalProcesses() throws ReflectiveOperationException {
String instanceId = contextManager.getInstanceContext().getInstance().getMetaData().getId();
- ProcessRegistry.getInstance().putProcess("foo_id", mock(Process.class));
+ Process process = mock(Process.class);
String processId = "foo_id";
+ when(process.getId()).thenReturn(processId);
+ ProcessRegistry.getInstance().addProcess(process);
subscriber.reportLocalProcesses(new ShowProcessListTriggerEvent(instanceId, processId));
ClusterPersistRepository repository = ((RegistryCenter) Plugins.getMemberAccessor().get(ProcessListChangedSubscriber.class.getDeclaredField("registryCenter"), subscriber)).getRepository();
verify(repository).persist("/execution_nodes/foo_id/" + instanceId,
- "processes:" + System.lineSeparator() + "- completedUnitCount: 0\n idle: false\n startMillis: 0\n totalUnitCount: 0" + System.lineSeparator());
+ "processes:" + System.lineSeparator() + "- completedUnitCount: 0\n id: foo_id\n idle: false\n startMillis: 0\n totalUnitCount: 0" + System.lineSeparator());
verify(repository).delete("/nodes/compute_nodes/process_trigger/" + instanceId + ":foo_id");
}
diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java
index 0e07e8f8447..5e8d2fb09b6 100644
--- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java
+++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java
@@ -176,7 +176,7 @@ public final class ProxySQLExecutor {
ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext;
try {
executionGroupContext = prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(), new ExecutionGroupReportContext(
- backendConnection.getConnectionSession().getDatabaseName(), backendConnection.getConnectionSession().getGrantee(), backendConnection.getConnectionSession().getProcessId()));
+ backendConnection.getConnectionSession().getProcessId(), backendConnection.getConnectionSession().getDatabaseName(), backendConnection.getConnectionSession().getGrantee()));
} catch (final SQLException ex) {
return getSaneExecuteResults(executionContext, ex);
}
@@ -193,7 +193,7 @@ public final class ProxySQLExecutor {
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext;
try {
executionGroupContext = prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(), new ExecutionGroupReportContext(
- backendConnection.getConnectionSession().getDatabaseName(), backendConnection.getConnectionSession().getGrantee(), backendConnection.getConnectionSession().getProcessId()));
+ backendConnection.getConnectionSession().getProcessId(), backendConnection.getConnectionSession().getDatabaseName(), backendConnection.getConnectionSession().getGrantee()));
} catch (final SQLException ex) {
return getSaneExecuteResults(executionContext, ex);
}
diff --git a/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java b/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java
index 2fce000dbdc..ae38acf28d1 100644
--- a/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java
+++ b/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java
@@ -142,7 +142,7 @@ public final class MySQLMultiStatementsHandler implements ProxyBackendHandler {
(JDBCBackendStatement) connectionSession.getStatementManager(), new StatementOption(false), rules,
metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName()).getResourceMetaData().getStorageTypes());
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = prepareEngine.prepare(anyExecutionContext.getRouteContext(), samplingExecutionUnit(),
- new ExecutionGroupReportContext(connectionSession.getDatabaseName(), connectionSession.getGrantee(), connectionSession.getProcessId()));
+ new ExecutionGroupReportContext(connectionSession.getProcessId(), connectionSession.getDatabaseName(), connectionSession.getGrantee()));
for (ExecutionGroup<JDBCExecutionUnit> eachGroup : executionGroupContext.getInputGroups()) {
for (JDBCExecutionUnit each : eachGroup.getInputs()) {
prepareBatchedStatement(each);
diff --git a/proxy/frontend/type/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutor.java b/proxy/frontend/type/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutor.java
index 7ea68641248..2b82ae46121 100644
--- a/proxy/frontend/type/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutor.java
+++ b/proxy/frontend/type/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutor.java
@@ -152,7 +152,7 @@ public final class PostgreSQLBatchedStatementsExecutor {
connectionSession.getBackendConnection(), (JDBCBackendStatement) connectionSession.getStatementManager(),
new StatementOption(false), rules, metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName()).getResourceMetaData().getStorageTypes());
executionGroupContext = prepareEngine.prepare(anyExecutionContext.getRouteContext(), executionUnitParams.keySet(),
- new ExecutionGroupReportContext(connectionSession.getDatabaseName(), connectionSession.getGrantee(), connectionSession.getProcessId()));
+ new ExecutionGroupReportContext(connectionSession.getProcessId(), connectionSession.getDatabaseName(), connectionSession.getGrantee()));
for (ExecutionGroup<JDBCExecutionUnit> eachGroup : executionGroupContext.getInputGroups()) {
for (JDBCExecutionUnit each : eachGroup.getInputs()) {
prepareJDBCExecutionUnit(each);