You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2023/04/29 06:54:54 UTC
[shardingsphere] branch master updated: Remove ExecuteProcessUnit (#25411)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5414f2eaa4c Remove ExecuteProcessUnit (#25411)
5414f2eaa4c is described below
commit 5414f2eaa4cfdacd4f2cb9ad59e486512ed275e3
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Sat Apr 29 14:54:41 2023 +0800
Remove ExecuteProcessUnit (#25411)
---
.../engine/driver/jdbc/JDBCExecutorCallback.java | 7 +---
.../raw/callback/RawSQLExecutorCallback.java | 4 +-
.../infra/executor/sql/process/ProcessEngine.java | 6 +--
.../executor/sql/process/ProcessReporter.java | 9 +----
.../sql/process/model/ExecuteProcessUnit.java | 44 ----------------------
.../executor/sql/process/model/ProcessContext.java | 37 +++++++++++++-----
.../yaml/swapper/YamlProcessContextSwapper.java | 5 +--
.../executor/sql/process/ProcessReporterTest.java | 5 +--
8 files changed, 38 insertions(+), 79 deletions(-)
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
index c8ca58a43e0..ce41216f46a 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
@@ -23,7 +23,6 @@ import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback;
import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
-import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import org.apache.shardingsphere.infra.executor.sql.hook.SPISQLExecutionHook;
import org.apache.shardingsphere.infra.executor.sql.hook.SQLExecutionHook;
@@ -85,7 +84,7 @@ public abstract class JDBCExecutorCallback<T> implements ExecutorCallback<JDBCEx
sqlExecutionHook.start(jdbcExecutionUnit.getExecutionUnit().getDataSourceName(), sqlUnit.getSql(), sqlUnit.getParameters(), dataSourceMetaData, isTrunkThread);
T result = executeSQL(sqlUnit.getSql(), jdbcExecutionUnit.getStorageResource(), jdbcExecutionUnit.getConnectionMode(), storageType);
sqlExecutionHook.finishSuccess();
- finishReport(jdbcExecutionUnit);
+ new ProcessEngine().finishExecution();
return result;
} catch (final SQLException ex) {
if (!storageType.equals(protocolType)) {
@@ -110,10 +109,6 @@ public abstract class JDBCExecutorCallback<T> implements ExecutorCallback<JDBCEx
return result;
}
- private void finishReport(final SQLExecutionUnit executionUnit) {
- new ProcessEngine().finishExecution(executionUnit);
- }
-
protected abstract T executeSQL(String sql, Statement statement, ConnectionMode connectionMode, DatabaseType storageType) throws SQLException;
protected abstract Optional<T> getSaneResult(SQLStatement sqlStatement, SQLException ex);
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
index 8c0868d516a..fda9ae97ed0 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
@@ -47,8 +47,8 @@ public final class RawSQLExecutorCallback implements ExecutorCallback<RawSQLExec
Collection<ExecuteResult> result = callbacks.iterator().next().execute(inputs, isTrunkThread);
if (!ExecuteIDContext.isEmpty()) {
ProcessEngine processEngine = new ProcessEngine();
- for (RawSQLExecutionUnit each : inputs) {
- processEngine.finishExecution(each);
+ for (int i = 0; i < inputs.size(); i++) {
+ processEngine.finishExecution();
}
}
return result;
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
index 9c5e1fea972..23cd0e1d098 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
@@ -68,14 +68,12 @@ public final class ProcessEngine {
/**
* Finish execution.
- *
- * @param executionUnit execution unit
*/
- public void finishExecution(final SQLExecutionUnit executionUnit) {
+ public void finishExecution() {
if (ExecuteIDContext.isEmpty()) {
return;
}
- reporter.reportComplete(ExecuteIDContext.get(), executionUnit);
+ reporter.reportComplete(ExecuteIDContext.get());
}
/**
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java
index cbb9b3fc0a0..5b895db0cfd 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java
@@ -22,12 +22,10 @@ import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupConte
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.process.model.ProcessContext;
-import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessUnit;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
import java.util.Collections;
-import java.util.Optional;
/**
* Process report.
@@ -64,12 +62,9 @@ public final class ProcessReporter {
* Report complete execution unit.
*
* @param executionID execution ID
- * @param executionUnit execution unit
*/
- public void reportComplete(final String executionID, final SQLExecutionUnit executionUnit) {
- ExecuteProcessUnit executeProcessUnit = new ExecuteProcessUnit(executionUnit.getExecutionUnit());
- ProcessContext processContext = ShowProcessListManager.getInstance().getProcessContext(executionID);
- Optional.ofNullable(processContext.getProcessUnits().get(executeProcessUnit.getUnitID())).ifPresent(ExecuteProcessUnit::switchComplete);
+ public void reportComplete(final String executionID) {
+ ShowProcessListManager.getInstance().getProcessContext(executionID).completeOne();
}
/**
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessUnit.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessUnit.java
deleted file mode 100644
index 441356273c9..00000000000
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessUnit.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.executor.sql.process.model;
-
-import lombok.Getter;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
-
-/**
- * Execute process unit.
- */
-@Getter
-public final class ExecuteProcessUnit {
-
- private final String unitID;
-
- private volatile boolean completed;
-
- public ExecuteProcessUnit(final ExecutionUnit executionUnit) {
- this.unitID = String.valueOf(executionUnit.hashCode());
- completed = false;
- }
-
- /**
- * Switch to complete.
- */
- public void switchComplete() {
- completed = true;
- }
-}
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ProcessContext.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ProcessContext.java
index b1e19756e36..76dcb69ff97 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ProcessContext.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ProcessContext.java
@@ -26,9 +26,8 @@ import org.apache.shardingsphere.infra.metadata.user.Grantee;
import java.sql.Statement;
import java.util.Collection;
-import java.util.HashMap;
import java.util.LinkedList;
-import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Process context.
@@ -44,9 +43,11 @@ public final class ProcessContext {
private final String hostname;
- private final Map<String, ExecuteProcessUnit> processUnits = new HashMap<>();
+ private final int totalUnitCount;
- private final Collection<Statement> processStatements = new LinkedList<>();
+ private final Collection<Statement> processStatements;
+
+ private final AtomicInteger completedUnitCount;
private String sql;
@@ -68,22 +69,40 @@ public final class ProcessContext {
Grantee grantee = executionGroupContext.getReportContext().getGrantee();
username = null == grantee ? null : grantee.getUsername();
hostname = null == grantee ? null : grantee.getHostname();
+ totalUnitCount = executionGroupContext.getInputGroups().stream().mapToInt(each -> each.getInputs().size()).sum();
+ processStatements = getProcessStatements(executionGroupContext);
+ completedUnitCount = new AtomicInteger(0);
this.sql = sql;
startMillis = System.currentTimeMillis();
this.executing = executing;
- addProcessUnitsAndStatements(executionGroupContext);
}
- private void addProcessUnitsAndStatements(final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext) {
+ private Collection<Statement> getProcessStatements(final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext) {
+ Collection<Statement> result = new LinkedList<>();
for (ExecutionGroup<? extends SQLExecutionUnit> each : executionGroupContext.getInputGroups()) {
for (SQLExecutionUnit executionUnit : each.getInputs()) {
- ExecuteProcessUnit processUnit = new ExecuteProcessUnit(executionUnit.getExecutionUnit());
- processUnits.put(processUnit.getUnitID(), processUnit);
if (executionUnit instanceof JDBCExecutionUnit) {
- processStatements.add(((JDBCExecutionUnit) executionUnit).getStorageResource());
+ result.add(((JDBCExecutionUnit) executionUnit).getStorageResource());
}
}
}
+ return result;
+ }
+
+ /**
+ * Complete one execute unit.
+ */
+ public void completeOne() {
+ completedUnitCount.incrementAndGet();
+ }
+
+ /**
+ * Get completed unit count.
+ *
+ * @return completed unit count
+ */
+ public int getCompletedUnitCount() {
+ return completedUnitCount.get();
}
/**
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessContextSwapper.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessContextSwapper.java
index 600db92cadc..211937bc8b7 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessContextSwapper.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessContextSwapper.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper;
import org.apache.shardingsphere.infra.executor.sql.process.model.ProcessContext;
-import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessUnit;
import org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessContext;
import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
@@ -35,8 +34,8 @@ public final class YamlProcessContextSwapper implements YamlConfigurationSwapper
result.setUsername(data.getUsername());
result.setHostname(data.getHostname());
result.setSql(data.getSql());
- result.setTotalUnitCount(data.getProcessUnits().size());
- result.setCompletedUnitCount(Long.valueOf(data.getProcessUnits().values().stream().filter(ExecuteProcessUnit::isCompleted).count()).intValue());
+ result.setTotalUnitCount(data.getTotalUnitCount());
+ result.setCompletedUnitCount(data.getCompletedUnitCount());
result.setStartTimeMillis(data.getStartMillis());
result.setExecuting(data.isExecuting());
return result;
diff --git a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java
index d27713ed5e5..313520068b2 100644
--- a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java
+++ b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.infra.executor.sql.process;
import org.apache.shardingsphere.infra.binder.QueryContext;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.process.model.ProcessContext;
import org.apache.shardingsphere.test.mock.AutoMockExtension;
@@ -68,10 +67,8 @@ class ProcessReporterTest {
@Test
void assertReportUnit() {
- SQLExecutionUnit sqlExecutionUnit = mock(SQLExecutionUnit.class);
- when(sqlExecutionUnit.getExecutionUnit()).thenReturn(mock(ExecutionUnit.class));
when(showProcessListManager.getProcessContext("foo_id")).thenReturn(mock(ProcessContext.class));
- new ProcessReporter().reportComplete("foo_id", sqlExecutionUnit);
+ new ProcessReporter().reportComplete("foo_id");
verify(showProcessListManager).getProcessContext("foo_id");
}