You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/13 09:31:17 UTC
[incubator-seatunnel] branch api-draft updated: [Api-draft] Fix Spark Sink Can't support batch mode. (#1868)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new af7f6e01 [Api-draft] Fix Spark Sink Can't support batch mode. (#1868)
af7f6e01 is described below
commit af7f6e019a1d538e14de331e567d41da30b1ff97
Author: TrickyZerg <32...@users.noreply.github.com>
AuthorDate: Fri May 13 17:31:13 2022 +0800
[Api-draft] Fix Spark Sink Can't support batch mode. (#1868)
* Fix Spark Sink Can't support batch mode.
---
.../seatunnel/api/serialization/DefaultSerializer.java | 14 ++++++++------
.../apache/seatunnel/common/utils/SerializationUtils.java | 9 +++++++++
.../translation/spark/sink/SparkSinkInjector.java | 13 ++++++++++---
3 files changed, 27 insertions(+), 9 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
index ec213c69..c17a1d58 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
@@ -17,18 +17,20 @@
package org.apache.seatunnel.api.serialization;
+import org.apache.seatunnel.common.utils.SerializationUtils;
+
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
+import java.io.Serializable;
-public class DefaultSerializer implements Serializer<String> {
+public class DefaultSerializer<T extends Serializable> implements Serializer<T> {
@Override
- public byte[] serialize(String obj) throws IOException {
- return obj.getBytes(StandardCharsets.UTF_8);
+ public byte[] serialize(T obj) throws IOException {
+ return SerializationUtils.serialize(obj);
}
@Override
- public String deserialize(byte[] serialized) throws IOException {
- return new String(serialized);
+ public T deserialize(byte[] serialized) throws IOException {
+ return SerializationUtils.deserialize(serialized);
}
}
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
index 0b6e9c52..43dead16 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
@@ -37,4 +37,13 @@ public class SerializationUtils {
}
return null;
}
+
+ public static <T extends Serializable> byte[] serialize(T obj) {
+ return org.apache.commons.lang3.SerializationUtils.serialize(obj);
+ }
+
+ public static <T extends Serializable> T deserialize(byte[] bytes) {
+ return org.apache.commons.lang3.SerializationUtils.deserialize(bytes);
+ }
+
}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
index 8aeb4537..cb45b70d 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.translation.spark.sink;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.common.utils.SerializationUtils;
-import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
@@ -31,9 +31,16 @@ public class SparkSinkInjector {
private static final String SPARK_SINK_CLASS_NAME = "org.apache.seatunnel.translation.spark.sink.SparkSink";
- public static DataStreamWriter<Row> inject(Dataset<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink,
+ public static DataStreamWriter<Row> inject(DataStreamWriter<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink,
HashMap<String, String> configuration) {
- return dataset.writeStream().format(SPARK_SINK_CLASS_NAME).outputMode(OutputMode.Append())
+ return dataset.format(SPARK_SINK_CLASS_NAME).outputMode(OutputMode.Append())
+ .option("configuration", SerializationUtils.objectToString(configuration)).option("sink",
+ SerializationUtils.objectToString(sink));
+ }
+
+ public static DataFrameWriter<Row> inject(DataFrameWriter<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink,
+ HashMap<String, String> configuration) {
+ return dataset.format(SPARK_SINK_CLASS_NAME)
.option("configuration", SerializationUtils.objectToString(configuration)).option("sink",
SerializationUtils.objectToString(sink));
}