You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/11/02 03:15:44 UTC

[shardingsphere] branch master updated: Refactor PipelineTask.start futures handling (#21900)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new aec92754f95 Refactor PipelineTask.start futures handling (#21900)
aec92754f95 is described below

commit aec92754f9519e573a8eb4415fcd3122e4a604e1
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Wed Nov 2 11:15:37 2022 +0800

    Refactor PipelineTask.start futures handling (#21900)
---
 .../data/pipeline/core/execute/ExecuteEngine.java  | 24 -----------
 .../data/pipeline/core/task/IncrementalTask.java   | 13 +++---
 .../core/task/InventoryIncrementalTasksRunner.java | 10 ++---
 .../data/pipeline/core/task/InventoryTask.java     | 15 ++++---
 .../data/pipeline/core/task/PipelineTask.java      |  3 +-
 .../pipeline/core/execute/ExecuteEngineTest.java   | 47 ----------------------
 .../pipeline/core/task/IncrementalTaskTest.java    |  3 +-
 .../data/pipeline/core/task/InventoryTaskTest.java |  5 ++-
 8 files changed, 28 insertions(+), 92 deletions(-)

diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java
index ac4e75e2f80..00988e62067 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java
@@ -22,7 +22,6 @@ import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
 import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
 
-import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -79,27 +78,4 @@ public final class ExecuteEngine {
             }
         }, executorService);
     }
-    
-    /**
-     * Submit a collection of {@code LifecycleExecutor} with callback {@code ExecuteCallback} to execute.
-     *
-     * @param lifecycleExecutors lifecycle executor
-     * @param executeCallback execute callback
-     * @return execute future of all
-     */
-    public CompletableFuture<?> submitAll(final Collection<? extends LifecycleExecutor> lifecycleExecutors, final ExecuteCallback executeCallback) {
-        CompletableFuture<?>[] futures = new CompletableFuture[lifecycleExecutors.size()];
-        int i = 0;
-        for (LifecycleExecutor each : lifecycleExecutors) {
-            futures[i++] = CompletableFuture.runAsync(each, executorService);
-        }
-        return CompletableFuture.allOf(futures).whenCompleteAsync((unused, throwable) -> {
-            if (null == throwable) {
-                executeCallback.onSuccess();
-            } else {
-                Throwable cause = throwable.getCause();
-                executeCallback.onFailure(null != cause ? cause : throwable);
-            }
-        }, executorService);
-    }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index c3e677ff297..9f963e4fd64 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -113,9 +113,10 @@ public final class IncrementalTask implements PipelineTask, AutoCloseable {
     }
     
     @Override
-    public CompletableFuture<?> start() {
+    public Collection<CompletableFuture<?>> start() {
         taskProgress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
-        CompletableFuture<?> dumperFuture = incrementalExecuteEngine.submit(dumper, new ExecuteCallback() {
+        Collection<CompletableFuture<?>> result = new LinkedList<>();
+        result.add(incrementalExecuteEngine.submit(dumper, new ExecuteCallback() {
             
             @Override
             public void onSuccess() {
@@ -128,8 +129,8 @@ public final class IncrementalTask implements PipelineTask, AutoCloseable {
                 stop();
                 close();
             }
-        });
-        CompletableFuture<?> importerFuture = incrementalExecuteEngine.submitAll(importers, new ExecuteCallback() {
+        }));
+        importers.forEach(each -> result.add(incrementalExecuteEngine.submit(each, new ExecuteCallback() {
             
             @Override
             public void onSuccess() {
@@ -142,8 +143,8 @@ public final class IncrementalTask implements PipelineTask, AutoCloseable {
                 stop();
                 close();
             }
-        });
-        return CompletableFuture.allOf(dumperFuture, importerFuture);
+        })));
+        return result;
     }
     
     @Override
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
index 4c7b7459edf..32f653efa52 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
@@ -98,7 +98,7 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
             if (each.getTaskProgress().getPosition() instanceof FinishedPosition) {
                 continue;
             }
-            futures.add(each.start());
+            futures.addAll(each.start());
         }
         CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0])).whenComplete((unused, throwable) -> {
             if (null != throwable) {
@@ -143,15 +143,15 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
             if (each.getTaskProgress().getPosition() instanceof FinishedPosition) {
                 continue;
             }
-            futures.add(each.start());
+            futures.addAll(each.start());
         }
         CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0])).whenComplete((unused, throwable) -> {
             if (null != throwable) {
                 log.error("onFailure, incremental task execute failed.", throwable);
                 updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE);
-                PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()))
-                        .persistJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem(), throwable);
-                stop();
+                String jobId = jobItemContext.getJobId();
+                jobAPI.persistJobItemErrorMessage(jobId, jobItemContext.getShardingItem(), throwable);
+                jobAPI.stop(jobId);
             }
         });
         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((unused, throwable) -> {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index d69fb46de97..0d31db298eb 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -39,6 +39,8 @@ import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreatorFacto
 import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
 
 import javax.sql.DataSource;
+import java.util.Collection;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
@@ -84,8 +86,9 @@ public final class InventoryTask implements PipelineTask, AutoCloseable {
     }
     
     @Override
-    public CompletableFuture<?> start() {
-        CompletableFuture<?> dumperFuture = inventoryDumperExecuteEngine.submit(dumper, new ExecuteCallback() {
+    public Collection<CompletableFuture<?>> start() {
+        Collection<CompletableFuture<?>> result = new LinkedList<>();
+        result.add(inventoryDumperExecuteEngine.submit(dumper, new ExecuteCallback() {
             
             @Override
             public void onSuccess() {
@@ -98,8 +101,8 @@ public final class InventoryTask implements PipelineTask, AutoCloseable {
                 stop();
                 close();
             }
-        });
-        CompletableFuture<?> importerFuture = inventoryImporterExecuteEngine.submit(importer, new ExecuteCallback() {
+        }));
+        result.add(inventoryImporterExecuteEngine.submit(importer, new ExecuteCallback() {
             
             @Override
             public void onSuccess() {
@@ -112,8 +115,8 @@ public final class InventoryTask implements PipelineTask, AutoCloseable {
                 stop();
                 close();
             }
-        });
-        return CompletableFuture.allOf(dumperFuture, importerFuture);
+        }));
+        return result;
     }
     
     private PipelineChannel createChannel(final PipelineChannelCreator pipelineChannelCreator) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
index 45dabe9ad68..98690873d30 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.task;
 
 import org.apache.shardingsphere.data.pipeline.api.task.progress.TaskProgress;
 
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -31,7 +32,7 @@ public interface PipelineTask {
      *
      * @return future
      */
-    CompletableFuture<?> start();
+    Collection<CompletableFuture<?>> start();
     
     /**
      * Stop task.
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngineTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngineTest.java
index acae3f3e8f7..dfb368b6f9f 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngineTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngineTest.java
@@ -22,8 +22,6 @@ import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
 import org.junit.Test;
 
 import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -34,7 +32,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 public final class ExecuteEngineTest {
@@ -72,50 +69,6 @@ public final class ExecuteEngineTest {
         verify(callback).onFailure(expectedException);
     }
     
-    @Test(timeout = 30000)
-    public void assertSubmitAllAndTasksSucceeded() throws ExecutionException, InterruptedException {
-        int taskCount = 8;
-        LifecycleExecutor lifecycleExecutor = mock(LifecycleExecutor.class);
-        List<LifecycleExecutor> lifecycleExecutors = new ArrayList<>(taskCount);
-        for (int i = 0; i < taskCount; i++) {
-            lifecycleExecutors.add(lifecycleExecutor);
-        }
-        ExecuteCallback callback = mock(ExecuteCallback.class);
-        ExecuteEngine executeEngine = ExecuteEngine.newFixedThreadInstance(2, ExecuteEngineTest.class.getSimpleName());
-        Future<?> future = executeEngine.submitAll(lifecycleExecutors, callback);
-        future.get();
-        shutdownAndAwaitTerminal(executeEngine);
-        verify(lifecycleExecutor, times(taskCount)).run();
-        verify(callback).onSuccess();
-    }
-    
-    @Test(timeout = 30000)
-    public void assertSubmitAllAndOneOfTasksFailed() {
-        LifecycleExecutor lifecycleExecutor = mock(LifecycleExecutor.class);
-        List<LifecycleExecutor> lifecycleExecutors = new ArrayList<>(8);
-        for (int i = 0; i < 7; i++) {
-            lifecycleExecutors.add(lifecycleExecutor);
-        }
-        LifecycleExecutor failedExecutor = mock(LifecycleExecutor.class);
-        RuntimeException expectedException = new RuntimeException("Expected");
-        doThrow(expectedException).when(failedExecutor).run();
-        lifecycleExecutors.add(failedExecutor);
-        ExecuteCallback callback = mock(ExecuteCallback.class);
-        ExecuteEngine executeEngine = ExecuteEngine.newFixedThreadInstance(2, ExecuteEngineTest.class.getSimpleName());
-        Future<?> future = executeEngine.submitAll(lifecycleExecutors, callback);
-        Throwable actualCause = null;
-        try {
-            future.get();
-        } catch (InterruptedException e) {
-            fail();
-        } catch (ExecutionException e) {
-            actualCause = e.getCause();
-        }
-        assertThat(actualCause, is(expectedException));
-        shutdownAndAwaitTerminal(executeEngine);
-        verify(callback).onFailure(expectedException);
-    }
-    
     @SneakyThrows({ReflectiveOperationException.class, InterruptedException.class})
     private void shutdownAndAwaitTerminal(final ExecuteEngine executeEngine) {
         Field field = ExecuteEngine.class.getDeclaredField("executorService");
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index f9448af70c5..426f51cd201 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -31,6 +31,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -61,7 +62,7 @@ public final class IncrementalTaskTest {
     
     @Test
     public void assertStart() throws ExecutionException, InterruptedException, TimeoutException {
-        incrementalTask.start().get(10, TimeUnit.SECONDS);
+        CompletableFuture.allOf(incrementalTask.start().toArray(new CompletableFuture[0])).get(10, TimeUnit.SECONDS);
         assertThat(incrementalTask.getTaskId(), is("standard_0"));
         assertThat(incrementalTask.getTaskProgress().getPosition(), instanceOf(PlaceholderPosition.class));
     }
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index 0b277d23713..d0d437882ee 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -38,6 +38,7 @@ import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -75,7 +76,7 @@ public final class InventoryTaskTest {
                 InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
                         PipelineContextUtil.getPipelineChannelCreator(), DATA_SOURCE_MANAGER, dataSource,
                         metaDataLoader, PipelineContextUtil.getExecuteEngine(), PipelineContextUtil.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext())) {
-            inventoryTask.start().get(10, TimeUnit.SECONDS);
+            CompletableFuture.allOf(inventoryTask.start().toArray(new CompletableFuture[0])).get(10, TimeUnit.SECONDS);
         }
     }
     
@@ -90,7 +91,7 @@ public final class InventoryTaskTest {
                 InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
                         PipelineContextUtil.getPipelineChannelCreator(), new DefaultPipelineDataSourceManager(), dataSource,
                         metaDataLoader, PipelineContextUtil.getExecuteEngine(), PipelineContextUtil.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext())) {
-            inventoryTask.start().get(10, TimeUnit.SECONDS);
+            CompletableFuture.allOf(inventoryTask.start().toArray(new CompletableFuture[0])).get(10, TimeUnit.SECONDS);
             assertThat(inventoryTask.getTaskProgress().getPosition(), instanceOf(IntegerPrimaryKeyPosition.class));
         }
     }