You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/04/16 08:42:23 UTC
[shardingsphere] branch master updated: Query execution process
from other proxies for running SQLs by RQL (#10006)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f57b3d2 Query execution process from other proxies for running SQLs by RQL (#10006)
f57b3d2 is described below
commit f57b3d27b373a452423178ba1b89a83c4bf2d9a5
Author: sandynz <42...@users.noreply.github.com>
AuthorDate: Fri Apr 16 16:41:39 2021 +0800
Query execution process from other proxies for running SQLs by RQL (#10006)
* "show processlist" query
* "show processlist" unit test
* checkstyle
* Fix compile error
* code conduct
* Follow code review advice: refactor initRegistryRepository
* Follow code review advice: use eventbus to fetch data from governance
* Follow code review advice: rename
---
.../invocation/ShowProcessListRequestEvent.java | 15 +--
.../invocation/ShowProcessListResponseEvent.java | 16 +--
.../governance/core/registry/RegistryCenter.java | 14 +++
.../core/registry/RegistryCenterNode.java | 21 ++++
.../kernel/model/ExecutionGroupContext.java | 2 +-
.../sql/process/model/ExecuteProcessContext.java | 2 +
.../admin/mysql/MySQLAdminExecutorFactory.java | 5 +
.../mysql/executor/ShowProcessListExecutor.java | 122 +++++++++++++++++++++
.../executor/model/YamlExecuteProcessContext.java | 22 ++--
.../executor/model/YamlExecuteProcessUnit.java | 24 ++--
.../executor/ShowProcessListExecutorTest.java | 112 +++++++++++++++++++
.../impl/MySQLDALStatementSQLVisitor.java | 7 ++
.../mysql/dal/MySQLShowProcessListStatement.java | 24 ++--
13 files changed, 322 insertions(+), 64 deletions(-)
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupContext.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/event/model/invocation/ShowProcessListRequestEvent.java
similarity index 70%
copy from shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupContext.java
copy to shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/event/model/invocation/ShowProcessListRequestEvent.java
index 9347efc..0a7abc0 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupContext.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/event/model/invocation/ShowProcessListRequestEvent.java
@@ -15,24 +15,15 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.infra.executor.kernel.model;
+package org.apache.shardingsphere.governance.core.event.model.invocation;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import java.util.Collection;
-import java.util.UUID;
-
/**
- * Execution group context.
- *
- * @param <T> type of execution input value
+ * Show process list request event.
*/
@RequiredArgsConstructor
@Getter
-public final class ExecutionGroupContext<T> {
-
- private final Collection<ExecutionGroup<T>> inputGroups;
-
- private final String executionID = UUID.randomUUID().toString();
+public final class ShowProcessListRequestEvent {
}
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupContext.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/event/model/invocation/ShowProcessListResponseEvent.java
similarity index 73%
copy from shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupContext.java
copy to shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/event/model/invocation/ShowProcessListResponseEvent.java
index 9347efc..2565771 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupContext.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/event/model/invocation/ShowProcessListResponseEvent.java
@@ -15,24 +15,18 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.infra.executor.kernel.model;
+package org.apache.shardingsphere.governance.core.event.model.invocation;
+import java.util.Collection;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import java.util.Collection;
-import java.util.UUID;
-
/**
- * Execution group context.
- *
- * @param <T> type of execution input value
+ * Show process list response event.
*/
@RequiredArgsConstructor
@Getter
-public final class ExecutionGroupContext<T> {
-
- private final Collection<ExecutionGroup<T>> inputGroups;
+public final class ShowProcessListResponseEvent {
- private final String executionID = UUID.randomUUID().toString();
+ private final Collection<String> processListData;
}
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
index dd255f0..a65cb93 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
@@ -23,6 +23,8 @@ import com.google.common.base.Strings;
import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.governance.core.event.model.datasource.DataSourceAddedEvent;
import org.apache.shardingsphere.governance.core.event.model.datasource.DataSourceAlteredEvent;
+import org.apache.shardingsphere.governance.core.event.model.invocation.ShowProcessListRequestEvent;
+import org.apache.shardingsphere.governance.core.event.model.invocation.ShowProcessListResponseEvent;
import org.apache.shardingsphere.governance.core.event.model.metadata.MetaDataCreatedEvent;
import org.apache.shardingsphere.governance.core.event.model.metadata.MetaDataDroppedEvent;
import org.apache.shardingsphere.governance.core.event.model.rule.RuleConfigurationCachedEvent;
@@ -479,6 +481,18 @@ public final class RegistryCenter {
}
/**
+ * Load show process list data.
+ *
+ * @param event get children request event.
+ */
+ @Subscribe
+ public void loadShowProcessListData(final ShowProcessListRequestEvent event) {
+ List<String> childrenKeys = repository.getChildrenKeys(node.getExecutionNodesPath());
+ Collection<String> processListData = childrenKeys.stream().map(key -> repository.get(node.getExecutionPath(key))).collect(Collectors.toList());
+ ShardingSphereEventBus.getInstance().post(new ShowProcessListResponseEvent(processListData));
+ }
+
+ /**
* Persist instance online.
*/
public void persistInstanceOnline() {
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNode.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNode.java
index dc60f84..70af0d0 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNode.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNode.java
@@ -45,6 +45,8 @@ public final class RegistryCenterNode {
private static final String PRIMARY_NODES_NAME = "primarynodes";
+ private static final String EXECUTION_NODES_NAME = "executionnodes";
+
private static final String METADATA_NODE = "metadata";
private static final String DATA_SOURCE_NODE = "datasource";
@@ -356,4 +358,23 @@ public final class RegistryCenterNode {
public Collection<String> getAllDataSourcePaths(final Collection<String> schemaNames) {
return schemaNames.stream().map(this::getMetadataDataSourcePath).collect(Collectors.toList());
}
+
+ /**
+ * Get execution nodes path.
+ *
+ * @return execution nodes path
+ */
+ public String getExecutionNodesPath() {
+ return Joiner.on("/").join("", EXECUTION_NODES_NAME);
+ }
+
+ /**
+ * Get execution path.
+ *
+ * @param executionId execution id
+ * @return execution path
+ */
+ public String getExecutionPath(final String executionId) {
+ return Joiner.on("/").join("", EXECUTION_NODES_NAME, executionId);
+ }
}
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupContext.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupContext.java
index 9347efc..18d0953 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupContext.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupContext.java
@@ -18,10 +18,10 @@
package org.apache.shardingsphere.infra.executor.kernel.model;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
import java.util.Collection;
import java.util.UUID;
+import lombok.RequiredArgsConstructor;
/**
* Execution group context.
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessContext.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessContext.java
index 2123b31..f5589d1 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessContext.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessContext.java
@@ -35,6 +35,8 @@ public final class ExecuteProcessContext {
private final Collection<ExecuteProcessUnit> unitStatuses;
+ private final long startTimeMillis = System.currentTimeMillis();
+
public ExecuteProcessContext(final ExecutionGroupContext<SQLExecutionUnit> executionGroupContext) {
this.executionID = executionGroupContext.getExecutionID();
unitStatuses = createExecutionUnitStatuses(executionGroupContext);
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/MySQLAdminExecutorFactory.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/MySQLAdminExecutorFactory.java
index b36a4de..b37c786 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/MySQLAdminExecutorFactory.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/MySQLAdminExecutorFactory.java
@@ -21,6 +21,7 @@ import org.apache.shardingsphere.proxy.backend.text.admin.executor.DatabaseAdmin
import org.apache.shardingsphere.proxy.backend.text.admin.executor.DatabaseAdminExecutorFactory;
import org.apache.shardingsphere.proxy.backend.text.admin.mysql.executor.ShowCurrentDatabaseExecutor;
import org.apache.shardingsphere.proxy.backend.text.admin.mysql.executor.ShowDatabasesExecutor;
+import org.apache.shardingsphere.proxy.backend.text.admin.mysql.executor.ShowProcessListExecutor;
import org.apache.shardingsphere.proxy.backend.text.admin.mysql.executor.ShowTablesExecutor;
import org.apache.shardingsphere.proxy.backend.text.admin.mysql.executor.UseDatabaseExecutor;
import org.apache.shardingsphere.sql.parser.sql.common.segment.dml.item.ExpressionProjectionSegment;
@@ -31,6 +32,7 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.dal.UseStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dal.MySQLShowDatabasesStatement;
+import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dal.MySQLShowProcessListStatement;
import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dal.MySQLShowTablesStatement;
import java.util.Optional;
@@ -55,6 +57,9 @@ public final class MySQLAdminExecutorFactory implements DatabaseAdminExecutorFac
if (sqlStatement instanceof MySQLShowTablesStatement) {
return Optional.of(new ShowTablesExecutor());
}
+ if (sqlStatement instanceof MySQLShowProcessListStatement) {
+ return Optional.of(new ShowProcessListExecutor());
+ }
if (sqlStatement instanceof SelectStatement) {
if (isShowCurrentDatabaseStatement((SelectStatement) sqlStatement)) {
return Optional.of(new ShowCurrentDatabaseExecutor());
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/executor/ShowProcessListExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/executor/ShowProcessListExecutor.java
new file mode 100644
index 0000000..b407ca4b
--- /dev/null
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/executor/ShowProcessListExecutor.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.backend.text.admin.mysql.executor;
+
+import com.google.common.eventbus.Subscribe;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import org.apache.shardingsphere.governance.core.event.model.invocation.ShowProcessListRequestEvent;
+import org.apache.shardingsphere.governance.core.event.model.invocation.ShowProcessListResponseEvent;
+import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResultMetaData;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.raw.metadata.RawQueryResultColumnMetaData;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.raw.metadata.RawQueryResultMetaData;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.raw.type.RawMemoryQueryResult;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.query.type.memory.row.MemoryQueryResultDataRow;
+import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
+import org.apache.shardingsphere.infra.merge.result.MergedResult;
+import org.apache.shardingsphere.infra.merge.result.impl.transparent.TransparentMergedResult;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
+import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import org.apache.shardingsphere.proxy.backend.text.admin.executor.DatabaseAdminQueryExecutor;
+import org.apache.shardingsphere.proxy.backend.text.admin.mysql.executor.model.YamlExecuteProcessContext;
+
+/**
+ * Show process list executor.
+ */
+public final class ShowProcessListExecutor implements DatabaseAdminQueryExecutor {
+
+ private Collection<String> processListData;
+
+ @Getter
+ private QueryResultMetaData queryResultMetaData;
+
+ @Getter
+ private MergedResult mergedResult;
+
+ public ShowProcessListExecutor() {
+ ShardingSphereEventBus.getInstance().register(this);
+ }
+
+ /**
+ * Receive and handle response event.
+ *
+ * @param event show process list response event
+ */
+ @Subscribe
+ public void receiveProcessListData(final ShowProcessListResponseEvent event) {
+ processListData = event.getProcessListData();
+ }
+
+ @Override
+ public void execute(final BackendConnection backendConnection) {
+ queryResultMetaData = createQueryResultMetaData();
+ mergedResult = new TransparentMergedResult(getQueryResult(backendConnection));
+ }
+
+ private QueryResult getQueryResult(final BackendConnection backendConnection) {
+ if (!ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName()).isComplete()) {
+ return new RawMemoryQueryResult(queryResultMetaData, Collections.emptyList());
+ }
+ ShardingSphereEventBus.getInstance().post(new ShowProcessListRequestEvent());
+ if (null == processListData || processListData.isEmpty()) {
+ return new RawMemoryQueryResult(queryResultMetaData, Collections.emptyList());
+ }
+ Collection<YamlExecuteProcessContext> processContexts = processListData.stream()
+ .map(value -> YamlEngine.unmarshal(value, YamlExecuteProcessContext.class)).collect(Collectors.toList());
+ Grantee grantee = backendConnection.getGrantee();
+ List<MemoryQueryResultDataRow> rows = processContexts.stream().map(processContext -> {
+ List<Object> rowValues = new ArrayList<>(8);
+ rowValues.add(processContext.getExecutionID());
+ rowValues.add(grantee.getUsername());
+ rowValues.add(grantee.getHostname());
+ rowValues.add(backendConnection.getSchemaName());
+ rowValues.add("Execute");
+ rowValues.add(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - processContext.getStartTimeMillis()));
+ int processDoneCount = processContext.getUnitStatuses().stream().map(processUnit -> ExecuteProcessConstants.EXECUTE_STATUS_DONE == processUnit.getStatus() ? 1 : 0).reduce(0, Integer::sum);
+ String statePrefix = "Executing ";
+ rowValues.add(statePrefix + processDoneCount + "/" + processContext.getUnitStatuses().size());
+ // TODO Show original SQL
+ rowValues.add("");
+ return new MemoryQueryResultDataRow(rowValues);
+ }).collect(Collectors.toList());
+ return new RawMemoryQueryResult(queryResultMetaData, rows);
+ }
+
+ private QueryResultMetaData createQueryResultMetaData() {
+ List<RawQueryResultColumnMetaData> columns = new ArrayList<>();
+ columns.add(new RawQueryResultColumnMetaData("", "Id", "Id", Types.VARCHAR, "VARCHAR", 20, 0));
+ columns.add(new RawQueryResultColumnMetaData("", "User", "User", Types.VARCHAR, "VARCHAR", 20, 0));
+ columns.add(new RawQueryResultColumnMetaData("", "Host", "Host", Types.VARCHAR, "VARCHAR", 64, 0));
+ columns.add(new RawQueryResultColumnMetaData("", "db", "db", Types.VARCHAR, "VARCHAR", 64, 0));
+ columns.add(new RawQueryResultColumnMetaData("", "Command", "Command", Types.VARCHAR, "VARCHAR", 64, 0));
+ columns.add(new RawQueryResultColumnMetaData("", "Time", "Time", Types.VARCHAR, "VARCHAR", 10, 0));
+ columns.add(new RawQueryResultColumnMetaData("", "State", "State", Types.VARCHAR, "VARCHAR", 64, 0));
+ columns.add(new RawQueryResultColumnMetaData("", "Info", "Info", Types.VARCHAR, "VARCHAR", 120, 0));
+ return new RawQueryResultMetaData(columns);
+ }
+}
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupContext.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/executor/model/YamlExecuteProcessContext.java
similarity index 68%
copy from shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupContext.java
copy to shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/executor/model/YamlExecuteProcessContext.java
index 9347efc..ffa4787 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupContext.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/executor/model/YamlExecuteProcessContext.java
@@ -15,24 +15,22 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.infra.executor.kernel.model;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+package org.apache.shardingsphere.proxy.backend.text.admin.mysql.executor.model;
import java.util.Collection;
-import java.util.UUID;
+import lombok.Getter;
+import lombok.Setter;
/**
- * Execution group context.
- *
- * @param <T> type of execution input value
+ * Execute process context for YAML.
*/
-@RequiredArgsConstructor
@Getter
-public final class ExecutionGroupContext<T> {
+@Setter
+public final class YamlExecuteProcessContext {
+
+ private String executionID;
- private final Collection<ExecutionGroup<T>> inputGroups;
+ private Collection<YamlExecuteProcessUnit> unitStatuses;
- private final String executionID = UUID.randomUUID().toString();
+ private Long startTimeMillis;
}
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupContext.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/executor/model/YamlExecuteProcessUnit.java
similarity index 65%
copy from shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupContext.java
copy to shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/executor/model/YamlExecuteProcessUnit.java
index 9347efc..9cf0eda 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupContext.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/executor/model/YamlExecuteProcessUnit.java
@@ -15,24 +15,24 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.infra.executor.kernel.model;
+package org.apache.shardingsphere.proxy.backend.text.admin.mysql.executor.model;
+import lombok.AllArgsConstructor;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-import java.util.Collection;
-import java.util.UUID;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
/**
- * Execution group context.
- *
- * @param <T> type of execution input value
+ * Execute process unit for YAML.
*/
-@RequiredArgsConstructor
+@NoArgsConstructor
+@AllArgsConstructor
@Getter
-public final class ExecutionGroupContext<T> {
+@Setter
+public final class YamlExecuteProcessUnit {
- private final Collection<ExecutionGroup<T>> inputGroups;
+ private String unitID;
- private final String executionID = UUID.randomUUID().toString();
+ private ExecuteProcessConstants status;
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/executor/ShowProcessListExecutorTest.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/executor/ShowProcessListExecutorTest.java
new file mode 100644
index 0000000..49cbc3a
--- /dev/null
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/executor/ShowProcessListExecutorTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.backend.text.admin.mysql.executor;
+
+import java.lang.reflect.Field;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
+import org.apache.shardingsphere.infra.context.metadata.MetaDataContexts;
+import org.apache.shardingsphere.infra.context.metadata.impl.StandardMetaDataContexts;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
+import org.apache.shardingsphere.infra.merge.result.MergedResult;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.infra.metadata.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
+import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUsers;
+import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public final class ShowProcessListExecutorTest {
+
+ private static final String SCHEMA_NAME = "sharding_db";
+
+ private ShowProcessListExecutor showProcessListExecutor;
+
+ @Before
+ public void setUp() throws NoSuchFieldException, IllegalAccessException {
+ showProcessListExecutor = new ShowProcessListExecutor();
+ setupMetaDataContexts();
+ setupChildrenValues();
+ }
+
+ private void setupMetaDataContexts() throws NoSuchFieldException, IllegalAccessException {
+ Field metaDataContextsField = ProxyContext.getInstance().getClass().getDeclaredField("metaDataContexts");
+ metaDataContextsField.setAccessible(true);
+ Map<String, ShardingSphereMetaData> metaDataMap = getMetaDataMap();
+ MetaDataContexts metaDataContexts = new StandardMetaDataContexts(metaDataMap, mock(ShardingSphereRuleMetaData.class), mock(ExecutorEngine.class),
+ new ShardingSphereUsers(Collections.singleton(new ShardingSphereUser("root", "root", ""))), new ConfigurationProperties(new Properties()));
+ metaDataContextsField.set(ProxyContext.getInstance(), metaDataContexts);
+ }
+
+ private Map<String, ShardingSphereMetaData> getMetaDataMap() {
+ Map<String, ShardingSphereMetaData> result = new HashMap<>(2);
+ ShardingSphereMetaData metaData = mock(ShardingSphereMetaData.class, RETURNS_DEEP_STUBS);
+ when(metaData.isComplete()).thenReturn(true);
+ when(metaData.getResource().getDatabaseType()).thenReturn(new MySQLDatabaseType());
+ result.put(SCHEMA_NAME, metaData);
+ return result;
+ }
+
+ private void setupChildrenValues() throws NoSuchFieldException, IllegalAccessException {
+ Field childrenValuesField = showProcessListExecutor.getClass().getDeclaredField("processListData");
+ childrenValuesField.setAccessible(true);
+ String executionNodeValue = "executionID: f6c2336a-63ba-41bf-941e-2e3504eb2c80\n"
+ + "startTimeMillis: 1617939785160\n"
+ + "unitStatuses:\n"
+ + "- status: EXECUTE_STATUS_START\n"
+ + " unitID: unitID1\n"
+ + "- status: EXECUTE_STATUS_DONE\n"
+ + " unitID: unitID2\n";
+ childrenValuesField.set(showProcessListExecutor, Collections.singleton(executionNodeValue));
+ }
+
+ @Test
+ public void assertExecute() throws SQLException {
+ showProcessListExecutor.execute(mockBackendConnection());
+ assertThat(showProcessListExecutor.getQueryResultMetaData().getColumnCount(), is(8));
+ MergedResult mergedResult = showProcessListExecutor.getMergedResult();
+ while (mergedResult.next()) {
+ assertThat(mergedResult.getValue(1, String.class), is("f6c2336a-63ba-41bf-941e-2e3504eb2c80"));
+ assertThat(mergedResult.getValue(2, String.class), is("root"));
+ assertThat(mergedResult.getValue(3, String.class), is("localhost:30000"));
+ assertThat(mergedResult.getValue(4, String.class), is(SCHEMA_NAME));
+ assertThat(mergedResult.getValue(7, String.class), is("Executing 1/2"));
+ }
+ }
+
+ private BackendConnection mockBackendConnection() {
+ BackendConnection result = mock(BackendConnection.class);
+ when(result.getGrantee()).thenReturn(new Grantee("root", "localhost:30000"));
+ when(result.getSchemaName()).thenReturn(SCHEMA_NAME);
+ return result;
+ }
+}
diff --git a/shardingsphere-sql-parser/shardingsphere-sql-parser-dialect/shardingsphere-sql-parser-mysql/src/main/java/org/apache/shardingsphere/sql/parser/mysql/visitor/statement/impl/MySQLDALStatementSQLVisitor.java b/shardingsphere-sql-parser/shardingsphere-sql-parser-dialect/shardingsphere-sql-parser-mysql/src/main/java/org/apache/shardingsphere/sql/parser/mysql/visitor/statement/impl/MySQLDALStatementSQLVisitor.java
index 559b70b..ba4f16c 100644
--- a/shardingsphere-sql-parser/shardingsphere-sql-parser-dialect/shardingsphere-sql-parser-mysql/src/main/java/org/apache/shardingsphere/sql/parser/mysql/visitor/statement/impl/MySQLDALStatementSQLVisitor.java
+++ b/shardingsphere-sql-parser/shardingsphere-sql-parser-dialect/shardingsphere-sql-parser-mysql/src/main/java/org/apache/shardingsphere/sql/parser/mysql/visitor/statement/impl/MySQLDALStatementSQLVisitor.java
@@ -54,6 +54,7 @@ import org.apache.shardingsphere.sql.parser.autogen.MySQLStatementParser.ShowDat
import org.apache.shardingsphere.sql.parser.autogen.MySQLStatementParser.ShowErrorsContext;
import org.apache.shardingsphere.sql.parser.autogen.MySQLStatementParser.ShowIndexContext;
import org.apache.shardingsphere.sql.parser.autogen.MySQLStatementParser.ShowLikeContext;
+import org.apache.shardingsphere.sql.parser.autogen.MySQLStatementParser.ShowProcesslistContext;
import org.apache.shardingsphere.sql.parser.autogen.MySQLStatementParser.ShowStatusContext;
import org.apache.shardingsphere.sql.parser.autogen.MySQLStatementParser.ShowTableStatusContext;
import org.apache.shardingsphere.sql.parser.autogen.MySQLStatementParser.ShowTablesContext;
@@ -97,6 +98,7 @@ import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dal.MySQ
import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dal.MySQLShowErrorsStatement;
import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dal.MySQLShowIndexStatement;
import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dal.MySQLShowOtherStatement;
+import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dal.MySQLShowProcessListStatement;
import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dal.MySQLShowStatusStatement;
import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dal.MySQLShowTableStatusStatement;
import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dal.MySQLShowTablesStatement;
@@ -311,6 +313,11 @@ public final class MySQLDALStatementSQLVisitor extends MySQLStatementSQLVisitor
}
@Override
+ public ASTNode visitShowProcesslist(final ShowProcesslistContext ctx) {
+ return new MySQLShowProcessListStatement();
+ }
+
+ @Override
public ASTNode visitSetVariable(final SetVariableContext ctx) {
MySQLSetStatement result = new MySQLSetStatement();
Collection<VariableAssignSegment> variableAssigns = getVariableAssigns(ctx.optionValueList());
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupContext.java b/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/main/java/org/apache/shardingsphere/sql/parser/sql/dialect/statement/mysql/dal/MySQLShowProcessListStatement.java
similarity index 60%
copy from shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupContext.java
copy to shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/main/java/org/apache/shardingsphere/sql/parser/sql/dialect/statement/mysql/dal/MySQLShowProcessListStatement.java
index 9347efc..1142416 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupContext.java
+++ b/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/main/java/org/apache/shardingsphere/sql/parser/sql/dialect/statement/mysql/dal/MySQLShowProcessListStatement.java
@@ -15,24 +15,16 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.infra.executor.kernel.model;
+package org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dal;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-import java.util.Collection;
-import java.util.UUID;
+import lombok.ToString;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.AbstractSQLStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
+import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.MySQLStatement;
/**
- * Execution group context.
- *
- * @param <T> type of execution input value
+ * MySQL show process list statement.
*/
-@RequiredArgsConstructor
-@Getter
-public final class ExecutionGroupContext<T> {
-
- private final Collection<ExecutionGroup<T>> inputGroups;
-
- private final String executionID = UUID.randomUUID().toString();
+@ToString
+public final class MySQLShowProcessListStatement extends AbstractSQLStatement implements DALStatement, MySQLStatement {
}