You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/08/26 03:14:36 UTC

[incubator-seatunnel] branch st-engine updated: [hotfix][engine][dag] Loss of parallelism when recreating actions. (#2519)

This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new 7953ac149 [hotfix][engine][dag] Loss of parallelism when recreating actions. (#2519)
7953ac149 is described below

commit 7953ac149fd8d397f7a87b147d56d355d7798a18
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Fri Aug 26 11:14:31 2022 +0800

    [hotfix][engine][dag] Loss of parallelism when recreating actions. (#2519)
    
    * [hotfix][engine][dag] Loss of parallelism when recreating actions.
    
    * Don't use Awaitility. Await
    
    * Set parallelism for TransformChainAction
    
    * Add a tool test class
    
    * revert await
    
    * An empty reader is allowed to start
    
    * checkstyle
    
    * SeaTunnelClientTest#testExecuteJob tests the single parallelism config
    
    * diff Hazelcast cluster name
---
 .../common/source/AbstractSingleSplitSource.java   |  3 -
 .../engine/client/SeaTunnelClientTest.java         |  2 +-
 .../src/test/resources/client_test.conf            | 63 +++++++++++++++++++++
 .../dag/execution/ExecutionPlanGenerator.java      |  2 +
 .../engine/server/AbstractSeaTunnelServerTest.java | 58 +++++++++++++++++++
 .../engine/server/TaskExecutionServiceTest.java    | 66 +++++++++-------------
 .../seatunnel/engine/server/dag/TaskTest.java      | 37 +-----------
 7 files changed, 153 insertions(+), 78 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/AbstractSingleSplitSource.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/AbstractSingleSplitSource.java
index 54410962b..bb0a102ce 100644
--- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/AbstractSingleSplitSource.java
+++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/AbstractSingleSplitSource.java
@@ -17,8 +17,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.common.source;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -29,7 +27,6 @@ public abstract class AbstractSingleSplitSource<T> implements SeaTunnelSource<T,
 
     @Override
     public final AbstractSingleSplitReader<T> createReader(SourceReader.Context readerContext) throws Exception {
-        checkArgument(readerContext.getIndexOfSubtask() == 0, "A single split source allows only one single reader to be created.");
         return createReader(new SingleSplitReaderContext(readerContext));
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 9318d6509..5ba9787db 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -67,7 +67,7 @@ public class SeaTunnelClientTest {
     @Test
     public void testExecuteJob() {
         Common.setDeployMode(DeployMode.CLIENT);
-        String filePath = TestUtils.getResource("/fakesource_to_file_complex.conf");
+        String filePath = TestUtils.getResource("/client_test.conf");
         JobConfig jobConfig = new JobConfig();
         jobConfig.setMode(JobMode.BATCH);
         jobConfig.setName("fake_to_file");
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
new file mode 100644
index 000000000..69ce5614a
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  execution.checkpoint.interval = 5000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    FakeSource {
+      result_table_name = "fake"
+      field_name = "name,age",
+      parallelism = 1
+    }
+
+    FakeSource {
+      result_table_name = "fake"
+      field_name = "name,age",
+      parallelism = 1
+    }
+}
+
+transform {
+}
+
+sink {
+  LocalFile {
+    path="file:///tmp/hive/warehouse/test2"
+    field_delimiter="\t"
+    row_delimiter="\n"
+    partition_by=["age"]
+    partition_dir_expression="${k0}=${v0}"
+    is_partition_field_write_in_file=true
+    file_name_expression="${transactionId}_${now}"
+    file_format="text"
+    sink_columns=["name","age"]
+    filename_time_format="yyyy.MM.dd"
+    is_enable_transaction=true
+    save_mode="error",
+    source_table_name="fake"
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index 69081fcc3..b95b3f631 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -175,6 +175,7 @@ public class ExecutionPlanGenerator {
                 String.join("->", names),
                 jars,
                 transforms);
+            newAction.setParallelism(logicalVertex.getAction().getParallelism());
         }
         ExecutionVertex executionVertex = new ExecutionVertex(newId, newAction, logicalVertex.getParallelism());
         executionVertexMap.put(newId, executionVertex);
@@ -208,6 +209,7 @@ public class ExecutionPlanGenerator {
         } else {
             throw new UnknownActionException(action);
         }
+        newAction.setParallelism(action.getParallelism());
         return newAction;
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
new file mode 100644
index 000000000..46e05e93c
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server;
+
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import com.hazelcast.instance.impl.HazelcastInstanceProxy;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.spi.impl.NodeEngine;
+import org.junit.After;
+import org.junit.Before;
+
+public abstract class AbstractSeaTunnelServerTest {
+
+    protected SeaTunnelServer server;
+
+    protected NodeEngine nodeEngine;
+
+    protected HazelcastInstanceImpl instance;
+
+    protected ILogger logger;
+
+    @Before
+    public void before() {
+        Config config = new Config();
+        config.setInstanceName(this.getClass().getSimpleName());
+        config.setClusterName(this.getClass().getSimpleName());
+        instance = ((HazelcastInstanceProxy) HazelcastInstanceFactory.newHazelcastInstance(config,
+            Thread.currentThread().getName(), new SeaTunnelNodeContext(new SeaTunnelConfig()))).getOriginal();
+        nodeEngine = instance.node.nodeEngine;
+        server = nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
+        logger = nodeEngine.getLogger(this.getClass());
+    }
+
+    @After
+    public void after() {
+        server.shutdown(true);
+        instance.shutdown();
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
index 8797a8ac9..26c556f1d 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
@@ -24,7 +24,6 @@ import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.server.execution.ExceptionTestTask;
 import org.apache.seatunnel.engine.server.execution.FixedCallTestTimeTask;
 import org.apache.seatunnel.engine.server.execution.StopTimeTestTask;
@@ -34,12 +33,8 @@ import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
 import org.apache.seatunnel.engine.server.execution.TestTask;
 
 import com.google.common.collect.Lists;
-import com.hazelcast.config.Config;
 import com.hazelcast.flakeidgen.FlakeIdGenerator;
-import com.hazelcast.instance.impl.HazelcastInstanceFactory;
-import com.hazelcast.instance.impl.HazelcastInstanceImpl;
-import com.hazelcast.instance.impl.HazelcastInstanceProxy;
-import com.hazelcast.logging.ILogger;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -48,21 +43,23 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiConsumer;
 
-public class TaskExecutionServiceTest {
+public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
 
-    HazelcastInstanceImpl instance = ((HazelcastInstanceProxy) HazelcastInstanceFactory.newHazelcastInstance(new Config(), Thread.currentThread().getName(), new SeaTunnelNodeContext(new SeaTunnelConfig()))).getOriginal();
-    SeaTunnelServer service = instance.node.nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
-    ILogger logger = instance.node.nodeEngine.getLogger(TaskExecutionServiceTest.class);
-    FlakeIdGenerator flakeIdGenerator = instance.getFlakeIdGenerator("test");
+    FlakeIdGenerator flakeIdGenerator;
     long taskRunTime = 2000;
 
+    @Before
+    @Override
+    public void before() {
+        super.before();
+        flakeIdGenerator = instance.getFlakeIdGenerator("test");
+    }
+
     @Test
-    public void testAll() throws InterruptedException, ExecutionException {
+    public void testAll() throws InterruptedException {
         logger.info("----------start Cancel test----------");
         testCancel();
 
@@ -82,7 +79,7 @@ public class TaskExecutionServiceTest {
     }
 
     public void testCancel() {
-        TaskExecutionService taskExecutionService = service.getTaskExecutionService();
+        TaskExecutionService taskExecutionService = server.getTaskExecutionService();
 
         long sleepTime = 300;
 
@@ -95,13 +92,12 @@ public class TaskExecutionServiceTest {
 
         taskExecutionService.cancelTaskGroup(taskGroupId);
 
-        await().atMost(sleepTime + 1000, TimeUnit.MILLISECONDS).untilAsserted(() -> {
-            assertEquals(CANCELED, completableFuture.get().getExecutionState());
-        });
+        await().atMost(sleepTime + 1000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> assertEquals(CANCELED, completableFuture.get().getExecutionState()));
     }
 
     public void testFinish() {
-        TaskExecutionService taskExecutionService = service.getTaskExecutionService();
+        TaskExecutionService taskExecutionService = server.getTaskExecutionService();
 
         long sleepTime = 300;
 
@@ -110,20 +106,13 @@ public class TaskExecutionServiceTest {
         TestTask testTask1 = new TestTask(stop, logger, sleepTime, true);
         TestTask testTask2 = new TestTask(stop, logger, sleepTime, false);
 
-        CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(flakeIdGenerator.newId(), "ts", Lists.newArrayList(testTask1, testTask2)), new CompletableFuture<>());
-        completableFuture.whenComplete(new BiConsumer<TaskExecutionState, Throwable>() {
-            @Override
-            public void accept(TaskExecutionState unused, Throwable throwable) {
-                futureMark.set(true);
-            }
-        });
-
+        final CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(flakeIdGenerator.newId(), "ts", Lists.newArrayList(testTask1, testTask2)), new CompletableFuture<>());
+        completableFuture.whenComplete((unused, throwable) -> futureMark.set(true));
         stop.set(true);
 
         await().atMost(sleepTime + 1000, TimeUnit.MILLISECONDS).untilAsserted(() -> {
             assertEquals(FINISHED, completableFuture.get().getExecutionState());
         });
-
         assertTrue(futureMark.get());
     }
 
@@ -142,7 +131,7 @@ public class TaskExecutionServiceTest {
         //Create tasks with critical delays
         List<Task> criticalTask = buildStopTestTask(callTime, count, stopMark, stopTime);
 
-        TaskExecutionService taskExecutionService = service.getTaskExecutionService();
+        TaskExecutionService taskExecutionService = server.getTaskExecutionService();
 
         CompletableFuture<TaskExecutionState> taskCts = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(flakeIdGenerator.newId(), "t1", Lists.newArrayList(criticalTask)), new CompletableFuture<>());
 
@@ -153,9 +142,8 @@ public class TaskExecutionServiceTest {
         stopMark.set(true);
 
         // Check all task ends right
-        await().atMost(count * callTime, TimeUnit.MILLISECONDS).untilAsserted(() -> {
-            assertEquals(FINISHED, taskCts.get().getExecutionState());
-        });
+        await().atMost(count * callTime, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> assertEquals(FINISHED, taskCts.get().getExecutionState()));
 
         //Check that each Task is only Done once
         assertEquals(count, stopTime.size());
@@ -163,7 +151,7 @@ public class TaskExecutionServiceTest {
     }
 
     public void testThrowException() throws InterruptedException {
-        TaskExecutionService taskExecutionService = service.getTaskExecutionService();
+        TaskExecutionService taskExecutionService = server.getTaskExecutionService();
 
         AtomicBoolean stopMark = new AtomicBoolean(false);
 
@@ -208,9 +196,8 @@ public class TaskExecutionServiceTest {
 
         stopMark.set(true);
 
-        await().atMost(lowLagSleep * 10 + highLagSleep, TimeUnit.MILLISECONDS).untilAsserted(() -> {
-            assertEquals(FINISHED, taskCts.get().getExecutionState());
-        });
+        await().atMost(lowLagSleep * 10 + highLagSleep + 1000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> assertEquals(FINISHED, taskCts.get().getExecutionState()));
     }
 
     public void testDelay() throws InterruptedException {
@@ -238,7 +225,7 @@ public class TaskExecutionServiceTest {
 
         logger.info("task size is : " + taskGroup.getTasks().size());
 
-        TaskExecutionService taskExecutionService = service.getTaskExecutionService();
+        TaskExecutionService taskExecutionService = server.getTaskExecutionService();
 
         CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.deployLocalTask(taskGroup, new CompletableFuture<>());
 
@@ -247,9 +234,8 @@ public class TaskExecutionServiceTest {
         stopMark.set(true);
 
         //Check all task ends right
-        await().atMost(lowLagSleep * 10 + highLagSleep * 5, TimeUnit.MILLISECONDS).untilAsserted(() -> {
-            assertEquals(FINISHED, completableFuture.get().getExecutionState());
-        });
+        await().atMost(lowLagSleep * 10 + highLagSleep * 5, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> assertEquals(FINISHED, completableFuture.get().getExecutionState()));
 
         //Computation Delay
         double lowAvg = lowLagList.stream().mapToLong(x -> x).average().getAsDouble();
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index 6ce4b9285..5c51a3365 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
 import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.JobConfig;
-import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
@@ -33,20 +32,12 @@ import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalEdge;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
-import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
-import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
 import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
 
 import com.google.common.collect.Sets;
-import com.hazelcast.config.Config;
-import com.hazelcast.instance.impl.HazelcastInstanceFactory;
-import com.hazelcast.instance.impl.HazelcastInstanceImpl;
-import com.hazelcast.instance.impl.HazelcastInstanceProxy;
-import com.hazelcast.spi.impl.NodeEngine;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.net.MalformedURLException;
@@ -54,24 +45,7 @@ import java.net.URL;
 import java.util.Collections;
 import java.util.concurrent.Executors;
 
-public class TaskTest {
-
-    private SeaTunnelServer service;
-
-    private NodeEngine nodeEngine;
-
-    private HazelcastInstanceImpl instance;
-
-    @Before
-    public void before() {
-        Config config = new Config();
-        config.setInstanceName("test");
-        config.setClusterName("test");
-        instance = ((HazelcastInstanceProxy) HazelcastInstanceFactory.newHazelcastInstance(config,
-            "taskTest", new SeaTunnelNodeContext(new SeaTunnelConfig()))).getOriginal();
-        nodeEngine = instance.node.nodeEngine;
-        service = nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
-    }
+public class TaskTest extends AbstractSeaTunnelServerTest {
 
     @Test
     public void testTask() throws MalformedURLException {
@@ -109,7 +83,7 @@ public class TaskTest {
             nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList());
 
         PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
-            service.submitJob(jobImmutableInformation.getJobId(),
+            server.submitJob(jobImmutableInformation.getJobId(),
                 nodeEngine.getSerializationService().toData(jobImmutableInformation));
 
         Assert.assertNotNull(voidPassiveCompletableFuture);
@@ -156,9 +130,4 @@ public class TaskTest {
         Assert.assertEquals(physicalPlan.getPipelineList().get(0).getCoordinatorVertexList().size(), 1);
         Assert.assertEquals(physicalPlan.getPipelineList().get(0).getPhysicalVertexList().size(), 1);
     }
-
-    @After
-    public void after() {
-        instance.shutdown();
-    }
 }