You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/07/05 05:43:00 UTC

[incubator-seatunnel] branch dev updated: [SeaTunnel API] [Sink] remove useless context field (#2124)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a31fdeedc [SeaTunnel API] [Sink] remove useless context field (#2124)
a31fdeedc is described below

commit a31fdeedcc1629d6c24c3033a57ad397f2a37e71
Author: Hisoka <fa...@qq.com>
AuthorDate: Tue Jul 5 13:42:54 2022 +0800

    [SeaTunnel API] [Sink] remove useless context field (#2124)
---
 docs/en/connector/source/Http.md                       |  2 +-
 .../seatunnel/api/sink/DefaultSinkWriterContext.java   | 18 +-----------------
 .../java/org/apache/seatunnel/api/sink/SinkWriter.java | 11 -----------
 .../jdbc/internal/xa/SemanticXidGenerator.java         |  3 +--
 .../starter/flink/execution/SinkExecuteProcessor.java  |  3 +--
 .../starter/spark/execution/SinkExecuteProcessor.java  |  5 +----
 .../seatunnel/translation/flink/sink/FlinkSink.java    | 11 ++---------
 .../translation/spark/sink/SparkDataSourceWriter.java  |  9 +++------
 .../translation/spark/sink/SparkDataWriterFactory.java | 10 ++--------
 .../seatunnel/translation/spark/sink/SparkSink.java    |  9 ++-------
 .../translation/spark/sink/SparkSinkInjector.java      | 10 ++--------
 .../translation/spark/sink/SparkStreamWriter.java      |  6 ++----
 12 files changed, 18 insertions(+), 79 deletions(-)

diff --git a/docs/en/connector/source/Http.md b/docs/en/connector/source/Http.md
index d9d3a399b..ad34e9e74 100644
--- a/docs/en/connector/source/Http.md
+++ b/docs/en/connector/source/Http.md
@@ -37,7 +37,7 @@ HTTP request header, json format.
 
 ### request_params[string]
 
-HTTP request parameters, json format.
+HTTP request parameters, json format. Use string with escapes to save json
 
 ### sync_path[string]
 
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
index 8c8219db7..c8a3fd61f 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
@@ -17,26 +17,14 @@
 
 package org.apache.seatunnel.api.sink;
 
-import java.util.Map;
-
 /**
  * The default {@link SinkWriter.Context} implement class.
  */
 public class DefaultSinkWriterContext implements SinkWriter.Context {
-
-    private final Map<String, String> configuration;
     private final int subtask;
-    private final int parallelism;
 
-    public DefaultSinkWriterContext(Map<String, String> configuration, int subtask, int parallelism) {
-        this.configuration = configuration;
+    public DefaultSinkWriterContext(int subtask) {
         this.subtask = subtask;
-        this.parallelism = parallelism;
-    }
-
-    @Override
-    public Map<String, String> getConfiguration() {
-        return configuration;
     }
 
     @Override
@@ -44,8 +32,4 @@ public class DefaultSinkWriterContext implements SinkWriter.Context {
         return subtask;
     }
 
-    @Override
-    public int getNumberOfParallelSubtasks() {
-        return parallelism;
-    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
index 268d3d40e..897e64b4c 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -76,20 +75,10 @@ public interface SinkWriter<T, CommitInfoT, StateT> extends Serializable {
 
     interface Context extends Serializable{
 
-        /**
-         * Gets the configuration with which Job was started.
-         */
-        Map<String, String> getConfiguration();
-
         /**
          * @return The index of this subtask.
          */
         int getIndexOfSubtask();
 
-        /**
-         * @return The number of parallel Sink tasks.
-         */
-        int getNumberOfParallelSubtasks();
-
     }
 }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
index 98825a57a..3d2a82b3d 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
@@ -80,8 +80,7 @@ class SemanticXidGenerator
             return false;
         }
         int subtaskIndex = readNumber(xid.getGlobalTransactionId(), JOB_ID_BYTES, Integer.BYTES);
-        if (subtaskIndex != sinkContext.getIndexOfSubtask()
-            && subtaskIndex <= sinkContext.getNumberOfParallelSubtasks() - 1) {
+        if (subtaskIndex != sinkContext.getIndexOfSubtask()) {
             return false;
         }
         byte[] jobIdBytes = new byte[JOB_ID_BYTES];
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 079e3f95e..4744fdf7e 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -36,7 +36,6 @@ import org.apache.flink.types.Row;
 
 import java.net.URL;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -76,7 +75,7 @@ public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunn
             SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink = plugins.get(i);
             DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
             seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
-            stream.sinkTo(new FlinkSink<>(seaTunnelSink, Collections.emptyMap()));
+            stream.sinkTo(new FlinkSink<>(seaTunnelSink));
         }
         // the sink is the last stream
         return null;
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index c19770a63..2e0096a47 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -20,7 +20,6 @@ package org.apache.seatunnel.core.starter.spark.execution;
 import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
@@ -36,7 +35,6 @@ import org.apache.spark.sql.Row;
 
 import java.net.URL;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -74,8 +72,7 @@ public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunn
             Dataset<Row> dataset = fromSourceTable(sinkConfig, sparkEnvironment).orElse(input);
             // TODO modify checkpoint location
             seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(dataset.schema()));
-            SparkSinkInjector.inject(dataset.write(), seaTunnelSink, new HashMap<>(Common.COLLECTION_SIZE)).option(
-                "checkpointLocation", "/tmp").save();
+            SparkSinkInjector.inject(dataset.write(), seaTunnelSink).option("checkpointLocation", "/tmp").save();
         }
         // the sink is the last stream
         return null;
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
index c90fd86eb..a3e927240 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
@@ -32,28 +32,21 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
-@SuppressWarnings("unchecked")
 public class FlinkSink<InputT, CommT, WriterStateT, GlobalCommT> implements Sink<InputT, CommitWrapper<CommT>,
         FlinkWriterState<WriterStateT>, GlobalCommT> {
 
     private final SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT, GlobalCommT> sink;
-    private final Map<String, String> configuration;
 
-    public FlinkSink(SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT, GlobalCommT> sink,
-              Map<String, String> configuration) {
+    public FlinkSink(SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT, GlobalCommT> sink) {
         this.sink = sink;
-        this.configuration = configuration;
     }
 
     @Override
     public SinkWriter<InputT, CommitWrapper<CommT>, FlinkWriterState<WriterStateT>> createWriter(org.apache.flink.api.connector.sink.Sink.InitContext context, List<FlinkWriterState<WriterStateT>> states) throws IOException {
-        // TODO add subtask and parallelism.
-        org.apache.seatunnel.api.sink.SinkWriter.Context stContext =
-                new DefaultSinkWriterContext(configuration, 0, 0);
+        org.apache.seatunnel.api.sink.SinkWriter.Context stContext = new DefaultSinkWriterContext(context.getSubtaskId());
 
         if (states == null || states.isEmpty()) {
             return new FlinkSinkWriter<>(sink.createWriter(stContext), 1, sink.getConsumedType());
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriter.java
index 3d79912d2..4855f7066 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriter.java
@@ -33,27 +33,24 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
 public class SparkDataSourceWriter<StateT, CommitInfoT, AggregatedCommitInfoT> implements DataSourceWriter {
 
     protected final SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, AggregatedCommitInfoT> sink;
-    protected final Map<String, String> configuration;
     @Nullable
     protected final SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> sinkAggregatedCommitter;
 
-    SparkDataSourceWriter(SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, AggregatedCommitInfoT> sink,
-                          Map<String, String> configuration) throws IOException {
+    SparkDataSourceWriter(SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, AggregatedCommitInfoT> sink)
+            throws IOException {
         this.sink = sink;
-        this.configuration = configuration;
         this.sinkAggregatedCommitter = sink.createAggregatedCommitter().orElse(null);
     }
 
     @Override
     public DataWriterFactory<InternalRow> createWriterFactory() {
-        return new SparkDataWriterFactory<>(sink, configuration);
+        return new SparkDataWriterFactory<>(sink);
     }
 
     @Override
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriterFactory.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriterFactory.java
index 8cdbca3b9..ac3c6aa58 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriterFactory.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriterFactory.java
@@ -28,23 +28,17 @@ import org.apache.spark.sql.sources.v2.writer.DataWriter;
 import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
 
 import java.io.IOException;
-import java.util.Map;
 
 public class SparkDataWriterFactory<CommitInfoT, StateT> implements DataWriterFactory<InternalRow> {
 
     private final SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> sink;
-    private final Map<String, String> configuration;
-    SparkDataWriterFactory(SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> sink,  Map<String, String> configuration) {
+    SparkDataWriterFactory(SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> sink) {
         this.sink = sink;
-        this.configuration = configuration;
     }
 
     @Override
     public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
-        // TODO use partitionID, taskId information.
-        // TODO add subtask and parallelism.
-        org.apache.seatunnel.api.sink.SinkWriter.Context context =
-            new DefaultSinkWriterContext(configuration, (int) taskId, 0);
+        org.apache.seatunnel.api.sink.SinkWriter.Context context = new DefaultSinkWriterContext((int) taskId);
         SinkWriter<SeaTunnelRow, CommitInfoT, StateT> writer;
         SinkCommitter<CommitInfoT> committer;
         try {
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
index dd9efe540..b174fb144 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
@@ -32,23 +32,18 @@ import org.apache.spark.sql.streaming.OutputMode;
 import org.apache.spark.sql.types.StructType;
 
 import java.io.IOException;
-import java.util.Map;
 import java.util.Optional;
 
 public class SparkSink<StateT, CommitInfoT, AggregatedCommitInfoT> implements WriteSupport,
     StreamWriteSupport, DataSourceV2 {
 
     private volatile SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, AggregatedCommitInfoT> sink;
-    private Map<String, String> configuration;
 
     private void init(DataSourceOptions options) {
         if (sink == null) {
             this.sink = SerializationUtils.stringToObject(
                     options.get("sink").orElseThrow(() -> new IllegalArgumentException("can not find sink " +
                             "class string in DataSourceOptions")));
-            this.configuration = SerializationUtils.stringToObject(
-                    options.get("configuration").orElseThrow(() -> new IllegalArgumentException("can not " +
-                            "find configuration class string in DataSourceOptions")));
         }
     }
 
@@ -57,7 +52,7 @@ public class SparkSink<StateT, CommitInfoT, AggregatedCommitInfoT> implements Wr
         init(options);
 
         try {
-            return new SparkStreamWriter<>(sink, configuration);
+            return new SparkStreamWriter<>(sink);
         } catch (IOException e) {
             throw new RuntimeException("find error when createStreamWriter", e);
         }
@@ -68,7 +63,7 @@ public class SparkSink<StateT, CommitInfoT, AggregatedCommitInfoT> implements Wr
         init(options);
 
         try {
-            return Optional.of(new SparkDataSourceWriter<>(sink, configuration));
+            return Optional.of(new SparkDataSourceWriter<>(sink));
         } catch (IOException e) {
             throw new RuntimeException("find error when createStreamWriter", e);
         }
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
index 62cdc255c..00d6a5daa 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
@@ -25,24 +25,18 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.streaming.DataStreamWriter;
 import org.apache.spark.sql.streaming.OutputMode;
 
-import java.util.HashMap;
-
 public class SparkSinkInjector {
 
     private static final String SPARK_SINK_CLASS_NAME = "org.apache.seatunnel.translation.spark.sink.SparkSink";
 
-    public static DataStreamWriter<Row> inject(DataStreamWriter<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink,
-                                               HashMap<String, String> configuration) {
+    public static DataStreamWriter<Row> inject(DataStreamWriter<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink) {
         return dataset.format(SPARK_SINK_CLASS_NAME)
             .outputMode(OutputMode.Append())
-            .option("configuration", SerializationUtils.objectToString(configuration))
             .option("sink", SerializationUtils.objectToString(sink));
     }
 
-    public static DataFrameWriter<Row> inject(DataFrameWriter<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink,
-                                              HashMap<String, String> configuration) {
+    public static DataFrameWriter<Row> inject(DataFrameWriter<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink) {
         return dataset.format(SPARK_SINK_CLASS_NAME)
-            .option("configuration", SerializationUtils.objectToString(configuration))
             .option("sink", SerializationUtils.objectToString(sink));
     }
 
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkStreamWriter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkStreamWriter.java
index 2fc678e38..85f6a9542 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkStreamWriter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkStreamWriter.java
@@ -26,14 +26,12 @@ import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
 
 import java.io.IOException;
-import java.util.Map;
 
 public class SparkStreamWriter<StateT, CommitInfoT, AggregatedCommitInfoT> extends SparkDataSourceWriter<StateT, CommitInfoT, AggregatedCommitInfoT>
         implements StreamWriter {
 
-    SparkStreamWriter(SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, AggregatedCommitInfoT> sink,
-                      Map<String, String> configuration) throws IOException {
-        super(sink, configuration);
+    SparkStreamWriter(SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, AggregatedCommitInfoT> sink) throws IOException {
+        super(sink);
     }
 
     @Override