You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/12/04 16:21:50 UTC

[flink] 01/06: [hotfix][fs-connector] Refactor PartFileWriter to take stream.

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0c1328aa4548953ff358da10052cc3644ec5c2f2
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Wed Nov 21 10:35:58 2018 +0100

    [hotfix][fs-connector] Refactor PartFileWriter to take stream.
---
 .../flink/streaming/api/functions/sink/filesystem/Bucket.java  |  7 +++++--
 .../api/functions/sink/filesystem/BulkPartWriter.java          | 10 ++++------
 .../api/functions/sink/filesystem/PartFileWriter.java          |  8 ++++----
 .../api/functions/sink/filesystem/RowWisePartWriter.java       | 10 ++++------
 4 files changed, 17 insertions(+), 18 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index 65a7628..042bcda 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.util.Preconditions;
 
@@ -124,8 +125,9 @@ public class Bucket<IN, BucketID> {
 		// we try to resume the previous in-progress file
 		if (state.hasInProgressResumableFile()) {
 			final RecoverableWriter.ResumeRecoverable resumable = state.getInProgressResumableFile();
+			final RecoverableFsDataOutputStream stream = fsWriter.recover(resumable);
 			inProgressPart = partFileFactory.resumeFrom(
-					bucketId, fsWriter, resumable, state.getInProgressFileCreationTime());
+					bucketId, stream, resumable, state.getInProgressFileCreationTime());
 		}
 	}
 
@@ -195,7 +197,8 @@ public class Bucket<IN, BucketID> {
 		closePartFile();
 
 		final Path partFilePath = assembleNewPartPath();
-		inProgressPart = partFileFactory.openNew(bucketId, fsWriter, partFilePath, currentTime);
+		final RecoverableFsDataOutputStream stream = fsWriter.open(partFilePath);
+		inProgressPart = partFileFactory.openNew(bucketId, stream, partFilePath, currentTime);
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Subtask {} opening new part file \"{}\" for bucket id={}.",
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
index 005ae4e..a44b0e8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
@@ -79,14 +79,13 @@ final class BulkPartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> {
 		@Override
 		public PartFileWriter<IN, BucketID> resumeFrom(
 				final BucketID bucketId,
-				final RecoverableWriter fileSystemWriter,
+				final RecoverableFsDataOutputStream stream,
 				final RecoverableWriter.ResumeRecoverable resumable,
 				final long creationTime) throws IOException {
 
-			Preconditions.checkNotNull(fileSystemWriter);
+			Preconditions.checkNotNull(stream);
 			Preconditions.checkNotNull(resumable);
 
-			final RecoverableFsDataOutputStream stream = fileSystemWriter.recover(resumable);
 			final BulkWriter<IN> writer = writerFactory.create(stream);
 			return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
 		}
@@ -94,14 +93,13 @@ final class BulkPartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> {
 		@Override
 		public PartFileWriter<IN, BucketID> openNew(
 				final BucketID bucketId,
-				final RecoverableWriter fileSystemWriter,
+				final RecoverableFsDataOutputStream stream,
 				final Path path,
 				final long creationTime) throws IOException {
 
-			Preconditions.checkNotNull(fileSystemWriter);
+			Preconditions.checkNotNull(stream);
 			Preconditions.checkNotNull(path);
 
-			final RecoverableFsDataOutputStream stream = fileSystemWriter.open(path);
 			final BulkWriter<IN> writer = writerFactory.create(stream);
 			return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
 		}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
index 662454b..95a2978a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
@@ -111,7 +111,7 @@ abstract class PartFileWriter<IN, BucketID> implements PartFileInfo<BucketID> {
 		/**
 		 * Used upon recovery from a failure to recover a {@link PartFileWriter writer}.
 		 * @param bucketId the id of the bucket this writer is writing to.
-		 * @param fileSystemWriter the filesystem-specific writer to use when writing to the filesystem.
+		 * @param stream the filesystem-specific output stream to use when writing to the filesystem.
 		 * @param resumable the state of the stream we are resurrecting.
 		 * @param creationTime the creation time of the stream.
 		 * @return the recovered {@link PartFileWriter writer}.
@@ -119,14 +119,14 @@ abstract class PartFileWriter<IN, BucketID> implements PartFileInfo<BucketID> {
 		 */
 		PartFileWriter<IN, BucketID> resumeFrom(
 			final BucketID bucketId,
-			final RecoverableWriter fileSystemWriter,
+			final RecoverableFsDataOutputStream stream,
 			final RecoverableWriter.ResumeRecoverable resumable,
 			final long creationTime) throws IOException;
 
 		/**
 		 * Used to create a new {@link PartFileWriter writer}.
 		 * @param bucketId the id of the bucket this writer is writing to.
-		 * @param fileSystemWriter the filesystem-specific writer to use when writing to the filesystem.
+		 * @param stream the filesystem-specific output stream to use when writing to the filesystem.
 		 * @param path the part this writer will write to.
 		 * @param creationTime the creation time of the stream.
 		 * @return the new {@link PartFileWriter writer}.
@@ -134,7 +134,7 @@ abstract class PartFileWriter<IN, BucketID> implements PartFileInfo<BucketID> {
 		 */
 		PartFileWriter<IN, BucketID> openNew(
 			final BucketID bucketId,
-			final RecoverableWriter fileSystemWriter,
+			final RecoverableFsDataOutputStream stream,
 			final Path path,
 			final long creationTime) throws IOException;
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
index 2478b79..05c160c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
@@ -67,28 +67,26 @@ final class RowWisePartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID>
 		@Override
 		public PartFileWriter<IN, BucketID> resumeFrom(
 				final BucketID bucketId,
-				final RecoverableWriter fileSystemWriter,
+				final RecoverableFsDataOutputStream stream,
 				final RecoverableWriter.ResumeRecoverable resumable,
 				final long creationTime) throws IOException {
 
-			Preconditions.checkNotNull(fileSystemWriter);
+			Preconditions.checkNotNull(stream);
 			Preconditions.checkNotNull(resumable);
 
-			final RecoverableFsDataOutputStream stream = fileSystemWriter.recover(resumable);
 			return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime);
 		}
 
 		@Override
 		public PartFileWriter<IN, BucketID> openNew(
 				final BucketID bucketId,
-				final RecoverableWriter fileSystemWriter,
+				final RecoverableFsDataOutputStream stream,
 				final Path path,
 				final long creationTime) throws IOException {
 
-			Preconditions.checkNotNull(fileSystemWriter);
+			Preconditions.checkNotNull(stream);
 			Preconditions.checkNotNull(path);
 
-			final RecoverableFsDataOutputStream stream = fileSystemWriter.open(path);
 			return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime);
 		}
 	}