You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/02/16 16:10:25 UTC
flink git commit: [FLINK-5773] Use akka.actor.Status.Failure class to
send failures via AskSupport
Repository: flink
Updated Branches:
refs/heads/master 2ec2abfae -> 413609d13
[FLINK-5773] Use akka.actor.Status.Failure class to send failures via AskSupport
Akka's AskSupport trait requires that failures are wrapped in a akka.actor.Status.Failure
to be recognized. Internally the trait will unwrap the failure and wrap it in a
scala.util.Failure instance. However, it does not recognize the scala Failure when given
to the AskSupport trait. As a consequence it would wrap scala.util.Failure in a
scala.util.Success instance.
This closes #3321.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/413609d1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/413609d1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/413609d1
Branch: refs/heads/master
Commit: 413609d13fcf924fa8581450618bccc6abdbbda0
Parents: 2ec2abf
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Feb 15 14:16:26 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Feb 16 17:09:59 2017 +0100
----------------------------------------------------------------------
.../flink/runtime/taskmanager/TaskManager.scala | 20 +-
.../runtime/taskmanager/TaskManagerTest.java | 237 ++++++++++++++++++-
2 files changed, 240 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/413609d1/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7cb1902..a70454b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -69,13 +69,11 @@ import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
import org.apache.flink.runtime.taskexecutor.{TaskManagerServices, TaskManagerServicesConfiguration, TaskManagerConfiguration}
import org.apache.flink.runtime.util._
import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
-import org.apache.flink.util.NetUtils
import scala.collection.JavaConverters._
import scala.concurrent._
import scala.concurrent.duration._
import scala.language.postfixOps
-import scala.util.{Failure, Success}
/**
* The TaskManager is responsible for executing the individual tasks of a Flink job. It is
@@ -423,7 +421,7 @@ class TaskManager(
futureResponse.mapTo[Boolean].onComplete {
// IMPORTANT: In the future callback, we cannot directly modify state
// but only send messages to the TaskManager to do those changes
- case Success(result) =>
+ case scala.util.Success(result) =>
if (!result) {
self ! decorateMessage(
FailTask(
@@ -432,7 +430,7 @@ class TaskManager(
)
}
- case Failure(t) =>
+ case scala.util.Failure(t) =>
self ! decorateMessage(
FailTask(
executionID,
@@ -470,7 +468,7 @@ class TaskManager(
sender ! decorateMessage(Acknowledge.get())
} catch {
case t: Throwable =>
- sender ! decorateMessage(Failure(t))
+ sender ! decorateMessage(Status.Failure(t))
}
} else {
log.debug(s"Cannot find task to stop for execution ${executionID})")
@@ -762,8 +760,6 @@ class TaskManager(
// ---- Done ----
log.debug(s"Done with stack trace sample $sampleId.")
-
-
sender ! new StackTraceSampleResponse(sampleId, executionId, currentTraces)
}
@@ -781,7 +777,7 @@ class TaskManager(
}
} catch {
case e: Exception =>
- sender ! Failure(e)
+ sender ! decorateMessage(Status.Failure(e))
}
case _ => unhandled(message)
@@ -841,10 +837,10 @@ class TaskManager(
client.put(fis);
}(context.dispatcher)
.onComplete {
- case Success(value) =>
+ case scala.util.Success(value) =>
sender ! value
fis.close()
- case Failure(e) =>
+ case scala.util.Failure(e) =>
sender ! akka.actor.Status.Failure(e)
fis.close()
}(context.dispatcher)
@@ -1209,7 +1205,7 @@ class TaskManager(
catch {
case t: Throwable =>
log.error("SubmitTask failed", t)
- sender ! decorateMessage(Failure(t))
+ sender ! decorateMessage(Status.Failure(t))
}
}
@@ -1263,7 +1259,7 @@ class TaskManager(
if (errors.isEmpty) {
sender ! decorateMessage(Acknowledge.get())
} else {
- sender ! decorateMessage(Failure(new Exception(errors.mkString("\n"))))
+ sender ! decorateMessage(Status.Failure(new Exception(errors.mkString("\n"))))
}
case None =>
http://git-wip-us.apache.org/repos/asf/flink/blob/413609d1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 770aa35..356d693 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -483,7 +483,7 @@ public class TaskManagerTest extends TestLogger {
expectMsgEquals(Acknowledge.get());
tm.tell(new StopTask(eid2), testActorGateway);
- expectMsgClass(Failure.class);
+ expectMsgClass(Status.Failure.class);
assertEquals(ExecutionState.RUNNING, t2.getExecutionState());
@@ -1227,13 +1227,13 @@ public class TaskManagerTest extends TestLogger {
// Receive the expected message (heartbeat races possible)
Object[] msg = receiveN(1);
- while (!(msg[0] instanceof Failure)) {
+ while (!(msg[0] instanceof Status.Failure)) {
msg = receiveN(1);
}
- Failure response = (Failure) msg[0];
+ Status.Failure response = (Status.Failure) msg[0];
- assertEquals(IllegalStateException.class, response.exception().getClass());
+ assertEquals(IllegalStateException.class, response.cause().getClass());
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -1525,7 +1525,234 @@ public class TaskManagerTest extends TestLogger {
}
}};
}
-
+
+ /**
+ * Tests that the TaskManager sends a proper exception back to the sender if the submit task
+ * message fails.
+ */
+ @Test
+ public void testSubmitTaskFailure() throws Exception {
+ ActorGateway jobManager = null;
+ ActorGateway taskManager = null;
+
+ try {
+
+ ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
+ jobManager = new AkkaActorGateway(jm, leaderSessionID);
+
+ taskManager = TestingUtils.createTaskManager(
+ system,
+ jobManager,
+ new Configuration(),
+ true,
+ true);
+
+ TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(
+ new JobID(),
+ "test job",
+ new JobVertexID(),
+ new ExecutionAttemptID(),
+ new SerializedValue<>(new ExecutionConfig()),
+ "test task",
+ 0, // this will make the submission fail because the number of key groups must be >= 1
+ 0,
+ 1,
+ 0,
+ new Configuration(),
+ new Configuration(),
+ "Foobar",
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+ Collections.<InputGateDeploymentDescriptor>emptyList(),
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList(),
+ 0);
+
+ Future<Object> submitResponse = taskManager.ask(new SubmitTask(tdd), timeout);
+
+ try {
+ Await.result(submitResponse, timeout);
+
+ fail("The submit task message should have failed.");
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+ } finally {
+ TestingUtils.stopActor(jobManager);
+ TestingUtils.stopActor(taskManager);
+ }
+ }
+
+ /**
+ * Tests that the TaskManager sends a proper exception back to the sender if the stop task
+ * message fails.
+ */
+ @Test
+ public void testStopTaskFailure() throws Exception {
+ ActorGateway jobManager = null;
+ ActorGateway taskManager = null;
+
+ try {
+ final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
+
+ ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
+ jobManager = new AkkaActorGateway(jm, leaderSessionID);
+
+ taskManager = TestingUtils.createTaskManager(
+ system,
+ jobManager,
+ new Configuration(),
+ true,
+ true);
+
+ TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(
+ new JobID(),
+ "test job",
+ new JobVertexID(),
+ executionAttemptId,
+ new SerializedValue<>(new ExecutionConfig()),
+ "test task",
+ 1,
+ 0,
+ 1,
+ 0,
+ new Configuration(),
+ new Configuration(),
+ BlockingNoOpInvokable.class.getName(),
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+ Collections.<InputGateDeploymentDescriptor>emptyList(),
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList(),
+ 0);
+
+ Future<Object> submitResponse = taskManager.ask(new SubmitTask(tdd), timeout);
+
+ Await.result(submitResponse, timeout);
+
+ Future<Object> stopResponse = taskManager.ask(new StopTask(executionAttemptId), timeout);
+
+ try {
+ Await.result(stopResponse, timeout);
+
+ fail("The stop task message should have failed.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+ } finally {
+ TestingUtils.stopActor(jobManager);
+ TestingUtils.stopActor(taskManager);
+ }
+ }
+
+ /**
+ * Tests that the TaskManager sends a proper exception back to the sender if the trigger stack
+ * trace message fails.
+ */
+ @Test
+ public void testStackTraceSampleFailure() throws Exception {
+ ActorGateway jobManager = null;
+ ActorGateway taskManager = null;
+
+ try {
+
+ ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
+ jobManager = new AkkaActorGateway(jm, leaderSessionID);
+
+ taskManager = TestingUtils.createTaskManager(
+ system,
+ jobManager,
+ new Configuration(),
+ true,
+ true);
+
+ Future<Object> stackTraceResponse = taskManager.ask(
+ new TriggerStackTraceSample(
+ 0,
+ new ExecutionAttemptID(),
+ 0,
+ Time.milliseconds(1L),
+ 0),
+ timeout);
+
+ try {
+ Await.result(stackTraceResponse, timeout);
+
+ fail("The trigger stack trace message should have failed.");
+ } catch (IllegalStateException e) {
+ // expected
+ }
+ } finally {
+ TestingUtils.stopActor(jobManager);
+ TestingUtils.stopActor(taskManager);
+ }
+ }
+
+ /**
+ * Tests that the TaskManager sends a proper exception back to the sender if the trigger stack
+ * trace message fails.
+ */
+ @Test
+ public void testUpdateTaskInputPartitionsFailure() throws Exception {
+ ActorGateway jobManager = null;
+ ActorGateway taskManager = null;
+
+ try {
+
+ final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
+
+ ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
+ jobManager = new AkkaActorGateway(jm, leaderSessionID);
+
+ taskManager = TestingUtils.createTaskManager(
+ system,
+ jobManager,
+ new Configuration(),
+ true,
+ true);
+
+ TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(
+ new JobID(),
+ "test job",
+ new JobVertexID(),
+ executionAttemptId,
+ new SerializedValue<>(new ExecutionConfig()),
+ "test task",
+ 1,
+ 0,
+ 1,
+ 0,
+ new Configuration(),
+ new Configuration(),
+ BlockingNoOpInvokable.class.getName(),
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+ Collections.<InputGateDeploymentDescriptor>emptyList(),
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList(),
+ 0);
+
+ Future<Object> submitResponse = taskManager.ask(new SubmitTask(tdd), timeout);
+
+ Await.result(submitResponse, timeout);
+
+ Future<Object> partitionUpdateResponse = taskManager.ask(
+ new TaskMessages.UpdateTaskSinglePartitionInfo(
+ executionAttemptId,
+ new IntermediateDataSetID(),
+ new InputChannelDeploymentDescriptor(new ResultPartitionID(), ResultPartitionLocation.createLocal())),
+ timeout);
+
+ try {
+ Await.result(partitionUpdateResponse, timeout);
+
+ fail("The update task input partitions message should have failed.");
+ } catch (Exception e) {
+ // expected
+ }
+ } finally {
+ TestingUtils.stopActor(jobManager);
+ TestingUtils.stopActor(taskManager);
+ }
+ }
+
// --------------------------------------------------------------------------------------------
public static class SimpleJobManager extends FlinkUntypedActor {