You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/05/03 12:34:57 UTC
flink git commit: [FLINK-6427] Ensure file length is flushed in
StreamWriterBase
Repository: flink
Updated Branches:
refs/heads/master 6181302f1 -> 6d0c4c340
[FLINK-6427] Ensure file length is flushed in StreamWriterBase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d0c4c34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d0c4c34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d0c4c34
Branch: refs/heads/master
Commit: 6d0c4c340d0b052d2a97e7e86622707d05f6b6d7
Parents: 6181302
Author: Jürgen Thomann <ju...@innogames.com>
Authored: Wed May 3 13:06:04 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed May 3 13:46:10 2017 +0200
----------------------------------------------------------------------
.../apache/flink/streaming/connectors/fs/StreamWriterBase.java | 6 ++++++
1 file changed, 6 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6d0c4c34/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
index 140246f..a04e4b5 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
@@ -21,12 +21,14 @@ import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.util.EnumSet;
/**
* Base class for {@link Writer Writers} that write to a {@link FSDataOutputStream}.
@@ -70,6 +72,10 @@ public abstract class StreamWriterBase<T> implements Writer<T> {
// At this point the refHflushOrSync cannot be null,
// since register method would have thrown if it was.
this.refHflushOrSync.invoke(os);
+
+ if (os instanceof HdfsDataOutputStream) {
+ ((HdfsDataOutputStream) os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
+ }
} catch (InvocationTargetException e) {
String msg = "Error while trying to hflushOrSync!";
LOG.error(msg + " " + e.getCause());