You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/08/26 13:30:17 UTC
flink git commit: [FLINK-4273] adapt JobRetrievalITCase to lazy
classloader reconstruction
Repository: flink
Updated Branches:
refs/heads/master abb449678 -> b05ea6939
[FLINK-4273] adapt JobRetrievalITCase to lazy classloader reconstruction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b05ea693
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b05ea693
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b05ea693
Branch: refs/heads/master
Commit: b05ea693984a5f5bf2e53f89d9fbd531e7be83fd
Parents: abb4496
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Aug 24 10:11:45 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Aug 26 15:29:46 2016 +0200
----------------------------------------------------------------------
.../flink/runtime/client/JobAttachmentClientActor.java | 3 ++-
.../org/apache/flink/runtime/jobmanager/JobManager.scala | 2 +-
.../runtime/testingUtils/TestingJobManagerLike.scala | 7 +++++--
.../runtime/testingUtils/TestingJobManagerMessages.scala | 11 +++++++++++
.../flink/test/clients/examples/JobRetrievalITCase.java | 8 +++++---
5 files changed, 24 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b05ea693/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
index 5446002..ffab9cc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
@@ -130,7 +130,8 @@ public class JobAttachmentClientActor extends JobClientActor {
}
private void tryToAttachToJob() {
- LOG.info("Sending message to JobManager {} to attach to job {} and wait for progress", jobID);
+ LOG.info("Sending message to JobManager {} to attach to job {} and wait for progress",
+ jobManager, jobID);
Futures.future(new Callable<Object>() {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/b05ea693/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d35fb0a..0e28d98 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -478,7 +478,7 @@ class JobManager(
val client = sender()
currentJobs.get(jobID) match {
case Some((executionGraph, jobInfo)) =>
- log.info("Registering client for job $jobID")
+ log.info(s"Registering client for job $jobID")
jobInfo.clients += ((client, listeningBehaviour))
val listener = new StatusListenerMessenger(client, leaderSessionID.orNull)
executionGraph.registerJobStatusListener(listener)
http://git-wip-us.apache.org/repos/asf/flink/blob/b05ea693/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index df4f95a..6a9b490 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.execution.ExecutionState
import org.apache.flink.runtime.jobgraph.JobStatus
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
-import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, RegisterJobClient}
+import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, RegisterJobClient, RequestClassloadingProps}
import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
@@ -336,7 +336,10 @@ trait TestingJobManagerLike extends FlinkActor {
case msg: RegisterJobClient =>
super.handleMessage(msg)
- waitForClient.foreach(_ ! true)
+ waitForClient.foreach(_ ! ClientConnected)
+ case msg: RequestClassloadingProps =>
+ super.handleMessage(msg)
+ waitForClient.foreach(_ ! ClassLoadingPropsDelivered)
case NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager) =>
if (that.instanceManager.getNumberOfRegisteredTaskManagers >= numRegisteredTaskManager) {
http://git-wip-us.apache.org/repos/asf/flink/blob/b05ea693/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index a88ed43..f121305 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -86,6 +86,14 @@ object TestingJobManagerMessages {
* Notifies the sender when the [[TestingJobManager]] receives new clients for jobs
*/
case object NotifyWhenClientConnects
+ /**
+ * Notifes of client connect
+ */
+ case object ClientConnected
+ /**
+ * Notifies when the client has requested class loading information
+ */
+ case object ClassLoadingPropsDelivered
/**
* Registers to be notified by an [[org.apache.flink.runtime.messages.Messages.Acknowledge]]
@@ -119,4 +127,7 @@ object TestingJobManagerMessages {
def getNotifyWhenClientConnects(): AnyRef = NotifyWhenClientConnects
def getDisablePostStop(): AnyRef = DisablePostStop
+ def getClientConnected(): AnyRef = ClientConnected
+ def getClassLoadingPropsDelivered(): AnyRef = ClassLoadingPropsDelivered
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b05ea693/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
index db17ee8..c9059f1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.runtime.client.JobRetrievalException;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -86,7 +85,7 @@ public class JobRetrievalITCase extends TestLogger {
public void run() {
try {
assertNotNull(client.retrieveJob(jobID));
- } catch (JobExecutionException e) {
+ } catch (Throwable e) {
fail(e.getMessage());
}
}
@@ -106,7 +105,10 @@ public class JobRetrievalITCase extends TestLogger {
resumingThread.start();
// wait for client to connect
- testkit.expectMsgEquals(true);
+ testkit.expectMsgAllOf(
+ TestingJobManagerMessages.getClientConnected(),
+ TestingJobManagerMessages.getClassLoadingPropsDelivered());
+
// client has connected, we can release the lock
lock.release();