You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/13 09:31:17 UTC

[incubator-seatunnel] branch api-draft updated: [Api-draft] Fix Spark Sink Can't support batch mode. (#1868)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new af7f6e01 [Api-draft] Fix Spark Sink Can't support batch mode. (#1868)
af7f6e01 is described below

commit af7f6e019a1d538e14de331e567d41da30b1ff97
Author: TrickyZerg <32...@users.noreply.github.com>
AuthorDate: Fri May 13 17:31:13 2022 +0800

    [Api-draft] Fix Spark Sink Can't support batch mode. (#1868)
    
    * Fix Spark Sink Can't support batch mode.
---
 .../seatunnel/api/serialization/DefaultSerializer.java     | 14 ++++++++------
 .../apache/seatunnel/common/utils/SerializationUtils.java  |  9 +++++++++
 .../translation/spark/sink/SparkSinkInjector.java          | 13 ++++++++++---
 3 files changed, 27 insertions(+), 9 deletions(-)

diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
index ec213c69..c17a1d58 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
@@ -17,18 +17,20 @@
 
 package org.apache.seatunnel.api.serialization;
 
+import org.apache.seatunnel.common.utils.SerializationUtils;
+
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
+import java.io.Serializable;
 
-public class DefaultSerializer implements Serializer<String> {
+public class DefaultSerializer<T extends Serializable> implements Serializer<T> {
 
     @Override
-    public byte[] serialize(String obj) throws IOException {
-        return obj.getBytes(StandardCharsets.UTF_8);
+    public byte[] serialize(T obj) throws IOException {
+        return SerializationUtils.serialize(obj);
     }
 
     @Override
-    public String deserialize(byte[] serialized) throws IOException {
-        return new String(serialized);
+    public T deserialize(byte[] serialized) throws IOException {
+        return SerializationUtils.deserialize(serialized);
     }
 }
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
index 0b6e9c52..43dead16 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
@@ -37,4 +37,13 @@ public class SerializationUtils {
         }
         return null;
     }
+
+    public static <T extends Serializable> byte[] serialize(T obj) {
+        return org.apache.commons.lang3.SerializationUtils.serialize(obj);
+    }
+
+    public static <T extends Serializable> T deserialize(byte[] bytes) {
+        return org.apache.commons.lang3.SerializationUtils.deserialize(bytes);
+    }
+
 }
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
index 8aeb4537..cb45b70d 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.translation.spark.sink;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.common.utils.SerializationUtils;
 
-import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.DataFrameWriter;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.streaming.DataStreamWriter;
 import org.apache.spark.sql.streaming.OutputMode;
@@ -31,9 +31,16 @@ public class SparkSinkInjector {
 
     private static final String SPARK_SINK_CLASS_NAME = "org.apache.seatunnel.translation.spark.sink.SparkSink";
 
-    public static DataStreamWriter<Row> inject(Dataset<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink,
+    public static DataStreamWriter<Row> inject(DataStreamWriter<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink,
                                                HashMap<String, String> configuration) {
-        return dataset.writeStream().format(SPARK_SINK_CLASS_NAME).outputMode(OutputMode.Append())
+        return dataset.format(SPARK_SINK_CLASS_NAME).outputMode(OutputMode.Append())
+                .option("configuration", SerializationUtils.objectToString(configuration)).option("sink",
+                        SerializationUtils.objectToString(sink));
+    }
+
+    public static DataFrameWriter<Row> inject(DataFrameWriter<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink,
+                                              HashMap<String, String> configuration) {
+        return dataset.format(SPARK_SINK_CLASS_NAME)
                 .option("configuration", SerializationUtils.objectToString(configuration)).option("sink",
                         SerializationUtils.objectToString(sink));
     }