You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/01 11:36:20 UTC
flink git commit: [FLINK-7314] [futures] Replace Flink's futures with
CompletableFuture in TaskManagerLogHandler
Repository: flink
Updated Branches:
refs/heads/master 648c1f595 -> 748448b5f
[FLINK-7314] [futures] Replace Flink's futures with CompletableFuture in TaskManagerLogHandler
This closes #4430.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/748448b5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/748448b5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/748448b5
Branch: refs/heads/master
Commit: 748448b5f02d73614242cc8488920a9cd8898974
Parents: 648c1f5
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 31 15:53:48 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Aug 1 13:33:19 2017 +0200
----------------------------------------------------------------------
.../handlers/TaskManagerLogHandler.java | 92 ++++++++------------
1 file changed, 34 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/748448b5/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index 1084623..9cbb71d 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -27,18 +27,13 @@ package org.apache.flink.runtime.webmonitor.handlers;
*****************************************************************************/
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobView;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.BiFunction;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceID;
@@ -65,6 +60,7 @@ import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.router.Routed;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedFile;
+import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,6 +72,8 @@ import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.nio.channels.FileChannel;
import java.util.HashMap;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import scala.Option;
@@ -111,7 +109,7 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
private final Configuration config;
/** Future of the blob cache. */
- private Future<BlobCache> cache;
+ private CompletableFuture<BlobCache> cache;
/** Indicates which log file should be displayed. */
private FileMode fileMode;
@@ -176,7 +174,7 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
}
}, executor);
- cache = new FlinkFuture<>(cacheFuture);
+ cache = FutureUtils.toJava(cacheFuture);
}
final String taskManagerID = routed.pathParams().get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
@@ -190,47 +188,34 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout)
.mapTo(ClassTag$.MODULE$.<JobManagerMessages.TaskManagerInstance>apply(JobManagerMessages.TaskManagerInstance.class));
- Future<JobManagerMessages.TaskManagerInstance> taskManagerFuture = new FlinkFuture<>(scalaTaskManagerFuture);
+ CompletableFuture<JobManagerMessages.TaskManagerInstance> taskManagerFuture = FutureUtils.toJava(scalaTaskManagerFuture);
- Future<BlobKey> blobKeyFuture = taskManagerFuture.thenCompose(new ApplyFunction<JobManagerMessages.TaskManagerInstance, Future<BlobKey>>() {
- @Override
- public Future<BlobKey> apply(JobManagerMessages.TaskManagerInstance value) {
- Instance taskManager = value.instance().get();
+ CompletableFuture<BlobKey> blobKeyFuture = taskManagerFuture.thenCompose(
+ taskManagerInstance -> {
+ Instance taskManager = taskManagerInstance.instance().get();
switch (fileMode) {
case LOG:
- return taskManager.getTaskManagerGateway().requestTaskManagerLog(timeTimeout);
+ return FutureUtils.toJava(taskManager.getTaskManagerGateway().requestTaskManagerLog(timeTimeout));
case STDOUT:
default:
- return taskManager.getTaskManagerGateway().requestTaskManagerStdout(timeTimeout);
+ return FutureUtils.toJava(taskManager.getTaskManagerGateway().requestTaskManagerStdout(timeTimeout));
}
}
- });
+ );
- Future<String> logPathFuture = blobKeyFuture
- .thenCombine(
+ CompletableFuture<String> logPathFuture = blobKeyFuture
+ .thenCombineAsync(
cache,
- new BiFunction<BlobKey, BlobCache, Tuple2<BlobKey, BlobCache>>() {
- @Override
- public Tuple2<BlobKey, BlobCache> apply(BlobKey blobKey, BlobCache blobCache) {
- return Tuple2.of(blobKey, blobCache);
- }
- })
- .thenComposeAsync(new ApplyFunction<Tuple2<BlobKey, BlobCache>, Future<String>>() {
- @Override
- public Future<String> apply(Tuple2<BlobKey, BlobCache> value) {
- final BlobKey blobKey = value.f0;
- final BlobCache blobCache = value.f1;
-
+ (blobKey, blobCache) -> {
//delete previous log file, if it is different than the current one
HashMap<String, BlobKey> lastSubmittedFile = fileMode == FileMode.LOG ? lastSubmittedLog : lastSubmittedStdout;
if (lastSubmittedFile.containsKey(taskManagerID)) {
- if (!blobKey.equals(lastSubmittedFile.get(taskManagerID))) {
+ if (!Objects.equals(blobKey, lastSubmittedFile.get(taskManagerID))) {
try {
blobCache.deleteGlobal(lastSubmittedFile.get(taskManagerID));
} catch (IOException e) {
- return FlinkCompletableFuture.completedExceptionally(
- new Exception("Could not delete file for " + taskManagerID + '.', e));
+ throw new FlinkFutureException("Could not delete file for " + taskManagerID + '.', e);
}
lastSubmittedFile.put(taskManagerID, blobKey);
}
@@ -238,28 +223,24 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
lastSubmittedFile.put(taskManagerID, blobKey);
}
try {
- return FlinkCompletableFuture.completed(blobCache.getURL(blobKey).getFile());
+ return blobCache.getURL(blobKey).getFile();
} catch (IOException e) {
- return FlinkCompletableFuture.completedExceptionally(
- new Exception("Could not retrieve blob for " + blobKey + '.', e));
+ throw new FlinkFutureException("Could not retrieve blob for " + blobKey + '.', e);
}
- }
- }, executor);
+ },
+ executor);
- logPathFuture.exceptionally(new ApplyFunction<Throwable, Void>() {
- @Override
- public Void apply(Throwable failure) {
+ logPathFuture.exceptionally(
+ failure -> {
display(ctx, request, "Fetching TaskManager log failed.");
LOG.error("Fetching TaskManager log failed.", failure);
lastRequestPending.remove(taskManagerID);
return null;
- }
- });
+ });
- logPathFuture.thenAccept(new AcceptFunction<String>() {
- @Override
- public void accept(String filePath) {
+ logPathFuture.thenAccept(
+ filePath -> {
File file = new File(filePath);
final RandomAccessFile raf;
try {
@@ -299,15 +280,11 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
// write the content.
ChannelFuture lastContentFuture;
- final GenericFutureListener<io.netty.util.concurrent.Future<? super Void>> completionListener =
- new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
- @Override
- public void operationComplete(io.netty.util.concurrent.Future<? super Void> future) throws Exception {
- lastRequestPending.remove(taskManagerID);
- fc.close();
- raf.close();
- }
- };
+ final GenericFutureListener<Future<? super Void>> completionListener = future -> {
+ lastRequestPending.remove(taskManagerID);
+ fc.close();
+ raf.close();
+ };
if (ctx.pipeline().get(SslHandler.class) == null) {
ctx.write(
new DefaultFileRegion(fc, 0, fileLength), ctx.newProgressivePromise())
@@ -333,8 +310,7 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
if (!HttpHeaders.isKeepAlive(request)) {
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
- }
- });
+ });
} catch (Exception e) {
display(ctx, request, "Error: " + e.getMessage());
LOG.error("Fetching TaskManager log failed.", e);