You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/07/23 12:46:57 UTC
[incubator-seatunnel] branch dev updated: using constants replace the hard code string (#2251)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b71c99363 using constants replace the hard code string (#2251)
b71c99363 is described below
commit b71c99363a634f83aae84788f914875d76dcb570
Author: Xiao Zhao <49...@users.noreply.github.com>
AuthorDate: Sat Jul 23 20:46:53 2022 +0800
using constants replace the hard code string (#2251)
---
.../java/org/apache/seatunnel/common/Constants.java | 18 ++++++++++++++++++
.../spark/execution/SourceExecuteProcessor.java | 3 ++-
.../core/starter/spark/execution/SparkExecution.java | 7 ++++---
.../seatunnel/translation/spark/sink/SparkSink.java | 3 ++-
.../translation/spark/sink/SparkSinkInjector.java | 5 +++--
.../spark/source/SeaTunnelSourceSupport.java | 17 +++++++++--------
6 files changed, 38 insertions(+), 15 deletions(-)
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
index f77cd014b..d75131b83 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
@@ -23,6 +23,24 @@ public final class Constants {
public static final String LOGO = "SeaTunnel";
+ public static final String SOURCE = "source";
+
+ public static final String TRANSFORM = "transform";
+
+ public static final String SINK = "sink";
+
+ public static final String SOURCE_SERIALIZATION = "source.serialization";
+
+ public static final String SOURCE_PARALLELISM = "source.parallelism";
+
+ public static final String HDFS_ROOT = "hdfs.root";
+
+ public static final String HDFS_USER = "hdfs.user";
+
+ public static final String CHECKPOINT_INTERVAL = "checkpoint.interval";
+
+ public static final String CHECKPOINT_ID = "checkpoint.id";
+
private Constants() {
}
}
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index f8f8a59b8..1435bd1b4 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.core.starter.spark.execution;
import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
@@ -55,7 +56,7 @@ public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTu
Dataset<Row> dataset = sparkEnvironment.getSparkSession()
.read()
.format(SeaTunnelSource.class.getSimpleName())
- .option("source.serialization", SerializationUtils.objectToString(source))
+ .option(Constants.SOURCE_SERIALIZATION, SerializationUtils.objectToString(source))
.schema((StructType) TypeConverterUtils.convert(source.getProducedType())).load();
sources.add(dataset);
registerInputTempView(pluginConfigs.get(i), dataset);
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
index 7b90bd0dd..31193e0cf 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.core.starter.spark.execution;
import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.core.starter.config.EngineType;
import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
@@ -47,9 +48,9 @@ public class SparkExecution {
this.config = config;
this.sparkEnvironment = (SparkEnvironment) new EnvironmentFactory<>(config, EngineType.SPARK).getEnvironment();
SeaTunnelContext.getContext().setJobMode(sparkEnvironment.getJobMode());
- this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(sparkEnvironment, config.getConfigList("source"));
- this.transformPluginExecuteProcessor = new TransformExecuteProcessor(sparkEnvironment, config.getConfigList("transform"));
- this.sinkPluginExecuteProcessor = new SinkExecuteProcessor(sparkEnvironment, config.getConfigList("sink"));
+ this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(sparkEnvironment, config.getConfigList(Constants.SOURCE));
+ this.transformPluginExecuteProcessor = new TransformExecuteProcessor(sparkEnvironment, config.getConfigList(Constants.TRANSFORM));
+ this.sinkPluginExecuteProcessor = new SinkExecuteProcessor(sparkEnvironment, config.getConfigList(Constants.SINK));
}
public void execute() throws TaskExecuteException {
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
index b174fb144..804b85593 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.translation.spark.sink;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.spark.sql.SaveMode;
@@ -42,7 +43,7 @@ public class SparkSink<StateT, CommitInfoT, AggregatedCommitInfoT> implements Wr
private void init(DataSourceOptions options) {
if (sink == null) {
this.sink = SerializationUtils.stringToObject(
- options.get("sink").orElseThrow(() -> new IllegalArgumentException("can not find sink " +
+ options.get(Constants.SINK).orElseThrow(() -> new IllegalArgumentException("can not find sink " +
"class string in DataSourceOptions")));
}
}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
index 00d6a5daa..50ec6b39a 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.translation.spark.sink;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.spark.sql.DataFrameWriter;
@@ -32,12 +33,12 @@ public class SparkSinkInjector {
public static DataStreamWriter<Row> inject(DataStreamWriter<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink) {
return dataset.format(SPARK_SINK_CLASS_NAME)
.outputMode(OutputMode.Append())
- .option("sink", SerializationUtils.objectToString(sink));
+ .option(Constants.SINK, SerializationUtils.objectToString(sink));
}
public static DataFrameWriter<Row> inject(DataFrameWriter<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink) {
return dataset.format(SPARK_SINK_CLASS_NAME)
- .option("sink", SerializationUtils.objectToString(sink));
+ .option(Constants.SINK, SerializationUtils.objectToString(sink));
}
}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
index b291da65e..baf7abcb7 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.translation.spark.source;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.translation.spark.source.batch.BatchSourceReader;
import org.apache.seatunnel.translation.spark.source.continnous.ContinuousSourceReader;
@@ -61,32 +62,32 @@ public class SeaTunnelSourceSupport implements DataSourceV2, ReadSupport, MicroB
@Override
public DataSourceReader createReader(DataSourceOptions options) {
SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource = getSeaTunnelSource(options);
- int parallelism = options.getInt("source.parallelism", 1);
+ int parallelism = options.getInt(Constants.SOURCE_PARALLELISM, 1);
return new BatchSourceReader(seaTunnelSource, parallelism);
}
@Override
public MicroBatchReader createMicroBatchReader(Optional<StructType> rowTypeOptional, String checkpointLocation, DataSourceOptions options) {
SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource = getSeaTunnelSource(options);
- Integer parallelism = options.getInt("source.parallelism", 1);
- Integer checkpointInterval = options.getInt("checkpoint.interval", CHECKPOINT_INTERVAL_DEFAULT);
+ Integer parallelism = options.getInt(Constants.SOURCE_PARALLELISM, 1);
+ Integer checkpointInterval = options.getInt(Constants.CHECKPOINT_INTERVAL, CHECKPOINT_INTERVAL_DEFAULT);
String checkpointPath = StringUtils.replacePattern(checkpointLocation, "sources/\\d+", "sources-state");
Configuration configuration = SparkSession.getActiveSession().get().sparkContext().hadoopConfiguration();
- String hdfsRoot = options.get("hdfs.root").orElse(FileSystem.getDefaultUri(configuration).toString());
- String hdfsUser = options.get("hdfs.user").orElse("");
- Integer checkpointId = options.getInt("checkpoint.id", 1);
+ String hdfsRoot = options.get(Constants.HDFS_ROOT).orElse(FileSystem.getDefaultUri(configuration).toString());
+ String hdfsUser = options.get(Constants.HDFS_USER).orElse("");
+ Integer checkpointId = options.getInt(Constants.CHECKPOINT_ID, 1);
return new MicroBatchSourceReader(seaTunnelSource, parallelism, checkpointId, checkpointInterval, checkpointPath, hdfsRoot, hdfsUser);
}
@Override
public ContinuousReader createContinuousReader(Optional<StructType> rowTypeOptional, String checkpointLocation, DataSourceOptions options) {
SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource = getSeaTunnelSource(options);
- Integer parallelism = options.getInt("source.parallelism", 1);
+ Integer parallelism = options.getInt(Constants.SOURCE_PARALLELISM, 1);
return new ContinuousSourceReader(seaTunnelSource, parallelism);
}
private SeaTunnelSource<SeaTunnelRow, ?, ?> getSeaTunnelSource(DataSourceOptions options) {
- return SerializationUtils.stringToObject(options.get("source.serialization")
+ return SerializationUtils.stringToObject(options.get(Constants.SOURCE_SERIALIZATION)
.orElseThrow(() -> new UnsupportedOperationException("Serialization information for the SeaTunnelSource is required")));
}
}