You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2023/04/29 06:54:54 UTC

[shardingsphere] branch master updated: Remove ExecuteProcessUnit (#25411)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 5414f2eaa4c Remove ExecuteProcessUnit (#25411)
5414f2eaa4c is described below

commit 5414f2eaa4cfdacd4f2cb9ad59e486512ed275e3
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Sat Apr 29 14:54:41 2023 +0800

    Remove ExecuteProcessUnit (#25411)
---
 .../engine/driver/jdbc/JDBCExecutorCallback.java   |  7 +---
 .../raw/callback/RawSQLExecutorCallback.java       |  4 +-
 .../infra/executor/sql/process/ProcessEngine.java  |  6 +--
 .../executor/sql/process/ProcessReporter.java      |  9 +----
 .../sql/process/model/ExecuteProcessUnit.java      | 44 ----------------------
 .../executor/sql/process/model/ProcessContext.java | 37 +++++++++++++-----
 .../yaml/swapper/YamlProcessContextSwapper.java    |  5 +--
 .../executor/sql/process/ProcessReporterTest.java  |  5 +--
 8 files changed, 38 insertions(+), 79 deletions(-)

diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
index c8ca58a43e0..ce41216f46a 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
@@ -23,7 +23,6 @@ import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback;
 import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
-import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
 import org.apache.shardingsphere.infra.executor.sql.hook.SPISQLExecutionHook;
 import org.apache.shardingsphere.infra.executor.sql.hook.SQLExecutionHook;
@@ -85,7 +84,7 @@ public abstract class JDBCExecutorCallback<T> implements ExecutorCallback<JDBCEx
             sqlExecutionHook.start(jdbcExecutionUnit.getExecutionUnit().getDataSourceName(), sqlUnit.getSql(), sqlUnit.getParameters(), dataSourceMetaData, isTrunkThread);
             T result = executeSQL(sqlUnit.getSql(), jdbcExecutionUnit.getStorageResource(), jdbcExecutionUnit.getConnectionMode(), storageType);
             sqlExecutionHook.finishSuccess();
-            finishReport(jdbcExecutionUnit);
+            new ProcessEngine().finishExecution();
             return result;
         } catch (final SQLException ex) {
             if (!storageType.equals(protocolType)) {
@@ -110,10 +109,6 @@ public abstract class JDBCExecutorCallback<T> implements ExecutorCallback<JDBCEx
         return result;
     }
     
-    private void finishReport(final SQLExecutionUnit executionUnit) {
-        new ProcessEngine().finishExecution(executionUnit);
-    }
-    
     protected abstract T executeSQL(String sql, Statement statement, ConnectionMode connectionMode, DatabaseType storageType) throws SQLException;
     
     protected abstract Optional<T> getSaneResult(SQLStatement sqlStatement, SQLException ex);
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
index 8c0868d516a..fda9ae97ed0 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
@@ -47,8 +47,8 @@ public final class RawSQLExecutorCallback implements ExecutorCallback<RawSQLExec
         Collection<ExecuteResult> result = callbacks.iterator().next().execute(inputs, isTrunkThread);
         if (!ExecuteIDContext.isEmpty()) {
             ProcessEngine processEngine = new ProcessEngine();
-            for (RawSQLExecutionUnit each : inputs) {
-                processEngine.finishExecution(each);
+            for (int i = 0; i < inputs.size(); i++) {
+                processEngine.finishExecution();
             }
         }
         return result;
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
index 9c5e1fea972..23cd0e1d098 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
@@ -68,14 +68,12 @@ public final class ProcessEngine {
     
     /**
      * Finish execution.
-     *
-     * @param executionUnit execution unit
      */
-    public void finishExecution(final SQLExecutionUnit executionUnit) {
+    public void finishExecution() {
         if (ExecuteIDContext.isEmpty()) {
             return;
         }
-        reporter.reportComplete(ExecuteIDContext.get(), executionUnit);
+        reporter.reportComplete(ExecuteIDContext.get());
     }
     
     /**
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java
index cbb9b3fc0a0..5b895db0cfd 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java
@@ -22,12 +22,10 @@ import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupConte
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
 import org.apache.shardingsphere.infra.executor.sql.process.model.ProcessContext;
-import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessUnit;
 import org.apache.shardingsphere.infra.metadata.user.Grantee;
 import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
 
 import java.util.Collections;
-import java.util.Optional;
 
 /**
  * Process report.
@@ -64,12 +62,9 @@ public final class ProcessReporter {
      * Report complete execution unit.
      *
      * @param executionID execution ID
-     * @param executionUnit execution unit
      */
-    public void reportComplete(final String executionID, final SQLExecutionUnit executionUnit) {
-        ExecuteProcessUnit executeProcessUnit = new ExecuteProcessUnit(executionUnit.getExecutionUnit());
-        ProcessContext processContext = ShowProcessListManager.getInstance().getProcessContext(executionID);
-        Optional.ofNullable(processContext.getProcessUnits().get(executeProcessUnit.getUnitID())).ifPresent(ExecuteProcessUnit::switchComplete);
+    public void reportComplete(final String executionID) {
+        ShowProcessListManager.getInstance().getProcessContext(executionID).completeOne();
     }
     
     /**
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessUnit.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessUnit.java
deleted file mode 100644
index 441356273c9..00000000000
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessUnit.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.executor.sql.process.model;
-
-import lombok.Getter;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
-
-/**
- * Execute process unit.
- */
-@Getter
-public final class ExecuteProcessUnit {
-    
-    private final String unitID;
-    
-    private volatile boolean completed;
-    
-    public ExecuteProcessUnit(final ExecutionUnit executionUnit) {
-        this.unitID = String.valueOf(executionUnit.hashCode());
-        completed = false;
-    }
-    
-    /**
-     * Switch to complete.
-     */
-    public void switchComplete() {
-        completed = true;
-    }
-}
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ProcessContext.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ProcessContext.java
index b1e19756e36..76dcb69ff97 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ProcessContext.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ProcessContext.java
@@ -26,9 +26,8 @@ import org.apache.shardingsphere.infra.metadata.user.Grantee;
 
 import java.sql.Statement;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.LinkedList;
-import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Process context.
@@ -44,9 +43,11 @@ public final class ProcessContext {
     
     private final String hostname;
     
-    private final Map<String, ExecuteProcessUnit> processUnits = new HashMap<>();
+    private final int totalUnitCount;
     
-    private final Collection<Statement> processStatements = new LinkedList<>();
+    private final Collection<Statement> processStatements;
+    
+    private final AtomicInteger completedUnitCount;
     
     private String sql;
     
@@ -68,22 +69,40 @@ public final class ProcessContext {
         Grantee grantee = executionGroupContext.getReportContext().getGrantee();
         username = null == grantee ? null : grantee.getUsername();
         hostname = null == grantee ? null : grantee.getHostname();
+        totalUnitCount = executionGroupContext.getInputGroups().stream().mapToInt(each -> each.getInputs().size()).sum();
+        processStatements = getProcessStatements(executionGroupContext);
+        completedUnitCount = new AtomicInteger(0);
         this.sql = sql;
         startMillis = System.currentTimeMillis();
         this.executing = executing;
-        addProcessUnitsAndStatements(executionGroupContext);
     }
     
-    private void addProcessUnitsAndStatements(final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext) {
+    private Collection<Statement> getProcessStatements(final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext) {
+        Collection<Statement> result = new LinkedList<>();
         for (ExecutionGroup<? extends SQLExecutionUnit> each : executionGroupContext.getInputGroups()) {
             for (SQLExecutionUnit executionUnit : each.getInputs()) {
-                ExecuteProcessUnit processUnit = new ExecuteProcessUnit(executionUnit.getExecutionUnit());
-                processUnits.put(processUnit.getUnitID(), processUnit);
                 if (executionUnit instanceof JDBCExecutionUnit) {
-                    processStatements.add(((JDBCExecutionUnit) executionUnit).getStorageResource());
+                    result.add(((JDBCExecutionUnit) executionUnit).getStorageResource());
                 }
             }
         }
+        return result;
+    }
+    
+    /**
+     * Complete one execute unit.
+     */
+    public void completeOne() {
+        completedUnitCount.incrementAndGet();
+    }
+    
+    /**
+     * Get completed unit count.
+     * 
+     * @return completed unit count
+     */
+    public int getCompletedUnitCount() {
+        return completedUnitCount.get();
     }
     
     /**
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessContextSwapper.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessContextSwapper.java
index 600db92cadc..211937bc8b7 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessContextSwapper.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessContextSwapper.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper;
 
 import org.apache.shardingsphere.infra.executor.sql.process.model.ProcessContext;
-import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessUnit;
 import org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessContext;
 import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
 
@@ -35,8 +34,8 @@ public final class YamlProcessContextSwapper implements YamlConfigurationSwapper
         result.setUsername(data.getUsername());
         result.setHostname(data.getHostname());
         result.setSql(data.getSql());
-        result.setTotalUnitCount(data.getProcessUnits().size());
-        result.setCompletedUnitCount(Long.valueOf(data.getProcessUnits().values().stream().filter(ExecuteProcessUnit::isCompleted).count()).intValue());
+        result.setTotalUnitCount(data.getTotalUnitCount());
+        result.setCompletedUnitCount(data.getCompletedUnitCount());
         result.setStartTimeMillis(data.getStartMillis());
         result.setExecuting(data.isExecuting());
         return result;
diff --git a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java
index d27713ed5e5..313520068b2 100644
--- a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java
+++ b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.infra.executor.sql.process;
 import org.apache.shardingsphere.infra.binder.QueryContext;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
 import org.apache.shardingsphere.infra.executor.sql.process.model.ProcessContext;
 import org.apache.shardingsphere.test.mock.AutoMockExtension;
@@ -68,10 +67,8 @@ class ProcessReporterTest {
     
     @Test
     void assertReportUnit() {
-        SQLExecutionUnit sqlExecutionUnit = mock(SQLExecutionUnit.class);
-        when(sqlExecutionUnit.getExecutionUnit()).thenReturn(mock(ExecutionUnit.class));
         when(showProcessListManager.getProcessContext("foo_id")).thenReturn(mock(ProcessContext.class));
-        new ProcessReporter().reportComplete("foo_id", sqlExecutionUnit);
+        new ProcessReporter().reportComplete("foo_id");
         verify(showProcessListManager).getProcessContext("foo_id");
     }