You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/02/16 16:10:25 UTC

flink git commit: [FLINK-5773] Use akka.actor.Status.Failure class to send failures via AskSupport

Repository: flink
Updated Branches:
  refs/heads/master 2ec2abfae -> 413609d13


[FLINK-5773] Use akka.actor.Status.Failure class to send failures via AskSupport

Akka's AskSupport trait requires that failures are wrapped in a akka.actor.Status.Failure
to be recognized. Internally the trait will unwrap the failure and wrap it in a
scala.util.Failure instance. However, it does not recognize the scala Failure when given
to the AskSupport trait. As a consequence it would wrap scala.util.Failure in a
scala.util.Success instance.

This closes #3321.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/413609d1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/413609d1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/413609d1

Branch: refs/heads/master
Commit: 413609d13fcf924fa8581450618bccc6abdbbda0
Parents: 2ec2abf
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Feb 15 14:16:26 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Feb 16 17:09:59 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/taskmanager/TaskManager.scala |  20 +-
 .../runtime/taskmanager/TaskManagerTest.java    | 237 ++++++++++++++++++-
 2 files changed, 240 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/413609d1/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7cb1902..a70454b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -69,13 +69,11 @@ import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
 import org.apache.flink.runtime.taskexecutor.{TaskManagerServices, TaskManagerServicesConfiguration, TaskManagerConfiguration}
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
-import org.apache.flink.util.NetUtils
 
 import scala.collection.JavaConverters._
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.language.postfixOps
-import scala.util.{Failure, Success}
 
 /**
  * The TaskManager is responsible for executing the individual tasks of a Flink job. It is
@@ -423,7 +421,7 @@ class TaskManager(
               futureResponse.mapTo[Boolean].onComplete {
                 // IMPORTANT: In the future callback, we cannot directly modify state
                 //            but only send messages to the TaskManager to do those changes
-                case Success(result) =>
+                case scala.util.Success(result) =>
                   if (!result) {
                   self ! decorateMessage(
                     FailTask(
@@ -432,7 +430,7 @@ class TaskManager(
                     )
                   }
 
-                case Failure(t) =>
+                case scala.util.Failure(t) =>
                 self ! decorateMessage(
                   FailTask(
                     executionID,
@@ -470,7 +468,7 @@ class TaskManager(
               sender ! decorateMessage(Acknowledge.get())
             } catch {
               case t: Throwable =>
-                sender ! decorateMessage(Failure(t))
+                sender ! decorateMessage(Status.Failure(t))
             }
           } else {
             log.debug(s"Cannot find task to stop for execution ${executionID})")
@@ -762,8 +760,6 @@ class TaskManager(
                   // ---- Done ----
                   log.debug(s"Done with stack trace sample $sampleId.")
 
-
-
                   sender ! new StackTraceSampleResponse(sampleId, executionId, currentTraces)
                 }
 
@@ -781,7 +777,7 @@ class TaskManager(
           }
         } catch {
           case e: Exception =>
-            sender ! Failure(e)
+            sender ! decorateMessage(Status.Failure(e))
         }
 
       case _ => unhandled(message)
@@ -841,10 +837,10 @@ class TaskManager(
             client.put(fis);
           }(context.dispatcher)
             .onComplete {
-              case Success(value) =>
+              case scala.util.Success(value) =>
                 sender ! value
                 fis.close()
-              case Failure(e) =>
+              case scala.util.Failure(e) =>
                 sender ! akka.actor.Status.Failure(e)
                 fis.close()
             }(context.dispatcher)
@@ -1209,7 +1205,7 @@ class TaskManager(
     catch {
       case t: Throwable =>
         log.error("SubmitTask failed", t)
-        sender ! decorateMessage(Failure(t))
+        sender ! decorateMessage(Status.Failure(t))
     }
   }
 
@@ -1263,7 +1259,7 @@ class TaskManager(
         if (errors.isEmpty) {
           sender ! decorateMessage(Acknowledge.get())
         } else {
-          sender ! decorateMessage(Failure(new Exception(errors.mkString("\n"))))
+          sender ! decorateMessage(Status.Failure(new Exception(errors.mkString("\n"))))
         }
 
       case None =>

http://git-wip-us.apache.org/repos/asf/flink/blob/413609d1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 770aa35..356d693 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -483,7 +483,7 @@ public class TaskManagerTest extends TestLogger {
 							expectMsgEquals(Acknowledge.get());
 
 							tm.tell(new StopTask(eid2), testActorGateway);
-							expectMsgClass(Failure.class);
+							expectMsgClass(Status.Failure.class);
 
 							assertEquals(ExecutionState.RUNNING, t2.getExecutionState());
 
@@ -1227,13 +1227,13 @@ public class TaskManagerTest extends TestLogger {
 
 							// Receive the expected message (heartbeat races possible)
 							Object[] msg = receiveN(1);
-							while (!(msg[0] instanceof Failure)) {
+							while (!(msg[0] instanceof Status.Failure)) {
 								msg = receiveN(1);
 							}
 
-							Failure response = (Failure) msg[0];
+							Status.Failure response = (Status.Failure) msg[0];
 
-							assertEquals(IllegalStateException.class, response.exception().getClass());
+							assertEquals(IllegalStateException.class, response.cause().getClass());
 						} catch (Exception e) {
 							e.printStackTrace();
 							fail(e.getMessage());
@@ -1525,7 +1525,234 @@ public class TaskManagerTest extends TestLogger {
 			}
 		}};
 	}
-	
+
+	/**
+	 * Tests that the TaskManager sends a proper exception back to the sender if the submit task
+	 * message fails.
+	 */
+	@Test
+	public void testSubmitTaskFailure() throws Exception {
+		ActorGateway jobManager = null;
+		ActorGateway taskManager = null;
+
+		try {
+
+			ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
+			jobManager = new AkkaActorGateway(jm, leaderSessionID);
+
+			taskManager = TestingUtils.createTaskManager(
+				system,
+				jobManager,
+				new Configuration(),
+				true,
+				true);
+
+			TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(
+				new JobID(),
+				"test job",
+				new JobVertexID(),
+				new ExecutionAttemptID(),
+				new SerializedValue<>(new ExecutionConfig()),
+				"test task",
+				0, // this will make the submission fail because the number of key groups must be >= 1
+				0,
+				1,
+				0,
+				new Configuration(),
+				new Configuration(),
+				"Foobar",
+				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+				Collections.<InputGateDeploymentDescriptor>emptyList(),
+				Collections.<BlobKey>emptyList(),
+				Collections.<URL>emptyList(),
+				0);
+
+			Future<Object> submitResponse = taskManager.ask(new SubmitTask(tdd), timeout);
+
+			try {
+				Await.result(submitResponse, timeout);
+
+				fail("The submit task message should have failed.");
+			} catch (IllegalArgumentException e) {
+				// expected
+			}
+		} finally {
+			TestingUtils.stopActor(jobManager);
+			TestingUtils.stopActor(taskManager);
+		}
+	}
+
+	/**
+	 * Tests that the TaskManager sends a proper exception back to the sender if the stop task
+	 * message fails.
+	 */
+	@Test
+	public void testStopTaskFailure() throws Exception {
+		ActorGateway jobManager = null;
+		ActorGateway taskManager = null;
+
+		try {
+			final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
+
+			ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
+			jobManager = new AkkaActorGateway(jm, leaderSessionID);
+
+			taskManager = TestingUtils.createTaskManager(
+				system,
+				jobManager,
+				new Configuration(),
+				true,
+				true);
+
+			TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(
+				new JobID(),
+				"test job",
+				new JobVertexID(),
+				executionAttemptId,
+				new SerializedValue<>(new ExecutionConfig()),
+				"test task",
+				1,
+				0,
+				1,
+				0,
+				new Configuration(),
+				new Configuration(),
+				BlockingNoOpInvokable.class.getName(),
+				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+				Collections.<InputGateDeploymentDescriptor>emptyList(),
+				Collections.<BlobKey>emptyList(),
+				Collections.<URL>emptyList(),
+				0);
+
+			Future<Object> submitResponse = taskManager.ask(new SubmitTask(tdd), timeout);
+
+			Await.result(submitResponse, timeout);
+
+			Future<Object> stopResponse = taskManager.ask(new StopTask(executionAttemptId), timeout);
+
+			try {
+				Await.result(stopResponse, timeout);
+
+				fail("The stop task message should have failed.");
+			} catch (UnsupportedOperationException e) {
+				// expected
+			}
+		} finally {
+			TestingUtils.stopActor(jobManager);
+			TestingUtils.stopActor(taskManager);
+		}
+	}
+
+	/**
+	 * Tests that the TaskManager sends a proper exception back to the sender if the trigger stack
+	 * trace message fails.
+	 */
+	@Test
+	public void testStackTraceSampleFailure() throws Exception {
+		ActorGateway jobManager = null;
+		ActorGateway taskManager = null;
+
+		try {
+
+			ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
+			jobManager = new AkkaActorGateway(jm, leaderSessionID);
+
+			taskManager = TestingUtils.createTaskManager(
+				system,
+				jobManager,
+				new Configuration(),
+				true,
+				true);
+
+			Future<Object> stackTraceResponse = taskManager.ask(
+				new TriggerStackTraceSample(
+					0,
+					new ExecutionAttemptID(),
+					0,
+					Time.milliseconds(1L),
+					0),
+				timeout);
+
+			try {
+				Await.result(stackTraceResponse, timeout);
+
+				fail("The trigger stack trace message should have failed.");
+			} catch (IllegalStateException e) {
+				// expected
+			}
+		} finally {
+			TestingUtils.stopActor(jobManager);
+			TestingUtils.stopActor(taskManager);
+		}
+	}
+
+	/**
+	 * Tests that the TaskManager sends a proper exception back to the sender if the trigger stack
+	 * trace message fails.
+	 */
+	@Test
+	public void testUpdateTaskInputPartitionsFailure() throws Exception {
+		ActorGateway jobManager = null;
+		ActorGateway taskManager = null;
+
+		try {
+
+			final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
+
+			ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
+			jobManager = new AkkaActorGateway(jm, leaderSessionID);
+
+			taskManager = TestingUtils.createTaskManager(
+				system,
+				jobManager,
+				new Configuration(),
+				true,
+				true);
+
+			TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(
+				new JobID(),
+				"test job",
+				new JobVertexID(),
+				executionAttemptId,
+				new SerializedValue<>(new ExecutionConfig()),
+				"test task",
+				1,
+				0,
+				1,
+				0,
+				new Configuration(),
+				new Configuration(),
+				BlockingNoOpInvokable.class.getName(),
+				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+				Collections.<InputGateDeploymentDescriptor>emptyList(),
+				Collections.<BlobKey>emptyList(),
+				Collections.<URL>emptyList(),
+				0);
+
+			Future<Object> submitResponse = taskManager.ask(new SubmitTask(tdd), timeout);
+
+			Await.result(submitResponse, timeout);
+
+			Future<Object> partitionUpdateResponse = taskManager.ask(
+				new TaskMessages.UpdateTaskSinglePartitionInfo(
+					executionAttemptId,
+					new IntermediateDataSetID(),
+					new InputChannelDeploymentDescriptor(new ResultPartitionID(), ResultPartitionLocation.createLocal())),
+				timeout);
+
+			try {
+				Await.result(partitionUpdateResponse, timeout);
+
+				fail("The update task input partitions message should have failed.");
+			} catch (Exception e) {
+				// expected
+			}
+		} finally {
+			TestingUtils.stopActor(jobManager);
+			TestingUtils.stopActor(taskManager);
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	public static class SimpleJobManager extends FlinkUntypedActor {