You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/08/26 03:14:36 UTC
[incubator-seatunnel] branch st-engine updated: [hotfix][engine][dag] Loss of parallelism when recreating actions. (#2519)
This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new 7953ac149 [hotfix][engine][dag] Loss of parallelism when recreating actions. (#2519)
7953ac149 is described below
commit 7953ac149fd8d397f7a87b147d56d355d7798a18
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Fri Aug 26 11:14:31 2022 +0800
[hotfix][engine][dag] Loss of parallelism when recreating actions. (#2519)
* [hotfix][engine][dag] Loss of parallelism when recreating actions.
* Don't use Awaitility. Await
* Set parallelism for TransformChainAction
* Add a tool test class
* revert await
* An empty reader is allowed to start
* checkstyle
* SeaTunnelClientTest#testExecuteJob tests the single parallelism config
* diff Hazelcast cluster name
---
.../common/source/AbstractSingleSplitSource.java | 3 -
.../engine/client/SeaTunnelClientTest.java | 2 +-
.../src/test/resources/client_test.conf | 63 +++++++++++++++++++++
.../dag/execution/ExecutionPlanGenerator.java | 2 +
.../engine/server/AbstractSeaTunnelServerTest.java | 58 +++++++++++++++++++
.../engine/server/TaskExecutionServiceTest.java | 66 +++++++++-------------
.../seatunnel/engine/server/dag/TaskTest.java | 37 +-----------
7 files changed, 153 insertions(+), 78 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/AbstractSingleSplitSource.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/AbstractSingleSplitSource.java
index 54410962b..bb0a102ce 100644
--- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/AbstractSingleSplitSource.java
+++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/AbstractSingleSplitSource.java
@@ -17,8 +17,6 @@
package org.apache.seatunnel.connectors.seatunnel.common.source;
-import static com.google.common.base.Preconditions.checkArgument;
-
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -29,7 +27,6 @@ public abstract class AbstractSingleSplitSource<T> implements SeaTunnelSource<T,
@Override
public final AbstractSingleSplitReader<T> createReader(SourceReader.Context readerContext) throws Exception {
- checkArgument(readerContext.getIndexOfSubtask() == 0, "A single split source allows only one single reader to be created.");
return createReader(new SingleSplitReaderContext(readerContext));
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 9318d6509..5ba9787db 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -67,7 +67,7 @@ public class SeaTunnelClientTest {
@Test
public void testExecuteJob() {
Common.setDeployMode(DeployMode.CLIENT);
- String filePath = TestUtils.getResource("/fakesource_to_file_complex.conf");
+ String filePath = TestUtils.getResource("/client_test.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setMode(JobMode.BATCH);
jobConfig.setName("fake_to_file");
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
new file mode 100644
index 000000000..69ce5614a
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ execution.checkpoint.interval = 5000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ field_name = "name,age",
+ parallelism = 1
+ }
+
+ FakeSource {
+ result_table_name = "fake"
+ field_name = "name,age",
+ parallelism = 1
+ }
+}
+
+transform {
+}
+
+sink {
+ LocalFile {
+ path="file:///tmp/hive/warehouse/test2"
+ field_delimiter="\t"
+ row_delimiter="\n"
+ partition_by=["age"]
+ partition_dir_expression="${k0}=${v0}"
+ is_partition_field_write_in_file=true
+ file_name_expression="${transactionId}_${now}"
+ file_format="text"
+ sink_columns=["name","age"]
+ filename_time_format="yyyy.MM.dd"
+ is_enable_transaction=true
+ save_mode="error",
+ source_table_name="fake"
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index 69081fcc3..b95b3f631 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -175,6 +175,7 @@ public class ExecutionPlanGenerator {
String.join("->", names),
jars,
transforms);
+ newAction.setParallelism(logicalVertex.getAction().getParallelism());
}
ExecutionVertex executionVertex = new ExecutionVertex(newId, newAction, logicalVertex.getParallelism());
executionVertexMap.put(newId, executionVertex);
@@ -208,6 +209,7 @@ public class ExecutionPlanGenerator {
} else {
throw new UnknownActionException(action);
}
+ newAction.setParallelism(action.getParallelism());
return newAction;
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
new file mode 100644
index 000000000..46e05e93c
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server;
+
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import com.hazelcast.instance.impl.HazelcastInstanceProxy;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.spi.impl.NodeEngine;
+import org.junit.After;
+import org.junit.Before;
+
+public abstract class AbstractSeaTunnelServerTest {
+
+ protected SeaTunnelServer server;
+
+ protected NodeEngine nodeEngine;
+
+ protected HazelcastInstanceImpl instance;
+
+ protected ILogger logger;
+
+ @Before
+ public void before() {
+ Config config = new Config();
+ config.setInstanceName(this.getClass().getSimpleName());
+ config.setClusterName(this.getClass().getSimpleName());
+ instance = ((HazelcastInstanceProxy) HazelcastInstanceFactory.newHazelcastInstance(config,
+ Thread.currentThread().getName(), new SeaTunnelNodeContext(new SeaTunnelConfig()))).getOriginal();
+ nodeEngine = instance.node.nodeEngine;
+ server = nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
+ logger = nodeEngine.getLogger(this.getClass());
+ }
+
+ @After
+ public void after() {
+ server.shutdown(true);
+ instance.shutdown();
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
index 8797a8ac9..26c556f1d 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
@@ -24,7 +24,6 @@ import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.server.execution.ExceptionTestTask;
import org.apache.seatunnel.engine.server.execution.FixedCallTestTimeTask;
import org.apache.seatunnel.engine.server.execution.StopTimeTestTask;
@@ -34,12 +33,8 @@ import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
import org.apache.seatunnel.engine.server.execution.TestTask;
import com.google.common.collect.Lists;
-import com.hazelcast.config.Config;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
-import com.hazelcast.instance.impl.HazelcastInstanceFactory;
-import com.hazelcast.instance.impl.HazelcastInstanceImpl;
-import com.hazelcast.instance.impl.HazelcastInstanceProxy;
-import com.hazelcast.logging.ILogger;
+import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
@@ -48,21 +43,23 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiConsumer;
-public class TaskExecutionServiceTest {
+public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
- HazelcastInstanceImpl instance = ((HazelcastInstanceProxy) HazelcastInstanceFactory.newHazelcastInstance(new Config(), Thread.currentThread().getName(), new SeaTunnelNodeContext(new SeaTunnelConfig()))).getOriginal();
- SeaTunnelServer service = instance.node.nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
- ILogger logger = instance.node.nodeEngine.getLogger(TaskExecutionServiceTest.class);
- FlakeIdGenerator flakeIdGenerator = instance.getFlakeIdGenerator("test");
+ FlakeIdGenerator flakeIdGenerator;
long taskRunTime = 2000;
+ @Before
+ @Override
+ public void before() {
+ super.before();
+ flakeIdGenerator = instance.getFlakeIdGenerator("test");
+ }
+
@Test
- public void testAll() throws InterruptedException, ExecutionException {
+ public void testAll() throws InterruptedException {
logger.info("----------start Cancel test----------");
testCancel();
@@ -82,7 +79,7 @@ public class TaskExecutionServiceTest {
}
public void testCancel() {
- TaskExecutionService taskExecutionService = service.getTaskExecutionService();
+ TaskExecutionService taskExecutionService = server.getTaskExecutionService();
long sleepTime = 300;
@@ -95,13 +92,12 @@ public class TaskExecutionServiceTest {
taskExecutionService.cancelTaskGroup(taskGroupId);
- await().atMost(sleepTime + 1000, TimeUnit.MILLISECONDS).untilAsserted(() -> {
- assertEquals(CANCELED, completableFuture.get().getExecutionState());
- });
+ await().atMost(sleepTime + 1000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> assertEquals(CANCELED, completableFuture.get().getExecutionState()));
}
public void testFinish() {
- TaskExecutionService taskExecutionService = service.getTaskExecutionService();
+ TaskExecutionService taskExecutionService = server.getTaskExecutionService();
long sleepTime = 300;
@@ -110,20 +106,13 @@ public class TaskExecutionServiceTest {
TestTask testTask1 = new TestTask(stop, logger, sleepTime, true);
TestTask testTask2 = new TestTask(stop, logger, sleepTime, false);
- CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(flakeIdGenerator.newId(), "ts", Lists.newArrayList(testTask1, testTask2)), new CompletableFuture<>());
- completableFuture.whenComplete(new BiConsumer<TaskExecutionState, Throwable>() {
- @Override
- public void accept(TaskExecutionState unused, Throwable throwable) {
- futureMark.set(true);
- }
- });
-
+ final CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(flakeIdGenerator.newId(), "ts", Lists.newArrayList(testTask1, testTask2)), new CompletableFuture<>());
+ completableFuture.whenComplete((unused, throwable) -> futureMark.set(true));
stop.set(true);
await().atMost(sleepTime + 1000, TimeUnit.MILLISECONDS).untilAsserted(() -> {
assertEquals(FINISHED, completableFuture.get().getExecutionState());
});
-
assertTrue(futureMark.get());
}
@@ -142,7 +131,7 @@ public class TaskExecutionServiceTest {
//Create tasks with critical delays
List<Task> criticalTask = buildStopTestTask(callTime, count, stopMark, stopTime);
- TaskExecutionService taskExecutionService = service.getTaskExecutionService();
+ TaskExecutionService taskExecutionService = server.getTaskExecutionService();
CompletableFuture<TaskExecutionState> taskCts = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(flakeIdGenerator.newId(), "t1", Lists.newArrayList(criticalTask)), new CompletableFuture<>());
@@ -153,9 +142,8 @@ public class TaskExecutionServiceTest {
stopMark.set(true);
// Check all task ends right
- await().atMost(count * callTime, TimeUnit.MILLISECONDS).untilAsserted(() -> {
- assertEquals(FINISHED, taskCts.get().getExecutionState());
- });
+ await().atMost(count * callTime, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> assertEquals(FINISHED, taskCts.get().getExecutionState()));
//Check that each Task is only Done once
assertEquals(count, stopTime.size());
@@ -163,7 +151,7 @@ public class TaskExecutionServiceTest {
}
public void testThrowException() throws InterruptedException {
- TaskExecutionService taskExecutionService = service.getTaskExecutionService();
+ TaskExecutionService taskExecutionService = server.getTaskExecutionService();
AtomicBoolean stopMark = new AtomicBoolean(false);
@@ -208,9 +196,8 @@ public class TaskExecutionServiceTest {
stopMark.set(true);
- await().atMost(lowLagSleep * 10 + highLagSleep, TimeUnit.MILLISECONDS).untilAsserted(() -> {
- assertEquals(FINISHED, taskCts.get().getExecutionState());
- });
+ await().atMost(lowLagSleep * 10 + highLagSleep + 1000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> assertEquals(FINISHED, taskCts.get().getExecutionState()));
}
public void testDelay() throws InterruptedException {
@@ -238,7 +225,7 @@ public class TaskExecutionServiceTest {
logger.info("task size is : " + taskGroup.getTasks().size());
- TaskExecutionService taskExecutionService = service.getTaskExecutionService();
+ TaskExecutionService taskExecutionService = server.getTaskExecutionService();
CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.deployLocalTask(taskGroup, new CompletableFuture<>());
@@ -247,9 +234,8 @@ public class TaskExecutionServiceTest {
stopMark.set(true);
//Check all task ends right
- await().atMost(lowLagSleep * 10 + highLagSleep * 5, TimeUnit.MILLISECONDS).untilAsserted(() -> {
- assertEquals(FINISHED, completableFuture.get().getExecutionState());
- });
+ await().atMost(lowLagSleep * 10 + highLagSleep * 5, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> assertEquals(FINISHED, completableFuture.get().getExecutionState()));
//Computation Delay
double lowAvg = lowLagList.stream().mapToLong(x -> x).average().getAsDouble();
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index 6ce4b9285..5c51a3365 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.JobConfig;
-import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.actions.Action;
@@ -33,20 +32,12 @@ import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.dag.logical.LogicalEdge;
import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
-import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
-import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
import com.google.common.collect.Sets;
-import com.hazelcast.config.Config;
-import com.hazelcast.instance.impl.HazelcastInstanceFactory;
-import com.hazelcast.instance.impl.HazelcastInstanceImpl;
-import com.hazelcast.instance.impl.HazelcastInstanceProxy;
-import com.hazelcast.spi.impl.NodeEngine;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import java.net.MalformedURLException;
@@ -54,24 +45,7 @@ import java.net.URL;
import java.util.Collections;
import java.util.concurrent.Executors;
-public class TaskTest {
-
- private SeaTunnelServer service;
-
- private NodeEngine nodeEngine;
-
- private HazelcastInstanceImpl instance;
-
- @Before
- public void before() {
- Config config = new Config();
- config.setInstanceName("test");
- config.setClusterName("test");
- instance = ((HazelcastInstanceProxy) HazelcastInstanceFactory.newHazelcastInstance(config,
- "taskTest", new SeaTunnelNodeContext(new SeaTunnelConfig()))).getOriginal();
- nodeEngine = instance.node.nodeEngine;
- service = nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
- }
+public class TaskTest extends AbstractSeaTunnelServerTest {
@Test
public void testTask() throws MalformedURLException {
@@ -109,7 +83,7 @@ public class TaskTest {
nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList());
PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
- service.submitJob(jobImmutableInformation.getJobId(),
+ server.submitJob(jobImmutableInformation.getJobId(),
nodeEngine.getSerializationService().toData(jobImmutableInformation));
Assert.assertNotNull(voidPassiveCompletableFuture);
@@ -156,9 +130,4 @@ public class TaskTest {
Assert.assertEquals(physicalPlan.getPipelineList().get(0).getCoordinatorVertexList().size(), 1);
Assert.assertEquals(physicalPlan.getPipelineList().get(0).getPhysicalVertexList().size(), 1);
}
-
- @After
- public void after() {
- instance.shutdown();
- }
}