You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/03/28 09:05:59 UTC
[incubator-inlong] branch master updated: [INLONG-3396][Sort] Support multiple dataflow to write the same hive table
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b8c75eb [INLONG-3396][Sort] Support multiple dataflow to write the same hive table
b8c75eb is described below
commit b8c75eb986e45d627858d5591048853e1e0e708f
Author: TianqiWan <52...@users.noreply.github.com>
AuthorDate: Mon Mar 28 17:05:52 2022 +0800
[INLONG-3396][Sort] Support multiple dataflow to write the same hive table
---
.../inlong/sort/flink/filesystem/Bucket.java | 34 ++++++++++++++++++++--
.../sort/flink/filesystem/BucketFactory.java | 2 ++
.../inlong/sort/flink/filesystem/Buckets.java | 5 ++++
.../flink/filesystem/DefaultBucketFactoryImpl.java | 4 +++
.../sort/flink/filesystem/StreamingFileSink.java | 5 ++++
.../apache/inlong/sort/flink/hive/HiveWriter.java | 3 ++
6 files changed, 50 insertions(+), 3 deletions(-)
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/Bucket.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/Bucket.java
index 1aa1f86..87476f7 100644
--- a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/Bucket.java
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/Bucket.java
@@ -51,6 +51,8 @@ public class Bucket<IN, BucketID> {
private static final String PART_PREFIX = "part";
+ private final long dataFlowId;
+
private final BucketID bucketId;
private final Path bucketPath;
@@ -78,6 +80,7 @@ public class Bucket<IN, BucketID> {
* Constructor to create a new empty bucket.
*/
private Bucket(
+ final long dataFlowId,
final RecoverableWriter fsWriter,
final int subtaskIndex,
final BucketID bucketId,
@@ -86,6 +89,7 @@ public class Bucket<IN, BucketID> {
final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
final RollingPolicy<IN, BucketID> rollingPolicy) {
+ this.dataFlowId = dataFlowId;
this.fsWriter = checkNotNull(fsWriter);
this.subtaskIndex = subtaskIndex;
this.bucketId = checkNotNull(bucketId);
@@ -109,6 +113,7 @@ public class Bucket<IN, BucketID> {
* Constructor to restore a bucket from checkpointed state.
*/
private Bucket(
+ final long dataFlowId,
final RecoverableWriter fsWriter,
final int subtaskIndex,
final long initialPartCounter,
@@ -117,6 +122,7 @@ public class Bucket<IN, BucketID> {
final BucketState<BucketID> bucketState) throws IOException {
this(
+ dataFlowId,
fsWriter,
subtaskIndex,
bucketState.getBucketId(),
@@ -222,7 +228,13 @@ public class Bucket<IN, BucketID> {
}
private Path assembleNewPartPath() {
- return new Path(bucketPath, PART_PREFIX + '-' + subtaskIndex + '-' + partCounter);
+ // Use dataFlowId to distinguish files written by different jobs
+ // and use timestamp to distinguish files written by
+ // different attempts(which doesn't restore from checkpoint) of the same job
+ return new Path(
+ bucketPath,
+ PART_PREFIX + '-' + dataFlowId + '-' + subtaskIndex + '-'
+ + partCounter + '-' + System.currentTimeMillis());
}
private CommitRecoverable closePartFile() throws IOException {
@@ -370,6 +382,7 @@ public class Bucket<IN, BucketID> {
* @return The new Bucket.
*/
static <I, B> Bucket<I, B> getNew(
+ final long dataFlowId,
final RecoverableWriter fsWriter,
final int subtaskIndex,
final B bucketId,
@@ -377,7 +390,14 @@ public class Bucket<IN, BucketID> {
final long initialPartCounter,
final PartFileWriter.PartFileFactory<I, B> partFileFactory,
final RollingPolicy<I, B> rollingPolicy) {
- return new Bucket<>(fsWriter, subtaskIndex, bucketId, bucketPath, initialPartCounter, partFileFactory,
+ return new Bucket<>(
+ dataFlowId,
+ fsWriter,
+ subtaskIndex,
+ bucketId,
+ bucketPath,
+ initialPartCounter,
+ partFileFactory,
rollingPolicy);
}
@@ -394,12 +414,20 @@ public class Bucket<IN, BucketID> {
* @return The restored Bucket.
*/
static <I, B> Bucket<I, B> restore(
+ final long dataFlowId,
final RecoverableWriter fsWriter,
final int subtaskIndex,
final long initialPartCounter,
final PartFileWriter.PartFileFactory<I, B> partFileFactory,
final RollingPolicy<I, B> rollingPolicy,
final BucketState<B> bucketState) throws IOException {
- return new Bucket<>(fsWriter, subtaskIndex, initialPartCounter, partFileFactory, rollingPolicy, bucketState);
+ return new Bucket<>(
+ dataFlowId,
+ fsWriter,
+ subtaskIndex,
+ initialPartCounter,
+ partFileFactory,
+ rollingPolicy,
+ bucketState);
}
}
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/BucketFactory.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/BucketFactory.java
index 0b288a1..dca1007 100644
--- a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/BucketFactory.java
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/BucketFactory.java
@@ -29,6 +29,7 @@ import java.io.Serializable;
public interface BucketFactory<IN, BucketID> extends Serializable {
Bucket<IN, BucketID> getNewBucket(
+ final long dataFlowId,
final RecoverableWriter fsWriter,
final int subtaskIndex,
final BucketID bucketId,
@@ -38,6 +39,7 @@ public interface BucketFactory<IN, BucketID> extends Serializable {
final RollingPolicy<IN, BucketID> rollingPolicy) throws IOException;
Bucket<IN, BucketID> restoreBucket(
+ final long dataFlowId,
final RecoverableWriter fsWriter,
final int subtaskIndex,
final long initialPartCounter,
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/Buckets.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/Buckets.java
index c4a72c7..59e8852 100644
--- a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/Buckets.java
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/Buckets.java
@@ -45,6 +45,7 @@ public class Buckets<IN, BucketID> {
Buckets.class);
// ------------------------ configuration fields --------------------------
+ private final long dataFlowId;
private final Path basePath;
@@ -82,6 +83,7 @@ public class Buckets<IN, BucketID> {
* @param rollingPolicy The {@link RollingPolicy} as specified by the user.
*/
Buckets(
+ final long dataFlowId,
final Path basePath,
final BucketAssigner<IN, BucketID> bucketAssigner,
final BucketFactory<IN, BucketID> bucketFactory,
@@ -89,6 +91,7 @@ public class Buckets<IN, BucketID> {
final RollingPolicy<IN, BucketID> rollingPolicy,
final int subtaskIndex) throws IOException {
+ this.dataFlowId = dataFlowId;
this.basePath = Preconditions.checkNotNull(basePath);
this.bucketAssigner = Preconditions.checkNotNull(bucketAssigner);
this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
@@ -169,6 +172,7 @@ public class Buckets<IN, BucketID> {
final Bucket<IN, BucketID> restoredBucket = bucketFactory
.restoreBucket(
+ dataFlowId,
fsWriter,
subtaskIndex,
maxPartCounter,
@@ -275,6 +279,7 @@ public class Buckets<IN, BucketID> {
if (bucket == null) {
final Path bucketPath = assembleBucketPath(bucketId);
bucket = bucketFactory.getNewBucket(
+ dataFlowId,
fsWriter,
subtaskIndex,
bucketId,
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/DefaultBucketFactoryImpl.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/DefaultBucketFactoryImpl.java
index 4ca26d8..6c084bd 100644
--- a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/DefaultBucketFactoryImpl.java
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/DefaultBucketFactoryImpl.java
@@ -32,6 +32,7 @@ public class DefaultBucketFactoryImpl<IN, BucketID> implements
@Override
public Bucket<IN, BucketID> getNewBucket(
+ final long dataFlowId,
final RecoverableWriter fsWriter,
final int subtaskIndex,
final BucketID bucketId,
@@ -41,6 +42,7 @@ public class DefaultBucketFactoryImpl<IN, BucketID> implements
final RollingPolicy<IN, BucketID> rollingPolicy) {
return Bucket.getNew(
+ dataFlowId,
fsWriter,
subtaskIndex,
bucketId,
@@ -52,6 +54,7 @@ public class DefaultBucketFactoryImpl<IN, BucketID> implements
@Override
public Bucket<IN, BucketID> restoreBucket(
+ final long dataFlowId,
final RecoverableWriter fsWriter,
final int subtaskIndex,
final long initialPartCounter,
@@ -60,6 +63,7 @@ public class DefaultBucketFactoryImpl<IN, BucketID> implements
final BucketState<BucketID> bucketState) throws IOException {
return Bucket.restore(
+ dataFlowId,
fsWriter,
subtaskIndex,
initialPartCounter,
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/StreamingFileSink.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/StreamingFileSink.java
index 73db29f..116588a 100644
--- a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/StreamingFileSink.java
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/StreamingFileSink.java
@@ -109,6 +109,8 @@ public class StreamingFileSink<IN>
private long bucketCheckInterval = 60L * 1000L;
+ private final long dataFlowId;
+
private final Path basePath;
private final BulkWriter.Factory<IN> writerFactory;
@@ -120,10 +122,12 @@ public class StreamingFileSink<IN>
private final RollingPolicy<IN, BucketID> rollingPolicy;
public BulkFormatBuilder(
+ long dataFlowId,
Path basePath,
BulkWriter.Factory<IN> writerFactory,
BucketAssigner<IN, BucketID> assigner,
RollingPolicy<IN, BucketID> rollingPolicy) {
+ this.dataFlowId = dataFlowId;
this.basePath = Preconditions.checkNotNull(basePath);
this.writerFactory = writerFactory;
this.bucketAssigner = Preconditions.checkNotNull(assigner);
@@ -151,6 +155,7 @@ public class StreamingFileSink<IN>
@Override
Buckets<IN, BucketID> createBuckets(int subtaskIndex) throws IOException {
return new Buckets<>(
+ dataFlowId,
basePath,
bucketAssigner,
bucketFactory,
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/HiveWriter.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/HiveWriter.java
index eb667dd..762f135 100644
--- a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/HiveWriter.java
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/HiveWriter.java
@@ -95,6 +95,7 @@ public class HiveWriter extends ProcessFunction<Row, PartitionCommitInfo>
hiveSinkInfo, configuration);
final RowPartitionComputer rowPartitionComputer = new RowPartitionComputer("", hiveSinkInfo);
final BulkFormatBuilder<Row, HivePartition> bulkFormatBuilder = new BulkFormatBuilder<>(
+ dataFlowId,
new Path(hiveSinkInfo.getDataPath()),
outputStream ->
new PartitionFilterBulkWriter(bulkWriterFactory.create(outputStream), rowPartitionComputer),
@@ -217,6 +218,7 @@ public class HiveWriter extends ProcessFunction<Row, PartitionCommitInfo>
@Override
public Bucket<Row, HivePartition> getNewBucket(
+ final long dataFlowId,
final RecoverableWriter fsWriter,
final int subtaskIndex,
final HivePartition bucketId,
@@ -225,6 +227,7 @@ public class HiveWriter extends ProcessFunction<Row, PartitionCommitInfo>
final PartFileWriter.PartFileFactory<Row, HivePartition> partFileWriterFactory,
final RollingPolicy<Row, HivePartition> rollingPolicy) {
final Bucket<Row, HivePartition> newBucket = super.getNewBucket(
+ dataFlowId,
fsWriter,
subtaskIndex,
bucketId,