You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/22 23:25:53 UTC

[1/3] flink git commit: [FLINK-4910] Introduce safety net for closing file system streams

Repository: flink
Updated Branches:
  refs/heads/master c590912c9 -> 4c23879a5


[FLINK-4910] Introduce safety net for closing file system streams

This closes #2691.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba8ed263
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba8ed263
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba8ed263

Branch: refs/heads/master
Commit: ba8ed263695d16eacb4bdfdf195dd22c83bb53ed
Parents: c590912
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon Oct 24 17:49:54 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 22 23:16:52 2016 +0100

----------------------------------------------------------------------
 .../common/operators/CollectionExecutor.java    |  31 +--
 .../apache/flink/core/fs/CloseableRegistry.java |  52 +++++
 .../flink/core/fs/ClosingFSDataInputStream.java |  97 ++++++++++
 .../core/fs/ClosingFSDataOutputStream.java      | 102 ++++++++++
 .../flink/core/fs/FSDataInputStreamWrapper.java |  96 +++++++++
 .../core/fs/FSDataOutputStreamWrapper.java      |  76 ++++++++
 .../org/apache/flink/core/fs/FileSystem.java    |  88 ++++++---
 .../core/fs/SafetyNetCloseableRegistry.java     | 181 +++++++++++++++++
 .../core/fs/SafetyNetWrapperFileSystem.java     | 150 ++++++++++++++
 .../flink/core/fs/WrappingProxyCloseable.java   |  30 +++
 .../flink/util/AbstractCloseableRegistry.java   | 114 +++++++++++
 .../java/org/apache/flink/util/IOUtils.java     |  15 +-
 .../org/apache/flink/util/WrappingProxy.java    |  25 +++
 .../apache/flink/util/WrappingProxyUtil.java    |  33 ++++
 .../apache/flink/core/fs/FileSystemTest.java    |  29 +--
 .../core/fs/SafetyNetCloseableRegistryTest.java | 193 +++++++++++++++++++
 .../flink/runtime/filecache/FileCache.java      |  42 ++--
 .../state/AbstractKeyedStateBackend.java        |   5 +-
 .../flink/runtime/state/ClosableRegistry.java   | 108 -----------
 .../state/DefaultOperatorStateBackend.java      |   5 +-
 .../state/StateInitializationContextImpl.java   |  15 +-
 .../StateSnapshotContextSynchronousImpl.java    |   5 +-
 .../state/filesystem/FileStateHandle.java       |  18 +-
 .../apache/flink/runtime/taskmanager/Task.java  |   5 +
 .../streaming/runtime/tasks/StreamTask.java     |   6 +-
 .../StateInitializationContextImplTest.java     |   6 +-
 ...StateSnapshotContextSynchronousImplTest.java |   4 +-
 .../util/AbstractStreamOperatorTestHarness.java |  10 +-
 .../test/checkpointing/RescalingITCase.java     |   1 +
 29 files changed, 1324 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
index a6fc17e..07f48fc 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
@@ -18,20 +18,6 @@
 
 package org.apache.flink.api.common.operators;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
@@ -58,6 +44,7 @@ import org.apache.flink.api.common.operators.util.TypeComparable;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.metrics.MetricGroup;
@@ -65,6 +52,20 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Visitor;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 /**
  * Execution utility for serial, local, collection-based executions of Flink programs.
  */
@@ -571,7 +572,7 @@ public class CollectionExecutor {
 
 		public CompletedFuture(Path entry) {
 			try{
-				LocalFileSystem fs = (LocalFileSystem) entry.getFileSystem();
+				LocalFileSystem fs = (LocalFileSystem) FileSystem.getUnguardedFileSystem(entry.toUri());
 				result = entry.isAbsolute() ? new Path(entry.toUri().getPath()): new Path(fs.getWorkingDirectory(),entry);
 			} catch (Exception e){
 				throw new RuntimeException("DistributedCache supports only local files for Collection Environments");

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
new file mode 100644
index 0000000..81ba7ab
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.AbstractCloseableRegistry;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class allows to register instances of {@link Closeable}, which are all closed if this registry is closed.
+ * <p>
+ * Registering to an already closed registry will throw an exception and close the provided {@link Closeable}
+ * <p>
+ * All methods in this class are thread-safe.
+ */
+public class CloseableRegistry extends AbstractCloseableRegistry<Closeable, Object> {
+
+	private static final Object DUMMY = new Object();
+
+	public CloseableRegistry() {
+		super(new HashMap<Closeable, Object>());
+	}
+
+	@Override
+	protected void doRegister(Closeable closeable, Map<Closeable, Object> closeableMap) throws IOException {
+		closeableMap.put(closeable, DUMMY);
+	}
+
+	@Override
+	protected void doUnRegister(Closeable closeable, Map<Closeable, Object> closeableMap) {
+		closeableMap.remove(closeable);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
new file mode 100644
index 0000000..23ac4f2
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * This class is a {@link org.apache.flink.util.WrappingProxy} for {@link FSDataInputStream} that is used to
+ * implement a safety net against unclosed streams.
+ * <p>
+ * See {@link SafetyNetCloseableRegistry} for more details on how this is utilized.
+ */
+public class ClosingFSDataInputStream
+		extends FSDataInputStreamWrapper
+		implements WrappingProxyCloseable<FSDataInputStream> {
+
+	private final SafetyNetCloseableRegistry registry;
+	private final String debugInfo;
+
+	private volatile boolean closed;
+
+	private ClosingFSDataInputStream(
+			FSDataInputStream delegate, SafetyNetCloseableRegistry registry, String debugInfo) throws IOException {
+		super(delegate);
+		this.registry = Preconditions.checkNotNull(registry);
+		this.debugInfo = Preconditions.checkNotNull(debugInfo);
+		this.closed = false;
+	}
+
+	public boolean isClosed() {
+		return closed;
+	}
+
+	@Override
+	public void close() throws IOException {
+		if (!closed) {
+			closed = true;
+			registry.unregisterClosable(this);
+			inputStream.close();
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return inputStream.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+
+		if (this == obj) {
+			return true;
+		}
+
+		if (obj instanceof ClosingFSDataInputStream) {
+			return inputStream.equals(((ClosingFSDataInputStream) obj).inputStream);
+		}
+
+		return false;
+	}
+
+	@Override
+	public String toString() {
+		return "ClosingFSDataInputStream(" + inputStream.toString() + ") : " + debugInfo;
+	}
+
+	public static ClosingFSDataInputStream wrapSafe(
+			FSDataInputStream delegate, SafetyNetCloseableRegistry registry) throws IOException{
+		return wrapSafe(delegate, registry, "");
+	}
+
+	public static ClosingFSDataInputStream wrapSafe(
+			FSDataInputStream delegate, SafetyNetCloseableRegistry registry, String debugInfo) throws IOException{
+
+		ClosingFSDataInputStream inputStream = new ClosingFSDataInputStream(delegate, registry, debugInfo);
+		registry.registerClosable(inputStream);
+		return inputStream;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
new file mode 100644
index 0000000..120ca67
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * This class is a {@link org.apache.flink.util.WrappingProxy} for {@link FSDataOutputStream} that is used to
+ * implement a safety net against unclosed streams.
+ * <p>
+ * See {@link SafetyNetCloseableRegistry} for more details on how this is utilized.
+ */
+public class ClosingFSDataOutputStream
+		extends FSDataOutputStreamWrapper
+		implements WrappingProxyCloseable<FSDataOutputStream> {
+
+	private final SafetyNetCloseableRegistry registry;
+	private final String debugString;
+
+	private volatile boolean closed;
+
+	public ClosingFSDataOutputStream(
+			FSDataOutputStream delegate, SafetyNetCloseableRegistry registry) throws IOException {
+		this(delegate, registry, "");
+	}
+
+	private ClosingFSDataOutputStream(
+			FSDataOutputStream delegate, SafetyNetCloseableRegistry registry, String debugString) throws IOException {
+		super(delegate);
+		this.registry = Preconditions.checkNotNull(registry);
+		this.debugString = Preconditions.checkNotNull(debugString);
+		this.closed = false;
+	}
+
+	public boolean isClosed() {
+		return closed;
+	}
+
+	@Override
+	public void close() throws IOException {
+		if (!closed) {
+			closed = true;
+			registry.unregisterClosable(this);
+			outputStream.close();
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return outputStream.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+
+		if (this == obj) {
+			return true;
+		}
+
+		if (obj instanceof ClosingFSDataOutputStream) {
+			return outputStream.equals(((ClosingFSDataOutputStream) obj).outputStream);
+		}
+
+		return false;
+	}
+
+	@Override
+	public String toString() {
+		return "ClosingFSDataOutputStream(" + outputStream.toString() + ") : " + debugString;
+	}
+
+	public static ClosingFSDataOutputStream wrapSafe(
+			FSDataOutputStream delegate, SafetyNetCloseableRegistry registry) throws IOException {
+		return wrapSafe(delegate, registry, "");
+	}
+
+	public static ClosingFSDataOutputStream wrapSafe(
+			FSDataOutputStream delegate, SafetyNetCloseableRegistry registry, String debugInfo) throws IOException {
+
+		ClosingFSDataOutputStream inputStream = new ClosingFSDataOutputStream(delegate, registry, debugInfo);
+		registry.registerClosable(inputStream);
+		return inputStream;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java
new file mode 100644
index 0000000..507b756
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingProxy;
+
+import java.io.IOException;
+
+/**
+ * Simple forwarding wrapper around {@link FSDataInputStream}
+ */
+public class FSDataInputStreamWrapper extends FSDataInputStream implements WrappingProxy<FSDataInputStream> {
+
+	protected final FSDataInputStream inputStream;
+
+	public FSDataInputStreamWrapper(FSDataInputStream inputStream) {
+		this.inputStream = Preconditions.checkNotNull(inputStream);
+	}
+
+	@Override
+	public void seek(long desired) throws IOException {
+		inputStream.seek(desired);
+	}
+
+	@Override
+	public long getPos() throws IOException {
+		return inputStream.getPos();
+	}
+
+	@Override
+	public int read() throws IOException {
+		return inputStream.read();
+	}
+
+	@Override
+	public int read(byte[] b) throws IOException {
+		return inputStream.read(b);
+	}
+
+	@Override
+	public int read(byte[] b, int off, int len) throws IOException {
+		return inputStream.read(b, off, len);
+	}
+
+	@Override
+	public long skip(long n) throws IOException {
+		return inputStream.skip(n);
+	}
+
+	@Override
+	public int available() throws IOException {
+		return inputStream.available();
+	}
+
+	@Override
+	public void close() throws IOException {
+		inputStream.close();
+	}
+
+	@Override
+	public void mark(int readlimit) {
+		inputStream.mark(readlimit);
+	}
+
+	@Override
+	public void reset() throws IOException {
+		inputStream.reset();
+	}
+
+	@Override
+	public boolean markSupported() {
+		return inputStream.markSupported();
+	}
+
+	@Override
+	public FSDataInputStream getWrappedDelegate() {
+		return inputStream;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java
new file mode 100644
index 0000000..36ebe10
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingProxy;
+
+import java.io.IOException;
+
+/**
+ * Simple forwarding wrapper around {@link FSDataInputStream}
+ */
+public class FSDataOutputStreamWrapper extends FSDataOutputStream implements WrappingProxy<FSDataOutputStream> {
+
+	protected final FSDataOutputStream outputStream;
+
+	public FSDataOutputStreamWrapper(FSDataOutputStream outputStream) {
+		this.outputStream = Preconditions.checkNotNull(outputStream);
+	}
+
+	@Override
+	public long getPos() throws IOException {
+		return outputStream.getPos();
+	}
+
+	@Override
+	public void flush() throws IOException {
+		outputStream.flush();
+	}
+
+	@Override
+	public void sync() throws IOException {
+		outputStream.sync();
+	}
+
+	@Override
+	public void write(int b) throws IOException {
+		outputStream.write(b);
+	}
+
+	@Override
+	public void write(byte[] b) throws IOException {
+		outputStream.write(b);
+	}
+
+	@Override
+	public void write(byte[] b, int off, int len) throws IOException {
+		outputStream.write(b, off, len);
+	}
+
+	@Override
+	public void close() throws IOException {
+		outputStream.close();
+	}
+
+	@Override
+	public FSDataOutputStream getWrappedDelegate() {
+		return outputStream;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 1844d64..5a608b5 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -25,6 +25,14 @@
 
 package org.apache.flink.core.fs;
 
+import org.apache.flink.annotation.Public;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -34,11 +42,6 @@ import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.flink.annotation.Public;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.OperatingSystem;
-
 /**
  * An abstract base class for a fairly generic file system. It
  * may be implemented as a distributed file system, or as a local
@@ -47,6 +50,8 @@ import org.apache.flink.util.OperatingSystem;
 @Public
 public abstract class FileSystem {
 
+	private static final InheritableThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new InheritableThreadLocal<>();
+
 	private static final String LOCAL_FILESYSTEM_CLASS = "org.apache.flink.core.fs.local.LocalFileSystem";
 
 	private static final String HADOOP_WRAPPER_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFileSystem";
@@ -55,6 +60,39 @@ public abstract class FileSystem {
 
 	private static final String HADOOP_WRAPPER_SCHEME = "hdwrapper";
 
+	private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class);
+
+	/**
+	 * Create a SafetyNetCloseableRegistry for a Task. This method should be called at the beginning of the task's
+	 * main thread.
+	 */
+	public static void createFileSystemCloseableRegistryForTask() {
+		SafetyNetCloseableRegistry oldRegistry = REGISTRIES.get();
+		if (null != oldRegistry) {
+			IOUtils.closeQuietly(oldRegistry);
+			LOG.warn("Found existing SafetyNetCloseableRegistry. Closed and replaced it.");
+		}
+		SafetyNetCloseableRegistry newRegistry = new SafetyNetCloseableRegistry();
+		REGISTRIES.set(newRegistry);
+	}
+
+	/**
+	 * Create a SafetyNetCloseableRegistry for a Task. This method should be called at the end of the task's
+	 * main thread or when the task should be canceled.
+	 */
+	public static void disposeFileSystemCloseableRegistryForTask() {
+		SafetyNetCloseableRegistry registry = REGISTRIES.get();
+		if (null != registry) {
+			LOG.info("Ensuring all FileSystem streams are closed");
+			REGISTRIES.remove();
+			IOUtils.closeQuietly(registry);
+		}
+	}
+
+	private static FileSystem wrapWithSafetyNetWhenInTask(FileSystem fs) {
+		SafetyNetCloseableRegistry reg = REGISTRIES.get();
+		return reg != null ? new SafetyNetWrapperFileSystem(fs, reg) : fs;
+	}
 
 	/** Object used to protect calls to specific methods.*/
 	private static final Object SYNCHRONIZATION_OBJECT = new Object();
@@ -63,7 +101,7 @@ public abstract class FileSystem {
 	 * Enumeration for write modes.
 	 *
 	 */
-	public static enum WriteMode {
+	public enum WriteMode {
 
 		/** Creates write path if it does not exist. Does not overwrite existing files and directories. */
 		NO_OVERWRITE,
@@ -214,18 +252,7 @@ public abstract class FileSystem {
 		}
 	}
 
-	/**
-	 * Returns a reference to the {@link FileSystem} instance for accessing the
-	 * file system identified by the given {@link URI}.
-	 *
-	 * @param uri
-	 *        the {@link URI} identifying the file system
-	 * @return a reference to the {@link FileSystem} instance for accessing the file system identified by the given
-	 *         {@link URI}.
-	 * @throws IOException
-	 *         thrown if a reference to the file system instance could not be obtained
-	 */
-	public static FileSystem get(URI uri) throws IOException {
+	public static FileSystem getUnguardedFileSystem(URI uri) throws IOException {
 		FileSystem fs;
 
 		URI asked = uri;
@@ -238,13 +265,13 @@ public abstract class FileSystem {
 					}
 
 					uri = new URI(defaultScheme.getScheme(), null, defaultScheme.getHost(),
-						defaultScheme.getPort(), uri.getPath(), null, null);
+							defaultScheme.getPort(), uri.getPath(), null, null);
 
 				} catch (URISyntaxException e) {
 					try {
 						if (defaultScheme.getScheme().equals("file")) {
 							uri = new URI("file", null,
-								new Path(new File(uri.getPath()).getAbsolutePath()).toUri().getPath(), null);
+									new Path(new File(uri.getPath()).getAbsolutePath()).toUri().getPath(), null);
 						}
 					} catch (URISyntaxException ex) {
 						// we tried to repair it, but could not. report the scheme error
@@ -255,8 +282,8 @@ public abstract class FileSystem {
 
 			if(uri.getScheme() == null) {
 				throw new IOException("The URI '" + uri + "' is invalid.\n" +
-					"The fs.default-scheme = " + defaultScheme + ", the requested URI = " + asked +
-					", and the final URI = " + uri + ".");
+						"The fs.default-scheme = " + defaultScheme + ", the requested URI = " + asked +
+						", and the final URI = " + uri + ".");
 			}
 
 			if (uri.getScheme().equals("file") && uri.getAuthority() != null && !uri.getAuthority().isEmpty()) {
@@ -294,7 +321,7 @@ public abstract class FileSystem {
 				} else {
 					// we can not read from this file system.
 					throw new IOException("No file system found with scheme " + uri.getScheme()
-						+ ", referenced in file URI '" + uri.toString() + "'.");
+							+ ", referenced in file URI '" + uri.toString() + "'.");
 				}
 			} else {
 				// we end up here if we have a file system with build-in flink support.
@@ -316,6 +343,21 @@ public abstract class FileSystem {
 	}
 
 	/**
+	 * Returns a reference to the {@link FileSystem} instance for accessing the
+	 * file system identified by the given {@link URI}.
+	 *
+	 * @param uri
+	 *        the {@link URI} identifying the file system
+	 * @return a reference to the {@link FileSystem} instance for accessing the file system identified by the given
+	 *         {@link URI}.
+	 * @throws IOException
+	 *         thrown if a reference to the file system instance could not be obtained
+	 */
+	public static FileSystem get(URI uri) throws IOException {
+		return wrapWithSafetyNetWhenInTask(getUnguardedFileSystem(uri));
+	}
+
+	/**
 	 * Returns a boolean indicating whether a scheme has built-in Flink support.
 	 *
 	 * @param scheme

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
new file mode 100644
index 0000000..de4fb30
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.AbstractCloseableRegistry;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingProxyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.ref.PhantomReference;
+import java.lang.ref.ReferenceQueue;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This implementation of an {@link AbstractCloseableRegistry} registers {@link WrappingProxyCloseable}. When
+ * the proxy becomes subject to GC, this registry takes care of closing unclosed {@link Closeable}s.
+ * <p>
+ * Phantom references are used to track when {@link org.apache.flink.util.WrappingProxy}s of {@link Closeable} got
+ * GC'ed. We ensure that the wrapped {@link Closeable} is properly closed to avoid resource leaks.
+ * <p>
+ * Other than that, it works like a normal {@link CloseableRegistry}.
+ * <p>
+ * All methods in this class are thread-safe.
+ */
+public class SafetyNetCloseableRegistry extends
+		AbstractCloseableRegistry<WrappingProxyCloseable<? extends Closeable>,
+				SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(SafetyNetCloseableRegistry.class);
+	private final ReferenceQueue<WrappingProxyCloseable<? extends Closeable>> referenceQueue;
+	private final Thread reaperThread;
+
+	public SafetyNetCloseableRegistry() {
+		super(new IdentityHashMap<Closeable, PhantomDelegatingCloseableRef>());
+		this.referenceQueue = new ReferenceQueue<>();
+		this.reaperThread = new CloseableReaperThread();
+		reaperThread.start();
+	}
+
+	@Override
+	protected void doRegister(
+			WrappingProxyCloseable<? extends Closeable> wrappingProxyCloseable,
+			Map<Closeable, PhantomDelegatingCloseableRef> closeableMap) throws IOException {
+
+		Closeable innerCloseable = WrappingProxyUtil.stripProxy(wrappingProxyCloseable.getWrappedDelegate());
+
+		if (null == innerCloseable) {
+			return;
+		}
+
+		PhantomDelegatingCloseableRef phantomRef =
+				new PhantomDelegatingCloseableRef(wrappingProxyCloseable, referenceQueue);
+
+		closeableMap.put(innerCloseable, phantomRef);
+	}
+
+	@Override
+	protected void doUnRegister(
+			WrappingProxyCloseable<? extends Closeable> closeable,
+			Map<Closeable, PhantomDelegatingCloseableRef> closeableMap) {
+
+		Closeable innerCloseable = WrappingProxyUtil.stripProxy(closeable.getWrappedDelegate());
+
+		if (null == innerCloseable) {
+			return;
+		}
+
+		closeableMap.remove(innerCloseable);
+	}
+
+	/**
+	 * Phantom reference to {@link WrappingProxyCloseable}.
+	 */
+	static final class PhantomDelegatingCloseableRef
+			extends PhantomReference<WrappingProxyCloseable<? extends Closeable>>
+			implements Closeable {
+
+		private final Closeable innerCloseable;
+		private final String debugString;
+
+		public PhantomDelegatingCloseableRef(
+				WrappingProxyCloseable<? extends Closeable> referent,
+				ReferenceQueue<? super WrappingProxyCloseable<? extends Closeable>> q) {
+
+			super(referent, q);
+			this.innerCloseable = Preconditions.checkNotNull(WrappingProxyUtil.stripProxy(referent));
+			this.debugString = referent.toString();
+		}
+
+		public Closeable getInnerCloseable() {
+			return innerCloseable;
+		}
+
+		public String getDebugString() {
+			return debugString;
+		}
+
+		@Override
+		public void close() throws IOException {
+			innerCloseable.close();
+		}
+	}
+
+	/**
+	 * Reaper runnable collects and closes leaking resources
+	 */
+	final class CloseableReaperThread extends Thread {
+
+		public CloseableReaperThread() {
+			super("CloseableReaperThread");
+			this.running = false;
+		}
+
+		private volatile boolean running;
+
+		@Override
+		public void run() {
+			this.running = true;
+			try {
+				List<PhantomDelegatingCloseableRef> closeableList = new LinkedList<>();
+				while (running) {
+					PhantomDelegatingCloseableRef oldRef = (PhantomDelegatingCloseableRef) referenceQueue.remove();
+					synchronized (getSynchronizationLock()) {
+						do {
+							closeableList.add(oldRef);
+							closeableToRef.remove(oldRef.getInnerCloseable());
+						}
+						while ((oldRef = (PhantomDelegatingCloseableRef) referenceQueue.poll()) != null);
+					}
+
+					// close outside the synchronized block in case this is blocking
+					for (PhantomDelegatingCloseableRef closeableRef : closeableList) {
+						IOUtils.closeQuietly(closeableRef);
+						if (LOG.isDebugEnabled()) {
+							LOG.debug("Closing unclosed resource: " + closeableRef.getDebugString());
+						}
+					}
+
+					closeableList.clear();
+				}
+			} catch (InterruptedException e) {
+				// done
+			}
+		}
+
+		@Override
+		public void interrupt() {
+			this.running = false;
+			super.interrupt();
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+		super.close();
+		reaperThread.interrupt();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
new file mode 100644
index 0000000..bf30b4f
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingProxy;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * This is a {@link WrappingProxy} around {@link FileSystem} which (i) wraps all opened streams as
+ * {@link ClosingFSDataInputStream} or {@link ClosingFSDataOutputStream} and (ii) registers them to
+ * a {@link SafetyNetCloseableRegistry}.
+ *
+ * Streams obtained by this are therefore managed by the {@link SafetyNetCloseableRegistry} to prevent resource leaks
+ * from unclosed streams.
+ */
+public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingProxy<FileSystem> {
+
+	private final SafetyNetCloseableRegistry registry;
+	private final FileSystem unsafeFileSystem;
+
+	public SafetyNetWrapperFileSystem(FileSystem unsafeFileSystem, SafetyNetCloseableRegistry registry) {
+		this.registry = Preconditions.checkNotNull(registry);
+		this.unsafeFileSystem = Preconditions.checkNotNull(unsafeFileSystem);
+	}
+
+	@Override
+	public Path getWorkingDirectory() {
+		return unsafeFileSystem.getWorkingDirectory();
+	}
+
+	@Override
+	public Path getHomeDirectory() {
+		return unsafeFileSystem.getHomeDirectory();
+	}
+
+	@Override
+	public URI getUri() {
+		return unsafeFileSystem.getUri();
+	}
+
+	@Override
+	public void initialize(URI name) throws IOException {
+		unsafeFileSystem.initialize(name);
+	}
+
+	@Override
+	public FileStatus getFileStatus(Path f) throws IOException {
+		return unsafeFileSystem.getFileStatus(f);
+	}
+
+	@Override
+	public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
+		return unsafeFileSystem.getFileBlockLocations(file, start, len);
+	}
+
+	@Override
+	public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+		FSDataInputStream innerStream = unsafeFileSystem.open(f, bufferSize);
+		return ClosingFSDataInputStream.wrapSafe(innerStream, registry, String.valueOf(f));
+	}
+
+	@Override
+	public FSDataInputStream open(Path f) throws IOException {
+		FSDataInputStream innerStream = unsafeFileSystem.open(f);
+		return ClosingFSDataInputStream.wrapSafe(innerStream, registry, String.valueOf(f));
+	}
+
+	@Override
+	public long getDefaultBlockSize() {
+		return unsafeFileSystem.getDefaultBlockSize();
+	}
+
+	@Override
+	public FileStatus[] listStatus(Path f) throws IOException {
+		return unsafeFileSystem.listStatus(f);
+	}
+
+	@Override
+	public boolean exists(Path f) throws IOException {
+		return unsafeFileSystem.exists(f);
+	}
+
+	@Override
+	public boolean delete(Path f, boolean recursive) throws IOException {
+		return unsafeFileSystem.delete(f, recursive);
+	}
+
+	@Override
+	public boolean mkdirs(Path f) throws IOException {
+		return unsafeFileSystem.mkdirs(f);
+	}
+
+	@Override
+	public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize)
+			throws IOException {
+
+		FSDataOutputStream innerStream = unsafeFileSystem.create(f, overwrite, bufferSize, replication, blockSize);
+		return ClosingFSDataOutputStream.wrapSafe(innerStream, registry, String.valueOf(f));
+	}
+
+	@Override
+	public FSDataOutputStream create(Path f, boolean overwrite) throws IOException {
+		FSDataOutputStream innerStream = unsafeFileSystem.create(f, overwrite);
+		return ClosingFSDataOutputStream.wrapSafe(innerStream, registry, String.valueOf(f));
+	}
+
+	@Override
+	public boolean rename(Path src, Path dst) throws IOException {
+		return unsafeFileSystem.rename(src, dst);
+	}
+
+	@Override
+	public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException {
+		return unsafeFileSystem.initOutPathLocalFS(outPath, writeMode, createDirectory);
+	}
+
+	@Override
+	public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException {
+		return unsafeFileSystem.initOutPathDistFS(outPath, writeMode, createDirectory);
+	}
+
+	@Override
+	public boolean isDistributedFS() {
+		return unsafeFileSystem.isDistributedFS();
+	}
+
+	@Override
+	public FileSystem getWrappedDelegate() {
+		return unsafeFileSystem;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java b/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java
new file mode 100644
index 0000000..b74fc78
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.WrappingProxy;
+
+import java.io.Closeable;
+
+/**
+ * {@link WrappingProxy} for {@link Closeable} that is also closeable.
+ */
+public interface WrappingProxyCloseable<T extends Closeable> extends Closeable, WrappingProxy<T> {
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
new file mode 100644
index 0000000..7c0291c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * This is the abstract base class for registries that allow to register instances of {@link Closeable}, which are all
+ * closed if this registry is closed.
+ * <p>
+ * Registering to an already closed registry will throw an exception and close the provided {@link Closeable}
+ * <p>
+ * All methods in this class are thread-safe.
+ *
+ * @param <C> Type of the closeable this registers
+ * @param <T> Type for potential meta data associated with the registering closeables
+ */
+public abstract class AbstractCloseableRegistry<C extends Closeable, T> implements Closeable {
+
+	protected final Map<Closeable, T> closeableToRef;
+	private boolean closed;
+
+	public AbstractCloseableRegistry(Map<Closeable, T> closeableToRef) {
+		this.closeableToRef = closeableToRef;
+		this.closed = false;
+	}
+
+	/**
+	 * Registers a {@link Closeable} with the registry. In case the registry is already closed, this method throws an
+	 * {@link IllegalStateException} and closes the passed {@link Closeable}.
+	 *
+	 * @param closeable Closeable tor register
+	 * @return true if the the Closeable was newly added to the registry
+	 * @throws IOException exception when the registry was closed before
+	 */
+	public final void registerClosable(C closeable) throws IOException {
+
+		if (null == closeable) {
+			return;
+		}
+
+		synchronized (getSynchronizationLock()) {
+			if (closed) {
+				IOUtils.closeQuietly(closeable);
+				throw new IOException("Cannot register Closeable, registry is already closed. Closing argument.");
+			}
+
+			doRegister(closeable, closeableToRef);
+		}
+	}
+
+	/**
+	 * Removes a {@link Closeable} from the registry.
+	 *
+	 * @param closeable instance to remove from the registry.
+	 * @return true, if the instance was actually registered and now removed
+	 */
+	public final void unregisterClosable(C closeable) {
+
+		if (null == closeable) {
+			return;
+		}
+
+		synchronized (getSynchronizationLock()) {
+			doUnRegister(closeable, closeableToRef);
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+		synchronized (getSynchronizationLock()) {
+
+			for (Closeable closeable : closeableToRef.keySet()) {
+				IOUtils.closeQuietly(closeable);
+			}
+
+			closeableToRef.clear();
+
+			closed = true;
+		}
+	}
+
+	public boolean isClosed() {
+		synchronized (getSynchronizationLock()) {
+			return closed;
+		}
+	}
+
+	protected final Object getSynchronizationLock() {
+		return closeableToRef;
+	}
+
+	protected abstract void doUnRegister(C closeable, Map<Closeable, T> closeableMap);
+
+	protected abstract void doRegister(C closeable, Map<Closeable, T> closeableMap) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
index 12d70ce..9810271 100644
--- a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
@@ -18,14 +18,15 @@
 
 package org.apache.flink.util;
 
+import org.slf4j.Logger;
+
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.net.Socket;
 
-import org.slf4j.Logger;
-
 /**
  * An utility class for I/O related functionality.
  * 
@@ -213,6 +214,16 @@ public final class IOUtils {
 			}
 		}
 	}
+
+	public static void closeQuietly(Closeable closeable) {
+		try {
+			if (closeable != null) {
+				closeable.close();
+			}
+		} catch (IOException ignored) {
+
+		}
+	}
 	
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/util/WrappingProxy.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/WrappingProxy.java b/flink-core/src/main/java/org/apache/flink/util/WrappingProxy.java
new file mode 100644
index 0000000..82fcf04
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/WrappingProxy.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+public interface WrappingProxy<T> {
+
+	T getWrappedDelegate();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java b/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
new file mode 100644
index 0000000..0f62abd
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+public final class WrappingProxyUtil {
+
+	private WrappingProxyUtil() {
+		throw new AssertionError();
+	}
+
+	public static <T> T stripProxy(T object) {
+		while (object instanceof WrappingProxy) {
+			object = ((WrappingProxy<T>) object).getWrappedDelegate();
+		}
+		return object;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
index 04ebc0e..1bde2fb 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
@@ -17,32 +17,37 @@
  */
 package org.apache.flink.core.fs;
 
+import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.util.WrappingProxyUtil;
+import org.junit.Test;
+
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import org.apache.flink.core.fs.local.LocalFileSystem;
-import org.junit.Test;
-import static org.junit.Assert.*;
+
+import static org.junit.Assert.assertTrue;
 
 public class FileSystemTest {
+
 	@Test
 	public void testGet() throws URISyntaxException, IOException {
 		String scheme = "file";
-		
-		assertTrue(FileSystem.get(new URI(scheme + ":///test/test")) instanceof LocalFileSystem);
-		
+
+		assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI(scheme + ":///test/test"))) instanceof LocalFileSystem);
+
 		try {
 			FileSystem.get(new URI(scheme + "://test/test"));
 		} catch (IOException ioe) {
 			assertTrue(ioe.getMessage().startsWith("Found local file path with authority '"));
 		}
 
-		assertTrue(FileSystem.get(new URI(scheme + ":/test/test")) instanceof LocalFileSystem);
-		
-		assertTrue(FileSystem.get(new URI(scheme + ":test/test")) instanceof LocalFileSystem);
+		assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI(scheme + ":/test/test"))) instanceof LocalFileSystem);
 
-		assertTrue(FileSystem.get(new URI("/test/test")) instanceof LocalFileSystem);
-		
-		assertTrue(FileSystem.get(new URI("test/test")) instanceof LocalFileSystem);
+		assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI(scheme + ":test/test"))) instanceof LocalFileSystem);
+
+		assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI("/test/test"))) instanceof LocalFileSystem);
+
+		assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI("test/test"))) instanceof LocalFileSystem);
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
new file mode 100644
index 0000000..6628407
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class SafetyNetCloseableRegistryTest {
+
+	private ProducerThread[] streamOpenThreads;
+	private SafetyNetCloseableRegistry closeableRegistry;
+	private AtomicInteger unclosedCounter;
+
+	@Before
+	public void setup() {
+		this.closeableRegistry = new SafetyNetCloseableRegistry();
+		this.unclosedCounter = new AtomicInteger(0);
+		this.streamOpenThreads = new ProducerThread[10];
+		for (int i = 0; i < streamOpenThreads.length; ++i) {
+			streamOpenThreads[i] = new ProducerThread(closeableRegistry, unclosedCounter, Integer.MAX_VALUE);
+		}
+	}
+
+	private void startThreads(int maxStreams) {
+		for (ProducerThread t : streamOpenThreads) {
+			t.setMaxStreams(maxStreams);
+			t.start();
+		}
+	}
+
+	private void joinThreads() throws InterruptedException {
+		for (Thread t : streamOpenThreads) {
+			t.join();
+		}
+	}
+
+	@Test
+	public void testClose() throws Exception {
+
+		startThreads(Integer.MAX_VALUE);
+
+		for (int i = 0; i < 5; ++i) {
+			System.gc();
+			Thread.sleep(40);
+		}
+
+		closeableRegistry.close();
+
+		joinThreads();
+
+		Assert.assertEquals(0, unclosedCounter.get());
+
+		try {
+
+			WrappingProxyCloseable<Closeable> testCloseable = new WrappingProxyCloseable<Closeable>() {
+				@Override
+				public Closeable getWrappedDelegate() {
+					return this;
+				}
+
+				@Override
+				public void close() throws IOException {
+					unclosedCounter.incrementAndGet();
+				}
+			};
+
+			closeableRegistry.registerClosable(testCloseable);
+
+			Assert.fail("Closed registry should not accept closeables!");
+
+		} catch (IOException expected) {
+			//expected
+		}
+
+		Assert.assertEquals(1, unclosedCounter.get());
+	}
+
+	@Test
+	public void testSafetyNetClose() throws Exception {
+
+		startThreads(20);
+
+		joinThreads();
+
+		for (int i = 0; i < 5 && unclosedCounter.get() > 0; ++i) {
+			System.gc();
+			Thread.sleep(50);
+		}
+
+		Assert.assertEquals(0, unclosedCounter.get());
+		closeableRegistry.close();
+	}
+
+	private static final class ProducerThread extends Thread {
+
+		private final SafetyNetCloseableRegistry registry;
+		private final AtomicInteger refCount;
+		private int maxStreams;
+
+		public ProducerThread(SafetyNetCloseableRegistry registry, AtomicInteger refCount, int maxStreams) {
+			this.registry = registry;
+			this.refCount = refCount;
+			this.maxStreams = maxStreams;
+		}
+
+		public int getMaxStreams() {
+			return maxStreams;
+		}
+
+		public void setMaxStreams(int maxStreams) {
+			this.maxStreams = maxStreams;
+		}
+
+		@Override
+		public void run() {
+			try {
+				int count = 0;
+				while (maxStreams > 0) {
+					String debug = Thread.currentThread().getName() + " " + count;
+					TestStream testStream = new TestStream(refCount);
+					refCount.incrementAndGet();
+					ClosingFSDataInputStream pis = ClosingFSDataInputStream.wrapSafe(testStream, registry, debug); //reference dies here
+
+					try {
+						Thread.sleep(2);
+					} catch (InterruptedException e) {
+
+					}
+
+					if (maxStreams != Integer.MAX_VALUE) {
+						--maxStreams;
+					}
+					++count;
+				}
+			} catch (Exception ex) {
+
+			}
+		}
+	}
+
+	private static final class TestStream extends FSDataInputStream {
+
+		private AtomicInteger refCount;
+
+		public TestStream(AtomicInteger refCount) {
+			this.refCount = refCount;
+		}
+
+		@Override
+		public void seek(long desired) throws IOException {
+
+		}
+
+		@Override
+		public long getPos() throws IOException {
+			return 0;
+		}
+
+		@Override
+		public int read() throws IOException {
+			return 0;
+		}
+
+		@Override
+		public void close() throws IOException {
+			if (refCount != null) {
+				refCount.decrementAndGet();
+				refCount = null;
+			}
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index b5bdcaf..d79be05 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -18,19 +18,8 @@
 
 package org.apache.flink.runtime.filecache;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.ConfigConstants;
@@ -40,13 +29,23 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.IOUtils;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
 /**
  * The FileCache is used to create the local files for the registered cache files when a task is deployed.
  * The files will be removed when the task is unregistered after a 5 second delay.
@@ -236,8 +235,10 @@ public class FileCache {
 	// ------------------------------------------------------------------------
 
 	public static void copy(Path sourcePath, Path targetPath, boolean executable) throws IOException {
-		FileSystem sFS = sourcePath.getFileSystem();
-		FileSystem tFS = targetPath.getFileSystem();
+		// TODO rewrite this to make it participate in the closable registry and the lifecycle of a task.
+		// we unwrap the file system to get raw streams without safety net
+		FileSystem sFS = FileSystem.getUnguardedFileSystem(sourcePath.toUri());
+		FileSystem tFS = FileSystem.getUnguardedFileSystem(targetPath.toUri());
 		if (!tFS.exists(targetPath)) {
 			if (sFS.getFileStatus(sourcePath).isDir()) {
 				tFS.mkdirs(targetPath);
@@ -253,16 +254,11 @@ public class FileCache {
 					copy(content.getPath(), new Path(localPath), executable);
 				}
 			} else {
-				try {
-					FSDataOutputStream lfsOutput = tFS.create(targetPath, false);
-					FSDataInputStream fsInput = sFS.open(sourcePath);
+				try (FSDataOutputStream lfsOutput = tFS.create(targetPath, false); FSDataInputStream fsInput = sFS.open(sourcePath)) {
 					IOUtils.copyBytes(fsInput, lfsOutput);
 					//noinspection ResultOfMethodCallIgnored
 					new File(targetPath.toString()).setExecutable(executable);
-					// closing the FSDataOutputStream
-					lfsOutput.close();
-				}
-				catch (IOException ioe) {
+				} catch (IOException ioe) {
 					LOG.error("could not copy file to local file cache.", ioe);
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index e5d9b2b..ae71c7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.util.Preconditions;
 
@@ -80,7 +81,7 @@ public abstract class AbstractKeyedStateBackend<K>
 	protected final TaskKvStateRegistry kvStateRegistry;
 
 	/** Registry for all opened streams, so they can be closed if the task using this backend is closed */
-	protected ClosableRegistry cancelStreamRegistry;
+	protected CloseableRegistry cancelStreamRegistry;
 
 	protected final ClassLoader userCodeClassLoader;
 
@@ -96,7 +97,7 @@ public abstract class AbstractKeyedStateBackend<K>
 		this.numberOfKeyGroups = Preconditions.checkNotNull(numberOfKeyGroups);
 		this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
 		this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
-		this.cancelStreamRegistry = new ClosableRegistry();
+		this.cancelStreamRegistry = new CloseableRegistry();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java
deleted file mode 100644
index b5f7dad..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.commons.io.IOUtils;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * This class allows to register instances of {@link Closeable}, which are all closed if this registry is closed.
- * <p>
- * Registering to an already closed registry will throw an exception and close the provided {@link Closeable}
- * <p>
- * All methods in this class are thread-safe.
- */
-public class ClosableRegistry implements Closeable {
-
-	private final Set<Closeable> registeredCloseables;
-	private boolean closed;
-
-	public ClosableRegistry() {
-		this.registeredCloseables = new HashSet<>();
-		this.closed = false;
-	}
-
-	/**
-	 * Registers a {@link Closeable} with the registry. In case the registry is already closed, this method throws an
-	 * {@link IllegalStateException} and closes the passed {@link Closeable}.
-	 *
-	 * @param closeable Closable tor register
-	 * @return true if the the Closable was newly added to the registry
-	 * @throws IOException exception when the registry was closed before
-	 */
-	public boolean registerClosable(Closeable closeable) throws IOException {
-
-		if (null == closeable) {
-			return false;
-		}
-
-		synchronized (getSynchronizationLock()) {
-			if (closed) {
-				IOUtils.closeQuietly(closeable);
-				throw new IOException("Cannot register Closable, registry is already closed. Closed passed closable.");
-			}
-
-			return registeredCloseables.add(closeable);
-		}
-	}
-
-	/**
-	 * Removes a {@link Closeable} from the registry.
-	 *
-	 * @param closeable instance to remove from the registry.
-	 * @return true, if the instance was actually registered and now removed
-	 */
-	public boolean unregisterClosable(Closeable closeable) {
-
-		if (null == closeable) {
-			return false;
-		}
-
-		synchronized (getSynchronizationLock()) {
-			return registeredCloseables.remove(closeable);
-		}
-	}
-
-	@Override
-	public void close() throws IOException {
-		synchronized (getSynchronizationLock()) {
-
-			for (Closeable closeable : registeredCloseables) {
-				IOUtils.closeQuietly(closeable);
-			}
-
-			registeredCloseables.clear();
-			closed = true;
-		}
-	}
-
-	public boolean isClosed() {
-		synchronized (getSynchronizationLock()) {
-			return closed;
-		}
-	}
-
-	private Object getSynchronizationLock() {
-		return registeredCloseables;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 2f5d3cb..5b47362 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.memory.DataInputView;
@@ -49,7 +50,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 	
 	private final Map<String, PartitionableListState<?>> registeredStates;
 	private final Collection<OperatorStateHandle> restoreSnapshots;
-	private final ClosableRegistry closeStreamOnCancelRegistry;
+	private final CloseableRegistry closeStreamOnCancelRegistry;
 	private final JavaSerializer<Serializable> javaSerializer;
 
 	/**
@@ -65,7 +66,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 		this.javaSerializer = new JavaSerializer<>(userClassLoader);
 		this.restoreSnapshots = restoreSnapshots;
 		this.registeredStates = new HashMap<>();
-		this.closeStreamOnCancelRegistry = new ClosableRegistry();
+		this.closeStreamOnCancelRegistry = new CloseableRegistry();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
index 8fbde05..b131d14 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
@@ -22,6 +22,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.util.Preconditions;
 
@@ -36,7 +37,7 @@ import java.util.Iterator;
 public class StateInitializationContextImpl implements StateInitializationContext {
 
 	/** Closable registry to participate in the operator's cancel/close methods */
-	private final ClosableRegistry closableRegistry;
+	private final CloseableRegistry closableRegistry;
 
 	/** Signal whether any state to restore was found */
 	private final boolean restored;
@@ -55,7 +56,7 @@ public class StateInitializationContextImpl implements StateInitializationContex
 			KeyedStateStore keyedStateStore,
 			Collection<KeyGroupsStateHandle> keyGroupsStateHandles,
 			Collection<OperatorStateHandle> operatorStateHandles,
-			ClosableRegistry closableRegistry) {
+			CloseableRegistry closableRegistry) {
 
 		this.restored = restored;
 		this.closableRegistry = Preconditions.checkNotNull(closableRegistry);
@@ -87,7 +88,7 @@ public class StateInitializationContextImpl implements StateInitializationContex
 		return keyGroupsStateHandles;
 	}
 
-	public ClosableRegistry getClosableRegistry() {
+	public CloseableRegistry getClosableRegistry() {
 		return closableRegistry;
 	}
 
@@ -137,14 +138,14 @@ public class StateInitializationContextImpl implements StateInitializationContex
 	private static class KeyGroupStreamIterator implements Iterator<KeyGroupStatePartitionStreamProvider> {
 
 		private final Iterator<KeyGroupsStateHandle> stateHandleIterator;
-		private final ClosableRegistry closableRegistry;
+		private final CloseableRegistry closableRegistry;
 
 		private KeyGroupsStateHandle currentStateHandle;
 		private FSDataInputStream currentStream;
 		private Iterator<Tuple2<Integer, Long>> currentOffsetsIterator;
 
 		public KeyGroupStreamIterator(
-				Iterator<KeyGroupsStateHandle> stateHandleIterator, ClosableRegistry closableRegistry) {
+				Iterator<KeyGroupsStateHandle> stateHandleIterator, CloseableRegistry closableRegistry) {
 
 			this.stateHandleIterator = Preconditions.checkNotNull(stateHandleIterator);
 			this.closableRegistry = Preconditions.checkNotNull(closableRegistry);
@@ -200,7 +201,7 @@ public class StateInitializationContextImpl implements StateInitializationContex
 		private final String stateName; //TODO since we only support a single named state in raw, this could be dropped
 
 		private final Iterator<OperatorStateHandle> stateHandleIterator;
-		private final ClosableRegistry closableRegistry;
+		private final CloseableRegistry closableRegistry;
 
 		private OperatorStateHandle currentStateHandle;
 		private FSDataInputStream currentStream;
@@ -210,7 +211,7 @@ public class StateInitializationContextImpl implements StateInitializationContex
 		public OperatorStateStreamIterator(
 				String stateName,
 				Iterator<OperatorStateHandle> stateHandleIterator,
-				ClosableRegistry closableRegistry) {
+				CloseableRegistry closableRegistry) {
 
 			this.stateName = Preconditions.checkNotNull(stateName);
 			this.stateHandleIterator = Preconditions.checkNotNull(stateHandleIterator);

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
index d632529..ce8a6c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -42,7 +43,7 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext
 	 * Registry for opened streams to participate in the lifecycle of the stream task. Hence, this registry should be 
 	 * obtained from and managed by the stream task.
 	 */
-	private final ClosableRegistry closableRegistry;
+	private final CloseableRegistry closableRegistry;
 
 	private KeyedStateCheckpointOutputStream keyedStateCheckpointOutputStream;
 	private OperatorStateCheckpointOutputStream operatorStateCheckpointOutputStream;
@@ -62,7 +63,7 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext
 			long checkpointTimestamp,
 			CheckpointStreamFactory streamFactory,
 			KeyGroupRange keyGroupRange,
-			ClosableRegistry closableRegistry) {
+			CloseableRegistry closableRegistry) {
 
 		this.checkpointId = checkpointId;
 		this.checkpointTimestamp = checkpointTimestamp;

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
index 29e905c..b61c52d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
@@ -43,9 +43,6 @@ public class FileStateHandle implements StreamStateHandle {
 	/** The size of the state in the file */
 	private final long stateSize;
 
-	/** Cached file system handle */
-	private transient FileSystem fs;
-
 	/**
 	 * Creates a new file state for the given file path.
 	 *
@@ -79,13 +76,17 @@ public class FileStateHandle implements StreamStateHandle {
 	 */
 	@Override
 	public void discardState() throws Exception {
-		getFileSystem().delete(filePath, false);
+
+		FileSystem fs = getFileSystem();
+
+		fs.delete(filePath, false);
 
 		// send a call to delete the checkpoint directory containing the file. This will
 		// fail (and be ignored) when some files still exist
 		try {
-			getFileSystem().delete(filePath.getParent(), false);
-		} catch (IOException ignored) {}
+			fs.delete(filePath.getParent(), false);
+		} catch (IOException ignored) {
+		}
 	}
 
 	/**
@@ -106,10 +107,7 @@ public class FileStateHandle implements StreamStateHandle {
 	 * @throws IOException Thrown if the file system cannot be accessed.
 	 */
 	private FileSystem getFileSystem() throws IOException {
-		if (fs == null) {
-			fs = FileSystem.get(filePath.toUri());
-		}
-		return fs;
+		return FileSystem.get(filePath.toUri());
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 3254fc1..c794f56 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
@@ -538,6 +539,9 @@ public class Task implements Runnable, TaskActions {
 			//  check for canceling as a shortcut
 			// ----------------------------
 
+			// init closeable registry for this task
+			FileSystem.createFileSystemCloseableRegistryForTask();
+
 			// first of all, get a user-code classloader
 			// this may involve downloading the job's JAR files and/or classes
 			LOG.info("Loading JAR files for task " + taskNameWithSubtask);
@@ -758,6 +762,7 @@ public class Task implements Runnable, TaskActions {
 
 				// remove all files in the distributed cache
 				removeCachedFiles(distributedCacheEntries, fileCache);
+				FileSystem.disposeFileSystemCloseableRegistryForTask();
 
 				notifyFinalState();
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 4aaad71..6595901 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
@@ -36,7 +37,6 @@ import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.ClosableRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
@@ -161,7 +161,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 
 	/** The currently active background materialization threads */
-	private final ClosableRegistry cancelables = new ClosableRegistry();
+	private final CloseableRegistry cancelables = new CloseableRegistry();
 
 	/** Flag to mark the task "in operation", in which case check
 	 * needs to be initialized to true, so that early cancel() before invoke() behaves correctly */
@@ -949,7 +949,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		}
 	}
 
-	public ClosableRegistry getCancelables() {
+	public CloseableRegistry getCancelables() {
 		return cancelables;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
index 75c2261..cd94076 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
@@ -20,13 +20,13 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.ClosableRegistry;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
@@ -58,7 +58,7 @@ public class StateInitializationContextImplTest {
 	static final int NUM_HANDLES = 10;
 
 	private StateInitializationContextImpl initializationContext;
-	private ClosableRegistry closableRegistry;
+	private CloseableRegistry closableRegistry;
 
 	private int writtenKeyGroups;
 	private Set<Integer> writtenOperatorStates;
@@ -70,7 +70,7 @@ public class StateInitializationContextImplTest {
 		this.writtenKeyGroups = 0;
 		this.writtenOperatorStates = new HashSet<>();
 
-		this.closableRegistry = new ClosableRegistry();
+		this.closableRegistry = new CloseableRegistry();
 		OperatorStateStore stateStore = mock(OperatorStateStore.class);
 
 		ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos(64);

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
index 0ee839e..2b2df4c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.ClosableRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
 import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
@@ -35,7 +35,7 @@ public class StateSnapshotContextSynchronousImplTest {
 
 	@Before
 	public void setUp() throws Exception {
-		ClosableRegistry closableRegistry = new ClosableRegistry();
+		CloseableRegistry closableRegistry = new CloseableRegistry();
 		CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(1024);
 		KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);
 		this.snapshotContext = new StateSnapshotContextSynchronousImpl(42, 4711, streamFactory, keyGroupRange, closableRegistry);

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 23a31d5..830cd6f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
@@ -32,7 +33,6 @@ import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.ClosableRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
@@ -65,7 +65,9 @@ import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Base class for {@code AbstractStreamOperator} test harnesses.
@@ -86,7 +88,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 
 	final Environment environment;
 
-	ClosableRegistry closableRegistry;
+	CloseableRegistry closableRegistry;
 
 	// use this as default for tests
 	protected AbstractStateBackend stateBackend = new MemoryStateBackend();
@@ -115,7 +117,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 		this.config = new StreamConfig(underlyingConfig);
 		this.config.setCheckpointingEnabled(true);
 		this.executionConfig = new ExecutionConfig();
-		this.closableRegistry = new ClosableRegistry();
+		this.closableRegistry = new CloseableRegistry();
 		this.checkpointLock = new Object();
 
 		environment = new MockEnvironment(

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 5a64173..09de67f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -114,6 +114,7 @@ public class RescalingITCase extends TestLogger {
 	public static void teardown() {
 		if (cluster != null) {
 			cluster.shutdown();
+			cluster.awaitTermination();
 		}
 	}
 


[2/3] flink git commit: [FLINK-5107] Introduced limit for prior execution attempt history

Posted by tr...@apache.org.
[FLINK-5107] Introduced limit for prior execution attempt history

This closes #2837.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f5af7f10
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f5af7f10
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f5af7f10

Branch: refs/heads/master
Commit: f5af7f1025aac9cac3f83de5f0e3aece5730ec0f
Parents: ba8ed26
Author: Stefan Richter <s....@data-artisans.com>
Authored: Fri Nov 18 19:07:56 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 22 23:18:34 2016 +0100

----------------------------------------------------------------------
 ...taskExecutionAttemptAccumulatorsHandler.java |   7 +-
 .../executiongraph/ArchivedExecutionVertex.java |  12 +-
 .../executiongraph/ExecutionJobVertex.java      |  11 +-
 .../runtime/executiongraph/ExecutionVertex.java |  44 +++--
 .../runtime/jobmanager/JobManagerOptions.java   |  38 +++++
 .../flink/runtime/util/EvictingBoundedList.java | 160 ++++++++++++++++++
 .../executiongraph/AllVerticesIteratorTest.java |   9 +-
 .../runtime/util/EvictingBoundedListTest.java   | 164 +++++++++++++++++++
 8 files changed, 422 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index 786f5e8..675304f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
-
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -39,6 +38,12 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
 
 	@Override
 	public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception {
+
+		// return empty string for pruned (== null) execution attempts
+		if (null == execAttempt) {
+			return "";
+		}
+
 		final StringifiedAccumulatorResult[] accs = execAttempt.getUserAccumulatorsStringified();
 		
 		StringWriter writer = new StringWriter();

http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
index e1fb11a..56fc7a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
@@ -19,17 +19,16 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.EvictingBoundedList;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
 
 public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializable {
 
 	private static final long serialVersionUID = -6708241535015028576L;
 	private final int subTaskIndex;
 
-	private final List<ArchivedExecution> priorExecutions;
+	private final EvictingBoundedList<ArchivedExecution> priorExecutions;
 
 	/** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations */
 	private final String taskNameWithSubtask;
@@ -38,9 +37,10 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
 
 	public ArchivedExecutionVertex(ExecutionVertex vertex) {
 		this.subTaskIndex = vertex.getParallelSubtaskIndex();
-		this.priorExecutions = new ArrayList<>();
-		for (Execution priorExecution : vertex.getPriorExecutions()) {
-			priorExecutions.add(priorExecution.archive());
+		EvictingBoundedList<Execution> copyOfPriorExecutionsList = vertex.getCopyOfPriorExecutionsList();
+		priorExecutions = new EvictingBoundedList<>(copyOfPriorExecutionsList.getSizeLimit());
+		for (Execution priorExecution : copyOfPriorExecutionsList) {
+			priorExecutions.add(priorExecution != null ? priorExecution.archive() : null);
 		}
 		this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
 		this.currentExecution = vertex.getCurrentExecutionAttempt().archive();

http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index a62ed86..c066ca8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
@@ -38,6 +39,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.JobManagerOptions;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
@@ -163,9 +165,16 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 					result.getResultType());
 		}
 
+		Configuration jobConfiguration = graph.getJobConfiguration();
+		int maxPriorAttemptsHistoryLength = jobConfiguration != null ?
+				jobConfiguration.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE) :
+				JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue();
+
 		// create all task vertices
 		for (int i = 0; i < numTaskVertices; i++) {
-			ExecutionVertex vertex = new ExecutionVertex(this, i, this.producedDataSets, timeout, createTimestamp);
+			ExecutionVertex vertex = new ExecutionVertex(
+					this, i, this.producedDataSets, timeout, createTimestamp, maxPriorAttemptsHistoryLength);
+
 			this.taskVertices[i] = vertex;
 		}
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 39c60d2..5cbd1c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -36,11 +36,13 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.JobManagerOptions;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.EvictingBoundedList;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
@@ -53,7 +55,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
 
 import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
 import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
@@ -79,7 +80,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 
 	private final int subTaskIndex;
 
-	private final List<Execution> priorExecutions;
+	private final EvictingBoundedList<Execution> priorExecutions;
 
 	private final Time timeout;
 
@@ -99,7 +100,13 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			int subTaskIndex,
 			IntermediateResult[] producedDataSets,
 			Time timeout) {
-		this(jobVertex, subTaskIndex, producedDataSets, timeout, System.currentTimeMillis());
+		this(
+				jobVertex,
+				subTaskIndex,
+				producedDataSets,
+				timeout,
+				System.currentTimeMillis(),
+				JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue());
 	}
 
 	public ExecutionVertex(
@@ -107,7 +114,17 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			int subTaskIndex,
 			IntermediateResult[] producedDataSets,
 			Time timeout,
-			long createTimestamp) {
+			int maxPriorExecutionHistoryLength) {
+		this(jobVertex, subTaskIndex, producedDataSets, timeout, System.currentTimeMillis(), maxPriorExecutionHistoryLength);
+	}
+
+	public ExecutionVertex(
+			ExecutionJobVertex jobVertex,
+			int subTaskIndex,
+			IntermediateResult[] producedDataSets,
+			Time timeout,
+			long createTimestamp,
+			int maxPriorExecutionHistoryLength) {
 
 		this.jobVertex = jobVertex;
 		this.subTaskIndex = subTaskIndex;
@@ -125,7 +142,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 
 		this.inputEdges = new ExecutionEdge[jobVertex.getJobVertex().getInputs().size()][];
 
-		this.priorExecutions = new CopyOnWriteArrayList<Execution>();
+		this.priorExecutions = new EvictingBoundedList<>(maxPriorExecutionHistoryLength);
 
 		this.currentExecution = new Execution(
 			getExecutionGraph().getFutureExecutor(),
@@ -235,16 +252,19 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 
 	@Override
 	public Execution getPriorExecutionAttempt(int attemptNumber) {
-		if (attemptNumber >= 0 && attemptNumber < priorExecutions.size()) {
-			return priorExecutions.get(attemptNumber);
-		}
-		else {
-			throw new IllegalArgumentException("attempt does not exist");
+		synchronized (priorExecutions) {
+			if (attemptNumber >= 0 && attemptNumber < priorExecutions.size()) {
+				return priorExecutions.get(attemptNumber);
+			} else {
+				throw new IllegalArgumentException("attempt does not exist");
+			}
 		}
 	}
 
-	List<Execution> getPriorExecutions() {
-		return priorExecutions;
+	EvictingBoundedList<Execution> getCopyOfPriorExecutionsList() {
+		synchronized (priorExecutions) {
+			return new EvictingBoundedList<>(priorExecutions);
+		}
 	}
 
 	public ExecutionGraph getExecutionGraph() {

http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
new file mode 100644
index 0000000..279a70e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+@PublicEvolving
+public class JobManagerOptions {
+
+	/**
+	 * The maximum number of prior execution attempts kept in history.
+	 */
+	public static final ConfigOption<Integer> MAX_ATTEMPTS_HISTORY_SIZE =
+			key("job-manager.max-attempts-history-size").defaultValue(16);
+
+	private JobManagerOptions() {
+		throw new IllegalAccessError();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
new file mode 100644
index 0000000..f4c155a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * This class implements a list (array based) that is physically bounded in maximum size, but can virtually grow beyond
+ * the bounded size. When the list grows beyond the size bound, elements are dropped from the head of the list (FIFO
+ * order). If dropped elements are accessed, a default element is returned instead.
+ * <p>
+ * TODO this class could eventually implement the whole actual List interface.
+ *
+ * @param <T> type of the list elements
+ */
+public class EvictingBoundedList<T> implements Iterable<T>, Serializable {
+
+	private static final long serialVersionUID = -1863961980953613146L;
+
+	private final T defaultElement;
+	private final Object[] elements;
+	private int idx;
+	private int count;
+	private long modCount;
+
+	public EvictingBoundedList(int sizeLimit) {
+		this(sizeLimit, null);
+	}
+
+	public EvictingBoundedList(EvictingBoundedList<T> other) {
+		Preconditions.checkNotNull(other);
+		this.defaultElement = other.defaultElement;
+		this.elements = other.elements.clone();
+		this.idx = other.idx;
+		this.count = other.count;
+		this.modCount = 0L;
+	}
+
+	public EvictingBoundedList(int sizeLimit, T defaultElement) {
+		this.elements = new Object[sizeLimit];
+		this.defaultElement = defaultElement;
+		this.idx = 0;
+		this.count = 0;
+		this.modCount = 0L;
+	}
+
+	public int size() {
+		return count;
+	}
+
+	public boolean isEmpty() {
+		return 0 == count;
+	}
+
+	public boolean add(T t) {
+		elements[idx] = t;
+		idx = (idx + 1) % elements.length;
+		++count;
+		++modCount;
+		return true;
+	}
+
+	public void clear() {
+		if (!isEmpty()) {
+			for (int i = 0; i < elements.length; ++i) {
+				elements[i] = null;
+			}
+			count = 0;
+			idx = 0;
+			++modCount;
+		}
+	}
+
+	public T get(int index) {
+		Preconditions.checkArgument(index >= 0 && index < count);
+		return isDroppedIndex(index) ? getDefaultElement() : accessInternal(index % elements.length);
+	}
+
+	public int getSizeLimit() {
+		return elements.length;
+	}
+
+	public T set(int index, T element) {
+		Preconditions.checkArgument(index >= 0 && index < count);
+		++modCount;
+		if (isDroppedIndex(index)) {
+			return getDefaultElement();
+		} else {
+			int idx = index % elements.length;
+			T old = accessInternal(idx);
+			elements[idx] = element;
+			return old;
+		}
+	}
+
+	public T getDefaultElement() {
+		return defaultElement;
+	}
+
+	private boolean isDroppedIndex(int idx) {
+		return idx < count - elements.length;
+	}
+
+	@SuppressWarnings("unchecked")
+	private T accessInternal(int arrayIndex) {
+		return (T) elements[arrayIndex];
+	}
+
+	@Override
+	public Iterator<T> iterator() {
+		return new Iterator<T>() {
+
+			int pos = 0;
+			final long oldModCount = modCount;
+
+			@Override
+			public boolean hasNext() {
+				return pos < count;
+			}
+
+			@Override
+			public T next() {
+				if (oldModCount != modCount) {
+					throw new ConcurrentModificationException();
+				}
+				if (pos < count) {
+					return get(pos++);
+				} else {
+					throw new NoSuchElementException("Iterator exhausted.");
+				}
+			}
+
+			@Override
+			public void remove() {
+				throw new UnsupportedOperationException("Read-only iterator");
+			}
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
index 0223a2e..4ecac9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
@@ -18,8 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import java.util.Arrays;
-
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -28,6 +27,8 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.util.Arrays;
+
 public class AllVerticesIteratorTest {
 
 	@Test
@@ -50,8 +51,10 @@ public class AllVerticesIteratorTest {
 			v4.setParallelism(2);
 			
 			ExecutionGraph eg = Mockito.mock(ExecutionGraph.class);
+			Configuration jobConf = new Configuration();
 			Mockito.when(eg.getFutureExecutor()).thenReturn(TestingUtils.directExecutionContext());
-					
+			Mockito.when(eg.getJobConfiguration()).thenReturn(jobConf);
+
 			ExecutionJobVertex ejv1 = new ExecutionJobVertex(eg, v1, 1,
 					AkkaUtils.getDefaultTimeout());
 			ExecutionJobVertex ejv2 = new ExecutionJobVertex(eg, v2, 1,

http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
new file mode 100644
index 0000000..e0a1c70
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class EvictingBoundedListTest {
+
+	@Test
+	public void testAddGet() {
+		int insertSize = 17;
+		int boundSize = 5;
+		Integer defaultElement = 4711;
+
+		EvictingBoundedList<Integer> list = new EvictingBoundedList<>(boundSize, defaultElement);
+		assertTrue(list.isEmpty());
+
+		for (int i = 0; i < insertSize; ++i) {
+			list.add(i);
+		}
+
+		assertEquals(17, list.size());
+
+		for (int i = 0; i < insertSize; ++i) {
+			int exp = i < (insertSize - boundSize) ? defaultElement : i;
+			int act = list.get(i);
+			assertEquals(exp, act);
+		}
+	}
+
+	@Test
+	public void testSet() {
+		int insertSize = 17;
+		int boundSize = 5;
+		Integer defaultElement = 4711;
+		List<Integer> reference = new ArrayList<>(insertSize);
+		EvictingBoundedList<Integer> list = new EvictingBoundedList<>(boundSize, defaultElement);
+		for (int i = 0; i < insertSize; ++i) {
+			reference.add(i);
+			list.add(i);
+		}
+
+		assertEquals(reference.size(), list.size());
+
+		list.set(0, 123);
+		list.set(insertSize - boundSize - 1, 123);
+
+		list.set(insertSize - boundSize, 42);
+		reference.set(insertSize - boundSize, 42);
+		list.set(13, 43);
+		reference.set(13, 43);
+		list.set(16, 44);
+		reference.set(16, 44);
+
+		try {
+			list.set(insertSize, 23);
+			fail("Illegal index in set not detected.");
+		} catch (IllegalArgumentException ignored) {
+
+		}
+
+		for (int i = 0; i < insertSize; ++i) {
+			int exp = i < (insertSize - boundSize) ? defaultElement : reference.get(i);
+			int act = list.get(i);
+			assertEquals(exp, act);
+		}
+
+		assertEquals(reference.size(), list.size());
+	}
+
+	@Test
+	public void testClear() {
+		int insertSize = 17;
+		int boundSize = 5;
+		Integer defaultElement = 4711;
+
+		EvictingBoundedList<Integer> list = new EvictingBoundedList<>(boundSize, defaultElement);
+		for (int i = 0; i < insertSize; ++i) {
+			list.add(i);
+		}
+
+		list.clear();
+
+		assertEquals(0, list.size());
+		assertTrue(list.isEmpty());
+
+		try {
+			list.get(0);
+			fail();
+		} catch (IllegalArgumentException ignore) {
+		}
+	}
+
+	@Test
+	public void testIterator() {
+		int insertSize = 17;
+		int boundSize = 5;
+		Integer defaultElement = 4711;
+
+		EvictingBoundedList<Integer> list = new EvictingBoundedList<>(boundSize, defaultElement);
+		assertTrue(list.isEmpty());
+
+		for (int i = 0; i < insertSize; ++i) {
+			list.add(i);
+		}
+
+		Iterator<Integer> iterator = list.iterator();
+
+		for (int i = 0; i < insertSize; ++i) {
+			assertTrue(iterator.hasNext());
+			int exp = i < (insertSize - boundSize) ? defaultElement : i;
+			int act = iterator.next();
+			assertEquals(exp, act);
+		}
+
+		assertFalse(iterator.hasNext());
+
+		try {
+			iterator.next();
+			fail("Next on exhausted iterator did not trigger exception.");
+		} catch (NoSuchElementException ignored) {
+
+		}
+
+		iterator = list.iterator();
+		assertTrue(iterator.hasNext());
+		iterator.next();
+		list.add(123);
+		assertTrue(iterator.hasNext());
+		try {
+			iterator.next();
+			fail("Concurrent modification not detected.");
+		} catch (ConcurrentModificationException ignored) {
+
+		}
+	}
+}


[3/3] flink git commit: [FLINK-5107] Handle evicted execution attempts in request handlers

Posted by tr...@apache.org.
[FLINK-5107] Handle evicted execution attempts in request handlers

If a prior execution attempt cannot be retrieved because it has been evicted before,
the request handler will now throw a meaningful exception to notify the requester
about the evicted execution attempt.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4c23879a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4c23879a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4c23879a

Branch: refs/heads/master
Commit: 4c23879a5743b862b93d3c4267244235266163e7
Parents: f5af7f1
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Nov 23 00:19:32 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 23 00:19:32 2016 +0100

----------------------------------------------------------------------
 .../AbstractSubtaskAttemptRequestHandler.java   |  8 ++++-
 .../handlers/RequestHandlerException.java       | 31 ++++++++++++++++++++
 ...taskExecutionAttemptAccumulatorsHandler.java |  6 ----
 3 files changed, 38 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4c23879a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
index f3a5059..1eab21c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
@@ -57,7 +57,13 @@ public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubta
 		}
 		else if (attempt >= 0 && attempt < currentAttempt.getAttemptNumber()) {
 			AccessExecution exec = vertex.getPriorExecutionAttempt(attempt);
-			return handleRequest(exec, params);
+
+			if (exec != null) {
+				return handleRequest(exec, params);
+			} else {
+				throw new RequestHandlerException("Execution for attempt " + attempt +
+					" has already been deleted.");
+			}
 		}
 		else {
 			throw new RuntimeException("Attempt does not exist: " + attempt);

http://git-wip-us.apache.org/repos/asf/flink/blob/4c23879a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java
new file mode 100644
index 0000000..bb61d16
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+/**
+ * Base class for request handler exceptions.
+ */
+public class RequestHandlerException extends Exception {
+
+	private static final long serialVersionUID = 7570352908725875886L;
+
+	public RequestHandlerException(String message) {
+		super(message);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4c23879a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index 675304f..e613efb 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -38,12 +38,6 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
 
 	@Override
 	public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception {
-
-		// return empty string for pruned (== null) execution attempts
-		if (null == execAttempt) {
-			return "";
-		}
-
 		final StringifiedAccumulatorResult[] accs = execAttempt.getUserAccumulatorsStringified();
 		
 		StringWriter writer = new StringWriter();