You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2023/04/30 05:18:31 UTC
[shardingsphere] branch master updated: Refactor ProcessContext.completeExecutionUnit (#25417)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new bea3c69a112 Refactor ProcessContext.completeExecutionUnit (#25417)
bea3c69a112 is described below
commit bea3c69a112f42778a0971ef5b27c9f554bd7e9c
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Sun Apr 30 13:18:19 2023 +0800
Refactor ProcessContext.completeExecutionUnit (#25417)
---
.../engine/driver/jdbc/JDBCExecutorCallback.java | 2 +-
.../raw/callback/RawSQLExecutorCallback.java | 4 +-
.../infra/executor/sql/process/ProcessContext.java | 8 ++-
.../infra/executor/sql/process/ProcessEngine.java | 6 +--
.../executor/sql/process/ProcessReporter.java | 5 +-
.../sql/process/ProcessReporterCleaner.java | 2 +-
.../sql/process/ShowProcessListManager.java | 58 +++++++++++-----------
.../executor/sql/process/ProcessReporterTest.java | 2 +-
.../sql/process/DriverProcessReporterCleaner.java | 2 +-
.../subscriber/ProcessListChangedSubscriber.java | 2 +-
.../sql/process/ProxyProcessReporterCleaner.java | 2 +-
11 files changed, 44 insertions(+), 49 deletions(-)
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
index eb26d4003a5..ce41216f46a 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
@@ -84,7 +84,7 @@ public abstract class JDBCExecutorCallback<T> implements ExecutorCallback<JDBCEx
sqlExecutionHook.start(jdbcExecutionUnit.getExecutionUnit().getDataSourceName(), sqlUnit.getSql(), sqlUnit.getParameters(), dataSourceMetaData, isTrunkThread);
T result = executeSQL(sqlUnit.getSql(), jdbcExecutionUnit.getStorageResource(), jdbcExecutionUnit.getConnectionMode(), storageType);
sqlExecutionHook.finishSuccess();
- new ProcessEngine().finishExecution(1);
+ new ProcessEngine().finishExecution();
return result;
} catch (final SQLException ex) {
if (!storageType.equals(protocolType)) {
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
index 9aedb8f4f1d..3adfcde0ac5 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
@@ -46,7 +46,9 @@ public final class RawSQLExecutorCallback implements ExecutorCallback<RawSQLExec
public Collection<ExecuteResult> execute(final Collection<RawSQLExecutionUnit> inputs, final boolean isTrunkThread) throws SQLException {
Collection<ExecuteResult> result = callbacks.iterator().next().execute(inputs, isTrunkThread);
if (!ExecuteIDContext.isEmpty()) {
- new ProcessEngine().finishExecution(inputs.size());
+ for (int i = 0; i < inputs.size(); i++) {
+ new ProcessEngine().finishExecution();
+ }
}
return result;
}
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessContext.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessContext.java
index dcb5b939475..f40ef34967f 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessContext.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessContext.java
@@ -90,12 +90,10 @@ public final class ProcessContext {
}
/**
- * Complete execution units.
- *
- * @param completedExecutionUnitCount completed execution unit count
+ * Complete execution unit.
*/
- public void completeExecutionUnits(final int completedExecutionUnitCount) {
- completedUnitCount.addAndGet(completedExecutionUnitCount);
+ public void completeExecutionUnit() {
+ completedUnitCount.incrementAndGet();
}
/**
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
index 46ab40cb4dc..23cd0e1d098 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
@@ -68,14 +68,12 @@ public final class ProcessEngine {
/**
* Finish execution.
- *
- * @param completedExecutionUnitCount completed execution unit count
*/
- public void finishExecution(final int completedExecutionUnitCount) {
+ public void finishExecution() {
if (ExecuteIDContext.isEmpty()) {
return;
}
- reporter.reportComplete(ExecuteIDContext.get(), completedExecutionUnitCount);
+ reporter.reportComplete(ExecuteIDContext.get());
}
/**
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java
index e5c70a88e95..02d76033117 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java
@@ -61,10 +61,9 @@ public final class ProcessReporter {
* Report complete execution unit.
*
* @param executionID execution ID
- * @param completedExecutionUnitCount completed execution unit count
*/
- public void reportComplete(final String executionID, final int completedExecutionUnitCount) {
- ShowProcessListManager.getInstance().getProcessContext(executionID).completeExecutionUnits(completedExecutionUnitCount);
+ public void reportComplete(final String executionID) {
+ ShowProcessListManager.getInstance().getProcessContext(executionID).completeExecutionUnit();
}
/**
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterCleaner.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterCleaner.java
index c87fec8623a..6b7ff6dba91 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterCleaner.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterCleaner.java
@@ -28,7 +28,7 @@ public interface ProcessReporterCleaner {
/**
* Reset reporter.
*
- * @param context execute process context
+ * @param context process context
*/
void reset(ProcessContext context);
}
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ShowProcessListManager.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ShowProcessListManager.java
index 4075f48592a..7d14297168b 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ShowProcessListManager.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ShowProcessListManager.java
@@ -32,17 +32,15 @@ import java.util.concurrent.ConcurrentHashMap;
* Show process list manager.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
+@Getter
public final class ShowProcessListManager {
private static final ShowProcessListManager INSTANCE = new ShowProcessListManager();
- @Getter
private final Map<String, ProcessContext> processContexts = new ConcurrentHashMap<>();
- @Getter
private final Map<String, Collection<Statement>> processStatements = new ConcurrentHashMap<>();
- @Getter
private final Map<String, ShowProcessListSimpleLock> locks = new ConcurrentHashMap<>();
/**
@@ -55,72 +53,72 @@ public final class ShowProcessListManager {
}
/**
- * Put execute process context.
+ * Put process context.
*
- * @param executionId execution id
+ * @param executionID execution ID
* @param processContext process context
*/
- public void putProcessContext(final String executionId, final ProcessContext processContext) {
- processContexts.put(executionId, processContext);
+ public void putProcessContext(final String executionID, final ProcessContext processContext) {
+ processContexts.put(executionID, processContext);
}
/**
* Put process statements.
*
- * @param executionId execution id
+ * @param executionID execution ID
* @param statements statements
*/
- public void putProcessStatement(final String executionId, final Collection<Statement> statements) {
+ public void putProcessStatement(final String executionID, final Collection<Statement> statements) {
if (statements.isEmpty()) {
return;
}
- processStatements.put(executionId, statements);
+ processStatements.put(executionID, statements);
}
/**
- * Get execute process context.
+ * Get process context.
*
- * @param executionId execution id
- * @return execute process context
+ * @param executionID execution ID
+ * @return process context
*/
- public ProcessContext getProcessContext(final String executionId) {
- return processContexts.get(executionId);
+ public ProcessContext getProcessContext(final String executionID) {
+ return processContexts.get(executionID);
}
/**
- * Get execute process statement.
+ * Get process statement.
*
- * @param executionId execution id
+ * @param executionID execution ID
* @return execute statements
*/
- public Collection<Statement> getProcessStatement(final String executionId) {
- return processStatements.getOrDefault(executionId, Collections.emptyList());
+ public Collection<Statement> getProcessStatement(final String executionID) {
+ return processStatements.getOrDefault(executionID, Collections.emptyList());
}
/**
- * Remove execute process context.
+ * Remove process context.
*
- * @param executionId execution id
+ * @param executionID execution ID
*/
- public void removeProcessContext(final String executionId) {
- processContexts.remove(executionId);
+ public void removeProcessContext(final String executionID) {
+ processContexts.remove(executionID);
}
/**
- * Remove execute process statement.
+ * Remove process statement.
*
- * @param executionId execution id
+ * @param executionID execution ID
*/
- public void removeProcessStatement(final String executionId) {
- processStatements.remove(executionId);
+ public void removeProcessStatement(final String executionID) {
+ processStatements.remove(executionID);
}
/**
- * Get all execute process context.
+ * Get all process contexts.
*
- * @return collection execute process context
+ * @return all process contexts
*/
- public Collection<ProcessContext> getAllProcessContext() {
+ public Collection<ProcessContext> getAllProcessContexts() {
return processContexts.values();
}
}
diff --git a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java
index 5fdbf0890d5..4642b82ef2d 100644
--- a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java
+++ b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java
@@ -67,7 +67,7 @@ class ProcessReporterTest {
@Test
void assertReportUnit() {
when(showProcessListManager.getProcessContext("foo_id")).thenReturn(mock(ProcessContext.class));
- new ProcessReporter().reportComplete("foo_id", 1);
+ new ProcessReporter().reportComplete("foo_id");
verify(showProcessListManager).getProcessContext("foo_id");
}
diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/sql/process/DriverProcessReporterCleaner.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/sql/process/DriverProcessReporterCleaner.java
index cbe8338ee34..033e72ae8ea 100644
--- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/sql/process/DriverProcessReporterCleaner.java
+++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/sql/process/DriverProcessReporterCleaner.java
@@ -22,7 +22,7 @@ import org.apache.shardingsphere.infra.executor.sql.process.ShowProcessListManag
import org.apache.shardingsphere.infra.executor.sql.process.ProcessContext;
/**
- * Execute process reporter cleaner for driver.
+ * Process reporter cleaner for driver.
*/
public final class DriverProcessReporterCleaner implements ProcessReporterCleaner {
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
index bd8245b123c..890a7507c09 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
@@ -64,7 +64,7 @@ public final class ProcessListChangedSubscriber {
if (!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId())) {
return;
}
- Collection<ProcessContext> processContexts = ShowProcessListManager.getInstance().getAllProcessContext();
+ Collection<ProcessContext> processContexts = ShowProcessListManager.getInstance().getAllProcessContexts();
if (!processContexts.isEmpty()) {
registryCenter.getRepository().persist(
ProcessNode.getProcessListInstancePath(event.getProcessId(), event.getInstanceId()), YamlEngine.marshal(swapper.swapToYamlConfiguration(processContexts)));
diff --git a/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/executor/sql/process/ProxyProcessReporterCleaner.java b/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/executor/sql/process/ProxyProcessReporterCleaner.java
index c0889d52f11..f6b3203df35 100644
--- a/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/executor/sql/process/ProxyProcessReporterCleaner.java
+++ b/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/executor/sql/process/ProxyProcessReporterCleaner.java
@@ -21,7 +21,7 @@ import org.apache.shardingsphere.infra.executor.sql.process.ProcessReporterClean
import org.apache.shardingsphere.infra.executor.sql.process.ProcessContext;
/**
- * Execute process reporter cleaner for proxy.
+ * Process reporter cleaner for proxy.
*/
public final class ProxyProcessReporterCleaner implements ProcessReporterCleaner {