You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/01 02:35:17 UTC
[incubator-seatunnel] branch dev updated: Remove same code to independent method in HiveSinkWriter (#2307)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e99e6ee72 Remove same code to independent method in HiveSinkWriter (#2307)
e99e6ee72 is described below
commit e99e6ee726acfd48dc2a5809f0d6b58b0e1e8356
Author: Xiao Zhao <zh...@163.com>
AuthorDate: Mon Aug 1 10:35:13 2022 +0800
Remove same code to independent method in HiveSinkWriter (#2307)
---
.../connectors/seatunnel/hive/sink/HiveSink.java | 2 +-
.../seatunnel/hive/sink/HiveSinkWriter.java | 85 +++++++++-------------
2 files changed, 36 insertions(+), 51 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index 4df91b1a5..bcdca3d49 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -113,7 +113,7 @@ public class HiveSink implements SeaTunnelSink<SeaTunnelRow, HiveSinkState, Hive
@Override
public Optional<Serializer<HiveAggregatedCommitInfo>> getAggregatedCommitInfoSerializer() {
- return Optional.of(new DefaultSerializer<HiveAggregatedCommitInfo>());
+ return Optional.of(new DefaultSerializer<>());
}
@Override
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
index 4aff955cd..9192a2738 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
@@ -63,31 +63,7 @@ public class HiveSinkWriter implements SinkWriter<SeaTunnelRow, HiveCommitInfo,
this.context = context;
this.jobId = jobId;
this.hiveSinkConfig = hiveSinkConfig;
-
- SinkFileSystemPlugin sinkFileSystemPlugin = new HdfsFileSinkPlugin();
- Optional<TransactionStateFileWriter> transactionStateFileWriter = sinkFileSystemPlugin.getTransactionStateFileWriter(this.seaTunnelRowTypeInfo,
- new FileSinkTransactionFileNameGenerator(
- this.hiveSinkConfig.getTextFileSinkConfig().getFileFormat(),
- this.hiveSinkConfig.getTextFileSinkConfig().getFileNameExpression(),
- this.hiveSinkConfig.getTextFileSinkConfig().getFileNameTimeFormat()),
- new FileSinkPartitionDirNameGenerator(
- this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldList(),
- this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldsIndexInRow(),
- this.hiveSinkConfig.getTextFileSinkConfig().getPartitionDirExpression()),
- this.hiveSinkConfig.getTextFileSinkConfig().getSinkColumnsIndexInRow(),
- this.hiveSinkConfig.getTextFileSinkConfig().getTmpPath(),
- this.hiveSinkConfig.getTextFileSinkConfig().getPath(),
- this.jobId,
- this.context.getIndexOfSubtask(),
- this.hiveSinkConfig.getTextFileSinkConfig().getFieldDelimiter(),
- this.hiveSinkConfig.getTextFileSinkConfig().getRowDelimiter(),
- sinkFileSystemPlugin.getFileSystem().get());
-
- if (!transactionStateFileWriter.isPresent()) {
- throw new RuntimeException("A TransactionStateFileWriter is need");
- }
-
- this.fileWriter = transactionStateFileWriter.get();
+ this.fileWriter = createFileWriter();
fileWriter.beginTransaction(1L);
}
@@ -103,31 +79,7 @@ public class HiveSinkWriter implements SinkWriter<SeaTunnelRow, HiveCommitInfo,
this.context = context;
this.jobId = jobId;
this.hiveSinkConfig = hiveSinkConfig;
-
- SinkFileSystemPlugin sinkFileSystemPlugin = new HdfsFileSinkPlugin();
- Optional<TransactionStateFileWriter> transactionStateFileWriter = sinkFileSystemPlugin.getTransactionStateFileWriter(this.seaTunnelRowTypeInfo,
- new FileSinkTransactionFileNameGenerator(
- this.hiveSinkConfig.getTextFileSinkConfig().getFileFormat(),
- this.hiveSinkConfig.getTextFileSinkConfig().getFileNameExpression(),
- this.hiveSinkConfig.getTextFileSinkConfig().getFileNameTimeFormat()),
- new FileSinkPartitionDirNameGenerator(
- this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldList(),
- this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldsIndexInRow(),
- this.hiveSinkConfig.getTextFileSinkConfig().getPartitionDirExpression()),
- this.hiveSinkConfig.getTextFileSinkConfig().getSinkColumnsIndexInRow(),
- this.hiveSinkConfig.getTextFileSinkConfig().getTmpPath(),
- this.hiveSinkConfig.getTextFileSinkConfig().getPath(),
- this.jobId,
- this.context.getIndexOfSubtask(),
- this.hiveSinkConfig.getTextFileSinkConfig().getFieldDelimiter(),
- this.hiveSinkConfig.getTextFileSinkConfig().getRowDelimiter(),
- sinkFileSystemPlugin.getFileSystem().get());
-
- if (!transactionStateFileWriter.isPresent()) {
- throw new RuntimeException("A TransactionStateFileWriter is need");
- }
-
- this.fileWriter = transactionStateFileWriter.get();
+ this.fileWriter = createFileWriter();
// Rollback dirty transaction
if (hiveSinkStates.size() > 0) {
@@ -172,4 +124,37 @@ public class HiveSinkWriter implements SinkWriter<SeaTunnelRow, HiveCommitInfo,
public void abortPrepare() {
fileWriter.abortTransaction();
}
+
+ private TransactionStateFileWriter createFileWriter() {
+ SinkFileSystemPlugin sinkFileSystemPlugin = new HdfsFileSinkPlugin();
+ Optional<TransactionStateFileWriter> transactionStateFileWriterOpt = sinkFileSystemPlugin.getTransactionStateFileWriter(this.seaTunnelRowTypeInfo,
+ getFilenameGenerator(),
+ getPartitionDirNameGenerator(),
+ this.hiveSinkConfig.getTextFileSinkConfig().getSinkColumnsIndexInRow(),
+ this.hiveSinkConfig.getTextFileSinkConfig().getTmpPath(),
+ this.hiveSinkConfig.getTextFileSinkConfig().getPath(),
+ this.jobId,
+ this.context.getIndexOfSubtask(),
+ this.hiveSinkConfig.getTextFileSinkConfig().getFieldDelimiter(),
+ this.hiveSinkConfig.getTextFileSinkConfig().getRowDelimiter(),
+ sinkFileSystemPlugin.getFileSystem().get());
+ if (!transactionStateFileWriterOpt.isPresent()) {
+ throw new RuntimeException("A TransactionStateFileWriter is need");
+ }
+ return transactionStateFileWriterOpt.get();
+ }
+
+ private FileSinkTransactionFileNameGenerator getFilenameGenerator() {
+ return new FileSinkTransactionFileNameGenerator(
+ this.hiveSinkConfig.getTextFileSinkConfig().getFileFormat(),
+ this.hiveSinkConfig.getTextFileSinkConfig().getFileNameExpression(),
+ this.hiveSinkConfig.getTextFileSinkConfig().getFileNameTimeFormat());
+ }
+
+ private FileSinkPartitionDirNameGenerator getPartitionDirNameGenerator() {
+ return new FileSinkPartitionDirNameGenerator(
+ this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldList(),
+ this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldsIndexInRow(),
+ this.hiveSinkConfig.getTextFileSinkConfig().getPartitionDirExpression());
+ }
}