You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/22 23:25:53 UTC
[1/3] flink git commit: [FLINK-4910] Introduce safety net for closing
file system streams
Repository: flink
Updated Branches:
refs/heads/master c590912c9 -> 4c23879a5
[FLINK-4910] Introduce safety net for closing file system streams
This closes #2691.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba8ed263
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba8ed263
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba8ed263
Branch: refs/heads/master
Commit: ba8ed263695d16eacb4bdfdf195dd22c83bb53ed
Parents: c590912
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon Oct 24 17:49:54 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 22 23:16:52 2016 +0100
----------------------------------------------------------------------
.../common/operators/CollectionExecutor.java | 31 +--
.../apache/flink/core/fs/CloseableRegistry.java | 52 +++++
.../flink/core/fs/ClosingFSDataInputStream.java | 97 ++++++++++
.../core/fs/ClosingFSDataOutputStream.java | 102 ++++++++++
.../flink/core/fs/FSDataInputStreamWrapper.java | 96 +++++++++
.../core/fs/FSDataOutputStreamWrapper.java | 76 ++++++++
.../org/apache/flink/core/fs/FileSystem.java | 88 ++++++---
.../core/fs/SafetyNetCloseableRegistry.java | 181 +++++++++++++++++
.../core/fs/SafetyNetWrapperFileSystem.java | 150 ++++++++++++++
.../flink/core/fs/WrappingProxyCloseable.java | 30 +++
.../flink/util/AbstractCloseableRegistry.java | 114 +++++++++++
.../java/org/apache/flink/util/IOUtils.java | 15 +-
.../org/apache/flink/util/WrappingProxy.java | 25 +++
.../apache/flink/util/WrappingProxyUtil.java | 33 ++++
.../apache/flink/core/fs/FileSystemTest.java | 29 +--
.../core/fs/SafetyNetCloseableRegistryTest.java | 193 +++++++++++++++++++
.../flink/runtime/filecache/FileCache.java | 42 ++--
.../state/AbstractKeyedStateBackend.java | 5 +-
.../flink/runtime/state/ClosableRegistry.java | 108 -----------
.../state/DefaultOperatorStateBackend.java | 5 +-
.../state/StateInitializationContextImpl.java | 15 +-
.../StateSnapshotContextSynchronousImpl.java | 5 +-
.../state/filesystem/FileStateHandle.java | 18 +-
.../apache/flink/runtime/taskmanager/Task.java | 5 +
.../streaming/runtime/tasks/StreamTask.java | 6 +-
.../StateInitializationContextImplTest.java | 6 +-
...StateSnapshotContextSynchronousImplTest.java | 4 +-
.../util/AbstractStreamOperatorTestHarness.java | 10 +-
.../test/checkpointing/RescalingITCase.java | 1 +
29 files changed, 1324 insertions(+), 218 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
index a6fc17e..07f48fc 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
@@ -18,20 +18,6 @@
package org.apache.flink.api.common.operators;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
@@ -58,6 +44,7 @@ import org.apache.flink.api.common.operators.util.TypeComparable;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.metrics.MetricGroup;
@@ -65,6 +52,20 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.types.Value;
import org.apache.flink.util.Visitor;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
/**
* Execution utility for serial, local, collection-based executions of Flink programs.
*/
@@ -571,7 +572,7 @@ public class CollectionExecutor {
public CompletedFuture(Path entry) {
try{
- LocalFileSystem fs = (LocalFileSystem) entry.getFileSystem();
+ LocalFileSystem fs = (LocalFileSystem) FileSystem.getUnguardedFileSystem(entry.toUri());
result = entry.isAbsolute() ? new Path(entry.toUri().getPath()): new Path(fs.getWorkingDirectory(),entry);
} catch (Exception e){
throw new RuntimeException("DistributedCache supports only local files for Collection Environments");
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
new file mode 100644
index 0000000..81ba7ab
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.AbstractCloseableRegistry;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class allows to register instances of {@link Closeable}, which are all closed if this registry is closed.
+ * <p>
+ * Registering to an already closed registry will throw an exception and close the provided {@link Closeable}
+ * <p>
+ * All methods in this class are thread-safe.
+ */
+public class CloseableRegistry extends AbstractCloseableRegistry<Closeable, Object> {
+
+ private static final Object DUMMY = new Object();
+
+ public CloseableRegistry() {
+ super(new HashMap<Closeable, Object>());
+ }
+
+ @Override
+ protected void doRegister(Closeable closeable, Map<Closeable, Object> closeableMap) throws IOException {
+ closeableMap.put(closeable, DUMMY);
+ }
+
+ @Override
+ protected void doUnRegister(Closeable closeable, Map<Closeable, Object> closeableMap) {
+ closeableMap.remove(closeable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
new file mode 100644
index 0000000..23ac4f2
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * This class is a {@link org.apache.flink.util.WrappingProxy} for {@link FSDataInputStream} that is used to
+ * implement a safety net against unclosed streams.
+ * <p>
+ * See {@link SafetyNetCloseableRegistry} for more details on how this is utilized.
+ */
+public class ClosingFSDataInputStream
+ extends FSDataInputStreamWrapper
+ implements WrappingProxyCloseable<FSDataInputStream> {
+
+ private final SafetyNetCloseableRegistry registry;
+ private final String debugInfo;
+
+ private volatile boolean closed;
+
+ private ClosingFSDataInputStream(
+ FSDataInputStream delegate, SafetyNetCloseableRegistry registry, String debugInfo) throws IOException {
+ super(delegate);
+ this.registry = Preconditions.checkNotNull(registry);
+ this.debugInfo = Preconditions.checkNotNull(debugInfo);
+ this.closed = false;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ closed = true;
+ registry.unregisterClosable(this);
+ inputStream.close();
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return inputStream.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj instanceof ClosingFSDataInputStream) {
+ return inputStream.equals(((ClosingFSDataInputStream) obj).inputStream);
+ }
+
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "ClosingFSDataInputStream(" + inputStream.toString() + ") : " + debugInfo;
+ }
+
+ public static ClosingFSDataInputStream wrapSafe(
+ FSDataInputStream delegate, SafetyNetCloseableRegistry registry) throws IOException{
+ return wrapSafe(delegate, registry, "");
+ }
+
+ public static ClosingFSDataInputStream wrapSafe(
+ FSDataInputStream delegate, SafetyNetCloseableRegistry registry, String debugInfo) throws IOException{
+
+ ClosingFSDataInputStream inputStream = new ClosingFSDataInputStream(delegate, registry, debugInfo);
+ registry.registerClosable(inputStream);
+ return inputStream;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
new file mode 100644
index 0000000..120ca67
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * This class is a {@link org.apache.flink.util.WrappingProxy} for {@link FSDataOutputStream} that is used to
+ * implement a safety net against unclosed streams.
+ * <p>
+ * See {@link SafetyNetCloseableRegistry} for more details on how this is utilized.
+ */
+public class ClosingFSDataOutputStream
+ extends FSDataOutputStreamWrapper
+ implements WrappingProxyCloseable<FSDataOutputStream> {
+
+ private final SafetyNetCloseableRegistry registry;
+ private final String debugString;
+
+ private volatile boolean closed;
+
+ public ClosingFSDataOutputStream(
+ FSDataOutputStream delegate, SafetyNetCloseableRegistry registry) throws IOException {
+ this(delegate, registry, "");
+ }
+
+ private ClosingFSDataOutputStream(
+ FSDataOutputStream delegate, SafetyNetCloseableRegistry registry, String debugString) throws IOException {
+ super(delegate);
+ this.registry = Preconditions.checkNotNull(registry);
+ this.debugString = Preconditions.checkNotNull(debugString);
+ this.closed = false;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ closed = true;
+ registry.unregisterClosable(this);
+ outputStream.close();
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return outputStream.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj instanceof ClosingFSDataOutputStream) {
+ return outputStream.equals(((ClosingFSDataOutputStream) obj).outputStream);
+ }
+
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "ClosingFSDataOutputStream(" + outputStream.toString() + ") : " + debugString;
+ }
+
+ public static ClosingFSDataOutputStream wrapSafe(
+ FSDataOutputStream delegate, SafetyNetCloseableRegistry registry) throws IOException {
+ return wrapSafe(delegate, registry, "");
+ }
+
+ public static ClosingFSDataOutputStream wrapSafe(
+ FSDataOutputStream delegate, SafetyNetCloseableRegistry registry, String debugInfo) throws IOException {
+
+ ClosingFSDataOutputStream inputStream = new ClosingFSDataOutputStream(delegate, registry, debugInfo);
+ registry.registerClosable(inputStream);
+ return inputStream;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java
new file mode 100644
index 0000000..507b756
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingProxy;
+
+import java.io.IOException;
+
+/**
+ * Simple forwarding wrapper around {@link FSDataInputStream}
+ */
+public class FSDataInputStreamWrapper extends FSDataInputStream implements WrappingProxy<FSDataInputStream> {
+
+ protected final FSDataInputStream inputStream;
+
+ public FSDataInputStreamWrapper(FSDataInputStream inputStream) {
+ this.inputStream = Preconditions.checkNotNull(inputStream);
+ }
+
+ @Override
+ public void seek(long desired) throws IOException {
+ inputStream.seek(desired);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return inputStream.getPos();
+ }
+
+ @Override
+ public int read() throws IOException {
+ return inputStream.read();
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return inputStream.read(b);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return inputStream.read(b, off, len);
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ return inputStream.skip(n);
+ }
+
+ @Override
+ public int available() throws IOException {
+ return inputStream.available();
+ }
+
+ @Override
+ public void close() throws IOException {
+ inputStream.close();
+ }
+
+ @Override
+ public void mark(int readlimit) {
+ inputStream.mark(readlimit);
+ }
+
+ @Override
+ public void reset() throws IOException {
+ inputStream.reset();
+ }
+
+ @Override
+ public boolean markSupported() {
+ return inputStream.markSupported();
+ }
+
+ @Override
+ public FSDataInputStream getWrappedDelegate() {
+ return inputStream;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java
new file mode 100644
index 0000000..36ebe10
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingProxy;
+
+import java.io.IOException;
+
+/**
+ * Simple forwarding wrapper around {@link FSDataInputStream}
+ */
+public class FSDataOutputStreamWrapper extends FSDataOutputStream implements WrappingProxy<FSDataOutputStream> {
+
+ protected final FSDataOutputStream outputStream;
+
+ public FSDataOutputStreamWrapper(FSDataOutputStream outputStream) {
+ this.outputStream = Preconditions.checkNotNull(outputStream);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return outputStream.getPos();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ outputStream.flush();
+ }
+
+ @Override
+ public void sync() throws IOException {
+ outputStream.sync();
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ outputStream.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ outputStream.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ outputStream.write(b, off, len);
+ }
+
+ @Override
+ public void close() throws IOException {
+ outputStream.close();
+ }
+
+ @Override
+ public FSDataOutputStream getWrappedDelegate() {
+ return outputStream;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 1844d64..5a608b5 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -25,6 +25,14 @@
package org.apache.flink.core.fs;
+import org.apache.flink.annotation.Public;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -34,11 +42,6 @@ import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
-import org.apache.flink.annotation.Public;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.OperatingSystem;
-
/**
* An abstract base class for a fairly generic file system. It
* may be implemented as a distributed file system, or as a local
@@ -47,6 +50,8 @@ import org.apache.flink.util.OperatingSystem;
@Public
public abstract class FileSystem {
+ private static final InheritableThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new InheritableThreadLocal<>();
+
private static final String LOCAL_FILESYSTEM_CLASS = "org.apache.flink.core.fs.local.LocalFileSystem";
private static final String HADOOP_WRAPPER_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFileSystem";
@@ -55,6 +60,39 @@ public abstract class FileSystem {
private static final String HADOOP_WRAPPER_SCHEME = "hdwrapper";
+ private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class);
+
+ /**
+ * Create a SafetyNetCloseableRegistry for a Task. This method should be called at the beginning of the task's
+ * main thread.
+ */
+ public static void createFileSystemCloseableRegistryForTask() {
+ SafetyNetCloseableRegistry oldRegistry = REGISTRIES.get();
+ if (null != oldRegistry) {
+ IOUtils.closeQuietly(oldRegistry);
+ LOG.warn("Found existing SafetyNetCloseableRegistry. Closed and replaced it.");
+ }
+ SafetyNetCloseableRegistry newRegistry = new SafetyNetCloseableRegistry();
+ REGISTRIES.set(newRegistry);
+ }
+
+ /**
+ * Create a SafetyNetCloseableRegistry for a Task. This method should be called at the end of the task's
+ * main thread or when the task should be canceled.
+ */
+ public static void disposeFileSystemCloseableRegistryForTask() {
+ SafetyNetCloseableRegistry registry = REGISTRIES.get();
+ if (null != registry) {
+ LOG.info("Ensuring all FileSystem streams are closed");
+ REGISTRIES.remove();
+ IOUtils.closeQuietly(registry);
+ }
+ }
+
+ private static FileSystem wrapWithSafetyNetWhenInTask(FileSystem fs) {
+ SafetyNetCloseableRegistry reg = REGISTRIES.get();
+ return reg != null ? new SafetyNetWrapperFileSystem(fs, reg) : fs;
+ }
/** Object used to protect calls to specific methods.*/
private static final Object SYNCHRONIZATION_OBJECT = new Object();
@@ -63,7 +101,7 @@ public abstract class FileSystem {
* Enumeration for write modes.
*
*/
- public static enum WriteMode {
+ public enum WriteMode {
/** Creates write path if it does not exist. Does not overwrite existing files and directories. */
NO_OVERWRITE,
@@ -214,18 +252,7 @@ public abstract class FileSystem {
}
}
- /**
- * Returns a reference to the {@link FileSystem} instance for accessing the
- * file system identified by the given {@link URI}.
- *
- * @param uri
- * the {@link URI} identifying the file system
- * @return a reference to the {@link FileSystem} instance for accessing the file system identified by the given
- * {@link URI}.
- * @throws IOException
- * thrown if a reference to the file system instance could not be obtained
- */
- public static FileSystem get(URI uri) throws IOException {
+ public static FileSystem getUnguardedFileSystem(URI uri) throws IOException {
FileSystem fs;
URI asked = uri;
@@ -238,13 +265,13 @@ public abstract class FileSystem {
}
uri = new URI(defaultScheme.getScheme(), null, defaultScheme.getHost(),
- defaultScheme.getPort(), uri.getPath(), null, null);
+ defaultScheme.getPort(), uri.getPath(), null, null);
} catch (URISyntaxException e) {
try {
if (defaultScheme.getScheme().equals("file")) {
uri = new URI("file", null,
- new Path(new File(uri.getPath()).getAbsolutePath()).toUri().getPath(), null);
+ new Path(new File(uri.getPath()).getAbsolutePath()).toUri().getPath(), null);
}
} catch (URISyntaxException ex) {
// we tried to repair it, but could not. report the scheme error
@@ -255,8 +282,8 @@ public abstract class FileSystem {
if(uri.getScheme() == null) {
throw new IOException("The URI '" + uri + "' is invalid.\n" +
- "The fs.default-scheme = " + defaultScheme + ", the requested URI = " + asked +
- ", and the final URI = " + uri + ".");
+ "The fs.default-scheme = " + defaultScheme + ", the requested URI = " + asked +
+ ", and the final URI = " + uri + ".");
}
if (uri.getScheme().equals("file") && uri.getAuthority() != null && !uri.getAuthority().isEmpty()) {
@@ -294,7 +321,7 @@ public abstract class FileSystem {
} else {
// we can not read from this file system.
throw new IOException("No file system found with scheme " + uri.getScheme()
- + ", referenced in file URI '" + uri.toString() + "'.");
+ + ", referenced in file URI '" + uri.toString() + "'.");
}
} else {
// we end up here if we have a file system with build-in flink support.
@@ -316,6 +343,21 @@ public abstract class FileSystem {
}
/**
+ * Returns a reference to the {@link FileSystem} instance for accessing the
+ * file system identified by the given {@link URI}.
+ *
+ * @param uri
+ * the {@link URI} identifying the file system
+ * @return a reference to the {@link FileSystem} instance for accessing the file system identified by the given
+ * {@link URI}.
+ * @throws IOException
+ * thrown if a reference to the file system instance could not be obtained
+ */
+ public static FileSystem get(URI uri) throws IOException {
+ return wrapWithSafetyNetWhenInTask(getUnguardedFileSystem(uri));
+ }
+
+ /**
* Returns a boolean indicating whether a scheme has built-in Flink support.
*
* @param scheme
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
new file mode 100644
index 0000000..de4fb30
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.AbstractCloseableRegistry;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingProxyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.ref.PhantomReference;
+import java.lang.ref.ReferenceQueue;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This implementation of an {@link AbstractCloseableRegistry} registers {@link WrappingProxyCloseable}. When
+ * the proxy becomes subject to GC, this registry takes care of closing unclosed {@link Closeable}s.
+ * <p>
+ * Phantom references are used to track when {@link org.apache.flink.util.WrappingProxy}s of {@link Closeable} got
+ * GC'ed. We ensure that the wrapped {@link Closeable} is properly closed to avoid resource leaks.
+ * <p>
+ * Other than that, it works like a normal {@link CloseableRegistry}.
+ * <p>
+ * All methods in this class are thread-safe.
+ */
+public class SafetyNetCloseableRegistry extends
+ AbstractCloseableRegistry<WrappingProxyCloseable<? extends Closeable>,
+ SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SafetyNetCloseableRegistry.class);
+ private final ReferenceQueue<WrappingProxyCloseable<? extends Closeable>> referenceQueue;
+ private final Thread reaperThread;
+
+ public SafetyNetCloseableRegistry() {
+ super(new IdentityHashMap<Closeable, PhantomDelegatingCloseableRef>());
+ this.referenceQueue = new ReferenceQueue<>();
+ this.reaperThread = new CloseableReaperThread();
+ reaperThread.start();
+ }
+
+ @Override
+ protected void doRegister(
+ WrappingProxyCloseable<? extends Closeable> wrappingProxyCloseable,
+ Map<Closeable, PhantomDelegatingCloseableRef> closeableMap) throws IOException {
+
+ Closeable innerCloseable = WrappingProxyUtil.stripProxy(wrappingProxyCloseable.getWrappedDelegate());
+
+ if (null == innerCloseable) {
+ return;
+ }
+
+ PhantomDelegatingCloseableRef phantomRef =
+ new PhantomDelegatingCloseableRef(wrappingProxyCloseable, referenceQueue);
+
+ closeableMap.put(innerCloseable, phantomRef);
+ }
+
+ @Override
+ protected void doUnRegister(
+ WrappingProxyCloseable<? extends Closeable> closeable,
+ Map<Closeable, PhantomDelegatingCloseableRef> closeableMap) {
+
+ Closeable innerCloseable = WrappingProxyUtil.stripProxy(closeable.getWrappedDelegate());
+
+ if (null == innerCloseable) {
+ return;
+ }
+
+ closeableMap.remove(innerCloseable);
+ }
+
+ /**
+ * Phantom reference to {@link WrappingProxyCloseable}.
+ */
+ static final class PhantomDelegatingCloseableRef
+ extends PhantomReference<WrappingProxyCloseable<? extends Closeable>>
+ implements Closeable {
+
+ private final Closeable innerCloseable;
+ private final String debugString;
+
+ public PhantomDelegatingCloseableRef(
+ WrappingProxyCloseable<? extends Closeable> referent,
+ ReferenceQueue<? super WrappingProxyCloseable<? extends Closeable>> q) {
+
+ super(referent, q);
+ this.innerCloseable = Preconditions.checkNotNull(WrappingProxyUtil.stripProxy(referent));
+ this.debugString = referent.toString();
+ }
+
+ public Closeable getInnerCloseable() {
+ return innerCloseable;
+ }
+
+ public String getDebugString() {
+ return debugString;
+ }
+
+ @Override
+ public void close() throws IOException {
+ innerCloseable.close();
+ }
+ }
+
+ /**
+ * Reaper runnable collects and closes leaking resources
+ */
+ final class CloseableReaperThread extends Thread {
+
+ public CloseableReaperThread() {
+ super("CloseableReaperThread");
+ this.running = false;
+ }
+
+ private volatile boolean running;
+
+ @Override
+ public void run() {
+ this.running = true;
+ try {
+ List<PhantomDelegatingCloseableRef> closeableList = new LinkedList<>();
+ while (running) {
+ PhantomDelegatingCloseableRef oldRef = (PhantomDelegatingCloseableRef) referenceQueue.remove();
+ synchronized (getSynchronizationLock()) {
+ do {
+ closeableList.add(oldRef);
+ closeableToRef.remove(oldRef.getInnerCloseable());
+ }
+ while ((oldRef = (PhantomDelegatingCloseableRef) referenceQueue.poll()) != null);
+ }
+
+ // close outside the synchronized block in case this is blocking
+ for (PhantomDelegatingCloseableRef closeableRef : closeableList) {
+ IOUtils.closeQuietly(closeableRef);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing unclosed resource: " + closeableRef.getDebugString());
+ }
+ }
+
+ closeableList.clear();
+ }
+ } catch (InterruptedException e) {
+ // done
+ }
+ }
+
+ @Override
+ public void interrupt() {
+ this.running = false;
+ super.interrupt();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ reaperThread.interrupt();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
new file mode 100644
index 0000000..bf30b4f
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingProxy;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * This is a {@link WrappingProxy} around {@link FileSystem} which (i) wraps all opened streams as
+ * {@link ClosingFSDataInputStream} or {@link ClosingFSDataOutputStream} and (ii) registers them to
+ * a {@link SafetyNetCloseableRegistry}.
+ *
+ * Streams obtained by this are therefore managed by the {@link SafetyNetCloseableRegistry} to prevent resource leaks
+ * from unclosed streams.
+ */
+public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingProxy<FileSystem> {
+
+ private final SafetyNetCloseableRegistry registry;
+ private final FileSystem unsafeFileSystem;
+
+ public SafetyNetWrapperFileSystem(FileSystem unsafeFileSystem, SafetyNetCloseableRegistry registry) {
+ this.registry = Preconditions.checkNotNull(registry);
+ this.unsafeFileSystem = Preconditions.checkNotNull(unsafeFileSystem);
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return unsafeFileSystem.getWorkingDirectory();
+ }
+
+ @Override
+ public Path getHomeDirectory() {
+ return unsafeFileSystem.getHomeDirectory();
+ }
+
+ @Override
+ public URI getUri() {
+ return unsafeFileSystem.getUri();
+ }
+
+ @Override
+ public void initialize(URI name) throws IOException {
+ unsafeFileSystem.initialize(name);
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ return unsafeFileSystem.getFileStatus(f);
+ }
+
+ @Override
+ public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
+ return unsafeFileSystem.getFileBlockLocations(file, start, len);
+ }
+
+ @Override
+ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ FSDataInputStream innerStream = unsafeFileSystem.open(f, bufferSize);
+ return ClosingFSDataInputStream.wrapSafe(innerStream, registry, String.valueOf(f));
+ }
+
+ @Override
+ public FSDataInputStream open(Path f) throws IOException {
+ FSDataInputStream innerStream = unsafeFileSystem.open(f);
+ return ClosingFSDataInputStream.wrapSafe(innerStream, registry, String.valueOf(f));
+ }
+
+ @Override
+ public long getDefaultBlockSize() {
+ return unsafeFileSystem.getDefaultBlockSize();
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path f) throws IOException {
+ return unsafeFileSystem.listStatus(f);
+ }
+
+ @Override
+ public boolean exists(Path f) throws IOException {
+ return unsafeFileSystem.exists(f);
+ }
+
+ @Override
+ public boolean delete(Path f, boolean recursive) throws IOException {
+ return unsafeFileSystem.delete(f, recursive);
+ }
+
+ @Override
+ public boolean mkdirs(Path f) throws IOException {
+ return unsafeFileSystem.mkdirs(f);
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize)
+ throws IOException {
+
+ FSDataOutputStream innerStream = unsafeFileSystem.create(f, overwrite, bufferSize, replication, blockSize);
+ return ClosingFSDataOutputStream.wrapSafe(innerStream, registry, String.valueOf(f));
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, boolean overwrite) throws IOException {
+ FSDataOutputStream innerStream = unsafeFileSystem.create(f, overwrite);
+ return ClosingFSDataOutputStream.wrapSafe(innerStream, registry, String.valueOf(f));
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ return unsafeFileSystem.rename(src, dst);
+ }
+
+ @Override
+ public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException {
+ return unsafeFileSystem.initOutPathLocalFS(outPath, writeMode, createDirectory);
+ }
+
+ @Override
+ public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException {
+ return unsafeFileSystem.initOutPathDistFS(outPath, writeMode, createDirectory);
+ }
+
+ @Override
+ public boolean isDistributedFS() {
+ return unsafeFileSystem.isDistributedFS();
+ }
+
+ @Override
+ public FileSystem getWrappedDelegate() {
+ return unsafeFileSystem;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java b/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java
new file mode 100644
index 0000000..b74fc78
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.WrappingProxy;
+
+import java.io.Closeable;
+
+/**
+ * {@link WrappingProxy} for {@link Closeable} that is also closeable.
+ */
+public interface WrappingProxyCloseable<T extends Closeable> extends Closeable, WrappingProxy<T> {
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
new file mode 100644
index 0000000..7c0291c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * This is the abstract base class for registries that allow to register instances of {@link Closeable}, which are all
+ * closed if this registry is closed.
+ * <p>
+ * Registering to an already closed registry will throw an exception and close the provided {@link Closeable}
+ * <p>
+ * All methods in this class are thread-safe.
+ *
+ * @param <C> Type of the closeable this registers
+ * @param <T> Type for potential meta data associated with the registering closeables
+ */
+public abstract class AbstractCloseableRegistry<C extends Closeable, T> implements Closeable {
+
+ protected final Map<Closeable, T> closeableToRef;
+ private boolean closed;
+
+ public AbstractCloseableRegistry(Map<Closeable, T> closeableToRef) {
+ this.closeableToRef = closeableToRef;
+ this.closed = false;
+ }
+
+ /**
+ * Registers a {@link Closeable} with the registry. In case the registry is already closed, this method throws an
+ * {@link IllegalStateException} and closes the passed {@link Closeable}.
+ *
+ * @param closeable Closeable tor register
+ * @return true if the the Closeable was newly added to the registry
+ * @throws IOException exception when the registry was closed before
+ */
+ public final void registerClosable(C closeable) throws IOException {
+
+ if (null == closeable) {
+ return;
+ }
+
+ synchronized (getSynchronizationLock()) {
+ if (closed) {
+ IOUtils.closeQuietly(closeable);
+ throw new IOException("Cannot register Closeable, registry is already closed. Closing argument.");
+ }
+
+ doRegister(closeable, closeableToRef);
+ }
+ }
+
+ /**
+ * Removes a {@link Closeable} from the registry.
+ *
+ * @param closeable instance to remove from the registry.
+ * @return true, if the instance was actually registered and now removed
+ */
+ public final void unregisterClosable(C closeable) {
+
+ if (null == closeable) {
+ return;
+ }
+
+ synchronized (getSynchronizationLock()) {
+ doUnRegister(closeable, closeableToRef);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ synchronized (getSynchronizationLock()) {
+
+ for (Closeable closeable : closeableToRef.keySet()) {
+ IOUtils.closeQuietly(closeable);
+ }
+
+ closeableToRef.clear();
+
+ closed = true;
+ }
+ }
+
+ public boolean isClosed() {
+ synchronized (getSynchronizationLock()) {
+ return closed;
+ }
+ }
+
+ protected final Object getSynchronizationLock() {
+ return closeableToRef;
+ }
+
+ protected abstract void doUnRegister(C closeable, Map<Closeable, T> closeableMap);
+
+ protected abstract void doRegister(C closeable, Map<Closeable, T> closeableMap) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
index 12d70ce..9810271 100644
--- a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
@@ -18,14 +18,15 @@
package org.apache.flink.util;
+import org.slf4j.Logger;
+
+import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.Socket;
-import org.slf4j.Logger;
-
/**
* An utility class for I/O related functionality.
*
@@ -213,6 +214,16 @@ public final class IOUtils {
}
}
}
+
+ public static void closeQuietly(Closeable closeable) {
+ try {
+ if (closeable != null) {
+ closeable.close();
+ }
+ } catch (IOException ignored) {
+
+ }
+ }
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/util/WrappingProxy.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/WrappingProxy.java b/flink-core/src/main/java/org/apache/flink/util/WrappingProxy.java
new file mode 100644
index 0000000..82fcf04
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/WrappingProxy.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+public interface WrappingProxy<T> {
+
+ T getWrappedDelegate();
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java b/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
new file mode 100644
index 0000000..0f62abd
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+public final class WrappingProxyUtil {
+
+ private WrappingProxyUtil() {
+ throw new AssertionError();
+ }
+
+ public static <T> T stripProxy(T object) {
+ while (object instanceof WrappingProxy) {
+ object = ((WrappingProxy<T>) object).getWrappedDelegate();
+ }
+ return object;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
index 04ebc0e..1bde2fb 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
@@ -17,32 +17,37 @@
*/
package org.apache.flink.core.fs;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.util.WrappingProxyUtil;
+import org.junit.Test;
+
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
-import org.apache.flink.core.fs.local.LocalFileSystem;
-import org.junit.Test;
-import static org.junit.Assert.*;
+
+import static org.junit.Assert.assertTrue;
public class FileSystemTest {
+
@Test
public void testGet() throws URISyntaxException, IOException {
String scheme = "file";
-
- assertTrue(FileSystem.get(new URI(scheme + ":///test/test")) instanceof LocalFileSystem);
-
+
+ assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI(scheme + ":///test/test"))) instanceof LocalFileSystem);
+
try {
FileSystem.get(new URI(scheme + "://test/test"));
} catch (IOException ioe) {
assertTrue(ioe.getMessage().startsWith("Found local file path with authority '"));
}
- assertTrue(FileSystem.get(new URI(scheme + ":/test/test")) instanceof LocalFileSystem);
-
- assertTrue(FileSystem.get(new URI(scheme + ":test/test")) instanceof LocalFileSystem);
+ assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI(scheme + ":/test/test"))) instanceof LocalFileSystem);
- assertTrue(FileSystem.get(new URI("/test/test")) instanceof LocalFileSystem);
-
- assertTrue(FileSystem.get(new URI("test/test")) instanceof LocalFileSystem);
+ assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI(scheme + ":test/test"))) instanceof LocalFileSystem);
+
+ assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI("/test/test"))) instanceof LocalFileSystem);
+
+ assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI("test/test"))) instanceof LocalFileSystem);
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
new file mode 100644
index 0000000..6628407
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class SafetyNetCloseableRegistryTest {
+
+ private ProducerThread[] streamOpenThreads;
+ private SafetyNetCloseableRegistry closeableRegistry;
+ private AtomicInteger unclosedCounter;
+
+ @Before
+ public void setup() {
+ this.closeableRegistry = new SafetyNetCloseableRegistry();
+ this.unclosedCounter = new AtomicInteger(0);
+ this.streamOpenThreads = new ProducerThread[10];
+ for (int i = 0; i < streamOpenThreads.length; ++i) {
+ streamOpenThreads[i] = new ProducerThread(closeableRegistry, unclosedCounter, Integer.MAX_VALUE);
+ }
+ }
+
+ private void startThreads(int maxStreams) {
+ for (ProducerThread t : streamOpenThreads) {
+ t.setMaxStreams(maxStreams);
+ t.start();
+ }
+ }
+
+ private void joinThreads() throws InterruptedException {
+ for (Thread t : streamOpenThreads) {
+ t.join();
+ }
+ }
+
+ @Test
+ public void testClose() throws Exception {
+
+ startThreads(Integer.MAX_VALUE);
+
+ for (int i = 0; i < 5; ++i) {
+ System.gc();
+ Thread.sleep(40);
+ }
+
+ closeableRegistry.close();
+
+ joinThreads();
+
+ Assert.assertEquals(0, unclosedCounter.get());
+
+ try {
+
+ WrappingProxyCloseable<Closeable> testCloseable = new WrappingProxyCloseable<Closeable>() {
+ @Override
+ public Closeable getWrappedDelegate() {
+ return this;
+ }
+
+ @Override
+ public void close() throws IOException {
+ unclosedCounter.incrementAndGet();
+ }
+ };
+
+ closeableRegistry.registerClosable(testCloseable);
+
+ Assert.fail("Closed registry should not accept closeables!");
+
+ } catch (IOException expected) {
+ //expected
+ }
+
+ Assert.assertEquals(1, unclosedCounter.get());
+ }
+
+ @Test
+ public void testSafetyNetClose() throws Exception {
+
+ startThreads(20);
+
+ joinThreads();
+
+ for (int i = 0; i < 5 && unclosedCounter.get() > 0; ++i) {
+ System.gc();
+ Thread.sleep(50);
+ }
+
+ Assert.assertEquals(0, unclosedCounter.get());
+ closeableRegistry.close();
+ }
+
+ private static final class ProducerThread extends Thread {
+
+ private final SafetyNetCloseableRegistry registry;
+ private final AtomicInteger refCount;
+ private int maxStreams;
+
+ public ProducerThread(SafetyNetCloseableRegistry registry, AtomicInteger refCount, int maxStreams) {
+ this.registry = registry;
+ this.refCount = refCount;
+ this.maxStreams = maxStreams;
+ }
+
+ public int getMaxStreams() {
+ return maxStreams;
+ }
+
+ public void setMaxStreams(int maxStreams) {
+ this.maxStreams = maxStreams;
+ }
+
+ @Override
+ public void run() {
+ try {
+ int count = 0;
+ while (maxStreams > 0) {
+ String debug = Thread.currentThread().getName() + " " + count;
+ TestStream testStream = new TestStream(refCount);
+ refCount.incrementAndGet();
+ ClosingFSDataInputStream pis = ClosingFSDataInputStream.wrapSafe(testStream, registry, debug); //reference dies here
+
+ try {
+ Thread.sleep(2);
+ } catch (InterruptedException e) {
+
+ }
+
+ if (maxStreams != Integer.MAX_VALUE) {
+ --maxStreams;
+ }
+ ++count;
+ }
+ } catch (Exception ex) {
+
+ }
+ }
+ }
+
+ private static final class TestStream extends FSDataInputStream {
+
+ private AtomicInteger refCount;
+
+ public TestStream(AtomicInteger refCount) {
+ this.refCount = refCount;
+ }
+
+ @Override
+ public void seek(long desired) throws IOException {
+
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public int read() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (refCount != null) {
+ refCount.decrementAndGet();
+ refCount = null;
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index b5bdcaf..d79be05 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -18,19 +18,8 @@
package org.apache.flink.runtime.filecache;
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.ConfigConstants;
@@ -40,13 +29,23 @@ import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.IOUtils;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
/**
* The FileCache is used to create the local files for the registered cache files when a task is deployed.
* The files will be removed when the task is unregistered after a 5 second delay.
@@ -236,8 +235,10 @@ public class FileCache {
// ------------------------------------------------------------------------
public static void copy(Path sourcePath, Path targetPath, boolean executable) throws IOException {
- FileSystem sFS = sourcePath.getFileSystem();
- FileSystem tFS = targetPath.getFileSystem();
+ // TODO rewrite this to make it participate in the closable registry and the lifecycle of a task.
+ // we unwrap the file system to get raw streams without safety net
+ FileSystem sFS = FileSystem.getUnguardedFileSystem(sourcePath.toUri());
+ FileSystem tFS = FileSystem.getUnguardedFileSystem(targetPath.toUri());
if (!tFS.exists(targetPath)) {
if (sFS.getFileStatus(sourcePath).isDir()) {
tFS.mkdirs(targetPath);
@@ -253,16 +254,11 @@ public class FileCache {
copy(content.getPath(), new Path(localPath), executable);
}
} else {
- try {
- FSDataOutputStream lfsOutput = tFS.create(targetPath, false);
- FSDataInputStream fsInput = sFS.open(sourcePath);
+ try (FSDataOutputStream lfsOutput = tFS.create(targetPath, false); FSDataInputStream fsInput = sFS.open(sourcePath)) {
IOUtils.copyBytes(fsInput, lfsOutput);
//noinspection ResultOfMethodCallIgnored
new File(targetPath.toString()).setExecutable(executable);
- // closing the FSDataOutputStream
- lfsOutput.close();
- }
- catch (IOException ioe) {
+ } catch (IOException ioe) {
LOG.error("could not copy file to local file cache.", ioe);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index e5d9b2b..ae71c7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.util.Preconditions;
@@ -80,7 +81,7 @@ public abstract class AbstractKeyedStateBackend<K>
protected final TaskKvStateRegistry kvStateRegistry;
/** Registry for all opened streams, so they can be closed if the task using this backend is closed */
- protected ClosableRegistry cancelStreamRegistry;
+ protected CloseableRegistry cancelStreamRegistry;
protected final ClassLoader userCodeClassLoader;
@@ -96,7 +97,7 @@ public abstract class AbstractKeyedStateBackend<K>
this.numberOfKeyGroups = Preconditions.checkNotNull(numberOfKeyGroups);
this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
- this.cancelStreamRegistry = new ClosableRegistry();
+ this.cancelStreamRegistry = new CloseableRegistry();
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java
deleted file mode 100644
index b5f7dad..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.commons.io.IOUtils;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * This class allows to register instances of {@link Closeable}, which are all closed if this registry is closed.
- * <p>
- * Registering to an already closed registry will throw an exception and close the provided {@link Closeable}
- * <p>
- * All methods in this class are thread-safe.
- */
-public class ClosableRegistry implements Closeable {
-
- private final Set<Closeable> registeredCloseables;
- private boolean closed;
-
- public ClosableRegistry() {
- this.registeredCloseables = new HashSet<>();
- this.closed = false;
- }
-
- /**
- * Registers a {@link Closeable} with the registry. In case the registry is already closed, this method throws an
- * {@link IllegalStateException} and closes the passed {@link Closeable}.
- *
- * @param closeable Closable tor register
- * @return true if the the Closable was newly added to the registry
- * @throws IOException exception when the registry was closed before
- */
- public boolean registerClosable(Closeable closeable) throws IOException {
-
- if (null == closeable) {
- return false;
- }
-
- synchronized (getSynchronizationLock()) {
- if (closed) {
- IOUtils.closeQuietly(closeable);
- throw new IOException("Cannot register Closable, registry is already closed. Closed passed closable.");
- }
-
- return registeredCloseables.add(closeable);
- }
- }
-
- /**
- * Removes a {@link Closeable} from the registry.
- *
- * @param closeable instance to remove from the registry.
- * @return true, if the instance was actually registered and now removed
- */
- public boolean unregisterClosable(Closeable closeable) {
-
- if (null == closeable) {
- return false;
- }
-
- synchronized (getSynchronizationLock()) {
- return registeredCloseables.remove(closeable);
- }
- }
-
- @Override
- public void close() throws IOException {
- synchronized (getSynchronizationLock()) {
-
- for (Closeable closeable : registeredCloseables) {
- IOUtils.closeQuietly(closeable);
- }
-
- registeredCloseables.clear();
- closed = true;
- }
- }
-
- public boolean isClosed() {
- synchronized (getSynchronizationLock()) {
- return closed;
- }
- }
-
- private Object getSynchronizationLock() {
- return registeredCloseables;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 2f5d3cb..5b47362 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.memory.DataInputView;
@@ -49,7 +50,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
private final Map<String, PartitionableListState<?>> registeredStates;
private final Collection<OperatorStateHandle> restoreSnapshots;
- private final ClosableRegistry closeStreamOnCancelRegistry;
+ private final CloseableRegistry closeStreamOnCancelRegistry;
private final JavaSerializer<Serializable> javaSerializer;
/**
@@ -65,7 +66,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
this.javaSerializer = new JavaSerializer<>(userClassLoader);
this.restoreSnapshots = restoreSnapshots;
this.registeredStates = new HashMap<>();
- this.closeStreamOnCancelRegistry = new ClosableRegistry();
+ this.closeStreamOnCancelRegistry = new CloseableRegistry();
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
index 8fbde05..b131d14 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
@@ -22,6 +22,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.util.Preconditions;
@@ -36,7 +37,7 @@ import java.util.Iterator;
public class StateInitializationContextImpl implements StateInitializationContext {
/** Closable registry to participate in the operator's cancel/close methods */
- private final ClosableRegistry closableRegistry;
+ private final CloseableRegistry closableRegistry;
/** Signal whether any state to restore was found */
private final boolean restored;
@@ -55,7 +56,7 @@ public class StateInitializationContextImpl implements StateInitializationContex
KeyedStateStore keyedStateStore,
Collection<KeyGroupsStateHandle> keyGroupsStateHandles,
Collection<OperatorStateHandle> operatorStateHandles,
- ClosableRegistry closableRegistry) {
+ CloseableRegistry closableRegistry) {
this.restored = restored;
this.closableRegistry = Preconditions.checkNotNull(closableRegistry);
@@ -87,7 +88,7 @@ public class StateInitializationContextImpl implements StateInitializationContex
return keyGroupsStateHandles;
}
- public ClosableRegistry getClosableRegistry() {
+ public CloseableRegistry getClosableRegistry() {
return closableRegistry;
}
@@ -137,14 +138,14 @@ public class StateInitializationContextImpl implements StateInitializationContex
private static class KeyGroupStreamIterator implements Iterator<KeyGroupStatePartitionStreamProvider> {
private final Iterator<KeyGroupsStateHandle> stateHandleIterator;
- private final ClosableRegistry closableRegistry;
+ private final CloseableRegistry closableRegistry;
private KeyGroupsStateHandle currentStateHandle;
private FSDataInputStream currentStream;
private Iterator<Tuple2<Integer, Long>> currentOffsetsIterator;
public KeyGroupStreamIterator(
- Iterator<KeyGroupsStateHandle> stateHandleIterator, ClosableRegistry closableRegistry) {
+ Iterator<KeyGroupsStateHandle> stateHandleIterator, CloseableRegistry closableRegistry) {
this.stateHandleIterator = Preconditions.checkNotNull(stateHandleIterator);
this.closableRegistry = Preconditions.checkNotNull(closableRegistry);
@@ -200,7 +201,7 @@ public class StateInitializationContextImpl implements StateInitializationContex
private final String stateName; //TODO since we only support a single named state in raw, this could be dropped
private final Iterator<OperatorStateHandle> stateHandleIterator;
- private final ClosableRegistry closableRegistry;
+ private final CloseableRegistry closableRegistry;
private OperatorStateHandle currentStateHandle;
private FSDataInputStream currentStream;
@@ -210,7 +211,7 @@ public class StateInitializationContextImpl implements StateInitializationContex
public OperatorStateStreamIterator(
String stateName,
Iterator<OperatorStateHandle> stateHandleIterator,
- ClosableRegistry closableRegistry) {
+ CloseableRegistry closableRegistry) {
this.stateName = Preconditions.checkNotNull(stateName);
this.stateHandleIterator = Preconditions.checkNotNull(stateHandleIterator);
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
index d632529..ce8a6c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.state;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
@@ -42,7 +43,7 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext
* Registry for opened streams to participate in the lifecycle of the stream task. Hence, this registry should be
* obtained from and managed by the stream task.
*/
- private final ClosableRegistry closableRegistry;
+ private final CloseableRegistry closableRegistry;
private KeyedStateCheckpointOutputStream keyedStateCheckpointOutputStream;
private OperatorStateCheckpointOutputStream operatorStateCheckpointOutputStream;
@@ -62,7 +63,7 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext
long checkpointTimestamp,
CheckpointStreamFactory streamFactory,
KeyGroupRange keyGroupRange,
- ClosableRegistry closableRegistry) {
+ CloseableRegistry closableRegistry) {
this.checkpointId = checkpointId;
this.checkpointTimestamp = checkpointTimestamp;
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
index 29e905c..b61c52d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
@@ -43,9 +43,6 @@ public class FileStateHandle implements StreamStateHandle {
/** The size of the state in the file */
private final long stateSize;
- /** Cached file system handle */
- private transient FileSystem fs;
-
/**
* Creates a new file state for the given file path.
*
@@ -79,13 +76,17 @@ public class FileStateHandle implements StreamStateHandle {
*/
@Override
public void discardState() throws Exception {
- getFileSystem().delete(filePath, false);
+
+ FileSystem fs = getFileSystem();
+
+ fs.delete(filePath, false);
// send a call to delete the checkpoint directory containing the file. This will
// fail (and be ignored) when some files still exist
try {
- getFileSystem().delete(filePath.getParent(), false);
- } catch (IOException ignored) {}
+ fs.delete(filePath.getParent(), false);
+ } catch (IOException ignored) {
+ }
}
/**
@@ -106,10 +107,7 @@ public class FileStateHandle implements StreamStateHandle {
* @throws IOException Thrown if the file system cannot be accessed.
*/
private FileSystem getFileSystem() throws IOException {
- if (fs == null) {
- fs = FileSystem.get(filePath.toUri());
- }
- return fs;
+ return FileSystem.get(filePath.toUri());
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 3254fc1..c794f56 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
@@ -538,6 +539,9 @@ public class Task implements Runnable, TaskActions {
// check for canceling as a shortcut
// ----------------------------
+ // init closeable registry for this task
+ FileSystem.createFileSystemCloseableRegistryForTask();
+
// first of all, get a user-code classloader
// this may involve downloading the job's JAR files and/or classes
LOG.info("Loading JAR files for task " + taskNameWithSubtask);
@@ -758,6 +762,7 @@ public class Task implements Runnable, TaskActions {
// remove all files in the distributed cache
removeCachedFiles(distributedCacheEntries, fileCache);
+ FileSystem.disposeFileSystemCloseableRegistryForTask();
notifyFinalState();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 4aaad71..6595901 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.SubtaskState;
@@ -36,7 +37,6 @@ import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.ClosableRegistry;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
@@ -161,7 +161,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
/** The currently active background materialization threads */
- private final ClosableRegistry cancelables = new ClosableRegistry();
+ private final CloseableRegistry cancelables = new CloseableRegistry();
/** Flag to mark the task "in operation", in which case check
* needs to be initialized to true, so that early cancel() before invoke() behaves correctly */
@@ -949,7 +949,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
}
- public ClosableRegistry getCancelables() {
+ public CloseableRegistry getCancelables() {
return cancelables;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
index 75c2261..cd94076 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
@@ -20,13 +20,13 @@ package org.apache.flink.streaming.api.operators;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.ClosableRegistry;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
@@ -58,7 +58,7 @@ public class StateInitializationContextImplTest {
static final int NUM_HANDLES = 10;
private StateInitializationContextImpl initializationContext;
- private ClosableRegistry closableRegistry;
+ private CloseableRegistry closableRegistry;
private int writtenKeyGroups;
private Set<Integer> writtenOperatorStates;
@@ -70,7 +70,7 @@ public class StateInitializationContextImplTest {
this.writtenKeyGroups = 0;
this.writtenOperatorStates = new HashSet<>();
- this.closableRegistry = new ClosableRegistry();
+ this.closableRegistry = new CloseableRegistry();
OperatorStateStore stateStore = mock(OperatorStateStore.class);
ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos(64);
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
index 0ee839e..2b2df4c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
@@ -18,8 +18,8 @@
package org.apache.flink.streaming.api.operators;
+import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.ClosableRegistry;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
@@ -35,7 +35,7 @@ public class StateSnapshotContextSynchronousImplTest {
@Before
public void setUp() throws Exception {
- ClosableRegistry closableRegistry = new ClosableRegistry();
+ CloseableRegistry closableRegistry = new CloseableRegistry();
CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(1024);
KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);
this.snapshotContext = new StateSnapshotContextSynchronousImpl(42, 4711, streamFactory, keyGroupRange, closableRegistry);
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 23a31d5..830cd6f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
@@ -32,7 +33,6 @@ import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.ClosableRegistry;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
@@ -65,7 +65,9 @@ import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* Base class for {@code AbstractStreamOperator} test harnesses.
@@ -86,7 +88,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
final Environment environment;
- ClosableRegistry closableRegistry;
+ CloseableRegistry closableRegistry;
// use this as default for tests
protected AbstractStateBackend stateBackend = new MemoryStateBackend();
@@ -115,7 +117,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
this.config = new StreamConfig(underlyingConfig);
this.config.setCheckpointingEnabled(true);
this.executionConfig = new ExecutionConfig();
- this.closableRegistry = new ClosableRegistry();
+ this.closableRegistry = new CloseableRegistry();
this.checkpointLock = new Object();
environment = new MockEnvironment(
http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 5a64173..09de67f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -114,6 +114,7 @@ public class RescalingITCase extends TestLogger {
public static void teardown() {
if (cluster != null) {
cluster.shutdown();
+ cluster.awaitTermination();
}
}
[2/3] flink git commit: [FLINK-5107] Introduced limit for prior
execution attempt history
Posted by tr...@apache.org.
[FLINK-5107] Introduced limit for prior execution attempt history
This closes #2837.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f5af7f10
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f5af7f10
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f5af7f10
Branch: refs/heads/master
Commit: f5af7f1025aac9cac3f83de5f0e3aece5730ec0f
Parents: ba8ed26
Author: Stefan Richter <s....@data-artisans.com>
Authored: Fri Nov 18 19:07:56 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 22 23:18:34 2016 +0100
----------------------------------------------------------------------
...taskExecutionAttemptAccumulatorsHandler.java | 7 +-
.../executiongraph/ArchivedExecutionVertex.java | 12 +-
.../executiongraph/ExecutionJobVertex.java | 11 +-
.../runtime/executiongraph/ExecutionVertex.java | 44 +++--
.../runtime/jobmanager/JobManagerOptions.java | 38 +++++
.../flink/runtime/util/EvictingBoundedList.java | 160 ++++++++++++++++++
.../executiongraph/AllVerticesIteratorTest.java | 9 +-
.../runtime/util/EvictingBoundedListTest.java | 164 +++++++++++++++++++
8 files changed, 422 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index 786f5e8..675304f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.core.JsonGenerator;
-
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -39,6 +38,12 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
@Override
public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception {
+
+ // return empty string for pruned (== null) execution attempts
+ if (null == execAttempt) {
+ return "";
+ }
+
final StringifiedAccumulatorResult[] accs = execAttempt.getUserAccumulatorsStringified();
StringWriter writer = new StringWriter();
http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
index e1fb11a..56fc7a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
@@ -19,17 +19,16 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.EvictingBoundedList;
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializable {
private static final long serialVersionUID = -6708241535015028576L;
private final int subTaskIndex;
- private final List<ArchivedExecution> priorExecutions;
+ private final EvictingBoundedList<ArchivedExecution> priorExecutions;
/** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations */
private final String taskNameWithSubtask;
@@ -38,9 +37,10 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
public ArchivedExecutionVertex(ExecutionVertex vertex) {
this.subTaskIndex = vertex.getParallelSubtaskIndex();
- this.priorExecutions = new ArrayList<>();
- for (Execution priorExecution : vertex.getPriorExecutions()) {
- priorExecutions.add(priorExecution.archive());
+ EvictingBoundedList<Execution> copyOfPriorExecutionsList = vertex.getCopyOfPriorExecutionsList();
+ priorExecutions = new EvictingBoundedList<>(copyOfPriorExecutionsList.getSizeLimit());
+ for (Execution priorExecution : copyOfPriorExecutionsList) {
+ priorExecutions.add(priorExecution != null ? priorExecution.archive() : null);
}
this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
this.currentExecution = vertex.getCurrentExecutionAttempt().archive();
http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index a62ed86..c066ca8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
@@ -38,6 +39,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.JobManagerOptions;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
@@ -163,9 +165,16 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
result.getResultType());
}
+ Configuration jobConfiguration = graph.getJobConfiguration();
+ int maxPriorAttemptsHistoryLength = jobConfiguration != null ?
+ jobConfiguration.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE) :
+ JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue();
+
// create all task vertices
for (int i = 0; i < numTaskVertices; i++) {
- ExecutionVertex vertex = new ExecutionVertex(this, i, this.producedDataSets, timeout, createTimestamp);
+ ExecutionVertex vertex = new ExecutionVertex(
+ this, i, this.producedDataSets, timeout, createTimestamp, maxPriorAttemptsHistoryLength);
+
this.taskVertices[i] = vertex;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 39c60d2..5cbd1c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -36,11 +36,13 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.JobManagerOptions;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.EvictingBoundedList;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
@@ -53,7 +55,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
@@ -79,7 +80,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
private final int subTaskIndex;
- private final List<Execution> priorExecutions;
+ private final EvictingBoundedList<Execution> priorExecutions;
private final Time timeout;
@@ -99,7 +100,13 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
int subTaskIndex,
IntermediateResult[] producedDataSets,
Time timeout) {
- this(jobVertex, subTaskIndex, producedDataSets, timeout, System.currentTimeMillis());
+ this(
+ jobVertex,
+ subTaskIndex,
+ producedDataSets,
+ timeout,
+ System.currentTimeMillis(),
+ JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue());
}
public ExecutionVertex(
@@ -107,7 +114,17 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
int subTaskIndex,
IntermediateResult[] producedDataSets,
Time timeout,
- long createTimestamp) {
+ int maxPriorExecutionHistoryLength) {
+ this(jobVertex, subTaskIndex, producedDataSets, timeout, System.currentTimeMillis(), maxPriorExecutionHistoryLength);
+ }
+
+ public ExecutionVertex(
+ ExecutionJobVertex jobVertex,
+ int subTaskIndex,
+ IntermediateResult[] producedDataSets,
+ Time timeout,
+ long createTimestamp,
+ int maxPriorExecutionHistoryLength) {
this.jobVertex = jobVertex;
this.subTaskIndex = subTaskIndex;
@@ -125,7 +142,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
this.inputEdges = new ExecutionEdge[jobVertex.getJobVertex().getInputs().size()][];
- this.priorExecutions = new CopyOnWriteArrayList<Execution>();
+ this.priorExecutions = new EvictingBoundedList<>(maxPriorExecutionHistoryLength);
this.currentExecution = new Execution(
getExecutionGraph().getFutureExecutor(),
@@ -235,16 +252,19 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
@Override
public Execution getPriorExecutionAttempt(int attemptNumber) {
- if (attemptNumber >= 0 && attemptNumber < priorExecutions.size()) {
- return priorExecutions.get(attemptNumber);
- }
- else {
- throw new IllegalArgumentException("attempt does not exist");
+ synchronized (priorExecutions) {
+ if (attemptNumber >= 0 && attemptNumber < priorExecutions.size()) {
+ return priorExecutions.get(attemptNumber);
+ } else {
+ throw new IllegalArgumentException("attempt does not exist");
+ }
}
}
- List<Execution> getPriorExecutions() {
- return priorExecutions;
+ EvictingBoundedList<Execution> getCopyOfPriorExecutionsList() {
+ synchronized (priorExecutions) {
+ return new EvictingBoundedList<>(priorExecutions);
+ }
}
public ExecutionGraph getExecutionGraph() {
http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
new file mode 100644
index 0000000..279a70e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+@PublicEvolving
+public class JobManagerOptions {
+
+ /**
+ * The maximum number of prior execution attempts kept in history.
+ */
+ public static final ConfigOption<Integer> MAX_ATTEMPTS_HISTORY_SIZE =
+ key("job-manager.max-attempts-history-size").defaultValue(16);
+
+ private JobManagerOptions() {
+ throw new IllegalAccessError();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
new file mode 100644
index 0000000..f4c155a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * This class implements a list (array based) that is physically bounded in maximum size, but can virtually grow beyond
+ * the bounded size. When the list grows beyond the size bound, elements are dropped from the head of the list (FIFO
+ * order). If dropped elements are accessed, a default element is returned instead.
+ * <p>
+ * TODO this class could eventually implement the whole actual List interface.
+ *
+ * @param <T> type of the list elements
+ */
+public class EvictingBoundedList<T> implements Iterable<T>, Serializable {
+
+ private static final long serialVersionUID = -1863961980953613146L;
+
+ private final T defaultElement;
+ private final Object[] elements;
+ private int idx;
+ private int count;
+ private long modCount;
+
+ public EvictingBoundedList(int sizeLimit) {
+ this(sizeLimit, null);
+ }
+
+ public EvictingBoundedList(EvictingBoundedList<T> other) {
+ Preconditions.checkNotNull(other);
+ this.defaultElement = other.defaultElement;
+ this.elements = other.elements.clone();
+ this.idx = other.idx;
+ this.count = other.count;
+ this.modCount = 0L;
+ }
+
+ public EvictingBoundedList(int sizeLimit, T defaultElement) {
+ this.elements = new Object[sizeLimit];
+ this.defaultElement = defaultElement;
+ this.idx = 0;
+ this.count = 0;
+ this.modCount = 0L;
+ }
+
+ public int size() {
+ return count;
+ }
+
+ public boolean isEmpty() {
+ return 0 == count;
+ }
+
+ public boolean add(T t) {
+ elements[idx] = t;
+ idx = (idx + 1) % elements.length;
+ ++count;
+ ++modCount;
+ return true;
+ }
+
+ public void clear() {
+ if (!isEmpty()) {
+ for (int i = 0; i < elements.length; ++i) {
+ elements[i] = null;
+ }
+ count = 0;
+ idx = 0;
+ ++modCount;
+ }
+ }
+
+ public T get(int index) {
+ Preconditions.checkArgument(index >= 0 && index < count);
+ return isDroppedIndex(index) ? getDefaultElement() : accessInternal(index % elements.length);
+ }
+
+ public int getSizeLimit() {
+ return elements.length;
+ }
+
+ public T set(int index, T element) {
+ Preconditions.checkArgument(index >= 0 && index < count);
+ ++modCount;
+ if (isDroppedIndex(index)) {
+ return getDefaultElement();
+ } else {
+ int idx = index % elements.length;
+ T old = accessInternal(idx);
+ elements[idx] = element;
+ return old;
+ }
+ }
+
+ public T getDefaultElement() {
+ return defaultElement;
+ }
+
+ private boolean isDroppedIndex(int idx) {
+ return idx < count - elements.length;
+ }
+
+ @SuppressWarnings("unchecked")
+ private T accessInternal(int arrayIndex) {
+ return (T) elements[arrayIndex];
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ return new Iterator<T>() {
+
+ int pos = 0;
+ final long oldModCount = modCount;
+
+ @Override
+ public boolean hasNext() {
+ return pos < count;
+ }
+
+ @Override
+ public T next() {
+ if (oldModCount != modCount) {
+ throw new ConcurrentModificationException();
+ }
+ if (pos < count) {
+ return get(pos++);
+ } else {
+ throw new NoSuchElementException("Iterator exhausted.");
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Read-only iterator");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
index 0223a2e..4ecac9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
@@ -18,8 +18,7 @@
package org.apache.flink.runtime.executiongraph;
-import java.util.Arrays;
-
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -28,6 +27,8 @@ import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
+import java.util.Arrays;
+
public class AllVerticesIteratorTest {
@Test
@@ -50,8 +51,10 @@ public class AllVerticesIteratorTest {
v4.setParallelism(2);
ExecutionGraph eg = Mockito.mock(ExecutionGraph.class);
+ Configuration jobConf = new Configuration();
Mockito.when(eg.getFutureExecutor()).thenReturn(TestingUtils.directExecutionContext());
-
+ Mockito.when(eg.getJobConfiguration()).thenReturn(jobConf);
+
ExecutionJobVertex ejv1 = new ExecutionJobVertex(eg, v1, 1,
AkkaUtils.getDefaultTimeout());
ExecutionJobVertex ejv2 = new ExecutionJobVertex(eg, v2, 1,
http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
new file mode 100644
index 0000000..e0a1c70
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class EvictingBoundedListTest {
+
+ @Test
+ public void testAddGet() {
+ int insertSize = 17;
+ int boundSize = 5;
+ Integer defaultElement = 4711;
+
+ EvictingBoundedList<Integer> list = new EvictingBoundedList<>(boundSize, defaultElement);
+ assertTrue(list.isEmpty());
+
+ for (int i = 0; i < insertSize; ++i) {
+ list.add(i);
+ }
+
+ assertEquals(17, list.size());
+
+ for (int i = 0; i < insertSize; ++i) {
+ int exp = i < (insertSize - boundSize) ? defaultElement : i;
+ int act = list.get(i);
+ assertEquals(exp, act);
+ }
+ }
+
+ @Test
+ public void testSet() {
+ int insertSize = 17;
+ int boundSize = 5;
+ Integer defaultElement = 4711;
+ List<Integer> reference = new ArrayList<>(insertSize);
+ EvictingBoundedList<Integer> list = new EvictingBoundedList<>(boundSize, defaultElement);
+ for (int i = 0; i < insertSize; ++i) {
+ reference.add(i);
+ list.add(i);
+ }
+
+ assertEquals(reference.size(), list.size());
+
+ list.set(0, 123);
+ list.set(insertSize - boundSize - 1, 123);
+
+ list.set(insertSize - boundSize, 42);
+ reference.set(insertSize - boundSize, 42);
+ list.set(13, 43);
+ reference.set(13, 43);
+ list.set(16, 44);
+ reference.set(16, 44);
+
+ try {
+ list.set(insertSize, 23);
+ fail("Illegal index in set not detected.");
+ } catch (IllegalArgumentException ignored) {
+
+ }
+
+ for (int i = 0; i < insertSize; ++i) {
+ int exp = i < (insertSize - boundSize) ? defaultElement : reference.get(i);
+ int act = list.get(i);
+ assertEquals(exp, act);
+ }
+
+ assertEquals(reference.size(), list.size());
+ }
+
+ @Test
+ public void testClear() {
+ int insertSize = 17;
+ int boundSize = 5;
+ Integer defaultElement = 4711;
+
+ EvictingBoundedList<Integer> list = new EvictingBoundedList<>(boundSize, defaultElement);
+ for (int i = 0; i < insertSize; ++i) {
+ list.add(i);
+ }
+
+ list.clear();
+
+ assertEquals(0, list.size());
+ assertTrue(list.isEmpty());
+
+ try {
+ list.get(0);
+ fail();
+ } catch (IllegalArgumentException ignore) {
+ }
+ }
+
+ @Test
+ public void testIterator() {
+ int insertSize = 17;
+ int boundSize = 5;
+ Integer defaultElement = 4711;
+
+ EvictingBoundedList<Integer> list = new EvictingBoundedList<>(boundSize, defaultElement);
+ assertTrue(list.isEmpty());
+
+ for (int i = 0; i < insertSize; ++i) {
+ list.add(i);
+ }
+
+ Iterator<Integer> iterator = list.iterator();
+
+ for (int i = 0; i < insertSize; ++i) {
+ assertTrue(iterator.hasNext());
+ int exp = i < (insertSize - boundSize) ? defaultElement : i;
+ int act = iterator.next();
+ assertEquals(exp, act);
+ }
+
+ assertFalse(iterator.hasNext());
+
+ try {
+ iterator.next();
+ fail("Next on exhausted iterator did not trigger exception.");
+ } catch (NoSuchElementException ignored) {
+
+ }
+
+ iterator = list.iterator();
+ assertTrue(iterator.hasNext());
+ iterator.next();
+ list.add(123);
+ assertTrue(iterator.hasNext());
+ try {
+ iterator.next();
+ fail("Concurrent modification not detected.");
+ } catch (ConcurrentModificationException ignored) {
+
+ }
+ }
+}
[3/3] flink git commit: [FLINK-5107] Handle evicted execution
attempts in request handlers
Posted by tr...@apache.org.
[FLINK-5107] Handle evicted execution attempts in request handlers
If a prior execution attempt cannot be retrieved because it has been evicted before,
the request handler will now throw a meaningful exception to notify the requester
about the evicted execution attempt.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4c23879a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4c23879a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4c23879a
Branch: refs/heads/master
Commit: 4c23879a5743b862b93d3c4267244235266163e7
Parents: f5af7f1
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Nov 23 00:19:32 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 23 00:19:32 2016 +0100
----------------------------------------------------------------------
.../AbstractSubtaskAttemptRequestHandler.java | 8 ++++-
.../handlers/RequestHandlerException.java | 31 ++++++++++++++++++++
...taskExecutionAttemptAccumulatorsHandler.java | 6 ----
3 files changed, 38 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4c23879a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
index f3a5059..1eab21c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
@@ -57,7 +57,13 @@ public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubta
}
else if (attempt >= 0 && attempt < currentAttempt.getAttemptNumber()) {
AccessExecution exec = vertex.getPriorExecutionAttempt(attempt);
- return handleRequest(exec, params);
+
+ if (exec != null) {
+ return handleRequest(exec, params);
+ } else {
+ throw new RequestHandlerException("Execution for attempt " + attempt +
+ " has already been deleted.");
+ }
}
else {
throw new RuntimeException("Attempt does not exist: " + attempt);
http://git-wip-us.apache.org/repos/asf/flink/blob/4c23879a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java
new file mode 100644
index 0000000..bb61d16
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+/**
+ * Base class for request handler exceptions.
+ */
+public class RequestHandlerException extends Exception {
+
+ private static final long serialVersionUID = 7570352908725875886L;
+
+ public RequestHandlerException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4c23879a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index 675304f..e613efb 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -38,12 +38,6 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
@Override
public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception {
-
- // return empty string for pruned (== null) execution attempts
- if (null == execAttempt) {
- return "";
- }
-
final StringifiedAccumulatorResult[] accs = execAttempt.getUserAccumulatorsStringified();
StringWriter writer = new StringWriter();