You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/01 11:36:20 UTC

flink git commit: [FLINK-7314] [futures] Replace Flink's futures with CompletableFuture in TaskManagerLogHandler

Repository: flink
Updated Branches:
  refs/heads/master 648c1f595 -> 748448b5f


[FLINK-7314] [futures] Replace Flink's futures with CompletableFuture in TaskManagerLogHandler

This closes #4430.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/748448b5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/748448b5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/748448b5

Branch: refs/heads/master
Commit: 748448b5f02d73614242cc8488920a9cd8898974
Parents: 648c1f5
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 31 15:53:48 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Aug 1 13:33:19 2017 +0200

----------------------------------------------------------------------
 .../handlers/TaskManagerLogHandler.java         | 92 ++++++++------------
 1 file changed, 34 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/748448b5/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index 1084623..9cbb71d 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -27,18 +27,13 @@ package org.apache.flink.runtime.webmonitor.handlers;
  *****************************************************************************/
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobView;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.BiFunction;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
@@ -65,6 +60,7 @@ import io.netty.handler.codec.http.LastHttpContent;
 import io.netty.handler.codec.http.router.Routed;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.stream.ChunkedFile;
+import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,6 +72,8 @@ import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
 import java.nio.channels.FileChannel;
 import java.util.HashMap;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 
 import scala.Option;
@@ -111,7 +109,7 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
 	private final Configuration config;
 
 	/** Future of the blob cache. */
-	private Future<BlobCache> cache;
+	private CompletableFuture<BlobCache> cache;
 
 	/** Indicates which log file should be displayed. */
 	private FileMode fileMode;
@@ -176,7 +174,7 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
 				}
 			}, executor);
 
-			cache = new FlinkFuture<>(cacheFuture);
+			cache = FutureUtils.toJava(cacheFuture);
 		}
 
 		final String taskManagerID = routed.pathParams().get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
@@ -190,47 +188,34 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
 					.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout)
 					.mapTo(ClassTag$.MODULE$.<JobManagerMessages.TaskManagerInstance>apply(JobManagerMessages.TaskManagerInstance.class));
 
-				Future<JobManagerMessages.TaskManagerInstance> taskManagerFuture = new FlinkFuture<>(scalaTaskManagerFuture);
+				CompletableFuture<JobManagerMessages.TaskManagerInstance> taskManagerFuture = FutureUtils.toJava(scalaTaskManagerFuture);
 
-				Future<BlobKey> blobKeyFuture = taskManagerFuture.thenCompose(new ApplyFunction<JobManagerMessages.TaskManagerInstance, Future<BlobKey>>() {
-					@Override
-					public Future<BlobKey> apply(JobManagerMessages.TaskManagerInstance value) {
-						Instance taskManager = value.instance().get();
+				CompletableFuture<BlobKey> blobKeyFuture = taskManagerFuture.thenCompose(
+					taskManagerInstance -> {
+						Instance taskManager = taskManagerInstance.instance().get();
 
 						switch (fileMode) {
 							case LOG:
-								return taskManager.getTaskManagerGateway().requestTaskManagerLog(timeTimeout);
+								return FutureUtils.toJava(taskManager.getTaskManagerGateway().requestTaskManagerLog(timeTimeout));
 							case STDOUT:
 							default:
-								return taskManager.getTaskManagerGateway().requestTaskManagerStdout(timeTimeout);
+								return FutureUtils.toJava(taskManager.getTaskManagerGateway().requestTaskManagerStdout(timeTimeout));
 						}
 					}
-				});
+				);
 
-				Future<String> logPathFuture = blobKeyFuture
-					.thenCombine(
+				CompletableFuture<String> logPathFuture = blobKeyFuture
+					.thenCombineAsync(
 						cache,
-						new BiFunction<BlobKey, BlobCache, Tuple2<BlobKey, BlobCache>>() {
-							@Override
-							public Tuple2<BlobKey, BlobCache> apply(BlobKey blobKey, BlobCache blobCache) {
-								return Tuple2.of(blobKey, blobCache);
-							}
-						})
-					.thenComposeAsync(new ApplyFunction<Tuple2<BlobKey, BlobCache>, Future<String>>() {
-						@Override
-						public Future<String> apply(Tuple2<BlobKey, BlobCache> value) {
-							final BlobKey blobKey = value.f0;
-							final BlobCache blobCache = value.f1;
-
+						(blobKey, blobCache) -> {
 							//delete previous log file, if it is different than the current one
 							HashMap<String, BlobKey> lastSubmittedFile = fileMode == FileMode.LOG ? lastSubmittedLog : lastSubmittedStdout;
 							if (lastSubmittedFile.containsKey(taskManagerID)) {
-								if (!blobKey.equals(lastSubmittedFile.get(taskManagerID))) {
+								if (!Objects.equals(blobKey, lastSubmittedFile.get(taskManagerID))) {
 									try {
 										blobCache.deleteGlobal(lastSubmittedFile.get(taskManagerID));
 									} catch (IOException e) {
-										return FlinkCompletableFuture.completedExceptionally(
-											new Exception("Could not delete file for " + taskManagerID + '.', e));
+										throw new FlinkFutureException("Could not delete file for " + taskManagerID + '.', e);
 									}
 									lastSubmittedFile.put(taskManagerID, blobKey);
 								}
@@ -238,28 +223,24 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
 								lastSubmittedFile.put(taskManagerID, blobKey);
 							}
 							try {
-								return FlinkCompletableFuture.completed(blobCache.getURL(blobKey).getFile());
+								return blobCache.getURL(blobKey).getFile();
 							} catch (IOException e) {
-								return FlinkCompletableFuture.completedExceptionally(
-									new Exception("Could not retrieve blob for " + blobKey + '.', e));
+								throw new FlinkFutureException("Could not retrieve blob for " + blobKey + '.', e);
 							}
-						}
-					}, executor);
+						},
+						executor);
 
-				logPathFuture.exceptionally(new ApplyFunction<Throwable, Void>() {
-					@Override
-					public Void apply(Throwable failure) {
+				logPathFuture.exceptionally(
+					failure -> {
 						display(ctx, request, "Fetching TaskManager log failed.");
 						LOG.error("Fetching TaskManager log failed.", failure);
 						lastRequestPending.remove(taskManagerID);
 
 						return null;
-					}
-				});
+					});
 
-				logPathFuture.thenAccept(new AcceptFunction<String>() {
-					@Override
-					public void accept(String filePath) {
+				logPathFuture.thenAccept(
+					filePath -> {
 						File file = new File(filePath);
 						final RandomAccessFile raf;
 						try {
@@ -299,15 +280,11 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
 
 						// write the content.
 						ChannelFuture lastContentFuture;
-						final GenericFutureListener<io.netty.util.concurrent.Future<? super Void>> completionListener =
-							new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
-								@Override
-								public void operationComplete(io.netty.util.concurrent.Future<? super Void> future) throws Exception {
-									lastRequestPending.remove(taskManagerID);
-									fc.close();
-									raf.close();
-								}
-							};
+						final GenericFutureListener<Future<? super Void>> completionListener = future -> {
+							lastRequestPending.remove(taskManagerID);
+							fc.close();
+							raf.close();
+						};
 						if (ctx.pipeline().get(SslHandler.class) == null) {
 							ctx.write(
 								new DefaultFileRegion(fc, 0, fileLength), ctx.newProgressivePromise())
@@ -333,8 +310,7 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
 						if (!HttpHeaders.isKeepAlive(request)) {
 							lastContentFuture.addListener(ChannelFutureListener.CLOSE);
 						}
-					}
-				});
+					});
 			} catch (Exception e) {
 				display(ctx, request, "Error: " + e.getMessage());
 				LOG.error("Fetching TaskManager log failed.", e);