You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/05/03 12:34:57 UTC

flink git commit: [FLINK-6427] Ensure file length is flushed in StreamWriterBase

Repository: flink
Updated Branches:
  refs/heads/master 6181302f1 -> 6d0c4c340


[FLINK-6427] Ensure file length is flushed in StreamWriterBase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d0c4c34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d0c4c34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d0c4c34

Branch: refs/heads/master
Commit: 6d0c4c340d0b052d2a97e7e86622707d05f6b6d7
Parents: 6181302
Author: Jürgen Thomann <ju...@innogames.com>
Authored: Wed May 3 13:06:04 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed May 3 13:46:10 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/streaming/connectors/fs/StreamWriterBase.java | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6d0c4c34/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
index 140246f..a04e4b5 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
@@ -21,12 +21,14 @@ import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.EnumSet;
 
 /**
  * Base class for {@link Writer Writers} that write to a {@link FSDataOutputStream}.
@@ -70,6 +72,10 @@ public abstract class StreamWriterBase<T> implements Writer<T> {
 			// At this point the refHflushOrSync cannot be null,
 			// since register method would have thrown if it was.
 			this.refHflushOrSync.invoke(os);
+
+			if (os instanceof HdfsDataOutputStream) {
+				((HdfsDataOutputStream) os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
+			}
 		} catch (InvocationTargetException e) {
 			String msg = "Error while trying to hflushOrSync!";
 			LOG.error(msg + " " + e.getCause());