You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/04/06 11:32:16 UTC

flink git commit: [FLINK-3637] Refactored output stream handling from RollingSink.

Repository: flink
Updated Branches:
  refs/heads/master 24f89b7f1 -> b3186b6a1


[FLINK-3637] Refactored output stream handling from RollingSink.

The Writer interface now deals directly with filesystem and path, rather
than the raw output stream.

Since the RollingSink no longer has access to the raw output stream, it
cannot directly determine the current size of the file. A getPos()
method has been added to the Writer interface, so the RollingSink can.
retrieve the current file size.

Finally, flush() has been extended to return the offset that the file
must be truncated to at recovery.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b3186b6a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b3186b6a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b3186b6a

Branch: refs/heads/master
Commit: b3186b6a109e4cd43216799e2f1c63ad8646bdab
Parents: 24f89b7
Author: Lasse Dalegaard <da...@gmail.com>
Authored: Tue Mar 22 10:24:11 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Apr 6 11:31:23 2016 +0200

----------------------------------------------------------------------
 .../streaming/connectors/fs/RollingSink.java    | 104 ++-----------
 .../connectors/fs/SequenceFileWriter.java       |  28 +---
 .../connectors/fs/StreamWriterBase.java         | 151 +++++++++++++++++++
 .../streaming/connectors/fs/StringWriter.java   |  29 +---
 .../flink/streaming/connectors/fs/Writer.java   |  24 ++-
 5 files changed, 194 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b3186b6a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index f186f53..76324d7 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -253,11 +253,6 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
 	private transient Path currentBucketDirectory;
 
 	/**
-	 * The {@code FSDataOutputStream} for the current part file.
-	 */
-	private transient FSDataOutputStream outStream;
-
-	/**
 	 * Our subtask index, retrieved from the {@code RuntimeContext} in {@link #open}.
 	 */
 	private transient int subtaskIndex;
@@ -269,10 +264,9 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
 	private transient int partCounter;
 
 	/**
-	 * We use reflection to get the hflush method or use sync as a fallback.
-	 * The idea for this and the code comes from the Flume HDFS Sink.
+	 * Tracks if the writer is currently opened or closed.
 	 */
-	private transient Method refHflushOrSync;
+	private transient boolean isWriterOpen = false;
 
 	/**
 	 * We use reflection to get the .truncate() method, this is only available starting with
@@ -385,7 +379,7 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
 	 */
 	private boolean shouldRoll() throws IOException {
 		boolean shouldRoll = false;
-		if (outStream == null) {
+		if (!isWriterOpen) {
 			shouldRoll = true;
 			LOG.debug("RollingSink {} starting new initial bucket. ", subtaskIndex);
 		}
@@ -395,9 +389,9 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
 			// we will retrieve a new bucket base path in openNewPartFile so reset the part counter
 			partCounter = 0;
 		}
-		if (outStream != null) {
-			long writePosition = outStream.getPos();
-			if (outStream != null && writePosition > batchSize) {
+		if (isWriterOpen) {
+			long writePosition = writer.getPos();
+			if (isWriterOpen && writePosition > batchSize) {
 				shouldRoll = true;
 				LOG.debug(
 						"RollingSink {} starting new bucket because file position {} is above batch size {}.",
@@ -452,16 +446,8 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
 
 		Path inProgressPath = new Path(currentPartPath.getParent(), inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix);
 
-
-
-		outStream = fs.create(inProgressPath, false);
-
-		// We do the reflection here since this is the first time that we have a FSDataOutputStream
-		if (refHflushOrSync == null) {
-			refHflushOrSync = reflectHflushOrSync(outStream);
-		}
-
-		writer.open(outStream);
+		writer.open(fs, inProgressPath);
+		isWriterOpen = true;
 	}
 
 	/**
@@ -472,15 +458,11 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
 	 * of pending files in our bucket state.
 	 */
 	private void closeCurrentPartFile() throws Exception {
-		if (writer != null) {
+		if (isWriterOpen) {
 			writer.close();
+			isWriterOpen = false;
 		}
 
-		if (outStream != null) {
-			hflushOrSync(outStream);
-			outStream.close();
-			outStream = null;
-		}
 		if (currentPartPath != null) {
 			Path inProgressPath = new Path(currentPartPath.getParent(), inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix);
 			Path pendingPath = new Path(currentPartPath.getParent(), pendingPrefix + currentPartPath.getName()).suffix(pendingSuffix);
@@ -494,62 +476,6 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
 	}
 
 	/**
-	 * If hflush is available in this version of HDFS, then this method calls
-	 * hflush, else it calls sync.
-	 * @param os - The stream to flush/sync
-	 * @throws java.io.IOException
-	 *
-	 * <p>
-	 * Note: This code comes from Flume
-	 */
-	protected void hflushOrSync(FSDataOutputStream os) throws IOException {
-		try {
-			// At this point the refHflushOrSync cannot be null,
-			// since register method would have thrown if it was.
-			this.refHflushOrSync.invoke(os);
-		} catch (InvocationTargetException e) {
-			String msg = "Error while trying to hflushOrSync!";
-			LOG.error(msg + " " + e.getCause());
-			Throwable cause = e.getCause();
-			if(cause != null && cause instanceof IOException) {
-				throw (IOException)cause;
-			}
-			throw new RuntimeException(msg, e);
-		} catch (Exception e) {
-			String msg = "Error while trying to hflushOrSync!";
-			LOG.error(msg + " " + e);
-			throw new RuntimeException(msg, e);
-		}
-	}
-
-	/**
-	 * Gets the hflush call using reflection. Fallback to sync if hflush is not available.
-	 *
-	 * <p>
-	 * Note: This code comes from Flume
-	 */
-	private Method reflectHflushOrSync(FSDataOutputStream os) {
-		Method m = null;
-		if(os != null) {
-			Class<?> fsDataOutputStreamClass = os.getClass();
-			try {
-				m = fsDataOutputStreamClass.getMethod("hflush");
-			} catch (NoSuchMethodException ex) {
-				LOG.debug("HFlush not found. Will use sync() instead");
-				try {
-					m = fsDataOutputStreamClass.getMethod("sync");
-				} catch (Exception ex1) {
-					String msg = "Neither hflush not sync were found. That seems to be " +
-							"a problem!";
-					LOG.error(msg);
-					throw new RuntimeException(msg, ex1);
-				}
-			}
-		}
-		return m;
-	}
-
-	/**
 	 * Gets the truncate() call using reflection.
 	 *
 	 * <p>
@@ -633,14 +559,10 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
 
 	@Override
 	public BucketState snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-		if (writer != null) {
-			writer.flush();
-		}
-		if (outStream != null) {
-			outStream.flush();
-			hflushOrSync(outStream);
+		if (isWriterOpen) {
+			long pos = writer.flush();
 			bucketState.currentFile = currentPartPath.toString();
-			bucketState.currentFileValidLength = outStream.getPos();
+			bucketState.currentFileValidLength = pos;
 		}
 		synchronized (bucketState.pendingFilesPerCheckpoint) {
 			bucketState.pendingFilesPerCheckpoint.put(checkpointId, bucketState.pendingFiles);

http://git-wip-us.apache.org/repos/asf/flink/blob/b3186b6a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
index 928d96e..32b8d49 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
@@ -24,7 +24,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -41,15 +42,13 @@ import java.io.IOException;
  * @param <K> The type of the first tuple field.
  * @param <V> The type of the second tuple field.
  */
-public class SequenceFileWriter<K extends Writable, V extends Writable> implements Writer<Tuple2<K, V>>, InputTypeConfigurable {
+public class SequenceFileWriter<K extends Writable, V extends Writable> extends StreamWriterBase<Tuple2<K, V>> implements InputTypeConfigurable {
 	private static final long serialVersionUID = 1L;
 
 	private final String compressionCodecName;
 
 	private SequenceFile.CompressionType compressionType;
 
-	private transient FSDataOutputStream outputStream;
-
 	private transient SequenceFile.Writer writer;
 
 	private Class<K> keyClass;
@@ -77,10 +76,8 @@ public class SequenceFileWriter<K extends Writable, V extends Writable> implemen
 	}
 
 	@Override
-	public void open(FSDataOutputStream outStream) throws IOException {
-		if (outputStream != null) {
-			throw new IllegalStateException("SequenceFileWriter has already been opened.");
-		}
+	public void open(FileSystem fs, Path path) throws IOException {
+		super.open(fs, path);
 		if (keyClass == null) {
 			throw new IllegalStateException("Key Class has not been initialized.");
 		}
@@ -88,8 +85,6 @@ public class SequenceFileWriter<K extends Writable, V extends Writable> implemen
 			throw new IllegalStateException("Value Class has not been initialized.");
 		}
 
-		this.outputStream = outStream;
-
 		CompressionCodec codec = null;
 
 		if (!compressionCodecName.equals("None")) {
@@ -102,7 +97,7 @@ public class SequenceFileWriter<K extends Writable, V extends Writable> implemen
 
 		// the non-deprecated constructor syntax is only available in recent hadoop versions...
 		writer = SequenceFile.createWriter(new Configuration(),
-				outStream,
+				getStream(),
 				keyClass,
 				valueClass,
 				compressionType,
@@ -110,23 +105,16 @@ public class SequenceFileWriter<K extends Writable, V extends Writable> implemen
 	}
 
 	@Override
-	public void flush() throws IOException {
-	}
-
-	@Override
 	public void close() throws IOException {
 		if (writer != null) {
 			writer.close();
 		}
-		writer = null;
-		outputStream = null;
+		super.close();
 	}
 
 	@Override
 	public void write(Tuple2<K, V> element) throws IOException {
-		if (outputStream == null) {
-			throw new IllegalStateException("SequenceFileWriter has not been opened.");
-		}
+		getStream(); // Throws if the stream is not open
 		writer.append(element.f0, element.f1);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b3186b6a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
new file mode 100644
index 0000000..0fd85e2
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+/**
+ * Base class for {@link Writer Writers} that write to a {@link FSDataOutputStream}.
+ */
+public abstract class StreamWriterBase<T> implements Writer<T> {
+
+	private static Logger LOG = LoggerFactory.getLogger(RollingSink.class);
+
+	/**
+	 * The {@code FSDataOutputStream} for the current part file.
+	 */
+	private transient FSDataOutputStream outStream;
+
+	/**
+	 * We use reflection to get the hflush method or use sync as a fallback.
+	 * The idea for this and the code comes from the Flume HDFS Sink.
+	 */
+	private transient Method refHflushOrSync;
+
+	/**
+	 * Returns the current output stream, if the stream is open.
+	 */
+	protected FSDataOutputStream getStream() {
+		if (outStream == null) {
+			throw new IllegalStateException("Output stream has not been opened");
+		}
+		return outStream;
+	}
+
+	/**
+	 * If hflush is available in this version of HDFS, then this method calls
+	 * hflush, else it calls sync.
+	 * @param os - The stream to flush/sync
+	 * @throws java.io.IOException
+	 *
+	 * <p>
+	 * Note: This code comes from Flume
+	 */
+	protected void hflushOrSync(FSDataOutputStream os) throws IOException {
+		try {
+			// At this point the refHflushOrSync cannot be null,
+			// since register method would have thrown if it was.
+			this.refHflushOrSync.invoke(os);
+		} catch (InvocationTargetException e) {
+			String msg = "Error while trying to hflushOrSync!";
+			LOG.error(msg + " " + e.getCause());
+			Throwable cause = e.getCause();
+			if(cause != null && cause instanceof IOException) {
+				throw (IOException)cause;
+			}
+			throw new RuntimeException(msg, e);
+		} catch (Exception e) {
+			String msg = "Error while trying to hflushOrSync!";
+			LOG.error(msg + " " + e);
+			throw new RuntimeException(msg, e);
+		}
+	}
+
+	/**
+	 * Gets the hflush call using reflection. Fallback to sync if hflush is not available.
+	 *
+	 * <p>
+	 * Note: This code comes from Flume
+	 */
+	private Method reflectHflushOrSync(FSDataOutputStream os) {
+		Method m = null;
+		if(os != null) {
+			Class<?> fsDataOutputStreamClass = os.getClass();
+			try {
+				m = fsDataOutputStreamClass.getMethod("hflush");
+			} catch (NoSuchMethodException ex) {
+				LOG.debug("HFlush not found. Will use sync() instead");
+				try {
+					m = fsDataOutputStreamClass.getMethod("sync");
+				} catch (Exception ex1) {
+					String msg = "Neither hflush not sync were found. That seems to be " +
+							"a problem!";
+					LOG.error(msg);
+					throw new RuntimeException(msg, ex1);
+				}
+			}
+		}
+		return m;
+	}
+
+	@Override
+	public void open(FileSystem fs, Path path) throws IOException {
+		if (outStream != null) {
+			throw new IllegalStateException("Writer has already been opened");
+		}
+		outStream = fs.create(path, false);
+		if (refHflushOrSync == null) {
+			refHflushOrSync = reflectHflushOrSync(outStream);
+		}
+	}
+
+	@Override
+	public long flush() throws IOException {
+		if (outStream == null) {
+			throw new IllegalStateException("Writer is not open");
+		}
+		hflushOrSync(outStream);
+		return outStream.getPos();
+	}
+
+	@Override
+	public long getPos() throws IOException {
+		if (outStream == null) {
+			throw new IllegalStateException("Writer is not open");
+		}
+		return outStream.getPos();
+	}
+
+	@Override
+	public void close() throws IOException {
+		if (outStream != null) {
+			flush();
+			outStream.close();
+			outStream = null;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b3186b6a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
index ad0ab46..6568a86 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
@@ -17,8 +17,9 @@
  */
 package org.apache.flink.streaming.connectors.fs;
 
-
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
 import java.nio.charset.Charset;
@@ -31,11 +32,9 @@ import java.nio.charset.UnsupportedCharsetException;
  *
  * @param <T> The type of the elements that are being written by the sink.
  */
-public class StringWriter<T> implements Writer<T> {
+public class StringWriter<T> extends StreamWriterBase<T> {
 	private static final long serialVersionUID = 1L;
 
-	private transient FSDataOutputStream outputStream;
-
 	private String charsetName;
 
 	private transient Charset charset;
@@ -59,11 +58,8 @@ public class StringWriter<T> implements Writer<T> {
 	}
 
 	@Override
-	public void open(FSDataOutputStream outStream) throws IOException {
-		if (outputStream != null) {
-			throw new IllegalStateException("StringWriter has already been opened.");
-		}
-		this.outputStream = outStream;
+	public void open(FileSystem fs, Path path) throws IOException {
+		super.open(fs, path);
 
 		try {
 			this.charset = Charset.forName(charsetName);
@@ -77,23 +73,10 @@ public class StringWriter<T> implements Writer<T> {
 	}
 
 	@Override
-	public void flush() throws IOException {
-
-	}
-
-	@Override
-	public void close() throws IOException {
-		outputStream = null;
-	}
-
-	@Override
 	public void write(T element) throws IOException {
-		if (outputStream == null) {
-			throw new IllegalStateException("StringWriter has not been opened.");
-		}
+		FSDataOutputStream outputStream = getStream();
 		outputStream.write(element.toString().getBytes(charset));
 		outputStream.write('\n');
-
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b3186b6a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
index 98cad32..e7aaaa7 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
@@ -17,7 +17,8 @@
  */
 package org.apache.flink.streaming.connectors.fs;
 
-import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -35,19 +36,26 @@ public interface Writer<T> extends Serializable {
 	 * Initializes the {@code Writer} for a newly opened bucket file.
 	 * Any internal per-bucket initialization should be performed here.
 	 *
-	 * @param outStream The {@link org.apache.hadoop.fs.FSDataOutputStream} for the newly opened file.
+	 * @param fs The {@link org.apache.hadoop.fs.FileSystem} containing the newly opened file.
+	 * @param path The {@link org.apache.hadoop.fs.Path} of the newly opened file.
 	 */
-	void open(FSDataOutputStream outStream) throws IOException;
+	void open(FileSystem fs, Path path) throws IOException;
 
 	/**
-	 * Flushes out any internally held data.
+	 * Flushes out any internally held data, and returns the offset that the file
+	 * must be truncated to at recovery.
 	 */
-	void flush()throws IOException ;
+	long flush() throws IOException;
 
 	/**
-	 * Closes the {@code Writer}. This must not close the {@code FSDataOutputStream} that
-	 * was handed in in the {@link #open} method. Only internally held state should be
-	 * closed.
+	 * Retrieves the current position, and thus size, of the output file.
+	 */
+	long getPos() throws IOException;
+
+	/**
+	 * Closes the {@code Writer}. If the writer is already closed, no action will be
+	 * taken. The call should close all state related to the current output file,
+	 * including the output stream opened in {@code open}.
 	 */
 	void close() throws IOException ;