You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/08/26 13:30:17 UTC

flink git commit: [FLINK-4273] adapt JobRetrievalITCase to lazy classloader reconstruction

Repository: flink
Updated Branches:
  refs/heads/master abb449678 -> b05ea6939


[FLINK-4273] adapt JobRetrievalITCase to lazy classloader reconstruction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b05ea693
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b05ea693
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b05ea693

Branch: refs/heads/master
Commit: b05ea693984a5f5bf2e53f89d9fbd531e7be83fd
Parents: abb4496
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Aug 24 10:11:45 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Aug 26 15:29:46 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/client/JobAttachmentClientActor.java   |  3 ++-
 .../org/apache/flink/runtime/jobmanager/JobManager.scala |  2 +-
 .../runtime/testingUtils/TestingJobManagerLike.scala     |  7 +++++--
 .../runtime/testingUtils/TestingJobManagerMessages.scala | 11 +++++++++++
 .../flink/test/clients/examples/JobRetrievalITCase.java  |  8 +++++---
 5 files changed, 24 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b05ea693/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
index 5446002..ffab9cc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
@@ -130,7 +130,8 @@ public class JobAttachmentClientActor extends JobClientActor {
 	}
 
 	private void tryToAttachToJob() {
-		LOG.info("Sending message to JobManager {} to attach to job {} and wait for progress", jobID);
+		LOG.info("Sending message to JobManager {} to attach to job {} and wait for progress",
+			jobManager, jobID);
 
 		Futures.future(new Callable<Object>() {
 			@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b05ea693/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d35fb0a..0e28d98 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -478,7 +478,7 @@ class JobManager(
       val client = sender()
       currentJobs.get(jobID) match {
         case Some((executionGraph, jobInfo)) =>
-          log.info("Registering client for job $jobID")
+          log.info(s"Registering client for job $jobID")
           jobInfo.clients += ((client, listeningBehaviour))
           val listener = new StatusListenerMessenger(client, leaderSessionID.orNull)
           executionGraph.registerJobStatusListener(listener)

http://git-wip-us.apache.org/repos/asf/flink/blob/b05ea693/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index df4f95a..6a9b490 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
-import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, RegisterJobClient}
+import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, RegisterJobClient, RequestClassloadingProps}
 import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
 import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
 import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
@@ -336,7 +336,10 @@ trait TestingJobManagerLike extends FlinkActor {
 
     case msg: RegisterJobClient =>
       super.handleMessage(msg)
-      waitForClient.foreach(_ ! true)
+      waitForClient.foreach(_ ! ClientConnected)
+    case msg: RequestClassloadingProps =>
+      super.handleMessage(msg)
+      waitForClient.foreach(_ ! ClassLoadingPropsDelivered)
 
     case NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager) =>
       if (that.instanceManager.getNumberOfRegisteredTaskManagers >= numRegisteredTaskManager) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b05ea693/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index a88ed43..f121305 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -86,6 +86,14 @@ object TestingJobManagerMessages {
     * Notifies the sender when the [[TestingJobManager]] receives new clients for jobs
     */
   case object NotifyWhenClientConnects
+  /**
+    * Notifes of client connect
+    */
+  case object ClientConnected
+  /**
+    * Notifies when the client has requested class loading information
+    */
+  case object ClassLoadingPropsDelivered
 
   /**
    * Registers to be notified by an [[org.apache.flink.runtime.messages.Messages.Acknowledge]]
@@ -119,4 +127,7 @@ object TestingJobManagerMessages {
   def getNotifyWhenClientConnects(): AnyRef = NotifyWhenClientConnects
   def getDisablePostStop(): AnyRef = DisablePostStop
 
+  def getClientConnected(): AnyRef = ClientConnected
+  def getClassLoadingPropsDelivered(): AnyRef = ClassLoadingPropsDelivered
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b05ea693/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
index db17ee8..c9059f1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.runtime.client.JobRetrievalException;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -86,7 +85,7 @@ public class JobRetrievalITCase extends TestLogger {
 			public void run() {
 				try {
 					assertNotNull(client.retrieveJob(jobID));
-				} catch (JobExecutionException e) {
+				} catch (Throwable e) {
 					fail(e.getMessage());
 				}
 			}
@@ -106,7 +105,10 @@ public class JobRetrievalITCase extends TestLogger {
 		resumingThread.start();
 
 		// wait for client to connect
-		testkit.expectMsgEquals(true);
+		testkit.expectMsgAllOf(
+			TestingJobManagerMessages.getClientConnected(),
+			TestingJobManagerMessages.getClassLoadingPropsDelivered());
+
 		// client has connected, we can release the lock
 		lock.release();