You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/20 02:11:41 UTC
[incubator-seatunnel] branch api-draft updated: fix spark batch can't stop (#1927)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 6d5ef6da fix spark batch can't stop (#1927)
6d5ef6da is described below
commit 6d5ef6dabec2e24d524a737c389d868ec3a8eea9
Author: TrickyZerg <32...@users.noreply.github.com>
AuthorDate: Fri May 20 10:11:35 2022 +0800
fix spark batch can't stop (#1927)
---
.../command/SeaTunnelApiTaskExecuteCommand.java | 38 ----------------------
.../spark/source/batch/BatchPartitionReader.java | 2 +-
2 files changed, 1 insertion(+), 39 deletions(-)
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiTaskExecuteCommand.java
index a12d23b4..a0e30a7b 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiTaskExecuteCommand.java
@@ -17,21 +17,12 @@
package org.apache.seatunnel.core.spark.command;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.core.base.command.Command;
import org.apache.seatunnel.core.base.config.ConfigBuilder;
-import org.apache.seatunnel.core.base.config.EngineType;
-import org.apache.seatunnel.core.base.config.EnvironmentFactory;
import org.apache.seatunnel.core.base.exception.CommandExecuteException;
import org.apache.seatunnel.core.base.utils.FileUtils;
import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
import org.apache.seatunnel.core.spark.execution.SeaTunnelTaskExecution;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
-import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
-import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
-import org.apache.seatunnel.spark.SparkEnvironment;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -66,33 +57,4 @@ public class SeaTunnelApiTaskExecuteCommand implements Command<SparkCommandArgs>
}
}
- private SeaTunnelSource<?, ?, ?> getSource(Config config) {
- PluginIdentifier pluginIdentifier = getSourcePluginIdentifier();
- // todo: use FactoryUtils to load the plugin
- SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
- return sourcePluginDiscovery.getPluginInstance(pluginIdentifier);
- }
-
- private SeaTunnelSink<?, ?, ?, ?> getSink(Config config) {
- PluginIdentifier pluginIdentifier = getSinkPluginIdentifier();
- SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
- return sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
- }
-
- private PluginIdentifier getSourcePluginIdentifier() {
- return PluginIdentifier.of("seatunnel", "source", "FakeSource");
- }
-
- private PluginIdentifier getSinkPluginIdentifier() {
- return PluginIdentifier.of("seatunnel", "sink", "Console");
- }
-
- private SparkEnvironment getSparkEnvironment(Config config) {
- SparkEnvironment sparkEnvironment = (SparkEnvironment) new EnvironmentFactory<>(config, EngineType.SPARK).getEnvironment();
- sparkEnvironment.setJobMode(JobMode.STREAMING);
- sparkEnvironment.setConfig(config);
- sparkEnvironment.prepare();
-
- return sparkEnvironment;
- }
}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
index 4ed7cfb1..9f5beeb5 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
@@ -76,7 +76,7 @@ public class BatchPartitionReader implements InputPartitionReader<InternalRow> {
throw new RuntimeException(e);
}
}
- return running;
+ return running || !handover.isEmpty();
}
protected void prepare() {