You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/01 02:35:17 UTC

[incubator-seatunnel] branch dev updated: Remove same code to independent method in HiveSinkWriter (#2307)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e99e6ee72 Remove same code to independent method in HiveSinkWriter (#2307)
e99e6ee72 is described below

commit e99e6ee726acfd48dc2a5809f0d6b58b0e1e8356
Author: Xiao Zhao <zh...@163.com>
AuthorDate: Mon Aug 1 10:35:13 2022 +0800

    Remove same code to independent method in HiveSinkWriter (#2307)
---
 .../connectors/seatunnel/hive/sink/HiveSink.java   |  2 +-
 .../seatunnel/hive/sink/HiveSinkWriter.java        | 85 +++++++++-------------
 2 files changed, 36 insertions(+), 51 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index 4df91b1a5..bcdca3d49 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -113,7 +113,7 @@ public class HiveSink implements SeaTunnelSink<SeaTunnelRow, HiveSinkState, Hive
 
     @Override
     public Optional<Serializer<HiveAggregatedCommitInfo>> getAggregatedCommitInfoSerializer() {
-        return Optional.of(new DefaultSerializer<HiveAggregatedCommitInfo>());
+        return Optional.of(new DefaultSerializer<>());
     }
 
     @Override
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
index 4aff955cd..9192a2738 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
@@ -63,31 +63,7 @@ public class HiveSinkWriter implements SinkWriter<SeaTunnelRow, HiveCommitInfo,
         this.context = context;
         this.jobId = jobId;
         this.hiveSinkConfig = hiveSinkConfig;
-
-        SinkFileSystemPlugin sinkFileSystemPlugin = new HdfsFileSinkPlugin();
-        Optional<TransactionStateFileWriter> transactionStateFileWriter = sinkFileSystemPlugin.getTransactionStateFileWriter(this.seaTunnelRowTypeInfo,
-            new FileSinkTransactionFileNameGenerator(
-                this.hiveSinkConfig.getTextFileSinkConfig().getFileFormat(),
-                this.hiveSinkConfig.getTextFileSinkConfig().getFileNameExpression(),
-                this.hiveSinkConfig.getTextFileSinkConfig().getFileNameTimeFormat()),
-            new FileSinkPartitionDirNameGenerator(
-                this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldList(),
-                this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldsIndexInRow(),
-                this.hiveSinkConfig.getTextFileSinkConfig().getPartitionDirExpression()),
-            this.hiveSinkConfig.getTextFileSinkConfig().getSinkColumnsIndexInRow(),
-            this.hiveSinkConfig.getTextFileSinkConfig().getTmpPath(),
-            this.hiveSinkConfig.getTextFileSinkConfig().getPath(),
-            this.jobId,
-            this.context.getIndexOfSubtask(),
-            this.hiveSinkConfig.getTextFileSinkConfig().getFieldDelimiter(),
-            this.hiveSinkConfig.getTextFileSinkConfig().getRowDelimiter(),
-            sinkFileSystemPlugin.getFileSystem().get());
-
-        if (!transactionStateFileWriter.isPresent()) {
-            throw new RuntimeException("A TransactionStateFileWriter is need");
-        }
-
-        this.fileWriter = transactionStateFileWriter.get();
+        this.fileWriter = createFileWriter();
 
         fileWriter.beginTransaction(1L);
     }
@@ -103,31 +79,7 @@ public class HiveSinkWriter implements SinkWriter<SeaTunnelRow, HiveCommitInfo,
         this.context = context;
         this.jobId = jobId;
         this.hiveSinkConfig = hiveSinkConfig;
-
-        SinkFileSystemPlugin sinkFileSystemPlugin = new HdfsFileSinkPlugin();
-        Optional<TransactionStateFileWriter> transactionStateFileWriter = sinkFileSystemPlugin.getTransactionStateFileWriter(this.seaTunnelRowTypeInfo,
-            new FileSinkTransactionFileNameGenerator(
-                this.hiveSinkConfig.getTextFileSinkConfig().getFileFormat(),
-                this.hiveSinkConfig.getTextFileSinkConfig().getFileNameExpression(),
-                this.hiveSinkConfig.getTextFileSinkConfig().getFileNameTimeFormat()),
-            new FileSinkPartitionDirNameGenerator(
-                this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldList(),
-                this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldsIndexInRow(),
-                this.hiveSinkConfig.getTextFileSinkConfig().getPartitionDirExpression()),
-            this.hiveSinkConfig.getTextFileSinkConfig().getSinkColumnsIndexInRow(),
-            this.hiveSinkConfig.getTextFileSinkConfig().getTmpPath(),
-            this.hiveSinkConfig.getTextFileSinkConfig().getPath(),
-            this.jobId,
-            this.context.getIndexOfSubtask(),
-            this.hiveSinkConfig.getTextFileSinkConfig().getFieldDelimiter(),
-            this.hiveSinkConfig.getTextFileSinkConfig().getRowDelimiter(),
-            sinkFileSystemPlugin.getFileSystem().get());
-
-        if (!transactionStateFileWriter.isPresent()) {
-            throw new RuntimeException("A TransactionStateFileWriter is need");
-        }
-
-        this.fileWriter = transactionStateFileWriter.get();
+        this.fileWriter = createFileWriter();
 
         // Rollback dirty transaction
         if (hiveSinkStates.size() > 0) {
@@ -172,4 +124,37 @@ public class HiveSinkWriter implements SinkWriter<SeaTunnelRow, HiveCommitInfo,
     public void abortPrepare() {
         fileWriter.abortTransaction();
     }
+
+    private TransactionStateFileWriter createFileWriter() {
+        SinkFileSystemPlugin sinkFileSystemPlugin = new HdfsFileSinkPlugin();
+        Optional<TransactionStateFileWriter> transactionStateFileWriterOpt = sinkFileSystemPlugin.getTransactionStateFileWriter(this.seaTunnelRowTypeInfo,
+                getFilenameGenerator(),
+                getPartitionDirNameGenerator(),
+                this.hiveSinkConfig.getTextFileSinkConfig().getSinkColumnsIndexInRow(),
+                this.hiveSinkConfig.getTextFileSinkConfig().getTmpPath(),
+                this.hiveSinkConfig.getTextFileSinkConfig().getPath(),
+                this.jobId,
+                this.context.getIndexOfSubtask(),
+                this.hiveSinkConfig.getTextFileSinkConfig().getFieldDelimiter(),
+                this.hiveSinkConfig.getTextFileSinkConfig().getRowDelimiter(),
+                sinkFileSystemPlugin.getFileSystem().get());
+        if (!transactionStateFileWriterOpt.isPresent()) {
+            throw new RuntimeException("A TransactionStateFileWriter is need");
+        }
+        return transactionStateFileWriterOpt.get();
+    }
+
+    private FileSinkTransactionFileNameGenerator getFilenameGenerator() {
+        return new FileSinkTransactionFileNameGenerator(
+                this.hiveSinkConfig.getTextFileSinkConfig().getFileFormat(),
+                this.hiveSinkConfig.getTextFileSinkConfig().getFileNameExpression(),
+                this.hiveSinkConfig.getTextFileSinkConfig().getFileNameTimeFormat());
+    }
+
+    private FileSinkPartitionDirNameGenerator getPartitionDirNameGenerator() {
+        return new FileSinkPartitionDirNameGenerator(
+                this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldList(),
+                this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldsIndexInRow(),
+                this.hiveSinkConfig.getTextFileSinkConfig().getPartitionDirExpression());
+    }
 }