You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/11/02 03:15:44 UTC
[shardingsphere] branch master updated: Refactor PipelineTask.start futures handling (#21900)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new aec92754f95 Refactor PipelineTask.start futures handling (#21900)
aec92754f95 is described below
commit aec92754f9519e573a8eb4415fcd3122e4a604e1
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Wed Nov 2 11:15:37 2022 +0800
Refactor PipelineTask.start futures handling (#21900)
---
.../data/pipeline/core/execute/ExecuteEngine.java | 24 -----------
.../data/pipeline/core/task/IncrementalTask.java | 13 +++---
.../core/task/InventoryIncrementalTasksRunner.java | 10 ++---
.../data/pipeline/core/task/InventoryTask.java | 15 ++++---
.../data/pipeline/core/task/PipelineTask.java | 3 +-
.../pipeline/core/execute/ExecuteEngineTest.java | 47 ----------------------
.../pipeline/core/task/IncrementalTaskTest.java | 3 +-
.../data/pipeline/core/task/InventoryTaskTest.java | 5 ++-
8 files changed, 28 insertions(+), 92 deletions(-)
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java
index ac4e75e2f80..00988e62067 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java
@@ -22,7 +22,6 @@ import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
-import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -79,27 +78,4 @@ public final class ExecuteEngine {
}
}, executorService);
}
-
- /**
- * Submit a collection of {@code LifecycleExecutor} with callback {@code ExecuteCallback} to execute.
- *
- * @param lifecycleExecutors lifecycle executor
- * @param executeCallback execute callback
- * @return execute future of all
- */
- public CompletableFuture<?> submitAll(final Collection<? extends LifecycleExecutor> lifecycleExecutors, final ExecuteCallback executeCallback) {
- CompletableFuture<?>[] futures = new CompletableFuture[lifecycleExecutors.size()];
- int i = 0;
- for (LifecycleExecutor each : lifecycleExecutors) {
- futures[i++] = CompletableFuture.runAsync(each, executorService);
- }
- return CompletableFuture.allOf(futures).whenCompleteAsync((unused, throwable) -> {
- if (null == throwable) {
- executeCallback.onSuccess();
- } else {
- Throwable cause = throwable.getCause();
- executeCallback.onFailure(null != cause ? cause : throwable);
- }
- }, executorService);
- }
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index c3e677ff297..9f963e4fd64 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -113,9 +113,10 @@ public final class IncrementalTask implements PipelineTask, AutoCloseable {
}
@Override
- public CompletableFuture<?> start() {
+ public Collection<CompletableFuture<?>> start() {
taskProgress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
- CompletableFuture<?> dumperFuture = incrementalExecuteEngine.submit(dumper, new ExecuteCallback() {
+ Collection<CompletableFuture<?>> result = new LinkedList<>();
+ result.add(incrementalExecuteEngine.submit(dumper, new ExecuteCallback() {
@Override
public void onSuccess() {
@@ -128,8 +129,8 @@ public final class IncrementalTask implements PipelineTask, AutoCloseable {
stop();
close();
}
- });
- CompletableFuture<?> importerFuture = incrementalExecuteEngine.submitAll(importers, new ExecuteCallback() {
+ }));
+ importers.forEach(each -> result.add(incrementalExecuteEngine.submit(each, new ExecuteCallback() {
@Override
public void onSuccess() {
@@ -142,8 +143,8 @@ public final class IncrementalTask implements PipelineTask, AutoCloseable {
stop();
close();
}
- });
- return CompletableFuture.allOf(dumperFuture, importerFuture);
+ })));
+ return result;
}
@Override
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
index 4c7b7459edf..32f653efa52 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
@@ -98,7 +98,7 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
if (each.getTaskProgress().getPosition() instanceof FinishedPosition) {
continue;
}
- futures.add(each.start());
+ futures.addAll(each.start());
}
CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0])).whenComplete((unused, throwable) -> {
if (null != throwable) {
@@ -143,15 +143,15 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
if (each.getTaskProgress().getPosition() instanceof FinishedPosition) {
continue;
}
- futures.add(each.start());
+ futures.addAll(each.start());
}
CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0])).whenComplete((unused, throwable) -> {
if (null != throwable) {
log.error("onFailure, incremental task execute failed.", throwable);
updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE);
- PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()))
- .persistJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem(), throwable);
- stop();
+ String jobId = jobItemContext.getJobId();
+ jobAPI.persistJobItemErrorMessage(jobId, jobItemContext.getShardingItem(), throwable);
+ jobAPI.stop(jobId);
}
});
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((unused, throwable) -> {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index d69fb46de97..0d31db298eb 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -39,6 +39,8 @@ import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreatorFacto
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import javax.sql.DataSource;
+import java.util.Collection;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -84,8 +86,9 @@ public final class InventoryTask implements PipelineTask, AutoCloseable {
}
@Override
- public CompletableFuture<?> start() {
- CompletableFuture<?> dumperFuture = inventoryDumperExecuteEngine.submit(dumper, new ExecuteCallback() {
+ public Collection<CompletableFuture<?>> start() {
+ Collection<CompletableFuture<?>> result = new LinkedList<>();
+ result.add(inventoryDumperExecuteEngine.submit(dumper, new ExecuteCallback() {
@Override
public void onSuccess() {
@@ -98,8 +101,8 @@ public final class InventoryTask implements PipelineTask, AutoCloseable {
stop();
close();
}
- });
- CompletableFuture<?> importerFuture = inventoryImporterExecuteEngine.submit(importer, new ExecuteCallback() {
+ }));
+ result.add(inventoryImporterExecuteEngine.submit(importer, new ExecuteCallback() {
@Override
public void onSuccess() {
@@ -112,8 +115,8 @@ public final class InventoryTask implements PipelineTask, AutoCloseable {
stop();
close();
}
- });
- return CompletableFuture.allOf(dumperFuture, importerFuture);
+ }));
+ return result;
}
private PipelineChannel createChannel(final PipelineChannelCreator pipelineChannelCreator) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
index 45dabe9ad68..98690873d30 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.task;
import org.apache.shardingsphere.data.pipeline.api.task.progress.TaskProgress;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
/**
@@ -31,7 +32,7 @@ public interface PipelineTask {
*
* @return future
*/
- CompletableFuture<?> start();
+ Collection<CompletableFuture<?>> start();
/**
* Stop task.
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngineTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngineTest.java
index acae3f3e8f7..dfb368b6f9f 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngineTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngineTest.java
@@ -22,8 +22,6 @@ import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
import org.junit.Test;
import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -34,7 +32,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public final class ExecuteEngineTest {
@@ -72,50 +69,6 @@ public final class ExecuteEngineTest {
verify(callback).onFailure(expectedException);
}
- @Test(timeout = 30000)
- public void assertSubmitAllAndTasksSucceeded() throws ExecutionException, InterruptedException {
- int taskCount = 8;
- LifecycleExecutor lifecycleExecutor = mock(LifecycleExecutor.class);
- List<LifecycleExecutor> lifecycleExecutors = new ArrayList<>(taskCount);
- for (int i = 0; i < taskCount; i++) {
- lifecycleExecutors.add(lifecycleExecutor);
- }
- ExecuteCallback callback = mock(ExecuteCallback.class);
- ExecuteEngine executeEngine = ExecuteEngine.newFixedThreadInstance(2, ExecuteEngineTest.class.getSimpleName());
- Future<?> future = executeEngine.submitAll(lifecycleExecutors, callback);
- future.get();
- shutdownAndAwaitTerminal(executeEngine);
- verify(lifecycleExecutor, times(taskCount)).run();
- verify(callback).onSuccess();
- }
-
- @Test(timeout = 30000)
- public void assertSubmitAllAndOneOfTasksFailed() {
- LifecycleExecutor lifecycleExecutor = mock(LifecycleExecutor.class);
- List<LifecycleExecutor> lifecycleExecutors = new ArrayList<>(8);
- for (int i = 0; i < 7; i++) {
- lifecycleExecutors.add(lifecycleExecutor);
- }
- LifecycleExecutor failedExecutor = mock(LifecycleExecutor.class);
- RuntimeException expectedException = new RuntimeException("Expected");
- doThrow(expectedException).when(failedExecutor).run();
- lifecycleExecutors.add(failedExecutor);
- ExecuteCallback callback = mock(ExecuteCallback.class);
- ExecuteEngine executeEngine = ExecuteEngine.newFixedThreadInstance(2, ExecuteEngineTest.class.getSimpleName());
- Future<?> future = executeEngine.submitAll(lifecycleExecutors, callback);
- Throwable actualCause = null;
- try {
- future.get();
- } catch (InterruptedException e) {
- fail();
- } catch (ExecutionException e) {
- actualCause = e.getCause();
- }
- assertThat(actualCause, is(expectedException));
- shutdownAndAwaitTerminal(executeEngine);
- verify(callback).onFailure(expectedException);
- }
-
@SneakyThrows({ReflectiveOperationException.class, InterruptedException.class})
private void shutdownAndAwaitTerminal(final ExecuteEngine executeEngine) {
Field field = ExecuteEngine.class.getDeclaredField("executorService");
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index f9448af70c5..426f51cd201 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -31,6 +31,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -61,7 +62,7 @@ public final class IncrementalTaskTest {
@Test
public void assertStart() throws ExecutionException, InterruptedException, TimeoutException {
- incrementalTask.start().get(10, TimeUnit.SECONDS);
+ CompletableFuture.allOf(incrementalTask.start().toArray(new CompletableFuture[0])).get(10, TimeUnit.SECONDS);
assertThat(incrementalTask.getTaskId(), is("standard_0"));
assertThat(incrementalTask.getTaskProgress().getPosition(), instanceOf(PlaceholderPosition.class));
}
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index 0b277d23713..d0d437882ee 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -38,6 +38,7 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -75,7 +76,7 @@ public final class InventoryTaskTest {
InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
PipelineContextUtil.getPipelineChannelCreator(), DATA_SOURCE_MANAGER, dataSource,
metaDataLoader, PipelineContextUtil.getExecuteEngine(), PipelineContextUtil.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext())) {
- inventoryTask.start().get(10, TimeUnit.SECONDS);
+ CompletableFuture.allOf(inventoryTask.start().toArray(new CompletableFuture[0])).get(10, TimeUnit.SECONDS);
}
}
@@ -90,7 +91,7 @@ public final class InventoryTaskTest {
InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
PipelineContextUtil.getPipelineChannelCreator(), new DefaultPipelineDataSourceManager(), dataSource,
metaDataLoader, PipelineContextUtil.getExecuteEngine(), PipelineContextUtil.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext())) {
- inventoryTask.start().get(10, TimeUnit.SECONDS);
+ CompletableFuture.allOf(inventoryTask.start().toArray(new CompletableFuture[0])).get(10, TimeUnit.SECONDS);
assertThat(inventoryTask.getTaskProgress().getPosition(), instanceOf(IntegerPrimaryKeyPosition.class));
}
}