You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/20 02:11:41 UTC

[incubator-seatunnel] branch api-draft updated: fix spark batch can't stop (#1927)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 6d5ef6da fix spark batch can't stop (#1927)
6d5ef6da is described below

commit 6d5ef6dabec2e24d524a737c389d868ec3a8eea9
Author: TrickyZerg <32...@users.noreply.github.com>
AuthorDate: Fri May 20 10:11:35 2022 +0800

    fix spark batch can't stop (#1927)
---
 .../command/SeaTunnelApiTaskExecuteCommand.java    | 38 ----------------------
 .../spark/source/batch/BatchPartitionReader.java   |  2 +-
 2 files changed, 1 insertion(+), 39 deletions(-)

diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiTaskExecuteCommand.java
index a12d23b4..a0e30a7b 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiTaskExecuteCommand.java
@@ -17,21 +17,12 @@
 
 package org.apache.seatunnel.core.spark.command;
 
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.core.base.command.Command;
 import org.apache.seatunnel.core.base.config.ConfigBuilder;
-import org.apache.seatunnel.core.base.config.EngineType;
-import org.apache.seatunnel.core.base.config.EnvironmentFactory;
 import org.apache.seatunnel.core.base.exception.CommandExecuteException;
 import org.apache.seatunnel.core.base.utils.FileUtils;
 import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
 import org.apache.seatunnel.core.spark.execution.SeaTunnelTaskExecution;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
-import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
-import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
-import org.apache.seatunnel.spark.SparkEnvironment;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -66,33 +57,4 @@ public class SeaTunnelApiTaskExecuteCommand implements Command<SparkCommandArgs>
         }
     }
 
-    private SeaTunnelSource<?, ?, ?> getSource(Config config) {
-        PluginIdentifier pluginIdentifier = getSourcePluginIdentifier();
-        // todo: use FactoryUtils to load the plugin
-        SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
-        return sourcePluginDiscovery.getPluginInstance(pluginIdentifier);
-    }
-
-    private SeaTunnelSink<?, ?, ?, ?> getSink(Config config) {
-        PluginIdentifier pluginIdentifier = getSinkPluginIdentifier();
-        SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
-        return sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
-    }
-
-    private PluginIdentifier getSourcePluginIdentifier() {
-        return PluginIdentifier.of("seatunnel", "source", "FakeSource");
-    }
-
-    private PluginIdentifier getSinkPluginIdentifier() {
-        return PluginIdentifier.of("seatunnel", "sink", "Console");
-    }
-
-    private SparkEnvironment getSparkEnvironment(Config config) {
-        SparkEnvironment sparkEnvironment = (SparkEnvironment) new EnvironmentFactory<>(config, EngineType.SPARK).getEnvironment();
-        sparkEnvironment.setJobMode(JobMode.STREAMING);
-        sparkEnvironment.setConfig(config);
-        sparkEnvironment.prepare();
-
-        return sparkEnvironment;
-    }
 }
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
index 4ed7cfb1..9f5beeb5 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
@@ -76,7 +76,7 @@ public class BatchPartitionReader implements InputPartitionReader<InternalRow> {
                 throw new RuntimeException(e);
             }
         }
-        return running;
+        return running || !handover.isEmpty();
     }
 
     protected void prepare() {