You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/07/05 05:43:00 UTC
[incubator-seatunnel] branch dev updated: [SeaTunnel API] [Sink] remove useless context field (#2124)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a31fdeedc [SeaTunnel API] [Sink] remove useless context field (#2124)
a31fdeedc is described below
commit a31fdeedcc1629d6c24c3033a57ad397f2a37e71
Author: Hisoka <fa...@qq.com>
AuthorDate: Tue Jul 5 13:42:54 2022 +0800
[SeaTunnel API] [Sink] remove useless context field (#2124)
---
docs/en/connector/source/Http.md | 2 +-
.../seatunnel/api/sink/DefaultSinkWriterContext.java | 18 +-----------------
.../java/org/apache/seatunnel/api/sink/SinkWriter.java | 11 -----------
.../jdbc/internal/xa/SemanticXidGenerator.java | 3 +--
.../starter/flink/execution/SinkExecuteProcessor.java | 3 +--
.../starter/spark/execution/SinkExecuteProcessor.java | 5 +----
.../seatunnel/translation/flink/sink/FlinkSink.java | 11 ++---------
.../translation/spark/sink/SparkDataSourceWriter.java | 9 +++------
.../translation/spark/sink/SparkDataWriterFactory.java | 10 ++--------
.../seatunnel/translation/spark/sink/SparkSink.java | 9 ++-------
.../translation/spark/sink/SparkSinkInjector.java | 10 ++--------
.../translation/spark/sink/SparkStreamWriter.java | 6 ++----
12 files changed, 18 insertions(+), 79 deletions(-)
diff --git a/docs/en/connector/source/Http.md b/docs/en/connector/source/Http.md
index d9d3a399b..ad34e9e74 100644
--- a/docs/en/connector/source/Http.md
+++ b/docs/en/connector/source/Http.md
@@ -37,7 +37,7 @@ HTTP request header, json format.
### request_params[string]
-HTTP request parameters, json format.
+HTTP request parameters, json format. Use string with escapes to save json
### sync_path[string]
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
index 8c8219db7..c8a3fd61f 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
@@ -17,26 +17,14 @@
package org.apache.seatunnel.api.sink;
-import java.util.Map;
-
/**
* The default {@link SinkWriter.Context} implement class.
*/
public class DefaultSinkWriterContext implements SinkWriter.Context {
-
- private final Map<String, String> configuration;
private final int subtask;
- private final int parallelism;
- public DefaultSinkWriterContext(Map<String, String> configuration, int subtask, int parallelism) {
- this.configuration = configuration;
+ public DefaultSinkWriterContext(int subtask) {
this.subtask = subtask;
- this.parallelism = parallelism;
- }
-
- @Override
- public Map<String, String> getConfiguration() {
- return configuration;
}
@Override
@@ -44,8 +32,4 @@ public class DefaultSinkWriterContext implements SinkWriter.Context {
return subtask;
}
- @Override
- public int getNumberOfParallelSubtasks() {
- return parallelism;
- }
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
index 268d3d40e..897e64b4c 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
/**
@@ -76,20 +75,10 @@ public interface SinkWriter<T, CommitInfoT, StateT> extends Serializable {
interface Context extends Serializable{
- /**
- * Gets the configuration with which Job was started.
- */
- Map<String, String> getConfiguration();
-
/**
* @return The index of this subtask.
*/
int getIndexOfSubtask();
- /**
- * @return The number of parallel Sink tasks.
- */
- int getNumberOfParallelSubtasks();
-
}
}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
index 98825a57a..3d2a82b3d 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
@@ -80,8 +80,7 @@ class SemanticXidGenerator
return false;
}
int subtaskIndex = readNumber(xid.getGlobalTransactionId(), JOB_ID_BYTES, Integer.BYTES);
- if (subtaskIndex != sinkContext.getIndexOfSubtask()
- && subtaskIndex <= sinkContext.getNumberOfParallelSubtasks() - 1) {
+ if (subtaskIndex != sinkContext.getIndexOfSubtask()) {
return false;
}
byte[] jobIdBytes = new byte[JOB_ID_BYTES];
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 079e3f95e..4744fdf7e 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -36,7 +36,6 @@ import org.apache.flink.types.Row;
import java.net.URL;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -76,7 +75,7 @@ public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunn
SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink = plugins.get(i);
DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
- stream.sinkTo(new FlinkSink<>(seaTunnelSink, Collections.emptyMap()));
+ stream.sinkTo(new FlinkSink<>(seaTunnelSink));
}
// the sink is the last stream
return null;
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index c19770a63..2e0096a47 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -20,7 +20,6 @@ package org.apache.seatunnel.core.starter.spark.execution;
import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
@@ -36,7 +35,6 @@ import org.apache.spark.sql.Row;
import java.net.URL;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
@@ -74,8 +72,7 @@ public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunn
Dataset<Row> dataset = fromSourceTable(sinkConfig, sparkEnvironment).orElse(input);
// TODO modify checkpoint location
seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(dataset.schema()));
- SparkSinkInjector.inject(dataset.write(), seaTunnelSink, new HashMap<>(Common.COLLECTION_SIZE)).option(
- "checkpointLocation", "/tmp").save();
+ SparkSinkInjector.inject(dataset.write(), seaTunnelSink).option("checkpointLocation", "/tmp").save();
}
// the sink is the last stream
return null;
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
index c90fd86eb..a3e927240 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
@@ -32,28 +32,21 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.io.IOException;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
-@SuppressWarnings("unchecked")
public class FlinkSink<InputT, CommT, WriterStateT, GlobalCommT> implements Sink<InputT, CommitWrapper<CommT>,
FlinkWriterState<WriterStateT>, GlobalCommT> {
private final SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT, GlobalCommT> sink;
- private final Map<String, String> configuration;
- public FlinkSink(SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT, GlobalCommT> sink,
- Map<String, String> configuration) {
+ public FlinkSink(SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT, GlobalCommT> sink) {
this.sink = sink;
- this.configuration = configuration;
}
@Override
public SinkWriter<InputT, CommitWrapper<CommT>, FlinkWriterState<WriterStateT>> createWriter(org.apache.flink.api.connector.sink.Sink.InitContext context, List<FlinkWriterState<WriterStateT>> states) throws IOException {
- // TODO add subtask and parallelism.
- org.apache.seatunnel.api.sink.SinkWriter.Context stContext =
- new DefaultSinkWriterContext(configuration, 0, 0);
+ org.apache.seatunnel.api.sink.SinkWriter.Context stContext = new DefaultSinkWriterContext(context.getSubtaskId());
if (states == null || states.isEmpty()) {
return new FlinkSinkWriter<>(sink.createWriter(stContext), 1, sink.getConsumedType());
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriter.java
index 3d79912d2..4855f7066 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriter.java
@@ -33,27 +33,24 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
public class SparkDataSourceWriter<StateT, CommitInfoT, AggregatedCommitInfoT> implements DataSourceWriter {
protected final SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, AggregatedCommitInfoT> sink;
- protected final Map<String, String> configuration;
@Nullable
protected final SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> sinkAggregatedCommitter;
- SparkDataSourceWriter(SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, AggregatedCommitInfoT> sink,
- Map<String, String> configuration) throws IOException {
+ SparkDataSourceWriter(SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, AggregatedCommitInfoT> sink)
+ throws IOException {
this.sink = sink;
- this.configuration = configuration;
this.sinkAggregatedCommitter = sink.createAggregatedCommitter().orElse(null);
}
@Override
public DataWriterFactory<InternalRow> createWriterFactory() {
- return new SparkDataWriterFactory<>(sink, configuration);
+ return new SparkDataWriterFactory<>(sink);
}
@Override
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriterFactory.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriterFactory.java
index 8cdbca3b9..ac3c6aa58 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriterFactory.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriterFactory.java
@@ -28,23 +28,17 @@ import org.apache.spark.sql.sources.v2.writer.DataWriter;
import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
import java.io.IOException;
-import java.util.Map;
public class SparkDataWriterFactory<CommitInfoT, StateT> implements DataWriterFactory<InternalRow> {
private final SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> sink;
- private final Map<String, String> configuration;
- SparkDataWriterFactory(SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> sink, Map<String, String> configuration) {
+ SparkDataWriterFactory(SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> sink) {
this.sink = sink;
- this.configuration = configuration;
}
@Override
public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
- // TODO use partitionID, taskId information.
- // TODO add subtask and parallelism.
- org.apache.seatunnel.api.sink.SinkWriter.Context context =
- new DefaultSinkWriterContext(configuration, (int) taskId, 0);
+ org.apache.seatunnel.api.sink.SinkWriter.Context context = new DefaultSinkWriterContext((int) taskId);
SinkWriter<SeaTunnelRow, CommitInfoT, StateT> writer;
SinkCommitter<CommitInfoT> committer;
try {
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
index dd9efe540..b174fb144 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
@@ -32,23 +32,18 @@ import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.types.StructType;
import java.io.IOException;
-import java.util.Map;
import java.util.Optional;
public class SparkSink<StateT, CommitInfoT, AggregatedCommitInfoT> implements WriteSupport,
StreamWriteSupport, DataSourceV2 {
private volatile SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, AggregatedCommitInfoT> sink;
- private Map<String, String> configuration;
private void init(DataSourceOptions options) {
if (sink == null) {
this.sink = SerializationUtils.stringToObject(
options.get("sink").orElseThrow(() -> new IllegalArgumentException("can not find sink " +
"class string in DataSourceOptions")));
- this.configuration = SerializationUtils.stringToObject(
- options.get("configuration").orElseThrow(() -> new IllegalArgumentException("can not " +
- "find configuration class string in DataSourceOptions")));
}
}
@@ -57,7 +52,7 @@ public class SparkSink<StateT, CommitInfoT, AggregatedCommitInfoT> implements Wr
init(options);
try {
- return new SparkStreamWriter<>(sink, configuration);
+ return new SparkStreamWriter<>(sink);
} catch (IOException e) {
throw new RuntimeException("find error when createStreamWriter", e);
}
@@ -68,7 +63,7 @@ public class SparkSink<StateT, CommitInfoT, AggregatedCommitInfoT> implements Wr
init(options);
try {
- return Optional.of(new SparkDataSourceWriter<>(sink, configuration));
+ return Optional.of(new SparkDataSourceWriter<>(sink));
} catch (IOException e) {
throw new RuntimeException("find error when createStreamWriter", e);
}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
index 62cdc255c..00d6a5daa 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
@@ -25,24 +25,18 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
-import java.util.HashMap;
-
public class SparkSinkInjector {
private static final String SPARK_SINK_CLASS_NAME = "org.apache.seatunnel.translation.spark.sink.SparkSink";
- public static DataStreamWriter<Row> inject(DataStreamWriter<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink,
- HashMap<String, String> configuration) {
+ public static DataStreamWriter<Row> inject(DataStreamWriter<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink) {
return dataset.format(SPARK_SINK_CLASS_NAME)
.outputMode(OutputMode.Append())
- .option("configuration", SerializationUtils.objectToString(configuration))
.option("sink", SerializationUtils.objectToString(sink));
}
- public static DataFrameWriter<Row> inject(DataFrameWriter<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink,
- HashMap<String, String> configuration) {
+ public static DataFrameWriter<Row> inject(DataFrameWriter<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink) {
return dataset.format(SPARK_SINK_CLASS_NAME)
- .option("configuration", SerializationUtils.objectToString(configuration))
.option("sink", SerializationUtils.objectToString(sink));
}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkStreamWriter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkStreamWriter.java
index 2fc678e38..85f6a9542 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkStreamWriter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkStreamWriter.java
@@ -26,14 +26,12 @@ import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
import java.io.IOException;
-import java.util.Map;
public class SparkStreamWriter<StateT, CommitInfoT, AggregatedCommitInfoT> extends SparkDataSourceWriter<StateT, CommitInfoT, AggregatedCommitInfoT>
implements StreamWriter {
- SparkStreamWriter(SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, AggregatedCommitInfoT> sink,
- Map<String, String> configuration) throws IOException {
- super(sink, configuration);
+ SparkStreamWriter(SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, AggregatedCommitInfoT> sink) throws IOException {
+ super(sink);
}
@Override