You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/04/27 21:08:19 UTC

[shardingsphere] branch master updated: Add YamlAllExecuteProcessContexts (#25381)

This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 49ac3771f3c Add YamlAllExecuteProcessContexts (#25381)
49ac3771f3c is described below

commit 49ac3771f3c132ef5c25a2f614efa99dffef21f2
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Fri Apr 28 05:08:10 2023 +0800

    Add YamlAllExecuteProcessContexts (#25381)
    
    * Rename ExecuteProcessUnit.status
    
    * Rename ExecuteProcessUnit.status
    
    * Refactor ExecuteProcessContext
    
    * Refactor ExecuteProcessContext
    
    * Refactor ExecuteProcessContext
    
    * Add YamlAllExecuteProcessContexts
    
    * Add test cases
---
 .../sql/process/model/ExecuteProcessContext.java   | 20 +++----
 .../model/yaml/YamlAllExecuteProcessContexts.java  | 35 ++++++++++++
 .../YamlAllExecuteProcessContextsSwapper.java}     | 36 +++++-------
 .../YamlAllExecuteProcessContextsSwapperTest.java  | 66 ++++++++++++++++++++++
 .../subscriber/ProcessListChangedSubscriber.java   | 10 ++--
 .../subscriber/ProcessStandaloneSubscriber.java    | 12 ++--
 .../admin/executor/ShowProcessListExecutor.java    |  4 +-
 7 files changed, 140 insertions(+), 43 deletions(-)

diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessContext.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessContext.java
index 6985d6024fd..91d5aa8e476 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessContext.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessContext.java
@@ -44,28 +44,29 @@ public final class ExecuteProcessContext {
     
     private final String hostname;
     
-    private String sql;
+    private final boolean proxyContext;
     
     private final Map<String, ExecuteProcessUnit> processUnits = new HashMap<>();
     
     private final Collection<Statement> processStatements = new LinkedList<>();
     
-    private long startMillis = System.currentTimeMillis();
+    private String sql;
     
     private ExecuteProcessStatus status;
     
-    private final boolean proxyContext;
+    private long startMillis;
     
     public ExecuteProcessContext(final String sql, final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext, final ExecuteProcessStatus status, final boolean isProxyContext) {
-        this.executionID = executionGroupContext.getReportContext().getExecutionID();
-        this.sql = sql;
-        this.databaseName = executionGroupContext.getReportContext().getDatabaseName();
+        executionID = executionGroupContext.getReportContext().getExecutionID();
+        databaseName = executionGroupContext.getReportContext().getDatabaseName();
         Grantee grantee = executionGroupContext.getReportContext().getGrantee();
-        this.username = null != grantee ? grantee.getUsername() : null;
-        this.hostname = null != grantee ? grantee.getHostname() : null;
+        username = null == grantee ? null : grantee.getUsername();
+        hostname = null == grantee ? null : grantee.getHostname();
+        proxyContext = isProxyContext;
+        this.sql = sql;
         this.status = status;
+        startMillis = System.currentTimeMillis();
         addProcessUnitsAndStatements(executionGroupContext, status);
-        proxyContext = isProxyContext;
     }
     
     private void addProcessUnitsAndStatements(final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext, final ExecuteProcessStatus processStatus) {
@@ -88,5 +89,4 @@ public final class ExecuteProcessContext {
         startMillis = System.currentTimeMillis();
         status = ExecuteProcessStatus.SLEEP;
     }
-    
 }
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/YamlAllExecuteProcessContexts.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/YamlAllExecuteProcessContexts.java
new file mode 100644
index 00000000000..9973359a099
--- /dev/null
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/YamlAllExecuteProcessContexts.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.executor.sql.process.model.yaml;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+
+import java.util.Collection;
+import java.util.LinkedList;
+
+/**
+ * All execute process contexts for YAML.
+ */
+@Getter
+@Setter
+public final class YamlAllExecuteProcessContexts implements YamlConfiguration {
+    
+    private Collection<YamlExecuteProcessContext> contexts = new LinkedList<>();
+}
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/BatchYamlExecuteProcessContext.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/swapper/YamlAllExecuteProcessContextsSwapper.java
similarity index 50%
rename from infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/BatchYamlExecuteProcessContext.java
rename to infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/swapper/YamlAllExecuteProcessContextsSwapper.java
index f4dff7df8a8..42c2f96f0a3 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/BatchYamlExecuteProcessContext.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/swapper/YamlAllExecuteProcessContextsSwapper.java
@@ -15,37 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.infra.executor.sql.process.model.yaml;
+package org.apache.shardingsphere.infra.executor.sql.process.model.yaml.swapper;
 
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
 import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
-import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.swapper.YamlExecuteProcessContextSwapper;
+import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlAllExecuteProcessContexts;
+import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
 
 import java.util.Collection;
-import java.util.LinkedList;
+import java.util.stream.Collectors;
 
 /**
- * Batch execute process context for YAML.
+ * YAML all execute process contexts swapper.
  */
-@NoArgsConstructor
-@Getter
-@Setter
-public final class BatchYamlExecuteProcessContext {
+public final class YamlAllExecuteProcessContextsSwapper implements YamlConfigurationSwapper<YamlAllExecuteProcessContexts, Collection<ExecuteProcessContext>> {
     
-    private Collection<YamlExecuteProcessContext> contexts;
+    private final YamlExecuteProcessContextSwapper yamlExecuteProcessContextSwapper = new YamlExecuteProcessContextSwapper();
     
-    public BatchYamlExecuteProcessContext(final Collection<ExecuteProcessContext> processContexts) {
-        contexts = getYamlProcessContexts(processContexts);
+    @Override
+    public YamlAllExecuteProcessContexts swapToYamlConfiguration(final Collection<ExecuteProcessContext> data) {
+        YamlAllExecuteProcessContexts result = new YamlAllExecuteProcessContexts();
+        result.setContexts(data.stream().map(yamlExecuteProcessContextSwapper::swapToYamlConfiguration).collect(Collectors.toList()));
+        return result;
     }
     
-    private Collection<YamlExecuteProcessContext> getYamlProcessContexts(final Collection<ExecuteProcessContext> processContexts) {
-        YamlExecuteProcessContextSwapper yamlExecuteProcessContextSwapper = new YamlExecuteProcessContextSwapper();
-        Collection<YamlExecuteProcessContext> result = new LinkedList<>();
-        for (ExecuteProcessContext each : processContexts) {
-            result.add(yamlExecuteProcessContextSwapper.swapToYamlConfiguration(each));
-        }
-        return result;
+    @Override
+    public Collection<ExecuteProcessContext> swapToObject(final YamlAllExecuteProcessContexts yamlConfig) {
+        throw new UnsupportedOperationException("YamlAllExecuteProcessContextsSwapper.swapToObject");
     }
 }
diff --git a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/swapper/YamlAllExecuteProcessContextsSwapperTest.java b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/swapper/YamlAllExecuteProcessContextsSwapperTest.java
new file mode 100644
index 00000000000..5f55c423f38
--- /dev/null
+++ b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/swapper/YamlAllExecuteProcessContextsSwapperTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.executor.sql.process.model.yaml.swapper;
+
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
+import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
+import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
+import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessStatus;
+import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlAllExecuteProcessContexts;
+import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlExecuteProcessContext;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class YamlAllExecuteProcessContextsSwapperTest {
+    
+    @Test
+    void assertSwapToYamlConfiguration() {
+        ExecutionGroupReportContext reportContext = new ExecutionGroupReportContext("foo_db", new Grantee("root", "localhost"));
+        ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext = new ExecutionGroupContext<>(Collections.emptyList(), reportContext);
+        ExecuteProcessContext executeProcessContext = new ExecuteProcessContext("SELECT 1", executionGroupContext, ExecuteProcessStatus.START, true);
+        YamlAllExecuteProcessContexts actual = new YamlAllExecuteProcessContextsSwapper().swapToYamlConfiguration(Collections.singleton(executeProcessContext));
+        assertThat(actual.getContexts().size(), is(1));
+        assertYamlExecuteProcessContext(actual.getContexts().iterator().next());
+    }
+    
+    private static void assertYamlExecuteProcessContext(final YamlExecuteProcessContext actual) {
+        assertNotNull(actual.getExecutionID());
+        assertThat(actual.getDatabaseName(), is("foo_db"));
+        assertThat(actual.getUsername(), is("root"));
+        assertThat(actual.getHostname(), is("localhost"));
+        assertThat(actual.getSql(), is("SELECT 1"));
+        assertTrue(actual.getUnitStatuses().isEmpty());
+        assertThat(actual.getStartTimeMillis(), lessThanOrEqualTo(System.currentTimeMillis()));
+        assertThat(actual.getProcessStatus(), is(ExecuteProcessStatus.START));
+    }
+    
+    @Test
+    void assertSwapToObject() {
+        assertThrows(UnsupportedOperationException.class, () -> new YamlAllExecuteProcessContextsSwapper().swapToObject(new YamlAllExecuteProcessContexts()));
+    }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
index 2372bd69b3a..b2709c09b04 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
@@ -21,16 +21,16 @@ import com.google.common.eventbus.Subscribe;
 import org.apache.shardingsphere.infra.executor.sql.process.ShowProcessListManager;
 import org.apache.shardingsphere.infra.executor.sql.process.lock.ShowProcessListSimpleLock;
 import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
-import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.BatchYamlExecuteProcessContext;
+import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.swapper.YamlAllExecuteProcessContextsSwapper;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
+import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessListIdEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessListIdUnitCompleteEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
-import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
 
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -46,6 +46,8 @@ public final class ProcessListChangedSubscriber {
     
     private final ContextManager contextManager;
     
+    private final YamlAllExecuteProcessContextsSwapper swapper = new YamlAllExecuteProcessContextsSwapper();
+    
     public ProcessListChangedSubscriber(final RegistryCenter registryCenter, final ContextManager contextManager) {
         this.registryCenter = registryCenter;
         this.contextManager = contextManager;
@@ -64,8 +66,8 @@ public final class ProcessListChangedSubscriber {
         }
         Collection<ExecuteProcessContext> processContexts = ShowProcessListManager.getInstance().getAllProcessContext();
         if (!processContexts.isEmpty()) {
-            registryCenter.getRepository().persist(ProcessNode.getProcessListInstancePath(event.getProcessListId(), event.getInstanceId()),
-                    YamlEngine.marshal(new BatchYamlExecuteProcessContext(processContexts)));
+            registryCenter.getRepository().persist(
+                    ProcessNode.getProcessListInstancePath(event.getProcessListId(), event.getInstanceId()), YamlEngine.marshal(swapper.swapToYamlConfiguration(processContexts)));
         }
         registryCenter.getRepository().delete(ComputeNode.getProcessTriggerInstanceIdNodePath(event.getInstanceId(), event.getProcessListId()));
     }
diff --git a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/ProcessStandaloneSubscriber.java b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/ProcessStandaloneSubscriber.java
index 90de8cd414e..cbb378c4fb2 100644
--- a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/ProcessStandaloneSubscriber.java
+++ b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/ProcessStandaloneSubscriber.java
@@ -20,7 +20,8 @@ package org.apache.shardingsphere.mode.manager.standalone.subscriber;
 import com.google.common.eventbus.Subscribe;
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.infra.executor.sql.process.ShowProcessListManager;
-import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.BatchYamlExecuteProcessContext;
+import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlAllExecuteProcessContexts;
+import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.swapper.YamlAllExecuteProcessContextsSwapper;
 import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.mode.event.process.KillProcessListIdRequestEvent;
@@ -29,7 +30,6 @@ import org.apache.shardingsphere.mode.event.process.ShowProcessListResponseEvent
 
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 
@@ -41,6 +41,8 @@ public final class ProcessStandaloneSubscriber {
     
     private final EventBusContext eventBusContext;
     
+    private final YamlAllExecuteProcessContextsSwapper swapper = new YamlAllExecuteProcessContextsSwapper();
+    
     public ProcessStandaloneSubscriber(final EventBusContext eventBusContext) {
         this.eventBusContext = eventBusContext;
         eventBusContext.register(this);
@@ -53,10 +55,8 @@ public final class ProcessStandaloneSubscriber {
      */
     @Subscribe
     public void loadShowProcessListData(final ShowProcessListRequestEvent event) {
-        BatchYamlExecuteProcessContext batchYamlExecuteProcessContext = new BatchYamlExecuteProcessContext(new ArrayList<>(ShowProcessListManager.getInstance().getProcessContexts().values()));
-        eventBusContext.post(new ShowProcessListResponseEvent(batchYamlExecuteProcessContext.getContexts().isEmpty()
-                ? Collections.emptyList()
-                : Collections.singletonList(YamlEngine.marshal(batchYamlExecuteProcessContext))));
+        YamlAllExecuteProcessContexts yamlContexts = swapper.swapToYamlConfiguration(ShowProcessListManager.getInstance().getProcessContexts().values());
+        eventBusContext.post(new ShowProcessListResponseEvent(yamlContexts.getContexts().isEmpty() ? Collections.emptyList() : Collections.singleton(YamlEngine.marshal(yamlContexts))));
     }
     
     /**
diff --git a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
index 0b1880097b7..bd49a203195 100644
--- a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
+++ b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.ra
 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.raw.type.RawMemoryQueryResult;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.type.memory.row.MemoryQueryResultDataRow;
 import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessStatus;
-import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.BatchYamlExecuteProcessContext;
+import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlAllExecuteProcessContexts;
 import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlExecuteProcessContext;
 import org.apache.shardingsphere.infra.merge.result.MergedResult;
 import org.apache.shardingsphere.infra.merge.result.impl.transparent.TransparentMergedResult;
@@ -87,7 +87,7 @@ public final class ShowProcessListExecutor implements DatabaseAdminQueryExecutor
         }
         Collection<YamlExecuteProcessContext> processContexts = new LinkedList<>();
         for (String each : batchProcessContexts) {
-            processContexts.addAll(YamlEngine.unmarshal(each, BatchYamlExecuteProcessContext.class).getContexts());
+            processContexts.addAll(YamlEngine.unmarshal(each, YamlAllExecuteProcessContexts.class).getContexts());
         }
         List<MemoryQueryResultDataRow> rows = processContexts.stream().map(ShowProcessListExecutor::getMemoryQueryResultDataRow).collect(Collectors.toList());
         return new RawMemoryQueryResult(queryResultMetaData, rows);