You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2023/04/30 05:18:31 UTC

[shardingsphere] branch master updated: Refactor ProcessContext.completeExecutionUnit (#25417)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new bea3c69a112 Refactor ProcessContext.completeExecutionUnit (#25417)
bea3c69a112 is described below

commit bea3c69a112f42778a0971ef5b27c9f554bd7e9c
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Sun Apr 30 13:18:19 2023 +0800

    Refactor ProcessContext.completeExecutionUnit (#25417)
---
 .../engine/driver/jdbc/JDBCExecutorCallback.java   |  2 +-
 .../raw/callback/RawSQLExecutorCallback.java       |  4 +-
 .../infra/executor/sql/process/ProcessContext.java |  8 ++-
 .../infra/executor/sql/process/ProcessEngine.java  |  6 +--
 .../executor/sql/process/ProcessReporter.java      |  5 +-
 .../sql/process/ProcessReporterCleaner.java        |  2 +-
 .../sql/process/ShowProcessListManager.java        | 58 +++++++++++-----------
 .../executor/sql/process/ProcessReporterTest.java  |  2 +-
 .../sql/process/DriverProcessReporterCleaner.java  |  2 +-
 .../subscriber/ProcessListChangedSubscriber.java   |  2 +-
 .../sql/process/ProxyProcessReporterCleaner.java   |  2 +-
 11 files changed, 44 insertions(+), 49 deletions(-)

diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
index eb26d4003a5..ce41216f46a 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
@@ -84,7 +84,7 @@ public abstract class JDBCExecutorCallback<T> implements ExecutorCallback<JDBCEx
             sqlExecutionHook.start(jdbcExecutionUnit.getExecutionUnit().getDataSourceName(), sqlUnit.getSql(), sqlUnit.getParameters(), dataSourceMetaData, isTrunkThread);
             T result = executeSQL(sqlUnit.getSql(), jdbcExecutionUnit.getStorageResource(), jdbcExecutionUnit.getConnectionMode(), storageType);
             sqlExecutionHook.finishSuccess();
-            new ProcessEngine().finishExecution(1);
+            new ProcessEngine().finishExecution();
             return result;
         } catch (final SQLException ex) {
             if (!storageType.equals(protocolType)) {
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
index 9aedb8f4f1d..3adfcde0ac5 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
@@ -46,7 +46,9 @@ public final class RawSQLExecutorCallback implements ExecutorCallback<RawSQLExec
     public Collection<ExecuteResult> execute(final Collection<RawSQLExecutionUnit> inputs, final boolean isTrunkThread) throws SQLException {
         Collection<ExecuteResult> result = callbacks.iterator().next().execute(inputs, isTrunkThread);
         if (!ExecuteIDContext.isEmpty()) {
-            new ProcessEngine().finishExecution(inputs.size());
+            for (int i = 0; i < inputs.size(); i++) {
+                new ProcessEngine().finishExecution();
+            }
         }
         return result;
     }
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessContext.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessContext.java
index dcb5b939475..f40ef34967f 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessContext.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessContext.java
@@ -90,12 +90,10 @@ public final class ProcessContext {
     }
     
     /**
-     * Complete execution units.
-     * 
-     * @param completedExecutionUnitCount completed execution unit count
+     * Complete execution unit.
      */
-    public void completeExecutionUnits(final int completedExecutionUnitCount) {
-        completedUnitCount.addAndGet(completedExecutionUnitCount);
+    public void completeExecutionUnit() {
+        completedUnitCount.incrementAndGet();
     }
     
     /**
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
index 46ab40cb4dc..23cd0e1d098 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
@@ -68,14 +68,12 @@ public final class ProcessEngine {
     
     /**
      * Finish execution.
-     * 
-     * @param completedExecutionUnitCount completed execution unit count
      */
-    public void finishExecution(final int completedExecutionUnitCount) {
+    public void finishExecution() {
         if (ExecuteIDContext.isEmpty()) {
             return;
         }
-        reporter.reportComplete(ExecuteIDContext.get(), completedExecutionUnitCount);
+        reporter.reportComplete(ExecuteIDContext.get());
     }
     
     /**
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java
index e5c70a88e95..02d76033117 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java
@@ -61,10 +61,9 @@ public final class ProcessReporter {
      * Report complete execution unit.
      *
      * @param executionID execution ID
-     * @param completedExecutionUnitCount completed execution unit count
      */
-    public void reportComplete(final String executionID, final int completedExecutionUnitCount) {
-        ShowProcessListManager.getInstance().getProcessContext(executionID).completeExecutionUnits(completedExecutionUnitCount);
+    public void reportComplete(final String executionID) {
+        ShowProcessListManager.getInstance().getProcessContext(executionID).completeExecutionUnit();
     }
     
     /**
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterCleaner.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterCleaner.java
index c87fec8623a..6b7ff6dba91 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterCleaner.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterCleaner.java
@@ -28,7 +28,7 @@ public interface ProcessReporterCleaner {
     /**
      * Reset reporter.
      * 
-     * @param context execute process context
+     * @param context process context
      */
     void reset(ProcessContext context);
 }
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ShowProcessListManager.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ShowProcessListManager.java
index 4075f48592a..7d14297168b 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ShowProcessListManager.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ShowProcessListManager.java
@@ -32,17 +32,15 @@ import java.util.concurrent.ConcurrentHashMap;
  * Show process list manager.
  */
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
+@Getter
 public final class ShowProcessListManager {
     
     private static final ShowProcessListManager INSTANCE = new ShowProcessListManager();
     
-    @Getter
     private final Map<String, ProcessContext> processContexts = new ConcurrentHashMap<>();
     
-    @Getter
     private final Map<String, Collection<Statement>> processStatements = new ConcurrentHashMap<>();
     
-    @Getter
     private final Map<String, ShowProcessListSimpleLock> locks = new ConcurrentHashMap<>();
     
     /**
@@ -55,72 +53,72 @@ public final class ShowProcessListManager {
     }
     
     /**
-     * Put execute process context.
+     * Put process context.
      * 
-     * @param executionId execution id
+     * @param executionID execution ID
      * @param processContext process context
      */
-    public void putProcessContext(final String executionId, final ProcessContext processContext) {
-        processContexts.put(executionId, processContext);
+    public void putProcessContext(final String executionID, final ProcessContext processContext) {
+        processContexts.put(executionID, processContext);
     }
     
     /**
      * Put process statements.
      *
-     * @param executionId execution id
+     * @param executionID execution ID
      * @param statements statements
      */
-    public void putProcessStatement(final String executionId, final Collection<Statement> statements) {
+    public void putProcessStatement(final String executionID, final Collection<Statement> statements) {
         if (statements.isEmpty()) {
             return;
         }
-        processStatements.put(executionId, statements);
+        processStatements.put(executionID, statements);
     }
     
     /**
-     * Get execute process context.
+     * Get process context.
      * 
-     * @param executionId execution id
-     * @return execute process context
+     * @param executionID execution ID
+     * @return process context
      */
-    public ProcessContext getProcessContext(final String executionId) {
-        return processContexts.get(executionId);
+    public ProcessContext getProcessContext(final String executionID) {
+        return processContexts.get(executionID);
     }
     
     /**
-     * Get execute process statement.
+     * Get process statement.
      *
-     * @param executionId execution id
+     * @param executionID execution ID
      * @return execute statements
      */
-    public Collection<Statement> getProcessStatement(final String executionId) {
-        return processStatements.getOrDefault(executionId, Collections.emptyList());
+    public Collection<Statement> getProcessStatement(final String executionID) {
+        return processStatements.getOrDefault(executionID, Collections.emptyList());
     }
     
     /**
-     * Remove execute process context.
+     * Remove process context.
      * 
-     * @param executionId execution id
+     * @param executionID execution ID
      */
-    public void removeProcessContext(final String executionId) {
-        processContexts.remove(executionId);
+    public void removeProcessContext(final String executionID) {
+        processContexts.remove(executionID);
     }
     
     /**
-     * Remove execute process statement.
+     * Remove process statement.
      *
-     * @param executionId execution id
+     * @param executionID execution ID
      */
-    public void removeProcessStatement(final String executionId) {
-        processStatements.remove(executionId);
+    public void removeProcessStatement(final String executionID) {
+        processStatements.remove(executionID);
     }
     
     /**
-     * Get all execute process context.
+     * Get all process contexts.
      * 
-     * @return collection execute process context
+     * @return all process contexts
      */
-    public Collection<ProcessContext> getAllProcessContext() {
+    public Collection<ProcessContext> getAllProcessContexts() {
         return processContexts.values();
     }
 }
diff --git a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java
index 5fdbf0890d5..4642b82ef2d 100644
--- a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java
+++ b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java
@@ -67,7 +67,7 @@ class ProcessReporterTest {
     @Test
     void assertReportUnit() {
         when(showProcessListManager.getProcessContext("foo_id")).thenReturn(mock(ProcessContext.class));
-        new ProcessReporter().reportComplete("foo_id", 1);
+        new ProcessReporter().reportComplete("foo_id");
         verify(showProcessListManager).getProcessContext("foo_id");
     }
     
diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/sql/process/DriverProcessReporterCleaner.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/sql/process/DriverProcessReporterCleaner.java
index cbe8338ee34..033e72ae8ea 100644
--- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/sql/process/DriverProcessReporterCleaner.java
+++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/sql/process/DriverProcessReporterCleaner.java
@@ -22,7 +22,7 @@ import org.apache.shardingsphere.infra.executor.sql.process.ShowProcessListManag
 import org.apache.shardingsphere.infra.executor.sql.process.ProcessContext;
 
 /**
- * Execute process reporter cleaner for driver.
+ * Process reporter cleaner for driver.
  */
 public final class DriverProcessReporterCleaner implements ProcessReporterCleaner {
     
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
index bd8245b123c..890a7507c09 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
@@ -64,7 +64,7 @@ public final class ProcessListChangedSubscriber {
         if (!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId())) {
             return;
         }
-        Collection<ProcessContext> processContexts = ShowProcessListManager.getInstance().getAllProcessContext();
+        Collection<ProcessContext> processContexts = ShowProcessListManager.getInstance().getAllProcessContexts();
         if (!processContexts.isEmpty()) {
             registryCenter.getRepository().persist(
                     ProcessNode.getProcessListInstancePath(event.getProcessId(), event.getInstanceId()), YamlEngine.marshal(swapper.swapToYamlConfiguration(processContexts)));
diff --git a/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/executor/sql/process/ProxyProcessReporterCleaner.java b/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/executor/sql/process/ProxyProcessReporterCleaner.java
index c0889d52f11..f6b3203df35 100644
--- a/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/executor/sql/process/ProxyProcessReporterCleaner.java
+++ b/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/executor/sql/process/ProxyProcessReporterCleaner.java
@@ -21,7 +21,7 @@ import org.apache.shardingsphere.infra.executor.sql.process.ProcessReporterClean
 import org.apache.shardingsphere.infra.executor.sql.process.ProcessContext;
 
 /**
- * Execute process reporter cleaner for proxy.
+ * Process reporter cleaner for proxy.
  */
 public final class ProxyProcessReporterCleaner implements ProcessReporterCleaner {