You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/02/18 08:03:59 UTC

[flink] branch master updated (b43701f -> 5b71c7f)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from b43701f  [FLINK-15988][json] Make JsonRowSerializationSchema's constructor private (#11080)
     new dddda69  [FLINK-10918][state backends] Change o.a.f.core.fs.Path to java.nio.file.Path in RocksDB incremental checkpoints
     new bd1a838  [hotfix][core] Fix broken JavaDoc link in FileUtils
     new 2da3bc3  [hotfix][runtime] Small docs/comments cleanups in RPC code.
     new 2746e6a  [FLINK-15966][runtime] Capture callstacks for RPC ask() calls to improve exceptions.
     new 5b71c7f  [hotfix][config] Adjust AkkaOptions to new type-safe pattern

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/_includes/generated/akka_configuration.html   |   6 +
 .../apache/flink/configuration/AkkaOptions.java    |  35 ++++++
 .../main/java/org/apache/flink/util/FileUtils.java |  14 ++-
 .../org/apache/flink/runtime/rpc/RpcEndpoint.java  |   2 -
 .../runtime/rpc/akka/AkkaInvocationHandler.java    |  72 ++++++++---
 .../flink/runtime/rpc/akka/AkkaRpcService.java     |  19 ++-
 .../rpc/akka/AkkaRpcServiceConfiguration.java      |  19 ++-
 .../rpc/akka/FencedAkkaInvocationHandler.java      |   5 +-
 .../runtime/rpc/messages/RemoteRpcInvocation.java  |   6 +-
 .../flink/runtime/state/DirectoryStateHandle.java  |  31 +++--
 .../flink/runtime/state/SnapshotDirectory.java     |  42 +++----
 .../ResourceManagerTaskExecutorTest.java           |   7 +-
 .../runtime/rpc/akka/TimeoutCallStackTest.java     | 135 +++++++++++++++++++++
 .../flink/runtime/state/SnapshotDirectoryTest.java |  48 ++------
 .../state/StateSnapshotTransformerTest.java        |   2 +-
 .../streaming/state/RocksDBStateDownloader.java    |  14 +--
 .../streaming/state/RocksDBStateUploader.java      |  12 +-
 .../RocksDBIncrementalRestoreOperation.java        |  46 +++----
 .../snapshot/RocksIncrementalSnapshotStrategy.java |  20 ++-
 .../state/RocksDBStateDownloaderTest.java          |   8 +-
 .../streaming/state/RocksDBStateUploaderTest.java  |  13 +-
 21 files changed, 382 insertions(+), 174 deletions(-)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/TimeoutCallStackTest.java


[flink] 01/05: [FLINK-10918][state backends] Change o.a.f.core.fs.Path to java.nio.file.Path in RocksDB incremental checkpoints

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dddda6904df94beeb0239c936e25968988f68afa
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Feb 13 20:50:07 2020 +0100

    [FLINK-10918][state backends] Change o.a.f.core.fs.Path to java.nio.file.Path in RocksDB incremental checkpoints
    
    This solves the bug that Paths are handled wrong on Windows, breaking incremental checkpoints.
    
    This closs #11095
---
 .../main/java/org/apache/flink/util/FileUtils.java | 12 ++++++
 .../flink/runtime/state/DirectoryStateHandle.java  | 31 +++++++++-----
 .../flink/runtime/state/SnapshotDirectory.java     | 42 ++++++++-----------
 .../flink/runtime/state/SnapshotDirectoryTest.java | 48 +++++-----------------
 .../state/StateSnapshotTransformerTest.java        |  2 +-
 .../streaming/state/RocksDBStateDownloader.java    | 14 +++----
 .../streaming/state/RocksDBStateUploader.java      | 12 +++---
 .../RocksDBIncrementalRestoreOperation.java        | 46 ++++++++-------------
 .../snapshot/RocksIncrementalSnapshotStrategy.java | 20 ++++-----
 .../state/RocksDBStateDownloaderTest.java          |  8 ++--
 .../streaming/state/RocksDBStateUploaderTest.java  | 13 +++---
 11 files changed, 109 insertions(+), 139 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
index 485eb02..be3e68d 100644
--- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
@@ -51,6 +51,7 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Random;
 import java.util.function.Predicate;
+import java.util.stream.Stream;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipInputStream;
 import java.util.zip.ZipOutputStream;
@@ -100,6 +101,17 @@ public final class FileUtils {
 	// ------------------------------------------------------------------------
 
 	/**
+	 * Lists the given directory in a resource-leak-safe way.
+	 */
+	public static java.nio.file.Path[] listDirectory(java.nio.file.Path directory) throws IOException {
+		try (Stream<java.nio.file.Path> stream = Files.list(directory)) {
+			return stream.toArray(java.nio.file.Path[]::new);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
 	 * Constructs a random filename with the given prefix and
 	 * a random part generated from hex characters.
 	 *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java
index 2d08777..f57dc9c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java
@@ -18,12 +18,13 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.FileUtils;
 
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 
 /**
  * This state handle represents a directory. This class is, for example, used to represent the directory of RocksDB's
@@ -34,18 +35,21 @@ public class DirectoryStateHandle implements StateObject {
 	/** Serial version. */
 	private static final long serialVersionUID = 1L;
 
-	/** The path that describes the directory. */
-	@Nonnull
-	private final Path directory;
+	/** The path that describes the directory, as a string, to be serializable. */
+	private final String directoryString;
+
+	/** Transient path cache, to avoid re-parsing the string. */
+	private transient Path directory;
 
 	public DirectoryStateHandle(@Nonnull Path directory) {
 		this.directory = directory;
+		this.directoryString = directory.toString();
 	}
 
 	@Override
 	public void discardState() throws IOException {
-		FileSystem fileSystem = directory.getFileSystem();
-		fileSystem.delete(directory, true);
+		ensurePath();
+		FileUtils.deleteDirectory(directory.toFile());
 	}
 
 	@Override
@@ -56,9 +60,16 @@ public class DirectoryStateHandle implements StateObject {
 
 	@Nonnull
 	public Path getDirectory() {
+		ensurePath();
 		return directory;
 	}
 
+	private void ensurePath() {
+		if (directory == null) {
+			directory = Paths.get(directoryString);
+		}
+	}
+
 	@Override
 	public boolean equals(Object o) {
 		if (this == o) {
@@ -70,18 +81,18 @@ public class DirectoryStateHandle implements StateObject {
 
 		DirectoryStateHandle that = (DirectoryStateHandle) o;
 
-		return directory.equals(that.directory);
+		return directoryString.equals(that.directoryString);
 	}
 
 	@Override
 	public int hashCode() {
-		return directory.hashCode();
+		return directoryString.hashCode();
 	}
 
 	@Override
 	public String toString() {
 		return "DirectoryStateHandle{" +
-			"directory=" + directory +
+			"directory=" + directoryString +
 			'}';
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
index 19e4b52..5809daf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
@@ -18,15 +18,15 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.FileUtils;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -49,50 +49,37 @@ public abstract class SnapshotDirectory {
 	@Nonnull
 	protected final Path directory;
 
-	/** The filesystem that contains the snapshot directory. */
-	@Nonnull
-	protected final FileSystem fileSystem;
-
 	/** This reference tracks the lifecycle state of the snapshot directory. */
 	@Nonnull
 	protected AtomicReference<State> state;
 
-	private SnapshotDirectory(@Nonnull Path directory, @Nonnull FileSystem fileSystem) {
+	private SnapshotDirectory(@Nonnull Path directory) {
 		this.directory = directory;
-		this.fileSystem = fileSystem;
 		this.state = new AtomicReference<>(State.ONGOING);
 	}
 
-	private SnapshotDirectory(@Nonnull Path directory) throws IOException {
-		this(directory, directory.getFileSystem());
-	}
-
 	@Nonnull
 	public Path getDirectory() {
 		return directory;
 	}
 
 	public boolean mkdirs() throws IOException {
-		return fileSystem.mkdirs(directory);
-	}
-
-	@Nonnull
-	public FileSystem getFileSystem() {
-		return fileSystem;
+		Files.createDirectories(directory);
+		return true;
 	}
 
 	public boolean exists() throws IOException {
-		return fileSystem.exists(directory);
+		return Files.exists(directory);
 	}
 
 	/**
-	 * List the statuses of the files/directories in the snapshot directory.
+	 * List the files in the snapshot directory.
 	 *
-	 * @return the statuses of the files/directories in the given path.
+	 * @return the files in the snapshot directory.
 	 * @throws IOException if there is a problem creating the file statuses.
 	 */
-	public FileStatus[] listStatus() throws IOException {
-		return fileSystem.listStatus(directory);
+	public Path[] listDirectory() throws IOException {
+		return FileUtils.listDirectory(directory);
 	}
 
 	/**
@@ -103,7 +90,10 @@ public abstract class SnapshotDirectory {
 	 * @throws IOException if an exception happens during the delete.
 	 */
 	public boolean cleanup() throws IOException {
-		return !state.compareAndSet(State.ONGOING, State.DELETED) || fileSystem.delete(directory, true);
+		if (state.compareAndSet(State.ONGOING, State.DELETED)) {
+			FileUtils.deleteDirectory(directory.toFile());
+		}
+		return true;
 	}
 
 	/**
@@ -174,7 +164,7 @@ public abstract class SnapshotDirectory {
 	private static class TemporarySnapshotDirectory extends SnapshotDirectory {
 
 		TemporarySnapshotDirectory(@Nonnull File directory) throws IOException {
-			super(new Path(directory.toURI()), FileSystem.getLocalFileSystem());
+			super(directory.toPath());
 		}
 
 		@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotDirectoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotDirectoryTest.java
index 392e844..67ba641 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotDirectoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotDirectoryTest.java
@@ -18,10 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
@@ -32,6 +29,7 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.UUID;
 
@@ -61,7 +59,7 @@ public class SnapshotDirectoryTest extends TestLogger {
 		File folderRoot = temporaryFolder.getRoot();
 		File newFolder = new File(folderRoot, String.valueOf(UUID.randomUUID()));
 		File innerNewFolder = new File(newFolder, String.valueOf(UUID.randomUUID()));
-		Path path = new Path(innerNewFolder.toURI());
+		Path path = innerNewFolder.toPath();
 
 		Assert.assertFalse(newFolder.isDirectory());
 		Assert.assertFalse(innerNewFolder.isDirectory());
@@ -85,7 +83,7 @@ public class SnapshotDirectoryTest extends TestLogger {
 		File folderA = new File(folderRoot, String.valueOf(UUID.randomUUID()));
 
 		Assert.assertFalse(folderA.isDirectory());
-		Path path = new Path(folderA.toURI());
+		Path path = folderA.toPath();
 		SnapshotDirectory snapshotDirectory = SnapshotDirectory.permanent(path);
 		Assert.assertFalse(snapshotDirectory.exists());
 		Assert.assertTrue(folderA.mkdirs());
@@ -106,14 +104,13 @@ public class SnapshotDirectoryTest extends TestLogger {
 		File file = new File(folderA, "test.txt");
 		Assert.assertTrue(file.createNewFile());
 
-		Path path = new Path(folderA.toURI());
-		FileSystem fileSystem = path.getFileSystem();
+		Path path = folderA.toPath();
 		SnapshotDirectory snapshotDirectory = SnapshotDirectory.permanent(path);
 		Assert.assertTrue(snapshotDirectory.exists());
 
 		Assert.assertEquals(
-			Arrays.toString(fileSystem.listStatus(path)),
-			Arrays.toString(snapshotDirectory.listStatus()));
+			Arrays.toString(FileUtils.listDirectory(path)),
+			Arrays.toString(snapshotDirectory.listDirectory()));
 	}
 
 	/**
@@ -125,7 +122,7 @@ public class SnapshotDirectoryTest extends TestLogger {
 		File folderRoot = temporaryFolder.getRoot();
 		File folderA = new File(folderRoot, String.valueOf(UUID.randomUUID()));
 		Assert.assertTrue(folderA.mkdirs());
-		Path folderAPath = new Path(folderA.toURI());
+		Path folderAPath = folderA.toPath();
 
 		SnapshotDirectory snapshotDirectory = SnapshotDirectory.permanent(folderAPath);
 
@@ -160,7 +157,7 @@ public class SnapshotDirectoryTest extends TestLogger {
 		Assert.assertTrue(folderB.mkdirs());
 		File file = new File(folderA, "test.txt");
 		Assert.assertTrue(file.createNewFile());
-		Path folderAPath = new Path(folderA.toURI());
+		Path folderAPath = folderA.toPath();
 		SnapshotDirectory snapshotDirectory = SnapshotDirectory.permanent(folderAPath);
 		Assert.assertTrue(snapshotDirectory.cleanup());
 		Assert.assertFalse(folderA.isDirectory());
@@ -181,7 +178,7 @@ public class SnapshotDirectoryTest extends TestLogger {
 		File folderRoot = temporaryFolder.getRoot();
 		File folderA = new File(folderRoot, String.valueOf(UUID.randomUUID()));
 		Assert.assertTrue(folderA.mkdirs());
-		Path pathA = new Path(folderA.toURI());
+		Path pathA = folderA.toPath();
 		SnapshotDirectory snapshotDirectory = SnapshotDirectory.permanent(pathA);
 		Assert.assertFalse(snapshotDirectory.isSnapshotCompleted());
 		Assert.assertNotNull(snapshotDirectory.completeSnapshotAndGetHandle());
@@ -207,29 +204,4 @@ public class SnapshotDirectoryTest extends TestLogger {
 		Assert.assertTrue(tmpSnapshotDirectory.cleanup());
 		Assert.assertFalse(folder.exists());
 	}
-
-	/**
-	 * Tests that we always use the local file system even if we have specified a different default
-	 * file system. See FLINK-12042.
-	 */
-	@Test
-	public void testLocalFileSystemIsUsedForTemporary() throws Exception {
-		// ensure that snapshot directory will always use the local file system instead of the default file system
-		Configuration configuration = new Configuration();
-		configuration.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, "nonexistfs:///");
-		FileSystem.initialize(configuration);
-
-		final File folderRoot = temporaryFolder.getRoot();
-
-		try {
-			File folderB = new File(folderRoot, String.valueOf(UUID.randomUUID()));
-			// only pass the path and leave the scheme missing
-			SnapshotDirectory snapshotDirectoryB = SnapshotDirectory.temporary(folderB);
-			Assert.assertEquals(snapshotDirectoryB.getFileSystem(), FileSystem.getLocalFileSystem());
-		} finally {
-			// restore the FileSystem configuration
-			FileSystem.initialize(new Configuration());
-		}
-	}
-
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java
index 8450076..1c82a8f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java
@@ -21,13 +21,13 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.SingleThreadAccessCheckingTypeSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
-import org.apache.flink.api.common.typeutils.SingleThreadAccessCheckingTypeSerializer;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.util.StringUtils;
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
index aa3201c..42a3f63 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
@@ -19,9 +19,6 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.StateHandleID;
@@ -31,6 +28,9 @@ import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.function.ThrowingRunnable;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -103,7 +103,7 @@ public class RocksDBStateDownloader extends RocksDBStateDataTransfer {
 			StateHandleID stateHandleID = entry.getKey();
 			StreamStateHandle remoteFileHandle = entry.getValue();
 
-			Path path = new Path(restoreInstancePath, stateHandleID.toString());
+			Path path = restoreInstancePath.resolve(stateHandleID.toString());
 
 			runnables.add(ThrowingRunnable.unchecked(
 				() -> downloadDataForStateHandle(path, remoteFileHandle, closeableRegistry)));
@@ -120,14 +120,14 @@ public class RocksDBStateDownloader extends RocksDBStateDataTransfer {
 		CloseableRegistry closeableRegistry) throws IOException {
 
 		FSDataInputStream inputStream = null;
-		FSDataOutputStream outputStream = null;
+		OutputStream outputStream = null;
 
 		try {
-			FileSystem restoreFileSystem = restoreFilePath.getFileSystem();
 			inputStream = remoteFileHandle.openInputStream();
 			closeableRegistry.registerCloseable(inputStream);
 
-			outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
+			Files.createDirectories(restoreFilePath.getParent());
+			outputStream = Files.newOutputStream(restoreFilePath);
 			closeableRegistry.registerCloseable(outputStream);
 
 			byte[] buffer = new byte[8 * 1024];
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java
index 045918d..2111c52 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java
@@ -19,9 +19,6 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.core.fs.CloseableRegistry;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
@@ -35,6 +32,9 @@ import org.apache.flink.util.function.CheckedSupplier;
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -107,14 +107,14 @@ public class RocksDBStateUploader extends RocksDBStateDataTransfer {
 		Path filePath,
 		CheckpointStreamFactory checkpointStreamFactory,
 		CloseableRegistry closeableRegistry) throws IOException {
-		FSDataInputStream inputStream = null;
+
+		InputStream inputStream = null;
 		CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
 
 		try {
 			final byte[] buffer = new byte[READ_BUFFER_SIZE];
 
-			FileSystem backupFileSystem = filePath.getFileSystem();
-			inputStream = backupFileSystem.open(filePath);
+			inputStream = Files.newInputStream(filePath);
 			closeableRegistry.registerCloseable(inputStream);
 
 			outputStream = checkpointStreamFactory
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index 35b6bab..f7abed6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -28,10 +28,6 @@ import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
 import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
 import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
 import org.apache.flink.core.fs.CloseableRegistry;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.metrics.MetricGroup;
@@ -48,6 +44,7 @@ import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StateSerializerProvider;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
 
 import org.rocksdb.ColumnFamilyDescriptor;
@@ -64,7 +61,10 @@ import javax.annotation.Nonnull;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -187,9 +187,8 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 	}
 
 	private void restoreFromRemoteState(IncrementalRemoteKeyedStateHandle stateHandle) throws Exception {
-		final Path tmpRestoreInstancePath = new Path(
-			instanceBasePath.getAbsolutePath(),
-			UUID.randomUUID().toString()); // used as restore source for IncrementalRemoteKeyedStateHandle
+		// used as restore source for IncrementalRemoteKeyedStateHandle
+		final Path tmpRestoreInstancePath = instanceBasePath.getAbsoluteFile().toPath().resolve(UUID.randomUUID().toString());
 		try {
 			restoreFromLocalState(
 				transferRemoteStateToLocalDirectory(tmpRestoreInstancePath, stateHandle));
@@ -246,10 +245,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 
 	private void cleanUpPathQuietly(@Nonnull Path path) {
 		try {
-			FileSystem fileSystem = path.getFileSystem();
-			if (fileSystem.exists(path)) {
-				fileSystem.delete(path, true);
-			}
+			FileUtils.deleteDirectory(path.toFile());
 		} catch (IOException ex) {
 			LOG.warn("Failed to clean up path " + path, ex);
 		}
@@ -296,7 +292,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 					", but found " + rawStateHandle.getClass());
 			}
 
-			Path temporaryRestoreInstancePath = new Path(instanceBasePath.getAbsolutePath() + UUID.randomUUID().toString());
+			Path temporaryRestoreInstancePath = instanceBasePath.getAbsoluteFile().toPath().resolve(UUID.randomUUID().toString());
 			try (RestoredDBInstance tmpRestoreDBInfo = restoreDBInstanceFromStateHandle(
 				(IncrementalRemoteKeyedStateHandle) rawStateHandle,
 				temporaryRestoreInstancePath);
@@ -427,7 +423,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 			new ArrayList<>(stateMetaInfoSnapshots.size() + 1);
 
 		RocksDB restoreDb = RocksDBOperationUtils.openDB(
-			temporaryRestoreInstancePath.getPath(),
+			temporaryRestoreInstancePath.toString(),
 			columnFamilyDescriptors,
 			columnFamilyHandles,
 			RocksDBOperationUtils.createColumnFamilyOptions(columnFamilyOptionsFactory, "default"),
@@ -461,26 +457,18 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 	 * a local state.
 	 */
 	private void restoreInstanceDirectoryFromPath(Path source, String instanceRocksDBPath) throws IOException {
+		final Path instanceRocksDBDirectory = Paths.get(instanceRocksDBPath);
+		final Path[] files = FileUtils.listDirectory(source);
 
-		FileSystem fileSystem = source.getFileSystem();
-
-		final FileStatus[] fileStatuses = fileSystem.listStatus(source);
-
-		if (fileStatuses == null) {
-			throw new IOException("Cannot list file statues. Directory " + source + " does not exist.");
-		}
-
-		for (FileStatus fileStatus : fileStatuses) {
-			final Path filePath = fileStatus.getPath();
-			final String fileName = filePath.getName();
-			File restoreFile = new File(source.getPath(), fileName);
-			File targetFile = new File(instanceRocksDBPath, fileName);
+		for (Path file : files) {
+			final String fileName = file.getFileName().toString();
+			final Path targetFile = instanceRocksDBDirectory.resolve(fileName);
 			if (fileName.endsWith(SST_FILE_SUFFIX)) {
 				// hardlink'ing the immutable sst-files.
-				Files.createLink(targetFile.toPath(), restoreFile.toPath());
+				Files.createLink(targetFile, file);
 			} else {
 				// true copy for all other files.
-				Files.copy(restoreFile.toPath(), targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+				Files.copy(file, targetFile, StandardCopyOption.REPLACE_EXISTING);
 			}
 		}
 	}
@@ -490,7 +478,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 	 */
 	private KeyedBackendSerializationProxy<K> readMetaData(StreamStateHandle metaStateHandle) throws Exception {
 
-		FSDataInputStream inputStream = null;
+		InputStream inputStream = null;
 
 		try {
 			inputStream = metaStateHandle.openInputStream();
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
index 16c72cc..23f4574 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
@@ -22,8 +22,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
 import org.apache.flink.contrib.streaming.state.RocksDBStateUploader;
 import org.apache.flink.core.fs.CloseableRegistry;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -64,6 +62,7 @@ import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -200,7 +199,7 @@ public class RocksIncrementalSnapshotStrategy<K> extends RocksDBSnapshotStrategy
 				FileUtils.deleteDirectory(rdbSnapshotDir);
 			}
 
-			Path path = new Path(rdbSnapshotDir.toURI());
+			Path path = rdbSnapshotDir.toPath();
 			// create a "permanent" snapshot directory because local recovery is active.
 			try {
 				return SnapshotDirectory.permanent(path);
@@ -246,7 +245,7 @@ public class RocksIncrementalSnapshotStrategy<K> extends RocksDBSnapshotStrategy
 		try (
 			ResourceGuard.Lease ignored = rocksDBResourceGuard.acquireResource();
 			Checkpoint checkpoint = Checkpoint.create(db)) {
-			checkpoint.createCheckpoint(outputDirectory.getDirectory().getPath());
+			checkpoint.createCheckpoint(outputDirectory.getDirectory().toString());
 		} catch (Exception ex) {
 			try {
 				outputDirectory.cleanup();
@@ -417,9 +416,9 @@ public class RocksIncrementalSnapshotStrategy<K> extends RocksDBSnapshotStrategy
 			Map<StateHandleID, Path> sstFilePaths = new HashMap<>();
 			Map<StateHandleID, Path> miscFilePaths = new HashMap<>();
 
-			FileStatus[] fileStatuses = localBackupDirectory.listStatus();
-			if (fileStatuses != null) {
-				createUploadFilePaths(fileStatuses, sstFiles, sstFilePaths, miscFilePaths);
+			Path[] files = localBackupDirectory.listDirectory();
+			if (files != null) {
+				createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths);
 
 				sstFiles.putAll(stateUploader.uploadFilesToCheckpointFs(
 					sstFilePaths,
@@ -433,13 +432,12 @@ public class RocksIncrementalSnapshotStrategy<K> extends RocksDBSnapshotStrategy
 		}
 
 		private void createUploadFilePaths(
-			FileStatus[] fileStatuses,
+			Path[] files,
 			Map<StateHandleID, StreamStateHandle> sstFiles,
 			Map<StateHandleID, Path> sstFilePaths,
 			Map<StateHandleID, Path> miscFilePaths) {
-			for (FileStatus fileStatus : fileStatuses) {
-				final Path filePath = fileStatus.getPath();
-				final String fileName = filePath.getName();
+			for (Path filePath : files) {
+				final String fileName = filePath.getFileName().toString();
 				final StateHandleID stateHandleID = new StateHandleID(fileName);
 
 				if (fileName.endsWith(SST_FILE_SUFFIX)) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
index 76372b7..897b855 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.StateHandleID;
@@ -34,6 +33,7 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.IOException;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -91,7 +91,7 @@ public class RocksDBStateDownloaderTest extends TestLogger {
 		try (RocksDBStateDownloader rocksDBStateDownloader = new RocksDBStateDownloader(5)) {
 			rocksDBStateDownloader.transferAllStateDataToDirectory(
 				incrementalKeyedStateHandle,
-				new Path(temporaryFolder.newFolder().toURI()),
+				temporaryFolder.newFolder().toPath(),
 				new CloseableRegistry());
 			fail();
 		} catch (Exception e) {
@@ -133,13 +133,13 @@ public class RocksDBStateDownloaderTest extends TestLogger {
 				privateStates,
 				handles.get(0));
 
-		Path dstPath = new Path(temporaryFolder.newFolder().toURI());
+		Path dstPath = temporaryFolder.newFolder().toPath();
 		try (RocksDBStateDownloader rocksDBStateDownloader = new RocksDBStateDownloader(5)) {
 			rocksDBStateDownloader.transferAllStateDataToDirectory(incrementalKeyedStateHandle, dstPath, new CloseableRegistry());
 		}
 
 		for (int i = 0; i < contentNum; ++i) {
-			assertStateContentEqual(contents[i], new Path(dstPath, String.format("sharedState%d", i)));
+			assertStateContentEqual(contents[i], dstPath.resolve(String.format("sharedState%d", i)));
 		}
 	}
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java
index 8ac2f3c..1c99739 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.StateHandleID;
@@ -41,7 +40,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.file.Files;
-import java.nio.file.Paths;
+import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -71,7 +70,7 @@ public class RocksDBStateUploaderTest extends TestLogger {
 		generateRandomFileContent(file.getPath(), 20);
 
 		Map<StateHandleID, Path> filePaths = new HashMap<>(1);
-		filePaths.put(new StateHandleID("mockHandleID"), new Path(file.getPath()));
+		filePaths.put(new StateHandleID("mockHandleID"), file.toPath());
 		try (RocksDBStateUploader rocksDBStateUploader = new RocksDBStateUploader(5)) {
 			rocksDBStateUploader.uploadFilesToCheckpointFs(filePaths, checkpointStreamFactory, new CloseableRegistry());
 			fail();
@@ -86,10 +85,10 @@ public class RocksDBStateUploaderTest extends TestLogger {
 	@Test
 	public void testMultiThreadUploadCorrectly() throws Exception {
 		File checkpointPrivateFolder = temporaryFolder.newFolder("private");
-		Path checkpointPrivateDirectory = new Path(checkpointPrivateFolder.getPath());
+		org.apache.flink.core.fs.Path checkpointPrivateDirectory = org.apache.flink.core.fs.Path.fromLocalFile(checkpointPrivateFolder);
 
 		File checkpointSharedFolder = temporaryFolder.newFolder("shared");
-		Path checkpointSharedDirectory = new Path(checkpointSharedFolder.getPath());
+		org.apache.flink.core.fs.Path checkpointSharedDirectory = org.apache.flink.core.fs.Path.fromLocalFile(checkpointSharedFolder);
 
 		FileSystem fileSystem = checkpointPrivateDirectory.getFileSystem();
 		int fileStateSizeThreshold = 1024;
@@ -157,7 +156,7 @@ public class RocksDBStateUploaderTest extends TestLogger {
 		for (int i = 0; i < sstFileCount; ++i) {
 			File file = temporaryFolder.newFile(String.format("%s/%d.sst", localFolder, i));
 			generateRandomFileContent(file.getPath(), random.nextInt(1_000_000) + fileStateSizeThreshold);
-			sstFilePaths.put(new StateHandleID(String.valueOf(i)), Path.fromLocalFile(file));
+			sstFilePaths.put(new StateHandleID(String.valueOf(i)), file.toPath());
 		}
 		return sstFilePaths;
 	}
@@ -171,7 +170,7 @@ public class RocksDBStateUploaderTest extends TestLogger {
 	}
 
 	private void assertStateContentEqual(Path stateFilePath, FSDataInputStream inputStream) throws IOException {
-		byte[] excepted = Files.readAllBytes(Paths.get(stateFilePath.toUri()));
+		byte[] excepted = Files.readAllBytes(stateFilePath);
 		byte[] actual = new byte[excepted.length];
 		IOUtils.readFully(inputStream, actual, 0, actual.length);
 		assertEquals(-1, inputStream.read());


[flink] 04/05: [FLINK-15966][runtime] Capture callstacks for RPC ask() calls to improve exceptions.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2746e6a4f3f84e05425484ad63ea096c5536c77f
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Feb 5 20:09:12 2020 +0100

    [FLINK-15966][runtime] Capture callstacks for RPC ask() calls to improve exceptions.
---
 docs/_includes/generated/akka_configuration.html   |   6 +
 .../apache/flink/configuration/AkkaOptions.java    |  12 ++
 .../runtime/rpc/akka/AkkaInvocationHandler.java    |  72 ++++++++---
 .../flink/runtime/rpc/akka/AkkaRpcService.java     |  19 ++-
 .../rpc/akka/AkkaRpcServiceConfiguration.java      |  19 ++-
 .../rpc/akka/FencedAkkaInvocationHandler.java      |   5 +-
 .../ResourceManagerTaskExecutorTest.java           |   7 +-
 .../runtime/rpc/akka/TimeoutCallStackTest.java     | 135 +++++++++++++++++++++
 8 files changed, 246 insertions(+), 29 deletions(-)

diff --git a/docs/_includes/generated/akka_configuration.html b/docs/_includes/generated/akka_configuration.html
index c77a393..2baeb6f 100644
--- a/docs/_includes/generated/akka_configuration.html
+++ b/docs/_includes/generated/akka_configuration.html
@@ -9,6 +9,12 @@
     </thead>
     <tbody>
         <tr>
+            <td><h5>akka.ask.callstack</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>If true, call stack for asynchronous asks are captured. That way, when an ask fails (for example times out), you get a proper exception, describing to the original method call and call site. Note that in case of having millions of concurrent RPC calls, this may add to the memory footprint.</td>
+        </tr>
+        <tr>
             <td><h5>akka.ask.timeout</h5></td>
             <td style="word-wrap: break-word;">"10 s"</td>
             <td>String</td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
index e71f18f..bdecb2e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
@@ -30,6 +30,18 @@ import static org.apache.flink.configuration.description.LinkElement.link;
 public class AkkaOptions {
 
 	/**
+	 * Flag whether to capture call stacks for RPC ask calls.
+	 */
+	public static final ConfigOption<Boolean> CAPTURE_ASK_CALLSTACK = ConfigOptions
+		.key("akka.ask.callstack")
+		.booleanType()
+		.defaultValue(true)
+		.withDescription("If true, call stack for asynchronous asks are captured. That way, when an ask fails " +
+			"(for example times out), you get a proper exception, describing to the original method call and " +
+			"call site. Note that in case of having millions of concurrent RPC calls, this may add to the " +
+			"memory footprint.");
+
+	/**
 	 * Timeout for akka ask calls.
 	 */
 	public static final ConfigOption<String> ASK_TIMEOUT = ConfigOptions
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index 44bfb3b..3b2a2cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -47,6 +47,7 @@ import java.io.IOException;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
+import java.util.Arrays;
 import java.util.Objects;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
@@ -90,13 +91,16 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
 	@Nullable
 	private final CompletableFuture<Void> terminationFuture;
 
+	private final boolean captureAskCallStack;
+
 	AkkaInvocationHandler(
-			String address,
-			String hostname,
-			ActorRef rpcEndpoint,
-			Time timeout,
-			long maximumFramesize,
-			@Nullable CompletableFuture<Void> terminationFuture) {
+		String address,
+		String hostname,
+		ActorRef rpcEndpoint,
+		Time timeout,
+		long maximumFramesize,
+		@Nullable CompletableFuture<Void> terminationFuture,
+		boolean captureAskCallStack) {
 
 		this.address = Preconditions.checkNotNull(address);
 		this.hostname = Preconditions.checkNotNull(hostname);
@@ -105,6 +109,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
 		this.timeout = Preconditions.checkNotNull(timeout);
 		this.maximumFramesize = maximumFramesize;
 		this.terminationFuture = terminationFuture;
+		this.captureAskCallStack = captureAskCallStack;
 	}
 
 	@Override
@@ -208,20 +213,20 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
 
 			result = null;
 		} else {
+			// Capture the call stack. It is significantly faster to do that via an exception than
+			// via Thread.getStackTrace(), because exceptions lazily initialize the stack trace, initially only
+			// capture a lightweight native pointer, and convert that into the stack trace lazily when needed.
+			final Throwable callStackCapture = captureAskCallStack ? new Throwable() : null;
+
 			// execute an asynchronous call
-			CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);
-
-			CompletableFuture<?> completableFuture = resultFuture.thenApply((Object o) -> {
-				if (o instanceof SerializedValue) {
-					try {
-						return  ((SerializedValue<?>) o).deserializeValue(getClass().getClassLoader());
-					} catch (IOException | ClassNotFoundException e) {
-						throw new CompletionException(
-							new RpcException("Could not deserialize the serialized payload of RPC method : "
-								+ methodName, e));
-					}
+			final CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);
+
+			final CompletableFuture<Object> completableFuture = new CompletableFuture<>();
+			resultFuture.whenComplete((resultValue, failure) -> {
+				if (failure != null) {
+					completableFuture.completeExceptionally(resolveTimeoutException(failure, callStackCapture, method));
 				} else {
-					return o;
+					completableFuture.complete(deserializeValueIfNeeded(resultValue, method));
 				}
 			});
 
@@ -370,4 +375,35 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
 	public CompletableFuture<Void> getTerminationFuture() {
 		return terminationFuture;
 	}
+
+	static Object deserializeValueIfNeeded(Object o, Method method) {
+		if (o instanceof SerializedValue) {
+			try {
+				return  ((SerializedValue<?>) o).deserializeValue(AkkaInvocationHandler.class.getClassLoader());
+			} catch (IOException | ClassNotFoundException e) {
+				throw new CompletionException(
+					new RpcException(
+						"Could not deserialize the serialized payload of RPC method : " + method.getName(), e));
+			}
+		} else {
+			return o;
+		}
+	}
+
+	static Throwable resolveTimeoutException(Throwable exception, @Nullable Throwable callStackCapture, Method method) {
+		if (!(exception instanceof akka.pattern.AskTimeoutException)) {
+			return exception;
+		}
+
+		final TimeoutException newException = new TimeoutException("Invocation of " + method + " timed out.");
+		newException.initCause(exception);
+
+		if (callStackCapture != null) {
+			// remove the stack frames coming from the proxy interface invocation
+			final StackTraceElement[] stackTrace = callStackCapture.getStackTrace();
+			newException.setStackTrace(Arrays.copyOfRange(stackTrace, 3, stackTrace.length));
+		}
+
+		return newException;
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 344f96b..0c41f05 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -98,6 +98,8 @@ public class AkkaRpcService implements RpcService {
 	private final String address;
 	private final int port;
 
+	private final boolean captureAskCallstacks;
+
 	private final ScheduledExecutor internalScheduledExecutor;
 
 	private final CompletableFuture<Void> terminationFuture;
@@ -122,6 +124,8 @@ public class AkkaRpcService implements RpcService {
 			port = -1;
 		}
 
+		captureAskCallstacks = configuration.captureAskCallStack();
+
 		internalScheduledExecutor = new ActorSystemScheduledExecutorAdapter(actorSystem);
 
 		terminationFuture = new CompletableFuture<>();
@@ -165,7 +169,8 @@ public class AkkaRpcService implements RpcService {
 					actorRef,
 					configuration.getTimeout(),
 					configuration.getMaximumFramesize(),
-					null);
+					null,
+					captureAskCallstacks);
 			});
 	}
 
@@ -185,7 +190,8 @@ public class AkkaRpcService implements RpcService {
 					configuration.getTimeout(),
 					configuration.getMaximumFramesize(),
 					null,
-					() -> fencingToken);
+					() -> fencingToken,
+					captureAskCallstacks);
 			});
 	}
 
@@ -247,7 +253,8 @@ public class AkkaRpcService implements RpcService {
 				configuration.getTimeout(),
 				configuration.getMaximumFramesize(),
 				terminationFuture,
-				((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken);
+				((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken,
+				captureAskCallstacks);
 
 			implementedRpcGateways.add(FencedMainThreadExecutable.class);
 		} else {
@@ -257,7 +264,8 @@ public class AkkaRpcService implements RpcService {
 				actorRef,
 				configuration.getTimeout(),
 				configuration.getMaximumFramesize(),
-				terminationFuture);
+				terminationFuture,
+				captureAskCallstacks);
 		}
 
 		// Rather than using the System ClassLoader directly, we derive the ClassLoader
@@ -285,7 +293,8 @@ public class AkkaRpcService implements RpcService {
 				configuration.getTimeout(),
 				configuration.getMaximumFramesize(),
 				null,
-				() -> fencingToken);
+				() -> fencingToken,
+				captureAskCallstacks);
 
 			// Rather than using the System ClassLoader directly, we derive the ClassLoader
 			// from this class . That works better in cases where Flink runs embedded and all Flink
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
index 0c478a9..91b5a07 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.rpc.akka;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 
@@ -38,11 +39,19 @@ public class AkkaRpcServiceConfiguration {
 
 	private final long maximumFramesize;
 
-	public AkkaRpcServiceConfiguration(@Nonnull Configuration configuration, @Nonnull Time timeout, long maximumFramesize) {
+	private final boolean captureAskCallStack;
+
+	public AkkaRpcServiceConfiguration(
+			@Nonnull Configuration configuration,
+			@Nonnull Time timeout,
+			long maximumFramesize,
+			boolean captureAskCallStack) {
+
 		checkArgument(maximumFramesize > 0L, "Maximum framesize must be positive.");
 		this.configuration = configuration;
 		this.timeout = timeout;
 		this.maximumFramesize = maximumFramesize;
+		this.captureAskCallStack = captureAskCallStack;
 	}
 
 	@Nonnull
@@ -59,12 +68,18 @@ public class AkkaRpcServiceConfiguration {
 		return maximumFramesize;
 	}
 
+	public boolean captureAskCallStack() {
+		return captureAskCallStack;
+	}
+
 	public static AkkaRpcServiceConfiguration fromConfiguration(Configuration configuration) {
 		final Time timeout = AkkaUtils.getTimeoutAsTime(configuration);
 
 		final long maximumFramesize = AkkaRpcServiceUtils.extractMaximumFramesize(configuration);
 
-		return new AkkaRpcServiceConfiguration(configuration, timeout, maximumFramesize);
+		final boolean captureAskCallStacks = configuration.get(AkkaOptions.CAPTURE_ASK_CALLSTACK);
+
+		return new AkkaRpcServiceConfiguration(configuration, timeout, maximumFramesize, captureAskCallStacks);
 	}
 
 	public static AkkaRpcServiceConfiguration defaultConfiguration() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
index 564b1ef..1a15fe2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
@@ -61,8 +61,9 @@ public class FencedAkkaInvocationHandler<F extends Serializable> extends AkkaInv
 			Time timeout,
 			long maximumFramesize,
 			@Nullable CompletableFuture<Void> terminationFuture,
-			Supplier<F> fencingTokenSupplier) {
-		super(address, hostname, rpcEndpoint, timeout, maximumFramesize, terminationFuture);
+			Supplier<F> fencingTokenSupplier,
+			boolean captureAskCallStacks) {
+		super(address, hostname, rpcEndpoint, timeout, maximumFramesize, terminationFuture, captureAskCallStacks);
 
 		this.fencingTokenSupplier = Preconditions.checkNotNull(fencingTokenSupplier);
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 553b07c..3a1632c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -51,7 +51,6 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
-import akka.pattern.AskTimeoutException;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -63,9 +62,11 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
@@ -249,7 +250,9 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 				firstFuture.get();
 				fail("Should have failed because connection to taskmanager is delayed beyond timeout");
 			} catch (Exception e) {
-				assertThat(ExceptionUtils.stripExecutionException(e), instanceOf(AskTimeoutException.class));
+				final Throwable cause = ExceptionUtils.stripExecutionException(e);
+				assertThat(cause, instanceOf(TimeoutException.class));
+				assertThat(cause.getMessage(), containsString("ResourceManagerGateway.registerTaskExecutor"));
 			}
 
 			startConnection.await();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/TimeoutCallStackTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/TimeoutCallStackTest.java
new file mode 100644
index 0000000..4457026
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/TimeoutCallStackTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.util.IOUtils;
+
+import akka.actor.ActorSystem;
+import akka.actor.Terminated;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests that ask timeouts report the call stack of the calling function.
+ */
+public class TimeoutCallStackTest {
+
+	private static ActorSystem actorSystem;
+	private static RpcService rpcService;
+
+	private final List<RpcEndpoint> endpointsToStop = new ArrayList<>();
+
+	@BeforeClass
+	public static void setup() {
+		actorSystem = AkkaUtils.createDefaultActorSystem();
+		rpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
+	}
+
+	@AfterClass
+	public static void teardown() throws Exception {
+
+		final CompletableFuture<Void> rpcTerminationFuture = rpcService.stopService();
+		final CompletableFuture<Terminated> actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate());
+
+		FutureUtils
+			.waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture))
+			.get(10_000, TimeUnit.MILLISECONDS);
+	}
+
+	@After
+	public void stopTestEndpoints() {
+		endpointsToStop.forEach(IOUtils::closeQuietly);
+	}
+
+	@Test
+	public void testTimeoutException() throws Exception {
+		final TestingGateway gateway = createTestingGateway();
+
+		final CompletableFuture<Void> future = gateway.callThatTimesOut(Time.milliseconds(1));
+
+		Throwable failureCause = null;
+		try {
+			future.get();
+			fail("test buggy: the call should never have completed");
+		} catch (ExecutionException e) {
+			failureCause = e.getCause();
+		}
+
+		assertThat(failureCause, instanceOf(TimeoutException.class));
+		assertThat(failureCause.getMessage(), containsString("callThatTimesOut"));
+		assertThat(failureCause.getStackTrace()[0].getMethodName(), equalTo("testTimeoutException"));
+	}
+
+	// ------------------------------------------------------------------------
+	//  setup helpers
+	// ------------------------------------------------------------------------
+
+	private TestingGateway createTestingGateway() throws Exception {
+		final TestingRpcEndpoint endpoint = new TestingRpcEndpoint(rpcService, "test_name");
+		endpointsToStop.add(endpoint);
+		endpoint.start();
+
+		return rpcService.connect(endpoint.getAddress(), TestingGateway.class).get();
+	}
+
+	// ------------------------------------------------------------------------
+	//  testing mocks / stubs
+	// ------------------------------------------------------------------------
+
+	private interface TestingGateway extends RpcGateway {
+
+		CompletableFuture<Void> callThatTimesOut(@RpcTimeout Time timeout);
+	}
+
+	private static final class TestingRpcEndpoint extends RpcEndpoint implements TestingGateway {
+
+		TestingRpcEndpoint(RpcService rpcService, String endpointId) {
+			super(rpcService, endpointId);
+		}
+
+		@Override
+		public CompletableFuture<Void> callThatTimesOut(@RpcTimeout Time timeout) {
+			// return a future that never completes, so the call is guaranteed to time out
+			return new CompletableFuture<>();
+		}
+	}
+}


[flink] 02/05: [hotfix][core] Fix broken JavaDoc link in FileUtils

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bd1a8385bd6ba1b597dad31d5e290decef43a8bd
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Feb 14 16:49:53 2020 +0100

    [hotfix][core] Fix broken JavaDoc link in FileUtils
---
 flink-core/src/main/java/org/apache/flink/util/FileUtils.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
index be3e68d..81f67e9 100644
--- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
@@ -76,7 +76,7 @@ public final class FileUtils {
 
 	/**
 	 * The maximum size of array to allocate for reading. See
-	 * {@link java.nio.file.Files#MAX_BUFFER_SIZE} for more.
+	 * {@code MAX_BUFFER_SIZE} in {@link java.nio.file.Files} for more.
 	 */
 	private static final int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;
 


[flink] 05/05: [hotfix][config] Adjust AkkaOptions to new type-safe pattern

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5b71c7f2fe36c760924848295a8090898cb10f15
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Feb 17 19:35:35 2020 +0100

    [hotfix][config] Adjust AkkaOptions to new type-safe pattern
---
 .../apache/flink/configuration/AkkaOptions.java    | 23 ++++++++++++++++++++++
 1 file changed, 23 insertions(+)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
index bdecb2e..e9b3002 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
@@ -46,6 +46,7 @@ public class AkkaOptions {
 	 */
 	public static final ConfigOption<String> ASK_TIMEOUT = ConfigOptions
 		.key("akka.ask.timeout")
+		.stringType()
 		.defaultValue("10 s")
 		.withDescription("Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you" +
 			" should try to increase this value. Timeouts can be caused by slow machines or a congested network. The" +
@@ -55,6 +56,7 @@ public class AkkaOptions {
 	 */
 	public static final ConfigOption<String> TCP_TIMEOUT = ConfigOptions
 		.key("akka.tcp.timeout")
+		.stringType()
 		.defaultValue("20 s")
 		.withDescription("Timeout for all outbound connections. If you should experience problems with connecting to a" +
 			" TaskManager due to a slow network, you should increase this value.");
@@ -64,6 +66,7 @@ public class AkkaOptions {
 	 */
 	public static final ConfigOption<String> STARTUP_TIMEOUT = ConfigOptions
 		.key("akka.startup-timeout")
+		.stringType()
 		.noDefaultValue()
 		.withDescription("Timeout after which the startup of a remote component is considered being failed.");
 
@@ -72,6 +75,7 @@ public class AkkaOptions {
 	 */
 	public static final ConfigOption<String> TRANSPORT_HEARTBEAT_INTERVAL = ConfigOptions
 		.key("akka.transport.heartbeat.interval")
+		.stringType()
 		.defaultValue("1000 s")
 		.withDescription("Heartbeat interval for Akka’s transport failure detector. Since Flink uses TCP, the detector" +
 			" is not necessary. Therefore, the detector is disabled by setting the interval to a very high value. In" +
@@ -83,6 +87,7 @@ public class AkkaOptions {
 	 */
 	public static final ConfigOption<String> TRANSPORT_HEARTBEAT_PAUSE = ConfigOptions
 		.key("akka.transport.heartbeat.pause")
+		.stringType()
 		.defaultValue("6000 s")
 		.withDescription("Acceptable heartbeat pause for Akka’s transport failure detector. Since Flink uses TCP, the" +
 			" detector is not necessary. Therefore, the detector is disabled by setting the pause to a very high value." +
@@ -94,6 +99,7 @@ public class AkkaOptions {
 	 */
 	public static final ConfigOption<Double> TRANSPORT_THRESHOLD = ConfigOptions
 		.key("akka.transport.threshold")
+		.doubleType()
 		.defaultValue(300.0)
 		.withDescription("Threshold for the transport failure detector. Since Flink uses TCP, the detector is not" +
 			" necessary and, thus, the threshold is set to a high value.");
@@ -103,6 +109,7 @@ public class AkkaOptions {
 	 */
 	public static final ConfigOption<Boolean> SSL_ENABLED = ConfigOptions
 		.key("akka.ssl.enabled")
+		.booleanType()
 		.defaultValue(true)
 		.withDescription("Turns on SSL for Akka’s remote communication. This is applicable only when the global ssl flag" +
 			" security.ssl.enabled is set to true.");
@@ -112,6 +119,7 @@ public class AkkaOptions {
 	 */
 	public static final ConfigOption<String> FRAMESIZE = ConfigOptions
 		.key("akka.framesize")
+		.stringType()
 		.defaultValue("10485760b")
 		.withDescription("Maximum size of messages which are sent between the JobManager and the TaskManagers. If Flink" +
 			" fails because messages exceed this limit, then you should increase it. The message size requires a" +
@@ -122,6 +130,7 @@ public class AkkaOptions {
 	 */
 	public static final ConfigOption<Integer> DISPATCHER_THROUGHPUT = ConfigOptions
 		.key("akka.throughput")
+		.intType()
 		.defaultValue(15)
 		.withDescription("Number of messages that are processed in a batch before returning the thread to the pool. Low" +
 			" values denote a fair scheduling whereas high values can increase the performance at the cost of unfairness.");
@@ -131,6 +140,7 @@ public class AkkaOptions {
 	 */
 	public static final ConfigOption<Boolean> LOG_LIFECYCLE_EVENTS = ConfigOptions
 		.key("akka.log.lifecycle.events")
+		.booleanType()
 		.defaultValue(false)
 		.withDescription("Turns on the Akka’s remote logging of events. Set this value to 'true' in case of debugging.");
 
@@ -139,6 +149,7 @@ public class AkkaOptions {
 	 */
 	public static final ConfigOption<String> LOOKUP_TIMEOUT = ConfigOptions
 		.key("akka.lookup.timeout")
+		.stringType()
 		.defaultValue("10 s")
 		.withDescription("Timeout used for the lookup of the JobManager. The timeout value has to contain a time-unit" +
 			" specifier (ms/s/min/h/d).");
@@ -148,6 +159,7 @@ public class AkkaOptions {
 	 */
 	public static final ConfigOption<String> CLIENT_TIMEOUT = ConfigOptions
 		.key("akka.client.timeout")
+		.stringType()
 		.defaultValue("60 s")
 		.withDescription("Timeout for all blocking calls on the client side.");
 
@@ -156,6 +168,7 @@ public class AkkaOptions {
 	 */
 	public static final ConfigOption<Boolean> JVM_EXIT_ON_FATAL_ERROR = ConfigOptions
 		.key("akka.jvm-exit-on-fatal-error")
+		.booleanType()
 		.defaultValue(true)
 		.withDescription("Exit JVM on fatal Akka errors.");
 
@@ -164,6 +177,7 @@ public class AkkaOptions {
 	 */
 	public static final ConfigOption<Long> RETRY_GATE_CLOSED_FOR = ConfigOptions
 		.key("akka.retry-gate-closed-for")
+		.longType()
 		.defaultValue(50L)
 		.withDescription("Milliseconds a gate should be closed for after a remote connection was disconnected.");
 
@@ -173,6 +187,7 @@ public class AkkaOptions {
 
 	public static final ConfigOption<Double> FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR = ConfigOptions
 		.key("akka.fork-join-executor.parallelism-factor")
+		.doubleType()
 		.defaultValue(2.0)
 		.withDescription(Description.builder()
 			.text("The parallelism factor is used to determine thread pool size using the" +
@@ -182,12 +197,14 @@ public class AkkaOptions {
 
 	public static final ConfigOption<Integer> FORK_JOIN_EXECUTOR_PARALLELISM_MIN = ConfigOptions
 		.key("akka.fork-join-executor.parallelism-min")
+		.intType()
 		.defaultValue(8)
 		.withDescription(Description.builder()
 			.text("Min number of threads to cap factor-based parallelism number to.").build());
 
 	public static final ConfigOption<Integer> FORK_JOIN_EXECUTOR_PARALLELISM_MAX = ConfigOptions
 		.key("akka.fork-join-executor.parallelism-max")
+		.intType()
 		.defaultValue(64)
 		.withDescription(Description.builder()
 			.text("Max number of threads to cap factor-based parallelism number to.").build());
@@ -198,18 +215,21 @@ public class AkkaOptions {
 
 	public static final ConfigOption<Integer> CLIENT_SOCKET_WORKER_POOL_SIZE_MIN = ConfigOptions
 		.key("akka.client-socket-worker-pool.pool-size-min")
+		.intType()
 		.defaultValue(1)
 		.withDescription(Description.builder()
 			.text("Min number of threads to cap factor-based number to.").build());
 
 	public static final ConfigOption<Integer> CLIENT_SOCKET_WORKER_POOL_SIZE_MAX = ConfigOptions
 		.key("akka.client-socket-worker-pool.pool-size-max")
+		.intType()
 		.defaultValue(2)
 		.withDescription(Description.builder()
 			.text("Max number of threads to cap factor-based number to.").build());
 
 	public static final ConfigOption<Double> CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR = ConfigOptions
 		.key("akka.client-socket-worker-pool.pool-size-factor")
+		.doubleType()
 		.defaultValue(1.0)
 		.withDescription(Description.builder()
 			.text("The pool size factor is used to determine thread pool size" +
@@ -224,18 +244,21 @@ public class AkkaOptions {
 
 	public static final ConfigOption<Integer> SERVER_SOCKET_WORKER_POOL_SIZE_MIN = ConfigOptions
 		.key("akka.server-socket-worker-pool.pool-size-min")
+		.intType()
 		.defaultValue(1)
 		.withDescription(Description.builder()
 			.text("Min number of threads to cap factor-based number to.").build());
 
 	public static final ConfigOption<Integer> SERVER_SOCKET_WORKER_POOL_SIZE_MAX = ConfigOptions
 		.key("akka.server-socket-worker-pool.pool-size-max")
+		.intType()
 		.defaultValue(2)
 		.withDescription(Description.builder()
 			.text("Max number of threads to cap factor-based number to.").build());
 
 	public static final ConfigOption<Double> SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR = ConfigOptions
 		.key("akka.server-socket-worker-pool.pool-size-factor")
+		.doubleType()
 		.defaultValue(1.0)
 		.withDescription(Description.builder()
 			.text("The pool size factor is used to determine thread pool size" +


[flink] 03/05: [hotfix][runtime] Small docs/comments cleanups in RPC code.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2da3bc31fbbc52ef08f6bae8188d366e3094599f
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Feb 5 20:08:41 2020 +0100

    [hotfix][runtime] Small docs/comments cleanups in RPC code.
---
 .../src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java     | 2 --
 .../org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java  | 6 +++---
 2 files changed, 3 insertions(+), 5 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index bc0da64..f67984d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -166,8 +166,6 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {
 	/**
 	 * Triggers start of the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is ready
 	 * to process remote procedure calls.
-	 *
-	 * @throws Exception indicating that something went wrong while starting the RPC endpoint
 	 */
 	public final void start() {
 		rpcServer.start();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java
index 486816d..db7a204 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java
@@ -29,8 +29,8 @@ import java.io.Serializable;
 /**
  * Remote rpc invocation message which is used when the actor communication is remote and, thus, the
  * message has to be serialized.
- * <p>
- * In order to fail fast and report an appropriate error message to the user, the method name, the
+ *
+ * <p>In order to fail fast and report an appropriate error message to the user, the method name, the
  * parameter types and the arguments are eagerly serialized. In case the invocation call
  * contains a non-serializable object, then an {@link IOException} is thrown.
  */
@@ -138,7 +138,7 @@ public class RemoteRpcInvocation implements RpcInvocation, Serializable {
 	// -------------------------------------------------------------------
 
 	/**
-	 * Wrapper class for the method invocation information
+	 * Wrapper class for the method invocation information.
 	 */
 	private static final class MethodInvocation implements Serializable {
 		private static final long serialVersionUID = 9187962608946082519L;