You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/01/23 21:18:24 UTC

[2/4] flink git commit: [FLINK-5298] TM checks that log file exists

[FLINK-5298] TM checks that log file exists

This closes #2974.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8fa8c65
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8fa8c65
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8fa8c65

Branch: refs/heads/master
Commit: b8fa8c65638e8749f9ad994da8ec69a6c34df029
Parents: da26bdc
Author: zentol <ch...@apache.org>
Authored: Thu Dec 8 19:28:12 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Jan 23 22:18:19 2017 +0100

----------------------------------------------------------------------
 .../handlers/TaskManagerLogHandlerTest.java     | 129 +++++++++++++++++++
 .../flink/runtime/taskmanager/TaskManager.scala |  38 +++---
 .../runtime/taskmanager/TaskManagerTest.java    |  54 ++++++++
 3 files changed, 206 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b8fa8c65/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
new file mode 100644
index 0000000..c1e963e
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.router.Routed;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import scala.Option;
+import scala.collection.JavaConverters;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.Future$;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isA;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class TaskManagerLogHandlerTest {
+	@Test
+	public void testLogFetchingFailure() throws Exception {
+		// ========= setup TaskManager =================================================================================
+		InstanceID tmID = new InstanceID();
+		ResourceID tmRID = new ResourceID(tmID.toString());
+		TaskManagerGateway taskManagerGateway = mock(TaskManagerGateway.class);
+		when(taskManagerGateway.getAddress()).thenReturn("/tm/address");
+
+		Instance taskManager = mock(Instance.class);
+		when(taskManager.getId()).thenReturn(tmID);
+		when(taskManager.getTaskManagerID()).thenReturn(tmRID);
+		when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway);
+		CompletableFuture<BlobKey> future = new FlinkCompletableFuture<>();
+		future.completeExceptionally(new IOException("failure"));
+		when(taskManagerGateway.requestTaskManagerLog(any(Time.class))).thenReturn(future);
+
+		// ========= setup JobManager ==================================================================================
+
+		ActorGateway jobManagerGateway = mock(ActorGateway.class);
+		Object registeredTaskManagersAnswer = new JobManagerMessages.RegisteredTaskManagers(
+			JavaConverters.collectionAsScalaIterableConverter(Collections.singletonList(taskManager)).asScala());
+
+		when(jobManagerGateway.ask(isA(JobManagerMessages.RequestRegisteredTaskManagers$.class), any(FiniteDuration.class)))
+			.thenReturn(Future$.MODULE$.successful(registeredTaskManagersAnswer));
+		when(jobManagerGateway.ask(isA(JobManagerMessages.getRequestBlobManagerPort().getClass()), any(FiniteDuration.class)))
+			.thenReturn(Future$.MODULE$.successful((Object) 5));
+		when(jobManagerGateway.ask(isA(JobManagerMessages.RequestTaskManagerInstance.class), any(FiniteDuration.class)))
+			.thenReturn(Future$.MODULE$.successful((Object) new JobManagerMessages.TaskManagerInstance(Option.apply(taskManager))));
+		when(jobManagerGateway.path()).thenReturn("/jm/address");
+
+		JobManagerRetriever retriever = mock(JobManagerRetriever.class);
+		when(retriever.getJobManagerGatewayAndWebPort())
+			.thenReturn(Option.apply(new scala.Tuple2<ActorGateway, Integer>(jobManagerGateway, 0)));
+
+
+		TaskManagerLogHandler handler = new TaskManagerLogHandler(
+			retriever,
+			ExecutionContext$.MODULE$.fromExecutor(Executors.directExecutor()),
+			Future$.MODULE$.successful("/jm/address"),
+			AkkaUtils.getDefaultClientTimeout(),
+			TaskManagerLogHandler.FileMode.LOG,
+			new Configuration(),
+			false);
+
+		final AtomicReference<String> exception = new AtomicReference<>();
+		
+		ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+		when(ctx.write(isA(ByteBuf.class))).thenAnswer(new Answer<Object>() {
+			@Override
+			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				ByteBuf data = invocationOnMock.getArgumentAt(0, ByteBuf.class);
+				exception.set(new String(data.array()));
+				return null;
+			}
+		});
+
+		Map<String, String> pathParams = new HashMap<>();
+		pathParams.put(TaskManagersHandler.TASK_MANAGER_ID_KEY, tmID.toString());
+		Routed routed = mock(Routed.class);
+		when(routed.pathParams()).thenReturn(pathParams);
+		when(routed.request()).thenReturn(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/taskmanagers/" + tmID + "/log"));
+
+		handler.respondAsLeader(ctx, routed, jobManagerGateway);
+
+		Assert.assertEquals("Fetching TaskManager log failed.", exception.get());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8fa8c65/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7b0e7d0..3c142cd 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -333,7 +333,8 @@ class TaskManager(
         case Some(_) =>
           handleRequestTaskManagerLog(sender(), requestType, currentJobManager.get)
         case None =>
-          sender() ! new IOException("BlobService not available. Cannot upload TaskManager logs.")
+          sender() ! akka.actor.Status.Failure(new IOException("BlobService not " +
+            "available. Cannot upload TaskManager logs."))
       }
 
     case RequestBroadcastVariablesWithReferences =>
@@ -823,28 +824,35 @@ class TaskManager(
     val logFilePathOption = Option(config.getConfiguration().getString(
       ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty("log.file")))
     logFilePathOption match {
-      case None => throw new IOException("TaskManager log files are unavailable. " +
+      case None => sender ! akka.actor.Status.Failure(
+        new IOException("TaskManager log files are unavailable. " +
         "Log file location not found in environment variable log.file or configuration key "
-        + ConfigConstants.TASK_MANAGER_LOG_PATH_KEY + ".");
+        + ConfigConstants.TASK_MANAGER_LOG_PATH_KEY + "."))
       case Some(logFilePath) =>
         val file: File = requestType match {
           case LogFileRequest => new File(logFilePath);
           case StdOutFileRequest =>
             new File(logFilePath.substring(0, logFilePath.length - 4) + ".out");
         }
-        val fis = new FileInputStream(file)
-        Future {
-          val client: BlobClient = blobService.get.createClient()
-          client.put(fis)
-        }(context.dispatcher)
-          .onComplete {
-            case Success(value) => 
-              sender ! value
-              fis.close()
-            case Failure(e) =>
-              sender ! e
-              fis.close()
+        if (file.exists()) {
+          val fis = new FileInputStream(file);
+          Future {
+            val client: BlobClient = blobService.get.createClient()
+            client.put(fis);
           }(context.dispatcher)
+            .onComplete {
+              case Success(value) =>
+                sender ! value
+                fis.close()
+              case Failure(e) =>
+                sender ! akka.actor.Status.Failure(e)
+                fis.close()
+            }(context.dispatcher)
+        } else {
+          sender ! akka.actor.Status.Failure(
+            new IOException("TaskManager log files are unavailable. " +
+            "Log file could not be found at " + file.getAbsolutePath + "."))
+        }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b8fa8c65/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 99c1c1f..cc661ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -79,6 +79,7 @@ import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -1075,6 +1076,59 @@ public class TaskManagerTest extends TestLogger {
 		}};
 	}
 
+	@Test
+	public void testLogNotFoundHandling() throws Exception {
+
+		new JavaTestKit(system){{
+
+			// we require a JobManager so that the BlobService is also started
+			ActorGateway jobManager = null;
+			ActorGateway taskManager = null;
+
+			try {
+
+				// Create the JM
+				ActorRef jm = system.actorOf(Props.create(
+					new SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor())));
+
+				jobManager = new AkkaActorGateway(jm, leaderSessionID);
+
+				final int dataPort = NetUtils.getAvailablePort();
+				Configuration config = new Configuration();
+				config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
+				config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+				config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+				config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
+
+				taskManager = TestingUtils.createTaskManager(
+					system,
+					jobManager,
+					config,
+					false,
+					true);
+
+				// ---------------------------------------------------------------------------------
+
+				final ActorGateway tm = taskManager;
+
+				new Within(d) {
+					@Override
+					protected void run() {
+						Future<Object> logFuture = tm.ask(TaskManagerMessages.getRequestTaskManagerLog(), timeout);
+						try {
+							Await.result(logFuture, timeout);
+							Assert.fail();
+						} catch (Exception e) {
+							Assert.assertTrue(e.getMessage().startsWith("TaskManager log files are unavailable. Log file could not be found at"));
+						}
+					}
+				};
+			} finally {
+				TestingUtils.stopActor(taskManager);
+				TestingUtils.stopActor(jobManager);
+			}
+		}};}
+
 	// ------------------------------------------------------------------------
 	// Stack trace sample
 	// ------------------------------------------------------------------------