You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/06/10 02:52:59 UTC
[incubator-seatunnel] branch api-draft updated: [Api-Draft] improve class comment and fix spark row StringType convert error (#2002)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 9f1b474f [Api-Draft] improve class comment and fix spark row StringType convert error (#2002)
9f1b474f is described below
commit 9f1b474f8ce893196b3e92f90ecea6e91cae7519
Author: Hisoka <10...@qq.com>
AuthorDate: Fri Jun 10 10:52:54 2022 +0800
[Api-Draft] improve class comment and fix spark row StringType convert error (#2002)
* improve class comment and fix spark row StringType convert error
* change hive sink override method struct
---
.../org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java | 4 +++-
.../main/java/org/apache/seatunnel/api/sink/SinkCommitter.java | 7 +++++--
.../src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java | 9 ++++++---
.../connectors/seatunnel/console/sink/ConsoleSinkWriter.java | 2 +-
.../seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java | 4 ++--
.../connectors/seatunnel/kafka/sink/KafkaSinkWriter.java | 2 +-
.../connectors/seatunnel/kafka/sink/KafkaTransactionSender.java | 1 -
.../core/starter/spark/execution/SinkExecuteProcessor.java | 1 +
.../spark/serialization/InternalRowSerialization.java | 8 +++++++-
.../apache/seatunnel/translation/spark/sink/SparkDataWriter.java | 2 +-
10 files changed, 27 insertions(+), 13 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
index ff138ea7..5d0068a9 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
@@ -24,6 +24,8 @@ import java.util.List;
/**
* The committer combine taskManager/Worker Commit message. Then commit it uses
* {@link SinkAggregatedCommitter#commit(List)}. This class will execute in single thread.
+ * <p>
+ * See Also {@link SinkCommitter}
*
* @param <CommitInfoT> The type of commit message.
* @param <AggregatedCommitInfoT> The type of commit message after combine.
@@ -48,7 +50,7 @@ public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> ext
AggregatedCommitInfoT combine(List<CommitInfoT> commitInfos);
/**
- * If commit failed, this method will be called.
+ * If {@link #commit(List)} failed, this method will be called (**Only** on Spark engine at now).
*
* @param aggregatedCommitInfo The list of combine commit message.
* @throws Exception throw Exception when abort failed.
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
index 00e3b253..f5dad959 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
@@ -22,7 +22,10 @@ import java.io.Serializable;
import java.util.List;
/**
- * The committer to commit message.
+ * The committer to commit message. We strongly recommend implementing {@link SinkAggregatedCommitter} first,
+ * as the current version of {@link SinkAggregatedCommitter} can provide more consistent behavior.
+ * <p>
+ * See Also {@link SinkAggregatedCommitter}
*
* @param <CommitInfoT> The type of commit message.
*/
@@ -38,7 +41,7 @@ public interface SinkCommitter<CommitInfoT> extends Serializable {
List<CommitInfoT> commit(List<CommitInfoT> committables) throws IOException;
/**
- * Abort the transaction, this method will be called when the commit is failed.
+ * Abort the transaction, this method will be called (**Only** on Spark engine) when the commit is failed.
*
* @param commitInfos The list of commit message, used to abort the commit.
* @throws IOException throw IOException when close failed.
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
index 4e6a6d99..56f97bac 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
@@ -45,6 +45,7 @@ public interface SinkWriter<T, CommitInfoT, StateT> extends Serializable {
/**
* prepare the commit, will be called before {@link #snapshotState(long checkpointId)}.
* If you need to use 2pc, you can return the commit info in this method, and receive the commit info in {@link SinkCommitter#commit(List)}.
+ * If this method failed (by throw exception), **Only** Spark engine will call {@link #abortPrepare()}
*
* @return the commit info need to commit
*/
@@ -59,10 +60,12 @@ public interface SinkWriter<T, CommitInfoT, StateT> extends Serializable {
}
/**
- * Used to abort the prepareCommit, if the prepareCommit failed,
- * there is no CommitInfoT, so the rollback work cannot be done by {@link SinkCommitter}.
+ * Used to abort the {@link #prepareCommit()}, if the prepareCommit failed,
+ * there is no CommitInfoT, so the rollback work cannot be done by {@link SinkCommitter}. But we can
+ * use this method to rollback side effects of {@link #prepareCommit()}. Only use it in Spark engine at
+ * now.
*/
- void abort();
+ void abortPrepare();
/**
* call it when SinkWriter close
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
index b567e6a4..700afd61 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
@@ -50,7 +50,7 @@ public class ConsoleSinkWriter implements SinkWriter<SeaTunnelRow, ConsoleCommit
}
@Override
- public void abort() {
+ public void abortPrepare() {
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
index ad63a520..d97a3b32 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
@@ -81,7 +81,7 @@ public class HiveSinkWriter implements SinkWriter<SeaTunnelRow, HiveCommitInfo,
}
@Override
- public void abort() {
+ public void abortPrepare() {
fileWriter.abort();
}
@@ -91,7 +91,7 @@ public class HiveSinkWriter implements SinkWriter<SeaTunnelRow, HiveCommitInfo,
}
@Override
- public List<HiveSinkState> snapshotState() throws IOException {
+ public List<HiveSinkState> snapshotState(long checkpointId) throws IOException {
//reset FileWrite
fileWriter.resetFileWriter(System.currentTimeMillis() + "");
return Lists.newArrayList(new HiveSinkState(hiveSinkConfig));
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index d7f1d5e6..697da6a4 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -87,7 +87,7 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
}
@Override
- public void abort() {
+ public void abortPrepare() {
kafkaProducerSender.abortTransaction();
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
index 43ee5717..df4fc5b0 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
@@ -85,7 +85,6 @@ public class KafkaTransactionSender<K, V> implements KafkaProduceSender<K, V> {
LOGGER.debug("Abort kafka transaction: {}", kafkaState.getTransactionId());
}
KafkaProducer<K, V> historyProducer = getTransactionProducer(kafkaProperties, kafkaState.getTransactionId());
- historyProducer.initTransactions();
historyProducer.abortTransaction();
historyProducer.close();
}
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 3dfe9268..112f4fbc 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -70,6 +70,7 @@ public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunn
Config sinkConfig = pluginConfigs.get(i);
SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = plugins.get(i);
Dataset<Row> dataset = fromSourceTable(sinkConfig, sparkEnvironment).orElse(input);
+ // TODO modify checkpoint location
SparkSinkInjector.inject(dataset.write(), seaTunnelSink, new HashMap<>(Common.COLLECTION_SIZE)).option(
"checkpointLocation", "/tmp").save();
}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowSerialization.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowSerialization.java
index fe6f0a62..ac0ede0f 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowSerialization.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowSerialization.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.translation.serialization.RowSerialization;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
@@ -49,7 +51,11 @@ public final class InternalRowSerialization implements RowSerialization<Internal
public SeaTunnelRow deserialize(InternalRow engineRow) throws IOException {
Object[] fields = new Object[engineRow.numFields()];
for (int i = 0; i < engineRow.numFields(); i++) {
- fields[i] = engineRow.get(i, sparkSchema.apply(i).dataType());
+ DataType type = sparkSchema.apply(i).dataType();
+ fields[i] = engineRow.get(i, type);
+ if (type instanceof StringType) {
+ fields[i] = fields[i].toString();
+ }
}
return new SeaTunnelRow(fields);
}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
index 95ed6fd3..4bf0bdb0 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
@@ -83,7 +83,7 @@ public class SparkDataWriter<CommitInfoT, StateT> implements DataWriter<Internal
@Override
public void abort() throws IOException {
- sinkWriter.abort();
+ sinkWriter.abortPrepare();
if (sinkCommitter != null) {
if (latestCommitInfoT == null) {
sinkCommitter.abort(Collections.emptyList());