You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/07/23 12:46:57 UTC

[incubator-seatunnel] branch dev updated: using constants replace the hard code string (#2251)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b71c99363 using constants replace the hard code string (#2251)
b71c99363 is described below

commit b71c99363a634f83aae84788f914875d76dcb570
Author: Xiao Zhao <49...@users.noreply.github.com>
AuthorDate: Sat Jul 23 20:46:53 2022 +0800

    using constants replace the hard code string (#2251)
---
 .../java/org/apache/seatunnel/common/Constants.java    | 18 ++++++++++++++++++
 .../spark/execution/SourceExecuteProcessor.java        |  3 ++-
 .../core/starter/spark/execution/SparkExecution.java   |  7 ++++---
 .../seatunnel/translation/spark/sink/SparkSink.java    |  3 ++-
 .../translation/spark/sink/SparkSinkInjector.java      |  5 +++--
 .../spark/source/SeaTunnelSourceSupport.java           | 17 +++++++++--------
 6 files changed, 38 insertions(+), 15 deletions(-)

diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
index f77cd014b..d75131b83 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
@@ -23,6 +23,24 @@ public final class Constants {
 
     public static final String LOGO = "SeaTunnel";
 
+    public static final String SOURCE = "source";
+
+    public static final String TRANSFORM = "transform";
+
+    public static final String SINK = "sink";
+
+    public static final String SOURCE_SERIALIZATION = "source.serialization";
+
+    public static final String SOURCE_PARALLELISM = "source.parallelism";
+
+    public static final String HDFS_ROOT = "hdfs.root";
+
+    public static final String HDFS_USER = "hdfs.user";
+
+    public static final String CHECKPOINT_INTERVAL = "checkpoint.interval";
+
+    public static final String CHECKPOINT_ID = "checkpoint.id";
+
     private Constants() {
     }
 }
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index f8f8a59b8..1435bd1b4 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.core.starter.spark.execution;
 
 import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
@@ -55,7 +56,7 @@ public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTu
             Dataset<Row> dataset = sparkEnvironment.getSparkSession()
                 .read()
                 .format(SeaTunnelSource.class.getSimpleName())
-                .option("source.serialization", SerializationUtils.objectToString(source))
+                .option(Constants.SOURCE_SERIALIZATION, SerializationUtils.objectToString(source))
                 .schema((StructType) TypeConverterUtils.convert(source.getProducedType())).load();
             sources.add(dataset);
             registerInputTempView(pluginConfigs.get(i), dataset);
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
index 7b90bd0dd..31193e0cf 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.core.starter.spark.execution;
 
 import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.core.starter.config.EngineType;
 import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
@@ -47,9 +48,9 @@ public class SparkExecution {
         this.config = config;
         this.sparkEnvironment = (SparkEnvironment) new EnvironmentFactory<>(config, EngineType.SPARK).getEnvironment();
         SeaTunnelContext.getContext().setJobMode(sparkEnvironment.getJobMode());
-        this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(sparkEnvironment, config.getConfigList("source"));
-        this.transformPluginExecuteProcessor = new TransformExecuteProcessor(sparkEnvironment, config.getConfigList("transform"));
-        this.sinkPluginExecuteProcessor = new SinkExecuteProcessor(sparkEnvironment, config.getConfigList("sink"));
+        this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(sparkEnvironment, config.getConfigList(Constants.SOURCE));
+        this.transformPluginExecuteProcessor = new TransformExecuteProcessor(sparkEnvironment, config.getConfigList(Constants.TRANSFORM));
+        this.sinkPluginExecuteProcessor = new SinkExecuteProcessor(sparkEnvironment, config.getConfigList(Constants.SINK));
     }
 
     public void execute() throws TaskExecuteException {
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
index b174fb144..804b85593 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.translation.spark.sink;
 
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.utils.SerializationUtils;
 
 import org.apache.spark.sql.SaveMode;
@@ -42,7 +43,7 @@ public class SparkSink<StateT, CommitInfoT, AggregatedCommitInfoT> implements Wr
     private void init(DataSourceOptions options) {
         if (sink == null) {
             this.sink = SerializationUtils.stringToObject(
-                    options.get("sink").orElseThrow(() -> new IllegalArgumentException("can not find sink " +
+                    options.get(Constants.SINK).orElseThrow(() -> new IllegalArgumentException("can not find sink " +
                             "class string in DataSourceOptions")));
         }
     }
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
index 00d6a5daa..50ec6b39a 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.translation.spark.sink;
 
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.utils.SerializationUtils;
 
 import org.apache.spark.sql.DataFrameWriter;
@@ -32,12 +33,12 @@ public class SparkSinkInjector {
     public static DataStreamWriter<Row> inject(DataStreamWriter<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink) {
         return dataset.format(SPARK_SINK_CLASS_NAME)
             .outputMode(OutputMode.Append())
-            .option("sink", SerializationUtils.objectToString(sink));
+            .option(Constants.SINK, SerializationUtils.objectToString(sink));
     }
 
     public static DataFrameWriter<Row> inject(DataFrameWriter<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink) {
         return dataset.format(SPARK_SINK_CLASS_NAME)
-            .option("sink", SerializationUtils.objectToString(sink));
+            .option(Constants.SINK, SerializationUtils.objectToString(sink));
     }
 
 }
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
index b291da65e..baf7abcb7 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.translation.spark.source;
 
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.translation.spark.source.batch.BatchSourceReader;
 import org.apache.seatunnel.translation.spark.source.continnous.ContinuousSourceReader;
@@ -61,32 +62,32 @@ public class SeaTunnelSourceSupport implements DataSourceV2, ReadSupport, MicroB
     @Override
     public DataSourceReader createReader(DataSourceOptions options) {
         SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource = getSeaTunnelSource(options);
-        int parallelism = options.getInt("source.parallelism", 1);
+        int parallelism = options.getInt(Constants.SOURCE_PARALLELISM, 1);
         return new BatchSourceReader(seaTunnelSource, parallelism);
     }
 
     @Override
     public MicroBatchReader createMicroBatchReader(Optional<StructType> rowTypeOptional, String checkpointLocation, DataSourceOptions options) {
         SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource = getSeaTunnelSource(options);
-        Integer parallelism = options.getInt("source.parallelism", 1);
-        Integer checkpointInterval = options.getInt("checkpoint.interval", CHECKPOINT_INTERVAL_DEFAULT);
+        Integer parallelism = options.getInt(Constants.SOURCE_PARALLELISM, 1);
+        Integer checkpointInterval = options.getInt(Constants.CHECKPOINT_INTERVAL, CHECKPOINT_INTERVAL_DEFAULT);
         String checkpointPath = StringUtils.replacePattern(checkpointLocation, "sources/\\d+", "sources-state");
         Configuration configuration = SparkSession.getActiveSession().get().sparkContext().hadoopConfiguration();
-        String hdfsRoot = options.get("hdfs.root").orElse(FileSystem.getDefaultUri(configuration).toString());
-        String hdfsUser = options.get("hdfs.user").orElse("");
-        Integer checkpointId = options.getInt("checkpoint.id", 1);
+        String hdfsRoot = options.get(Constants.HDFS_ROOT).orElse(FileSystem.getDefaultUri(configuration).toString());
+        String hdfsUser = options.get(Constants.HDFS_USER).orElse("");
+        Integer checkpointId = options.getInt(Constants.CHECKPOINT_ID, 1);
         return new MicroBatchSourceReader(seaTunnelSource, parallelism, checkpointId, checkpointInterval, checkpointPath, hdfsRoot, hdfsUser);
     }
 
     @Override
     public ContinuousReader createContinuousReader(Optional<StructType> rowTypeOptional, String checkpointLocation, DataSourceOptions options) {
         SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource = getSeaTunnelSource(options);
-        Integer parallelism = options.getInt("source.parallelism", 1);
+        Integer parallelism = options.getInt(Constants.SOURCE_PARALLELISM, 1);
         return new ContinuousSourceReader(seaTunnelSource, parallelism);
     }
 
     private SeaTunnelSource<SeaTunnelRow, ?, ?> getSeaTunnelSource(DataSourceOptions options) {
-        return SerializationUtils.stringToObject(options.get("source.serialization")
+        return SerializationUtils.stringToObject(options.get(Constants.SOURCE_SERIALIZATION)
             .orElseThrow(() -> new UnsupportedOperationException("Serialization information for the SeaTunnelSource is required")));
     }
 }