You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/03/28 09:05:59 UTC

[incubator-inlong] branch master updated: [INLONG-3396][Sort] Support multiple dataflow to write the same hive table

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b8c75eb  [INLONG-3396][Sort] Support multiple dataflow to write the same hive table
b8c75eb is described below

commit b8c75eb986e45d627858d5591048853e1e0e708f
Author: TianqiWan <52...@users.noreply.github.com>
AuthorDate: Mon Mar 28 17:05:52 2022 +0800

    [INLONG-3396][Sort] Support multiple dataflow to write the same hive table
---
 .../inlong/sort/flink/filesystem/Bucket.java       | 34 ++++++++++++++++++++--
 .../sort/flink/filesystem/BucketFactory.java       |  2 ++
 .../inlong/sort/flink/filesystem/Buckets.java      |  5 ++++
 .../flink/filesystem/DefaultBucketFactoryImpl.java |  4 +++
 .../sort/flink/filesystem/StreamingFileSink.java   |  5 ++++
 .../apache/inlong/sort/flink/hive/HiveWriter.java  |  3 ++
 6 files changed, 50 insertions(+), 3 deletions(-)

diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/Bucket.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/Bucket.java
index 1aa1f86..87476f7 100644
--- a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/Bucket.java
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/Bucket.java
@@ -51,6 +51,8 @@ public class Bucket<IN, BucketID> {
 
     private static final String PART_PREFIX = "part";
 
+    private final long dataFlowId;
+
     private final BucketID bucketId;
 
     private final Path bucketPath;
@@ -78,6 +80,7 @@ public class Bucket<IN, BucketID> {
      * Constructor to create a new empty bucket.
      */
     private Bucket(
+            final long dataFlowId,
             final RecoverableWriter fsWriter,
             final int subtaskIndex,
             final BucketID bucketId,
@@ -86,6 +89,7 @@ public class Bucket<IN, BucketID> {
             final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
             final RollingPolicy<IN, BucketID> rollingPolicy) {
 
+        this.dataFlowId = dataFlowId;
         this.fsWriter = checkNotNull(fsWriter);
         this.subtaskIndex = subtaskIndex;
         this.bucketId = checkNotNull(bucketId);
@@ -109,6 +113,7 @@ public class Bucket<IN, BucketID> {
      * Constructor to restore a bucket from checkpointed state.
      */
     private Bucket(
+            final long dataFlowId,
             final RecoverableWriter fsWriter,
             final int subtaskIndex,
             final long initialPartCounter,
@@ -117,6 +122,7 @@ public class Bucket<IN, BucketID> {
             final BucketState<BucketID> bucketState) throws IOException {
 
         this(
+                dataFlowId,
                 fsWriter,
                 subtaskIndex,
                 bucketState.getBucketId(),
@@ -222,7 +228,13 @@ public class Bucket<IN, BucketID> {
     }
 
     private Path assembleNewPartPath() {
-        return new Path(bucketPath, PART_PREFIX + '-' + subtaskIndex + '-' + partCounter);
+        // Use dataFlowId to distinguish files written by different jobs
+        // and use timestamp to distinguish files written by
+        // different attempts(which doesn't restore from checkpoint) of the same job
+        return new Path(
+                bucketPath,
+                PART_PREFIX + '-' + dataFlowId + '-' + subtaskIndex + '-'
+                        + partCounter + '-' + System.currentTimeMillis());
     }
 
     private CommitRecoverable closePartFile() throws IOException {
@@ -370,6 +382,7 @@ public class Bucket<IN, BucketID> {
      * @return The new Bucket.
      */
     static <I, B> Bucket<I, B> getNew(
+            final long dataFlowId,
             final RecoverableWriter fsWriter,
             final int subtaskIndex,
             final B bucketId,
@@ -377,7 +390,14 @@ public class Bucket<IN, BucketID> {
             final long initialPartCounter,
             final PartFileWriter.PartFileFactory<I, B> partFileFactory,
             final RollingPolicy<I, B> rollingPolicy) {
-        return new Bucket<>(fsWriter, subtaskIndex, bucketId, bucketPath, initialPartCounter, partFileFactory,
+        return new Bucket<>(
+                dataFlowId,
+                fsWriter,
+                subtaskIndex,
+                bucketId,
+                bucketPath,
+                initialPartCounter,
+                partFileFactory,
                 rollingPolicy);
     }
 
@@ -394,12 +414,20 @@ public class Bucket<IN, BucketID> {
      * @return The restored Bucket.
      */
     static <I, B> Bucket<I, B> restore(
+            final long dataFlowId,
             final RecoverableWriter fsWriter,
             final int subtaskIndex,
             final long initialPartCounter,
             final PartFileWriter.PartFileFactory<I, B> partFileFactory,
             final RollingPolicy<I, B> rollingPolicy,
             final BucketState<B> bucketState) throws IOException {
-        return new Bucket<>(fsWriter, subtaskIndex, initialPartCounter, partFileFactory, rollingPolicy, bucketState);
+        return new Bucket<>(
+                dataFlowId,
+                fsWriter,
+                subtaskIndex,
+                initialPartCounter,
+                partFileFactory,
+                rollingPolicy,
+                bucketState);
     }
 }
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/BucketFactory.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/BucketFactory.java
index 0b288a1..dca1007 100644
--- a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/BucketFactory.java
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/BucketFactory.java
@@ -29,6 +29,7 @@ import java.io.Serializable;
 public interface BucketFactory<IN, BucketID> extends Serializable {
 
     Bucket<IN, BucketID> getNewBucket(
+            final long dataFlowId,
             final RecoverableWriter fsWriter,
             final int subtaskIndex,
             final BucketID bucketId,
@@ -38,6 +39,7 @@ public interface BucketFactory<IN, BucketID> extends Serializable {
             final RollingPolicy<IN, BucketID> rollingPolicy) throws IOException;
 
     Bucket<IN, BucketID> restoreBucket(
+            final long dataFlowId,
             final RecoverableWriter fsWriter,
             final int subtaskIndex,
             final long initialPartCounter,
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/Buckets.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/Buckets.java
index c4a72c7..59e8852 100644
--- a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/Buckets.java
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/Buckets.java
@@ -45,6 +45,7 @@ public class Buckets<IN, BucketID> {
             Buckets.class);
 
     // ------------------------ configuration fields --------------------------
+    private final long dataFlowId;
 
     private final Path basePath;
 
@@ -82,6 +83,7 @@ public class Buckets<IN, BucketID> {
      * @param rollingPolicy The {@link RollingPolicy} as specified by the user.
      */
     Buckets(
+            final long dataFlowId,
             final Path basePath,
             final BucketAssigner<IN, BucketID> bucketAssigner,
             final BucketFactory<IN, BucketID> bucketFactory,
@@ -89,6 +91,7 @@ public class Buckets<IN, BucketID> {
             final RollingPolicy<IN, BucketID> rollingPolicy,
             final int subtaskIndex) throws IOException {
 
+        this.dataFlowId = dataFlowId;
         this.basePath = Preconditions.checkNotNull(basePath);
         this.bucketAssigner = Preconditions.checkNotNull(bucketAssigner);
         this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
@@ -169,6 +172,7 @@ public class Buckets<IN, BucketID> {
 
         final Bucket<IN, BucketID> restoredBucket = bucketFactory
                 .restoreBucket(
+                        dataFlowId,
                         fsWriter,
                         subtaskIndex,
                         maxPartCounter,
@@ -275,6 +279,7 @@ public class Buckets<IN, BucketID> {
         if (bucket == null) {
             final Path bucketPath = assembleBucketPath(bucketId);
             bucket = bucketFactory.getNewBucket(
+                    dataFlowId,
                     fsWriter,
                     subtaskIndex,
                     bucketId,
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/DefaultBucketFactoryImpl.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/DefaultBucketFactoryImpl.java
index 4ca26d8..6c084bd 100644
--- a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/DefaultBucketFactoryImpl.java
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/DefaultBucketFactoryImpl.java
@@ -32,6 +32,7 @@ public class DefaultBucketFactoryImpl<IN, BucketID> implements
 
     @Override
     public Bucket<IN, BucketID> getNewBucket(
+            final long dataFlowId,
             final RecoverableWriter fsWriter,
             final int subtaskIndex,
             final BucketID bucketId,
@@ -41,6 +42,7 @@ public class DefaultBucketFactoryImpl<IN, BucketID> implements
             final RollingPolicy<IN, BucketID> rollingPolicy) {
 
         return Bucket.getNew(
+                dataFlowId,
                 fsWriter,
                 subtaskIndex,
                 bucketId,
@@ -52,6 +54,7 @@ public class DefaultBucketFactoryImpl<IN, BucketID> implements
 
     @Override
     public Bucket<IN, BucketID> restoreBucket(
+            final long dataFlowId,
             final RecoverableWriter fsWriter,
             final int subtaskIndex,
             final long initialPartCounter,
@@ -60,6 +63,7 @@ public class DefaultBucketFactoryImpl<IN, BucketID> implements
             final BucketState<BucketID> bucketState) throws IOException {
 
         return Bucket.restore(
+                dataFlowId,
                 fsWriter,
                 subtaskIndex,
                 initialPartCounter,
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/StreamingFileSink.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/StreamingFileSink.java
index 73db29f..116588a 100644
--- a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/StreamingFileSink.java
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/StreamingFileSink.java
@@ -109,6 +109,8 @@ public class StreamingFileSink<IN>
 
         private long bucketCheckInterval = 60L * 1000L;
 
+        private final long dataFlowId;
+
         private final Path basePath;
 
         private final BulkWriter.Factory<IN> writerFactory;
@@ -120,10 +122,12 @@ public class StreamingFileSink<IN>
         private final RollingPolicy<IN, BucketID> rollingPolicy;
 
         public BulkFormatBuilder(
+                long dataFlowId,
                 Path basePath,
                 BulkWriter.Factory<IN> writerFactory,
                 BucketAssigner<IN, BucketID> assigner,
                 RollingPolicy<IN, BucketID> rollingPolicy) {
+            this.dataFlowId = dataFlowId;
             this.basePath = Preconditions.checkNotNull(basePath);
             this.writerFactory = writerFactory;
             this.bucketAssigner = Preconditions.checkNotNull(assigner);
@@ -151,6 +155,7 @@ public class StreamingFileSink<IN>
         @Override
         Buckets<IN, BucketID> createBuckets(int subtaskIndex) throws IOException {
             return new Buckets<>(
+                    dataFlowId,
                     basePath,
                     bucketAssigner,
                     bucketFactory,
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/HiveWriter.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/HiveWriter.java
index eb667dd..762f135 100644
--- a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/HiveWriter.java
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/HiveWriter.java
@@ -95,6 +95,7 @@ public class HiveWriter extends ProcessFunction<Row, PartitionCommitInfo>
                 hiveSinkInfo, configuration);
         final RowPartitionComputer rowPartitionComputer = new RowPartitionComputer("", hiveSinkInfo);
         final BulkFormatBuilder<Row, HivePartition> bulkFormatBuilder = new BulkFormatBuilder<>(
+                dataFlowId,
                 new Path(hiveSinkInfo.getDataPath()),
                 outputStream ->
                         new PartitionFilterBulkWriter(bulkWriterFactory.create(outputStream), rowPartitionComputer),
@@ -217,6 +218,7 @@ public class HiveWriter extends ProcessFunction<Row, PartitionCommitInfo>
 
         @Override
         public Bucket<Row, HivePartition> getNewBucket(
+                final long dataFlowId,
                 final RecoverableWriter fsWriter,
                 final int subtaskIndex,
                 final HivePartition bucketId,
@@ -225,6 +227,7 @@ public class HiveWriter extends ProcessFunction<Row, PartitionCommitInfo>
                 final PartFileWriter.PartFileFactory<Row, HivePartition> partFileWriterFactory,
                 final RollingPolicy<Row, HivePartition> rollingPolicy) {
             final Bucket<Row, HivePartition> newBucket = super.getNewBucket(
+                    dataFlowId,
                     fsWriter,
                     subtaskIndex,
                     bucketId,