You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/01/23 21:18:24 UTC
[2/4] flink git commit: [FLINK-5298] TM checks that log file exists
[FLINK-5298] TM checks that log file exists
This closes #2974.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8fa8c65
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8fa8c65
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8fa8c65
Branch: refs/heads/master
Commit: b8fa8c65638e8749f9ad994da8ec69a6c34df029
Parents: da26bdc
Author: zentol <ch...@apache.org>
Authored: Thu Dec 8 19:28:12 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Jan 23 22:18:19 2017 +0100
----------------------------------------------------------------------
.../handlers/TaskManagerLogHandlerTest.java | 129 +++++++++++++++++++
.../flink/runtime/taskmanager/TaskManager.scala | 38 +++---
.../runtime/taskmanager/TaskManagerTest.java | 54 ++++++++
3 files changed, 206 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b8fa8c65/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
new file mode 100644
index 0000000..c1e963e
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.router.Routed;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import scala.Option;
+import scala.collection.JavaConverters;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.Future$;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isA;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class TaskManagerLogHandlerTest {
+ @Test
+ public void testLogFetchingFailure() throws Exception {
+ // ========= setup TaskManager =================================================================================
+ InstanceID tmID = new InstanceID();
+ ResourceID tmRID = new ResourceID(tmID.toString());
+ TaskManagerGateway taskManagerGateway = mock(TaskManagerGateway.class);
+ when(taskManagerGateway.getAddress()).thenReturn("/tm/address");
+
+ Instance taskManager = mock(Instance.class);
+ when(taskManager.getId()).thenReturn(tmID);
+ when(taskManager.getTaskManagerID()).thenReturn(tmRID);
+ when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway);
+ CompletableFuture<BlobKey> future = new FlinkCompletableFuture<>();
+ future.completeExceptionally(new IOException("failure"));
+ when(taskManagerGateway.requestTaskManagerLog(any(Time.class))).thenReturn(future);
+
+ // ========= setup JobManager ==================================================================================
+
+ ActorGateway jobManagerGateway = mock(ActorGateway.class);
+ Object registeredTaskManagersAnswer = new JobManagerMessages.RegisteredTaskManagers(
+ JavaConverters.collectionAsScalaIterableConverter(Collections.singletonList(taskManager)).asScala());
+
+ when(jobManagerGateway.ask(isA(JobManagerMessages.RequestRegisteredTaskManagers$.class), any(FiniteDuration.class)))
+ .thenReturn(Future$.MODULE$.successful(registeredTaskManagersAnswer));
+ when(jobManagerGateway.ask(isA(JobManagerMessages.getRequestBlobManagerPort().getClass()), any(FiniteDuration.class)))
+ .thenReturn(Future$.MODULE$.successful((Object) 5));
+ when(jobManagerGateway.ask(isA(JobManagerMessages.RequestTaskManagerInstance.class), any(FiniteDuration.class)))
+ .thenReturn(Future$.MODULE$.successful((Object) new JobManagerMessages.TaskManagerInstance(Option.apply(taskManager))));
+ when(jobManagerGateway.path()).thenReturn("/jm/address");
+
+ JobManagerRetriever retriever = mock(JobManagerRetriever.class);
+ when(retriever.getJobManagerGatewayAndWebPort())
+ .thenReturn(Option.apply(new scala.Tuple2<ActorGateway, Integer>(jobManagerGateway, 0)));
+
+
+ TaskManagerLogHandler handler = new TaskManagerLogHandler(
+ retriever,
+ ExecutionContext$.MODULE$.fromExecutor(Executors.directExecutor()),
+ Future$.MODULE$.successful("/jm/address"),
+ AkkaUtils.getDefaultClientTimeout(),
+ TaskManagerLogHandler.FileMode.LOG,
+ new Configuration(),
+ false);
+
+ final AtomicReference<String> exception = new AtomicReference<>();
+
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ when(ctx.write(isA(ByteBuf.class))).thenAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ ByteBuf data = invocationOnMock.getArgumentAt(0, ByteBuf.class);
+ exception.set(new String(data.array()));
+ return null;
+ }
+ });
+
+ Map<String, String> pathParams = new HashMap<>();
+ pathParams.put(TaskManagersHandler.TASK_MANAGER_ID_KEY, tmID.toString());
+ Routed routed = mock(Routed.class);
+ when(routed.pathParams()).thenReturn(pathParams);
+ when(routed.request()).thenReturn(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/taskmanagers/" + tmID + "/log"));
+
+ handler.respondAsLeader(ctx, routed, jobManagerGateway);
+
+ Assert.assertEquals("Fetching TaskManager log failed.", exception.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b8fa8c65/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7b0e7d0..3c142cd 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -333,7 +333,8 @@ class TaskManager(
case Some(_) =>
handleRequestTaskManagerLog(sender(), requestType, currentJobManager.get)
case None =>
- sender() ! new IOException("BlobService not available. Cannot upload TaskManager logs.")
+ sender() ! akka.actor.Status.Failure(new IOException("BlobService not " +
+ "available. Cannot upload TaskManager logs."))
}
case RequestBroadcastVariablesWithReferences =>
@@ -823,28 +824,35 @@ class TaskManager(
val logFilePathOption = Option(config.getConfiguration().getString(
ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty("log.file")))
logFilePathOption match {
- case None => throw new IOException("TaskManager log files are unavailable. " +
+ case None => sender ! akka.actor.Status.Failure(
+ new IOException("TaskManager log files are unavailable. " +
"Log file location not found in environment variable log.file or configuration key "
- + ConfigConstants.TASK_MANAGER_LOG_PATH_KEY + ".");
+ + ConfigConstants.TASK_MANAGER_LOG_PATH_KEY + "."))
case Some(logFilePath) =>
val file: File = requestType match {
case LogFileRequest => new File(logFilePath);
case StdOutFileRequest =>
new File(logFilePath.substring(0, logFilePath.length - 4) + ".out");
}
- val fis = new FileInputStream(file)
- Future {
- val client: BlobClient = blobService.get.createClient()
- client.put(fis)
- }(context.dispatcher)
- .onComplete {
- case Success(value) =>
- sender ! value
- fis.close()
- case Failure(e) =>
- sender ! e
- fis.close()
+ if (file.exists()) {
+ val fis = new FileInputStream(file);
+ Future {
+ val client: BlobClient = blobService.get.createClient()
+ client.put(fis);
}(context.dispatcher)
+ .onComplete {
+ case Success(value) =>
+ sender ! value
+ fis.close()
+ case Failure(e) =>
+ sender ! akka.actor.Status.Failure(e)
+ fis.close()
+ }(context.dispatcher)
+ } else {
+ sender ! akka.actor.Status.Failure(
+ new IOException("TaskManager log files are unavailable. " +
+ "Log file could not be found at " + file.getAbsolutePath + "."))
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b8fa8c65/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 99c1c1f..cc661ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -79,6 +79,7 @@ import org.apache.flink.util.NetUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
@@ -1075,6 +1076,59 @@ public class TaskManagerTest extends TestLogger {
}};
}
+ @Test
+ public void testLogNotFoundHandling() throws Exception {
+
+ new JavaTestKit(system){{
+
+ // we require a JobManager so that the BlobService is also started
+ ActorGateway jobManager = null;
+ ActorGateway taskManager = null;
+
+ try {
+
+ // Create the JM
+ ActorRef jm = system.actorOf(Props.create(
+ new SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor())));
+
+ jobManager = new AkkaActorGateway(jm, leaderSessionID);
+
+ final int dataPort = NetUtils.getAvailablePort();
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
+ config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+ config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+ config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
+
+ taskManager = TestingUtils.createTaskManager(
+ system,
+ jobManager,
+ config,
+ false,
+ true);
+
+ // ---------------------------------------------------------------------------------
+
+ final ActorGateway tm = taskManager;
+
+ new Within(d) {
+ @Override
+ protected void run() {
+ Future<Object> logFuture = tm.ask(TaskManagerMessages.getRequestTaskManagerLog(), timeout);
+ try {
+ Await.result(logFuture, timeout);
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().startsWith("TaskManager log files are unavailable. Log file could not be found at"));
+ }
+ }
+ };
+ } finally {
+ TestingUtils.stopActor(taskManager);
+ TestingUtils.stopActor(jobManager);
+ }
+ }};}
+
// ------------------------------------------------------------------------
// Stack trace sample
// ------------------------------------------------------------------------