You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/04/27 21:08:19 UTC
[shardingsphere] branch master updated: Add YamlAllExecuteProcessContexts (#25381)
This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 49ac3771f3c Add YamlAllExecuteProcessContexts (#25381)
49ac3771f3c is described below
commit 49ac3771f3c132ef5c25a2f614efa99dffef21f2
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Fri Apr 28 05:08:10 2023 +0800
Add YamlAllExecuteProcessContexts (#25381)
* Rename ExecuteProcessUnit.status
* Rename ExecuteProcessUnit.status
* Refactor ExecuteProcessContext
* Refactor ExecuteProcessContext
* Refactor ExecuteProcessContext
* Add YamlAllExecuteProcessContexts
* Add test cases
---
.../sql/process/model/ExecuteProcessContext.java | 20 +++----
.../model/yaml/YamlAllExecuteProcessContexts.java | 35 ++++++++++++
.../YamlAllExecuteProcessContextsSwapper.java} | 36 +++++-------
.../YamlAllExecuteProcessContextsSwapperTest.java | 66 ++++++++++++++++++++++
.../subscriber/ProcessListChangedSubscriber.java | 10 ++--
.../subscriber/ProcessStandaloneSubscriber.java | 12 ++--
.../admin/executor/ShowProcessListExecutor.java | 4 +-
7 files changed, 140 insertions(+), 43 deletions(-)
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessContext.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessContext.java
index 6985d6024fd..91d5aa8e476 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessContext.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessContext.java
@@ -44,28 +44,29 @@ public final class ExecuteProcessContext {
private final String hostname;
- private String sql;
+ private final boolean proxyContext;
private final Map<String, ExecuteProcessUnit> processUnits = new HashMap<>();
private final Collection<Statement> processStatements = new LinkedList<>();
- private long startMillis = System.currentTimeMillis();
+ private String sql;
private ExecuteProcessStatus status;
- private final boolean proxyContext;
+ private long startMillis;
public ExecuteProcessContext(final String sql, final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext, final ExecuteProcessStatus status, final boolean isProxyContext) {
- this.executionID = executionGroupContext.getReportContext().getExecutionID();
- this.sql = sql;
- this.databaseName = executionGroupContext.getReportContext().getDatabaseName();
+ executionID = executionGroupContext.getReportContext().getExecutionID();
+ databaseName = executionGroupContext.getReportContext().getDatabaseName();
Grantee grantee = executionGroupContext.getReportContext().getGrantee();
- this.username = null != grantee ? grantee.getUsername() : null;
- this.hostname = null != grantee ? grantee.getHostname() : null;
+ username = null == grantee ? null : grantee.getUsername();
+ hostname = null == grantee ? null : grantee.getHostname();
+ proxyContext = isProxyContext;
+ this.sql = sql;
this.status = status;
+ startMillis = System.currentTimeMillis();
addProcessUnitsAndStatements(executionGroupContext, status);
- proxyContext = isProxyContext;
}
private void addProcessUnitsAndStatements(final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext, final ExecuteProcessStatus processStatus) {
@@ -88,5 +89,4 @@ public final class ExecuteProcessContext {
startMillis = System.currentTimeMillis();
status = ExecuteProcessStatus.SLEEP;
}
-
}
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/YamlAllExecuteProcessContexts.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/YamlAllExecuteProcessContexts.java
new file mode 100644
index 00000000000..9973359a099
--- /dev/null
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/YamlAllExecuteProcessContexts.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.executor.sql.process.model.yaml;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+
+import java.util.Collection;
+import java.util.LinkedList;
+
+/**
+ * All execute process contexts for YAML.
+ */
+@Getter
+@Setter
+public final class YamlAllExecuteProcessContexts implements YamlConfiguration {
+
+ private Collection<YamlExecuteProcessContext> contexts = new LinkedList<>();
+}
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/BatchYamlExecuteProcessContext.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/swapper/YamlAllExecuteProcessContextsSwapper.java
similarity index 50%
rename from infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/BatchYamlExecuteProcessContext.java
rename to infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/swapper/YamlAllExecuteProcessContextsSwapper.java
index f4dff7df8a8..42c2f96f0a3 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/BatchYamlExecuteProcessContext.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/swapper/YamlAllExecuteProcessContextsSwapper.java
@@ -15,37 +15,31 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.infra.executor.sql.process.model.yaml;
+package org.apache.shardingsphere.infra.executor.sql.process.model.yaml.swapper;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
-import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.swapper.YamlExecuteProcessContextSwapper;
+import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlAllExecuteProcessContexts;
+import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
import java.util.Collection;
-import java.util.LinkedList;
+import java.util.stream.Collectors;
/**
- * Batch execute process context for YAML.
+ * YAML all execute process contexts swapper.
*/
-@NoArgsConstructor
-@Getter
-@Setter
-public final class BatchYamlExecuteProcessContext {
+public final class YamlAllExecuteProcessContextsSwapper implements YamlConfigurationSwapper<YamlAllExecuteProcessContexts, Collection<ExecuteProcessContext>> {
- private Collection<YamlExecuteProcessContext> contexts;
+ private final YamlExecuteProcessContextSwapper yamlExecuteProcessContextSwapper = new YamlExecuteProcessContextSwapper();
- public BatchYamlExecuteProcessContext(final Collection<ExecuteProcessContext> processContexts) {
- contexts = getYamlProcessContexts(processContexts);
+ @Override
+ public YamlAllExecuteProcessContexts swapToYamlConfiguration(final Collection<ExecuteProcessContext> data) {
+ YamlAllExecuteProcessContexts result = new YamlAllExecuteProcessContexts();
+ result.setContexts(data.stream().map(yamlExecuteProcessContextSwapper::swapToYamlConfiguration).collect(Collectors.toList()));
+ return result;
}
- private Collection<YamlExecuteProcessContext> getYamlProcessContexts(final Collection<ExecuteProcessContext> processContexts) {
- YamlExecuteProcessContextSwapper yamlExecuteProcessContextSwapper = new YamlExecuteProcessContextSwapper();
- Collection<YamlExecuteProcessContext> result = new LinkedList<>();
- for (ExecuteProcessContext each : processContexts) {
- result.add(yamlExecuteProcessContextSwapper.swapToYamlConfiguration(each));
- }
- return result;
+ @Override
+ public Collection<ExecuteProcessContext> swapToObject(final YamlAllExecuteProcessContexts yamlConfig) {
+ throw new UnsupportedOperationException("YamlAllExecuteProcessContextsSwapper.swapToObject");
}
}
diff --git a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/swapper/YamlAllExecuteProcessContextsSwapperTest.java b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/swapper/YamlAllExecuteProcessContextsSwapperTest.java
new file mode 100644
index 00000000000..5f55c423f38
--- /dev/null
+++ b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/swapper/YamlAllExecuteProcessContextsSwapperTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.executor.sql.process.model.yaml.swapper;
+
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
+import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
+import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
+import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessStatus;
+import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlAllExecuteProcessContexts;
+import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlExecuteProcessContext;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class YamlAllExecuteProcessContextsSwapperTest {
+
+ @Test
+ void assertSwapToYamlConfiguration() {
+ ExecutionGroupReportContext reportContext = new ExecutionGroupReportContext("foo_db", new Grantee("root", "localhost"));
+ ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext = new ExecutionGroupContext<>(Collections.emptyList(), reportContext);
+ ExecuteProcessContext executeProcessContext = new ExecuteProcessContext("SELECT 1", executionGroupContext, ExecuteProcessStatus.START, true);
+ YamlAllExecuteProcessContexts actual = new YamlAllExecuteProcessContextsSwapper().swapToYamlConfiguration(Collections.singleton(executeProcessContext));
+ assertThat(actual.getContexts().size(), is(1));
+ assertYamlExecuteProcessContext(actual.getContexts().iterator().next());
+ }
+
+ private static void assertYamlExecuteProcessContext(final YamlExecuteProcessContext actual) {
+ assertNotNull(actual.getExecutionID());
+ assertThat(actual.getDatabaseName(), is("foo_db"));
+ assertThat(actual.getUsername(), is("root"));
+ assertThat(actual.getHostname(), is("localhost"));
+ assertThat(actual.getSql(), is("SELECT 1"));
+ assertTrue(actual.getUnitStatuses().isEmpty());
+ assertThat(actual.getStartTimeMillis(), lessThanOrEqualTo(System.currentTimeMillis()));
+ assertThat(actual.getProcessStatus(), is(ExecuteProcessStatus.START));
+ }
+
+ @Test
+ void assertSwapToObject() {
+ assertThrows(UnsupportedOperationException.class, () -> new YamlAllExecuteProcessContextsSwapper().swapToObject(new YamlAllExecuteProcessContexts()));
+ }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
index 2372bd69b3a..b2709c09b04 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
@@ -21,16 +21,16 @@ import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.infra.executor.sql.process.ShowProcessListManager;
import org.apache.shardingsphere.infra.executor.sql.process.lock.ShowProcessListSimpleLock;
import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
-import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.BatchYamlExecuteProcessContext;
+import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.swapper.YamlAllExecuteProcessContextsSwapper;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
+import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessListIdEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessListIdUnitCompleteEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
-import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
import java.sql.SQLException;
import java.sql.Statement;
@@ -46,6 +46,8 @@ public final class ProcessListChangedSubscriber {
private final ContextManager contextManager;
+ private final YamlAllExecuteProcessContextsSwapper swapper = new YamlAllExecuteProcessContextsSwapper();
+
public ProcessListChangedSubscriber(final RegistryCenter registryCenter, final ContextManager contextManager) {
this.registryCenter = registryCenter;
this.contextManager = contextManager;
@@ -64,8 +66,8 @@ public final class ProcessListChangedSubscriber {
}
Collection<ExecuteProcessContext> processContexts = ShowProcessListManager.getInstance().getAllProcessContext();
if (!processContexts.isEmpty()) {
- registryCenter.getRepository().persist(ProcessNode.getProcessListInstancePath(event.getProcessListId(), event.getInstanceId()),
- YamlEngine.marshal(new BatchYamlExecuteProcessContext(processContexts)));
+ registryCenter.getRepository().persist(
+ ProcessNode.getProcessListInstancePath(event.getProcessListId(), event.getInstanceId()), YamlEngine.marshal(swapper.swapToYamlConfiguration(processContexts)));
}
registryCenter.getRepository().delete(ComputeNode.getProcessTriggerInstanceIdNodePath(event.getInstanceId(), event.getProcessListId()));
}
diff --git a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/ProcessStandaloneSubscriber.java b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/ProcessStandaloneSubscriber.java
index 90de8cd414e..cbb378c4fb2 100644
--- a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/ProcessStandaloneSubscriber.java
+++ b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/ProcessStandaloneSubscriber.java
@@ -20,7 +20,8 @@ package org.apache.shardingsphere.mode.manager.standalone.subscriber;
import com.google.common.eventbus.Subscribe;
import lombok.SneakyThrows;
import org.apache.shardingsphere.infra.executor.sql.process.ShowProcessListManager;
-import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.BatchYamlExecuteProcessContext;
+import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlAllExecuteProcessContexts;
+import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.swapper.YamlAllExecuteProcessContextsSwapper;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.event.process.KillProcessListIdRequestEvent;
@@ -29,7 +30,6 @@ import org.apache.shardingsphere.mode.event.process.ShowProcessListResponseEvent
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -41,6 +41,8 @@ public final class ProcessStandaloneSubscriber {
private final EventBusContext eventBusContext;
+ private final YamlAllExecuteProcessContextsSwapper swapper = new YamlAllExecuteProcessContextsSwapper();
+
public ProcessStandaloneSubscriber(final EventBusContext eventBusContext) {
this.eventBusContext = eventBusContext;
eventBusContext.register(this);
@@ -53,10 +55,8 @@ public final class ProcessStandaloneSubscriber {
*/
@Subscribe
public void loadShowProcessListData(final ShowProcessListRequestEvent event) {
- BatchYamlExecuteProcessContext batchYamlExecuteProcessContext = new BatchYamlExecuteProcessContext(new ArrayList<>(ShowProcessListManager.getInstance().getProcessContexts().values()));
- eventBusContext.post(new ShowProcessListResponseEvent(batchYamlExecuteProcessContext.getContexts().isEmpty()
- ? Collections.emptyList()
- : Collections.singletonList(YamlEngine.marshal(batchYamlExecuteProcessContext))));
+ YamlAllExecuteProcessContexts yamlContexts = swapper.swapToYamlConfiguration(ShowProcessListManager.getInstance().getProcessContexts().values());
+ eventBusContext.post(new ShowProcessListResponseEvent(yamlContexts.getContexts().isEmpty() ? Collections.emptyList() : Collections.singleton(YamlEngine.marshal(yamlContexts))));
}
/**
diff --git a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
index 0b1880097b7..bd49a203195 100644
--- a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
+++ b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.ra
import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.raw.type.RawMemoryQueryResult;
import org.apache.shardingsphere.infra.executor.sql.execute.result.query.type.memory.row.MemoryQueryResultDataRow;
import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessStatus;
-import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.BatchYamlExecuteProcessContext;
+import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlAllExecuteProcessContexts;
import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlExecuteProcessContext;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import org.apache.shardingsphere.infra.merge.result.impl.transparent.TransparentMergedResult;
@@ -87,7 +87,7 @@ public final class ShowProcessListExecutor implements DatabaseAdminQueryExecutor
}
Collection<YamlExecuteProcessContext> processContexts = new LinkedList<>();
for (String each : batchProcessContexts) {
- processContexts.addAll(YamlEngine.unmarshal(each, BatchYamlExecuteProcessContext.class).getContexts());
+ processContexts.addAll(YamlEngine.unmarshal(each, YamlAllExecuteProcessContexts.class).getContexts());
}
List<MemoryQueryResultDataRow> rows = processContexts.stream().map(ShowProcessListExecutor::getMemoryQueryResultDataRow).collect(Collectors.toList());
return new RawMemoryQueryResult(queryResultMetaData, rows);