You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/14 14:48:51 UTC

[1/3] flink git commit: [FLINK-5788] [docs] Improve documentation of FileSystem and specify the data persistence contract.

Repository: flink
Updated Branches:
  refs/heads/master 663c1e3f7 -> f7af3b016


[FLINK-5788] [docs] Improve documentation of FileSystem and specify the data persistence contract.

This closes #3301


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f7af3b01
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f7af3b01
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f7af3b01

Branch: refs/heads/master
Commit: f7af3b01681592787db16a555b55d6b11d35f869
Parents: af81beb
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 13 14:29:03 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 14 15:32:43 2017 +0100

----------------------------------------------------------------------
 docs/internals/filesystems.md                   | 138 +++++++++++++++++++
 .../apache/flink/core/fs/FSDataInputStream.java |  11 +-
 .../flink/core/fs/FSDataOutputStream.java       |  81 ++++++++++-
 .../org/apache/flink/core/fs/FileSystem.java    |  98 ++++++++++++-
 4 files changed, 323 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f7af3b01/docs/internals/filesystems.md
----------------------------------------------------------------------
diff --git a/docs/internals/filesystems.md b/docs/internals/filesystems.md
new file mode 100644
index 0000000..427251a
--- /dev/null
+++ b/docs/internals/filesystems.md
@@ -0,0 +1,138 @@
+---
+title: "File Systems"
+nav-parent_id: internals
+nav-pos: 10
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* Replaced by the TOC
+{:toc}
+
+Flink has its own file system abstraction via the `org.apache.flink.core.fs.FileSystem` class.
+This abstraction provides a common set of operations and minimal guarantees across various types
+of file system implementations.
+
+The `FileSystem`'s set of available operations is quite limited, in order to support a wide
+range of file systems. For example, appending to or mutating existing files is not supported.
+
+File systems are identified by a *file system scheme*, such as `file://`, `hdfs://`, etc.
+
+# Implementations
+
+Flink implements the file systems directly, with the following file system schemes:
+
+  - `file`, which represents the machine's local file system.
+
+Other file system types are accessed by an implementation that bridges to the suite of file systems supported by
+[Apache Hadoop](https://hadoop.apache.org/). The following is an incomplete list of examples:
+
+  - `hdfs`: Hadoop Distributed File System
+  - `s3`, `s3n`, and `s3a`: Amazon S3 file system
+  - `gcs`: Google Cloud Storage
+  - `maprfs`: The MapR distributed file system
+  - ...
+
+Flink loads Hadoop's file systems transparently if it finds the Hadoop File System classes in the class path and finds a valid
+Hadoop configuration. By default, it looks for the Hadoop configuration in the class path. Alternatively, one can specify a
+custom location via the configuration entry `fs.hdfs.hadoopconf`.
+
+
+# Persistence Guarantees
+
+These `FileSystem` and its `FsDataOutputStream` instances are used to persistently store data, both for results of applications
+and for fault tolerance and recovery. It is therefore crucial that the persistence semantics of these streams are well defined.
+
+## Definition of Persistence Guarantees
+
+Data written to an output stream is considered persistent, if two requirements are met:
+
+  1. **Visibility Requirement:** It must be guaranteed that all other processes, machines,
+     virtual machines, containers, etc. that are able to access the file see the data consistently
+     when given the absolute file path. This requirement is similar to the *close-to-open*
+     semantics defined by POSIX, but restricted to the file itself (by its absolute path).
+
+  2. **Durability Requirement:** The file system's specific durability/persistence requirements
+     must be met. These are specific to the particular file system. For example the
+     {@link LocalFileSystem} does not provide any durability guarantees for crashes of both
+     hardware and operating system, while replicated distributed file systems (like HDFS)
+     guarantee typically durability in the presence of up *n* concurrent node failures,
+     where *n* is the replication factor.
+
+Updates to the file's parent directory (such that the file shows up when
+listing the directory contents) are not required to be complete for the data in the file stream
+to be considered persistent. This relaxation is important for file systems where updates to
+directory contents are only eventually consistent.
+
+The `FSDataOutputStream` has to guarantee data persistence for the written bytes once the call to
+`FSDataOutputStream.close()` returns.
+
+## Examples
+ 
+  - For **fault-tolerant distributed file systems**, data is considered persistent once 
+    it has been received and acknowledged by the file system, typically by having been replicated
+    to a quorum of machines (*durability requirement*). In addition the absolute file path
+    must be visible to all other machines that will potentially access the file (*visibility requirement*).
+
+    Whether data has hit non-volatile storage on the storage nodes depends on the specific
+    guarantees of the particular file system.
+
+    The metadata updates to the file's parent directory are not required to have reached
+    a consistent state. It is permissible that some machines see the file when listing the parent
+    directory's contents while others do not, as long as access to the file by its absolute path
+    is possible on all nodes.
+
+  - A **local file system** must support the POSIX *close-to-open* semantics.
+    Because the local file system does not have any fault tolerance guarantees, no further
+    requirements exist.
+ 
+    The above implies specifically that data may still be in the OS cache when considered
+    persistent from the local file system's perspective. Crashes that cause the OS cache to loose
+    data are considered fatal to the local machine and are not covered by the local file system's
+    guarantees as defined by Flink.
+
+    That means that computed results, checkpoints, and savepoints that are written only to
+    the local filesystem are not guaranteed to be recoverable from the local machine's failure,
+    making local file systems unsuitable for production setups.
+
+# Updating File Contents
+
+Many file systems either do not support overwriting contents of existing files at all, or do not support consistent visibility of the
+updated contents in that case. For that reason, Flink's FileSystem does not support appending to existing files, or seeking within
+output streams such that previously written data could be changed within the same file.
+
+# Overwriting Files
+
+Overwriting files is in general possible. A file is overwritten by deleting it and creating a new file.
+However, certain filesystems cannot make that change synchronously visible to all parties that have access to the file.
+For example [Amazon S3](https://aws.amazon.com/documentation/s3/) guarantees only *eventual consistency* in
+the visibility of the file replacement: Some machines may see the old file, some machines may see the new file.
+
+To avoid these consistency issues, the implementations of failure/recovery mechanisms in Flink strictly avoid writing to
+the same file path more than once.
+
+# Thread Safety
+
+Implementations of `FileSystem` must be thread-safe: The same instance of `FileSystem` is frequently shared across multiple threads
+in Flink and must be able to concurrently create input/output streams and list file metadata.
+
+The `FSDataOutputStream` and `FSDataOutputStream` implementations are strictly **not thread-safe**.
+Instances of the streams should also not be passed between threads in between read or write operations, because there are no guarantees
+about the visibility of operations across threads (many operations do not create memory fences).
+

http://git-wip-us.apache.org/repos/asf/flink/blob/f7af3b01/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java
index 6ce1235..44dbcb1 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java
@@ -25,6 +25,10 @@ import java.io.InputStream;
 
 /**
  * Interface for a data input stream to a file on a {@link FileSystem}.
+ * 
+ * <p>This extends the {@link java.io.InputStream} with methods for accessing
+ * the stream's {@link #getPos() current position} and
+ * {@link #seek(long) seeking} to a desired position.
  */
 @Public
 public abstract class FSDataInputStream extends InputStream {
@@ -35,15 +39,16 @@ public abstract class FSDataInputStream extends InputStream {
 	 * 
 	 * @param desired
 	 *        the desired offset
-	 * @throws IOException
-	 *         thrown if an error occurred while seeking inside the input stream
+	 * @throws IOException Thrown if an error occurred while seeking inside the input stream.
 	 */
 	public abstract void seek(long desired) throws IOException;
 
 	/**
-	 * Get the current position in the input stream.
+	 * Gets the current position in the input stream.
 	 *
 	 * @return current position in the input stream
+	 * @throws IOException Thrown if an I/O error occurred in the underlying stream 
+	 *                     implementation while accessing the stream's position.
 	 */
 	public abstract long getPos() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f7af3b01/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java
index 0318d1f..a8df5c1 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java
@@ -24,14 +24,93 @@ import java.io.IOException;
 import java.io.OutputStream;
 
 /**
- * Interface for a data output stream to a file on a {@link FileSystem}.
+ * An output stream to a file that is created via a {@link FileSystem}.
+ * This class extends the base {@link java.io.OutputStream} with some additional important methods.
+ * 
+ * <h2>Data Persistence Guarantees</h2>
+ * 
+ * These streams are used to persistently store data, both for results of streaming applications
+ * and for fault tolerance and recovery. It is therefore crucial that the persistence semantics
+ * of these streams are well defined.
+ * 
+ * <p>Please refer to the class-level docs of {@link FileSystem} for the definition of data persistence
+ * via Flink's FileSystem abstraction and the {@code FSDataOutputStream}.
+ * 
+ * <h2>Thread Safety</h2>
+ * 
+ * Implementations of the {@code FSDataOutputStream} are generally not assumed to be thread safe.
+ * Instances of {@code FSDataOutputStream} should not be passed between threads, because there
+ * are no guarantees about the order of visibility of operations across threads.
+ * 
+ * @see FileSystem
+ * @see FSDataInputStream
  */
 @Public
 public abstract class FSDataOutputStream extends OutputStream {
 
+	/**
+	 * Gets the position of the stream (non-negative), defined as the number of bytes
+	 * from the beginning of the file to the current writing position. The position
+	 * corresponds to the zero-based index of the next byte that will be written.
+	 * 
+	 * <p>This method must report accurately report the current position of the stream.
+	 * Various components of the high-availability and recovery logic rely on the accurate
+	 * 
+	 * @return The current position in the stream, defined as the number of bytes
+	 *         from the beginning of the file to the current writing position.
+	 * 
+	 * @throws IOException Thrown if an I/O error occurs while obtaining the position from
+	 *                     the stream implementation.
+	 */
 	public abstract long getPos() throws IOException;
 
+	/**
+	 * Flushes the stream, writing any data currently buffered in stream implementation
+	 * to the proper output stream. After this method has been called, the stream implementation
+	 * must not hold onto any buffered data any more.
+	 * 
+	 * <p>A completed flush does not mean that the data is necessarily persistent. Data
+	 * persistence can is only assumed after calls to {@link #close()} or {@link #sync()}.
+	 * 
+	 * <p>Implementation note: This overrides the method defined in {@link OutputStream}
+	 * as abstract to force implementations of the {@code FSDataOutputStream} to implement
+	 * this method directly.
+	 * 
+	 * @throws IOException Thrown if an I/O error occurs while flushing the stream.
+	 */
 	public abstract void flush() throws IOException;
 
+	/**
+	 * Flushes the data all the way to the persistent non-volatile storage (for example disks).
+	 * The method behaves similar to the <i>fsync</i> function, forcing all data to
+	 * be persistent on the devices.
+	 * 
+	 * <p>
+	 * 
+	 * @throws IOException Thrown if an I/O error occurs
+	 */
 	public abstract void sync() throws IOException;
+
+	/**
+	 * Closes the output stream. After this method returns, the implementation must guarantee
+	 * that all data written to the stream is persistent/visible, as defined in the
+	 * {@link FileSystem class-level docs}.
+	 * 
+	 * <p>The above implies that the method must block until persistence can be guaranteed.
+	 * For example for distributed replicated file systems, the method must block until the
+	 * replication quorum has been reached. If the calling thread is interrupted in the
+	 * process, it must fail with an {@code IOException} to indicate that persistence cannot
+	 * be guaranteed.
+	 * 
+	 * <p>If this method throws an exception, the data in the stream cannot be assumed to be
+	 * persistent.
+	 * 
+	 * <p>Implementation note: This overrides the method defined in {@link OutputStream}
+	 * as abstract to force implementations of the {@code FSDataOutputStream} to implement
+	 * this method directly.
+	 *         
+	 * @throws IOException Thrown, if an error occurred while closing the stream or guaranteeing
+	 *                     that the data is persistent.
+	 */
+	public abstract void close() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f7af3b01/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index d8efcbc..c3828fb 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -52,12 +52,108 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * Abstract base class of all file systems used by Flink. This class may be extended to implement
  * distributed file systems, or local file systems. The abstraction by this file system is very simple,
- * and teh set of allowed operations quite limited, to support the common denominator of a wide
+ * and the set of available operations quite limited, to support the common denominator of a wide
  * range of file systems. For example, appending to or mutating existing files is not supported.
  * 
  * <p>Flink implements and supports some file system types directly (for example the default
  * machine-local file system). Other file system types are accessed by an implementation that bridges
  * to the suite of file systems supported by Hadoop (such as for example HDFS).
+ * 
+ * <h2>Data Persistence</h2>
+ * 
+ * The FileSystem's {@link FSDataOutputStream output streams} are used to persistently store data,
+ * both for results of streaming applications and for fault tolerance and recovery. It is therefore
+ * crucial that the persistence semantics of these streams are well defined.
+ * 
+ * <h3>Definition of Persistence Guarantees</h3>
+ * 
+ * Data written to an output stream is considered persistent, if two requirements are met:
+ * 
+ * <ol>
+ *     <li><b>Visibility Requirement:</b> It must be guaranteed that all other processes, machines,
+ *     virtual machines, containers, etc. that are able to access the file see the data consistently
+ *     when given the absolute file path. This requirement is similar to the <i>close-to-open</i>
+ *     semantics defined by POSIX, but restricted to the file itself (by its absolute path).</li>
+ * 
+ *     <li><b>Durability Requirement:</b> The file system's specific durability/persistence requirements
+ *     must be met. These are specific to the particular file system. For example the
+ *     {@link LocalFileSystem} does not provide any durability guarantees for crashes of both
+ *     hardware and operating system, while replicated distributed file systems (like HDFS)
+ *     typically guarantee durability in the presence of at most <i>n</i> concurrent node failures,
+ *     where <i>n</i> is the replication factor.</li>
+ * </ol>
+ *
+ * <p>Updates to the file's parent directory (such that the file shows up when
+ * listing the directory contents) are not required to be complete for the data in the file stream
+ * to be considered persistent. This relaxation is important for file systems where updates to
+ * directory contents are only eventually consistent.
+ * 
+ * <p>The {@link FSDataOutputStream} has to guarantee data persistence for the written bytes
+ * once the call to {@link FSDataOutputStream#close()} returns.
+ *
+ * <h3>Examples</h3>
+ *
+ * <ul>
+ *     <li>For <b>fault-tolerant distributed file systems</b>, data is considered persistent once 
+ *     it has been received and acknowledged by the file system, typically by having been replicated
+ *     to a quorum of machines (<i>durability requirement</i>). In addition the absolute file path
+ *     must be visible to all other machines that will potentially access the file (<i>visibility
+ *     requirement</i>).
+ *
+ *     <p>Whether data has hit non-volatile storage on the storage nodes depends on the specific
+ *     guarantees of the particular file system.
+ *
+ *     <p>The metadata updates to the file's parent directory are not required to have reached
+ *     a consistent state. It is permissible that some machines see the file when listing the parent
+ *     directory's contents while others do not, as long as access to the file by its absolute path
+ *     is possible on all nodes.</li>
+ *
+ *     <li>A <b>local file system</b> must support the POSIX <i>close-to-open</i> semantics.
+ *     Because the local file system does not have any fault tolerance guarantees, no further
+ *     requirements exist.
+ * 
+ *     <p>The above implies specifically that data may still be in the OS cache when considered
+ *     persistent from the local file system's perspective. Crashes that cause the OS cache to loose
+ *     data are considered fatal to the local machine and are not covered by the local file system's
+ *     guarantees as defined by Flink.
+ * 
+ *     <p>That means that computed results, checkpoints, and savepoints that are written only to
+ *     the local filesystem are not guaranteed to be recoverable from the local machine's failure,
+ *     making local file systems unsuitable for production setups.</li>
+ * </ul>
+ *
+ * <h2>Updating File Contents</h2>
+ *
+ * Many file systems either do not support overwriting contents of existing files at all, or do
+ * not support consistent visibility of the updated contents in that case. For that reason,
+ * Flink's FileSystem does not support appending to existing files, or seeking within output streams
+ * so that previously written data could be overwritten.
+ *
+ * <h2>Overwriting Files</h2>
+ *
+ * Overwriting files is in general possible. A file is overwritten by deleting it and creating
+ * a new file. However, certain filesystems cannot make that change synchronously visible
+ * to all parties that have access to the file.
+ * For example <a href="https://aws.amazon.com/documentation/s3/">Amazon S3</a> guarantees only
+ * <i>eventual consistency</i> in the visibility of the file replacement: Some machines may see
+ * the old file, some machines may see the new file.
+ *
+ * <p>To avoid these consistency issues, the implementations of failure/recovery mechanisms in
+ * Flink strictly avoid writing to the same file path more than once.
+ * 
+ * <h2>Thread Safety</h2>
+ * 
+ * Implementations of {@code FileSystem} must be thread-safe: The same instance of FileSystem
+ * is frequently shared across multiple threads in Flink and must be able to concurrently
+ * create input/output streams and list file metadata.
+ * 
+ * <p>The {@link FSDataOutputStream} and {@link FSDataOutputStream} implementations are strictly
+ * <b>not thread-safe</b>. Instances of the streams should also not be passed between threads
+ * in between read or write operations, because there are no guarantees about the visibility of
+ * operations across threads (many operations do not create memory fences).
+ * 
+ * @see FSDataInputStream
+ * @see FSDataOutputStream
  */
 @Public
 public abstract class FileSystem {


[2/3] flink git commit: [FLINK-5553] [network] keep the original throwable in PartitionRequestClientHandler

Posted by se...@apache.org.
[FLINK-5553] [network] keep the original throwable in PartitionRequestClientHandler

This way, when checking for a previous error in any input channel, we can throw
a meaningful exception instead of the inspecific
IllegalStateException("There has been an error in the channel.") before.

Note that the original throwable (from an existing channel) may or may not(!)
have been printed by the InputGate yet. Any new input channel, however, did not
get the Throwable and must fail through the (now enhanced) fallback mechanism.

This closes #3299


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af81bebd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af81bebd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af81bebd

Branch: refs/heads/master
Commit: af81bebd0fabc6390930689df131e72edab6995b
Parents: a91b6ff
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Feb 13 16:30:59 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 14 15:32:43 2017 +0100

----------------------------------------------------------------------
 .../netty/PartitionRequestClientHandler.java    | 27 +++++++++++++++-----
 .../netty/ClientTransportErrorHandlingTest.java |  3 ++-
 2 files changed, 22 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/af81bebd/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 52775d4..9f80abc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -42,18 +42,15 @@ import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 
 	private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestClientHandler.class);
 
 	private final ConcurrentMap<InputChannelID, RemoteInputChannel> inputChannels = new ConcurrentHashMap<InputChannelID, RemoteInputChannel>();
 
-	private final AtomicBoolean channelError = new AtomicBoolean(false);
+	private final AtomicReference<Throwable> channelError = new AtomicReference<Throwable>();
 
 	private final BufferListenerTask bufferListener = new BufferListenerTask();
 
@@ -73,8 +70,8 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 	// Input channel/receiver registration
 	// ------------------------------------------------------------------------
 
-	void addInputChannel(RemoteInputChannel listener) {
-		checkState(!channelError.get(), "There has been an error in the channel.");
+	void addInputChannel(RemoteInputChannel listener) throws IOException {
+		checkError();
 
 		if (!inputChannels.containsKey(listener.getInputChannelId())) {
 			inputChannels.put(listener.getInputChannelId(), listener);
@@ -172,7 +169,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 	}
 
 	private void notifyAllChannelsOfErrorAndClose(Throwable cause) {
-		if (channelError.compareAndSet(false, true)) {
+		if (channelError.compareAndSet(null, cause)) {
 			try {
 				for (RemoteInputChannel inputChannel : inputChannels.values()) {
 					inputChannel.onError(cause);
@@ -195,6 +192,22 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 
 	// ------------------------------------------------------------------------
 
+	/**
+	 * Checks for an error and rethrows it if one was reported.
+	 */
+	private void checkError() throws IOException {
+		final Throwable t = channelError.get();
+
+		if (t != null) {
+			if (t instanceof IOException) {
+				throw (IOException) t;
+			}
+			else {
+				throw new IOException("There has been an error in the channel.", t);
+			}
+		}
+	}
+
 	@Override
 	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
 		super.channelReadComplete(ctx);

http://git-wip-us.apache.org/repos/asf/flink/blob/af81bebd/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
index ab96d4a..22e7754 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
@@ -389,7 +389,8 @@ public class ClientTransportErrorHandlingTest {
 		return new EmbeddedChannel(protocol.getClientChannelHandlers());
 	}
 
-	private RemoteInputChannel addInputChannel(PartitionRequestClientHandler clientHandler) {
+	private RemoteInputChannel addInputChannel(PartitionRequestClientHandler clientHandler)
+		throws IOException {
 		RemoteInputChannel rich = createRemoteInputChannel();
 		clientHandler.addInputChannel(rich);
 


[3/3] flink git commit: [FLINK-5762] [runtime] Protect initializeState() and open() by the same lock

Posted by se...@apache.org.
[FLINK-5762] [runtime] Protect initializeState() and open() by the same lock

This closes #3291


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a91b6ff0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a91b6ff0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a91b6ff0

Branch: refs/heads/master
Commit: a91b6ff05d8af870ad076f9bf0fc17886787bc46
Parents: 663c1e3
Author: kl0u <kk...@gmail.com>
Authored: Thu Feb 9 16:02:27 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 14 15:32:43 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/streaming/runtime/tasks/StreamTask.java    | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a91b6ff0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 2676b64..3781cb6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -244,12 +244,15 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			// -------- Invoke --------
 			LOG.debug("Invoking {}", getName());
 
-			// first order of business is to give operators their state
-			initializeState();
-
 			// we need to make sure that any triggers scheduled in open() cannot be
 			// executed before all operators are opened
 			synchronized (lock) {
+
+				// both the following operations are protected by the lock
+				// so that we avoid race conditions in the case that initializeState()
+				// registers a timer, that fires before the open() is called.
+
+				initializeState();
 				openAllOperators();
 			}