You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/12/04 16:21:50 UTC
[flink] 01/06: [hotfix][fs-connector] Refactor PartFileWriter to
take stream.
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0c1328aa4548953ff358da10052cc3644ec5c2f2
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Wed Nov 21 10:35:58 2018 +0100
[hotfix][fs-connector] Refactor PartFileWriter to take stream.
---
.../flink/streaming/api/functions/sink/filesystem/Bucket.java | 7 +++++--
.../api/functions/sink/filesystem/BulkPartWriter.java | 10 ++++------
.../api/functions/sink/filesystem/PartFileWriter.java | 8 ++++----
.../api/functions/sink/filesystem/RowWisePartWriter.java | 10 ++++------
4 files changed, 17 insertions(+), 18 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index 65a7628..042bcda 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.util.Preconditions;
@@ -124,8 +125,9 @@ public class Bucket<IN, BucketID> {
// we try to resume the previous in-progress file
if (state.hasInProgressResumableFile()) {
final RecoverableWriter.ResumeRecoverable resumable = state.getInProgressResumableFile();
+ final RecoverableFsDataOutputStream stream = fsWriter.recover(resumable);
inProgressPart = partFileFactory.resumeFrom(
- bucketId, fsWriter, resumable, state.getInProgressFileCreationTime());
+ bucketId, stream, resumable, state.getInProgressFileCreationTime());
}
}
@@ -195,7 +197,8 @@ public class Bucket<IN, BucketID> {
closePartFile();
final Path partFilePath = assembleNewPartPath();
- inProgressPart = partFileFactory.openNew(bucketId, fsWriter, partFilePath, currentTime);
+ final RecoverableFsDataOutputStream stream = fsWriter.open(partFilePath);
+ inProgressPart = partFileFactory.openNew(bucketId, stream, partFilePath, currentTime);
if (LOG.isDebugEnabled()) {
LOG.debug("Subtask {} opening new part file \"{}\" for bucket id={}.",
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
index 005ae4e..a44b0e8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
@@ -79,14 +79,13 @@ final class BulkPartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> {
@Override
public PartFileWriter<IN, BucketID> resumeFrom(
final BucketID bucketId,
- final RecoverableWriter fileSystemWriter,
+ final RecoverableFsDataOutputStream stream,
final RecoverableWriter.ResumeRecoverable resumable,
final long creationTime) throws IOException {
- Preconditions.checkNotNull(fileSystemWriter);
+ Preconditions.checkNotNull(stream);
Preconditions.checkNotNull(resumable);
- final RecoverableFsDataOutputStream stream = fileSystemWriter.recover(resumable);
final BulkWriter<IN> writer = writerFactory.create(stream);
return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
}
@@ -94,14 +93,13 @@ final class BulkPartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> {
@Override
public PartFileWriter<IN, BucketID> openNew(
final BucketID bucketId,
- final RecoverableWriter fileSystemWriter,
+ final RecoverableFsDataOutputStream stream,
final Path path,
final long creationTime) throws IOException {
- Preconditions.checkNotNull(fileSystemWriter);
+ Preconditions.checkNotNull(stream);
Preconditions.checkNotNull(path);
- final RecoverableFsDataOutputStream stream = fileSystemWriter.open(path);
final BulkWriter<IN> writer = writerFactory.create(stream);
return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
index 662454b..95a2978a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
@@ -111,7 +111,7 @@ abstract class PartFileWriter<IN, BucketID> implements PartFileInfo<BucketID> {
/**
* Used upon recovery from a failure to recover a {@link PartFileWriter writer}.
* @param bucketId the id of the bucket this writer is writing to.
- * @param fileSystemWriter the filesystem-specific writer to use when writing to the filesystem.
+ * @param stream the filesystem-specific output stream to use when writing to the filesystem.
* @param resumable the state of the stream we are resurrecting.
* @param creationTime the creation time of the stream.
* @return the recovered {@link PartFileWriter writer}.
@@ -119,14 +119,14 @@ abstract class PartFileWriter<IN, BucketID> implements PartFileInfo<BucketID> {
*/
PartFileWriter<IN, BucketID> resumeFrom(
final BucketID bucketId,
- final RecoverableWriter fileSystemWriter,
+ final RecoverableFsDataOutputStream stream,
final RecoverableWriter.ResumeRecoverable resumable,
final long creationTime) throws IOException;
/**
* Used to create a new {@link PartFileWriter writer}.
* @param bucketId the id of the bucket this writer is writing to.
- * @param fileSystemWriter the filesystem-specific writer to use when writing to the filesystem.
+ * @param stream the filesystem-specific output stream to use when writing to the filesystem.
* @param path the part this writer will write to.
* @param creationTime the creation time of the stream.
* @return the new {@link PartFileWriter writer}.
@@ -134,7 +134,7 @@ abstract class PartFileWriter<IN, BucketID> implements PartFileInfo<BucketID> {
*/
PartFileWriter<IN, BucketID> openNew(
final BucketID bucketId,
- final RecoverableWriter fileSystemWriter,
+ final RecoverableFsDataOutputStream stream,
final Path path,
final long creationTime) throws IOException;
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
index 2478b79..05c160c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
@@ -67,28 +67,26 @@ final class RowWisePartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID>
@Override
public PartFileWriter<IN, BucketID> resumeFrom(
final BucketID bucketId,
- final RecoverableWriter fileSystemWriter,
+ final RecoverableFsDataOutputStream stream,
final RecoverableWriter.ResumeRecoverable resumable,
final long creationTime) throws IOException {
- Preconditions.checkNotNull(fileSystemWriter);
+ Preconditions.checkNotNull(stream);
Preconditions.checkNotNull(resumable);
- final RecoverableFsDataOutputStream stream = fileSystemWriter.recover(resumable);
return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime);
}
@Override
public PartFileWriter<IN, BucketID> openNew(
final BucketID bucketId,
- final RecoverableWriter fileSystemWriter,
+ final RecoverableFsDataOutputStream stream,
final Path path,
final long creationTime) throws IOException {
- Preconditions.checkNotNull(fileSystemWriter);
+ Preconditions.checkNotNull(stream);
Preconditions.checkNotNull(path);
- final RecoverableFsDataOutputStream stream = fileSystemWriter.open(path);
return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime);
}
}