You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/06/10 02:52:59 UTC

[incubator-seatunnel] branch api-draft updated: [Api-Draft] improve class comment and fix spark row StringType convert error (#2002)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 9f1b474f [Api-Draft] improve class comment and fix spark row StringType convert error (#2002)
9f1b474f is described below

commit 9f1b474f8ce893196b3e92f90ecea6e91cae7519
Author: Hisoka <10...@qq.com>
AuthorDate: Fri Jun 10 10:52:54 2022 +0800

    [Api-Draft] improve class comment and fix spark row StringType convert error (#2002)
    
    * improve class comment and fix spark row StringType convert error
    
    * change hive sink override method struct
---
 .../org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java   | 4 +++-
 .../main/java/org/apache/seatunnel/api/sink/SinkCommitter.java   | 7 +++++--
 .../src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java  | 9 ++++++---
 .../connectors/seatunnel/console/sink/ConsoleSinkWriter.java     | 2 +-
 .../seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java | 4 ++--
 .../connectors/seatunnel/kafka/sink/KafkaSinkWriter.java         | 2 +-
 .../connectors/seatunnel/kafka/sink/KafkaTransactionSender.java  | 1 -
 .../core/starter/spark/execution/SinkExecuteProcessor.java       | 1 +
 .../spark/serialization/InternalRowSerialization.java            | 8 +++++++-
 .../apache/seatunnel/translation/spark/sink/SparkDataWriter.java | 2 +-
 10 files changed, 27 insertions(+), 13 deletions(-)

diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
index ff138ea7..5d0068a9 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
@@ -24,6 +24,8 @@ import java.util.List;
 /**
  * The committer combine taskManager/Worker Commit message. Then commit it uses
  * {@link SinkAggregatedCommitter#commit(List)}. This class will execute in single thread.
+ * <p>
+ * See Also {@link SinkCommitter}
  *
  * @param <CommitInfoT>           The type of commit message.
  * @param <AggregatedCommitInfoT> The type of commit message after combine.
@@ -48,7 +50,7 @@ public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> ext
     AggregatedCommitInfoT combine(List<CommitInfoT> commitInfos);
 
     /**
-     * If commit failed, this method will be called.
+     * If {@link #commit(List)} failed, this method will be called (**Only** on Spark engine at now).
      *
      * @param aggregatedCommitInfo The list of combine commit message.
      * @throws Exception throw Exception when abort failed.
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
index 00e3b253..f5dad959 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
@@ -22,7 +22,10 @@ import java.io.Serializable;
 import java.util.List;
 
 /**
- * The committer to commit message.
+ * The committer to commit message. We strongly recommend implementing {@link SinkAggregatedCommitter} first,
+ * as the current version of {@link SinkAggregatedCommitter} can provide more consistent behavior.
+ * <p>
+ * See Also {@link SinkAggregatedCommitter}
  *
  * @param <CommitInfoT> The type of commit message.
  */
@@ -38,7 +41,7 @@ public interface SinkCommitter<CommitInfoT> extends Serializable {
     List<CommitInfoT> commit(List<CommitInfoT> committables) throws IOException;
 
     /**
-     * Abort the transaction, this method will be called when the commit is failed.
+     * Abort the transaction, this method will be called (**Only** on Spark engine) when the commit is failed.
      *
      * @param commitInfos The list of commit message, used to abort the commit.
      * @throws IOException throw IOException when close failed.
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
index 4e6a6d99..56f97bac 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
@@ -45,6 +45,7 @@ public interface SinkWriter<T, CommitInfoT, StateT> extends Serializable {
     /**
      * prepare the commit, will be called before {@link #snapshotState(long checkpointId)}.
      * If you need to use 2pc, you can return the commit info in this method, and receive the commit info in {@link SinkCommitter#commit(List)}.
+     * If this method failed (by throw exception), **Only** Spark engine will call {@link #abortPrepare()}
      *
      * @return the commit info need to commit
      */
@@ -59,10 +60,12 @@ public interface SinkWriter<T, CommitInfoT, StateT> extends Serializable {
     }
 
     /**
-     * Used to abort the prepareCommit, if the prepareCommit failed,
-     * there is no CommitInfoT, so the rollback work cannot be done by {@link SinkCommitter}.
+     * Used to abort the {@link #prepareCommit()}, if the prepareCommit failed,
+     * there is no CommitInfoT, so the rollback work cannot be done by {@link SinkCommitter}. But we can
+     * use this method to rollback side effects of {@link #prepareCommit()}. Only use it in Spark engine at
+     * now.
      */
-    void abort();
+    void abortPrepare();
 
     /**
      * call it when SinkWriter close
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
index b567e6a4..700afd61 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
@@ -50,7 +50,7 @@ public class ConsoleSinkWriter implements SinkWriter<SeaTunnelRow, ConsoleCommit
     }
 
     @Override
-    public void abort() {
+    public void abortPrepare() {
 
     }
 
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
index ad63a520..d97a3b32 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
@@ -81,7 +81,7 @@ public class HiveSinkWriter implements SinkWriter<SeaTunnelRow, HiveCommitInfo,
     }
 
     @Override
-    public void abort() {
+    public void abortPrepare() {
         fileWriter.abort();
     }
 
@@ -91,7 +91,7 @@ public class HiveSinkWriter implements SinkWriter<SeaTunnelRow, HiveCommitInfo,
     }
 
     @Override
-    public List<HiveSinkState> snapshotState() throws IOException {
+    public List<HiveSinkState> snapshotState(long checkpointId) throws IOException {
         //reset FileWrite
         fileWriter.resetFileWriter(System.currentTimeMillis() + "");
         return Lists.newArrayList(new HiveSinkState(hiveSinkConfig));
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index d7f1d5e6..697da6a4 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -87,7 +87,7 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
     }
 
     @Override
-    public void abort() {
+    public void abortPrepare() {
         kafkaProducerSender.abortTransaction();
     }
 
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
index 43ee5717..df4fc5b0 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
@@ -85,7 +85,6 @@ public class KafkaTransactionSender<K, V> implements KafkaProduceSender<K, V> {
                 LOGGER.debug("Abort kafka transaction: {}", kafkaState.getTransactionId());
             }
             KafkaProducer<K, V> historyProducer = getTransactionProducer(kafkaProperties, kafkaState.getTransactionId());
-            historyProducer.initTransactions();
             historyProducer.abortTransaction();
             historyProducer.close();
         }
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 3dfe9268..112f4fbc 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -70,6 +70,7 @@ public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunn
             Config sinkConfig = pluginConfigs.get(i);
             SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = plugins.get(i);
             Dataset<Row> dataset = fromSourceTable(sinkConfig, sparkEnvironment).orElse(input);
+            // TODO modify checkpoint location
             SparkSinkInjector.inject(dataset.write(), seaTunnelSink, new HashMap<>(Common.COLLECTION_SIZE)).option(
                 "checkpointLocation", "/tmp").save();
         }
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowSerialization.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowSerialization.java
index fe6f0a62..ac0ede0f 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowSerialization.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowSerialization.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.translation.serialization.RowSerialization;
 
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StringType;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
 
@@ -49,7 +51,11 @@ public final class InternalRowSerialization implements RowSerialization<Internal
     public SeaTunnelRow deserialize(InternalRow engineRow) throws IOException {
         Object[] fields = new Object[engineRow.numFields()];
         for (int i = 0; i < engineRow.numFields(); i++) {
-            fields[i] = engineRow.get(i, sparkSchema.apply(i).dataType());
+            DataType type = sparkSchema.apply(i).dataType();
+            fields[i] = engineRow.get(i, type);
+            if (type instanceof StringType) {
+                fields[i] = fields[i].toString();
+            }
         }
         return new SeaTunnelRow(fields);
     }
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
index 95ed6fd3..4bf0bdb0 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
@@ -83,7 +83,7 @@ public class SparkDataWriter<CommitInfoT, StateT> implements DataWriter<Internal
 
     @Override
     public void abort() throws IOException {
-        sinkWriter.abort();
+        sinkWriter.abortPrepare();
         if (sinkCommitter != null) {
             if (latestCommitInfoT == null) {
                 sinkCommitter.abort(Collections.emptyList());