You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/11/17 10:06:51 UTC

[4/5] flink git commit: [FLINK-8059][QS] QS client throws FlinkJobNotFoundException for queries with unknown jobIds

[FLINK-8059][QS] QS client throws FlinkJobNotFoundException for queries with unknown jobIds


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2fe078f3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2fe078f3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2fe078f3

Branch: refs/heads/master
Commit: 2fe078f3927595cbc3c5de6635a710494e0f34b4
Parents: 5e059e9
Author: kkloudas <kk...@gmail.com>
Authored: Thu Nov 16 17:45:49 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 10:46:10 2017 +0100

----------------------------------------------------------------------
 .../itcases/AbstractQueryableStateTestBase.java | 32 +++++++++++++++-----
 .../flink/runtime/jobmanager/JobManager.scala   |  4 +--
 .../runtime/jobmanager/JobManagerTest.java      |  5 +--
 3 files changed, 29 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2fe078f3/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index a789dbd..65e9bb5 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -276,10 +276,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 	/**
 	 * Tests that duplicate query registrations fail the job at the JobManager.
-	 *
-	 * <b>NOTE: </b> This test is only in the non-HA variant of the tests because
-	 * in the HA mode we use the actual JM code which does not recognize the
-	 * {@code NotifyWhenJobStatus} message.
 	 */
 	@Test
 	public void testDuplicateRegistrationFailsJob() throws Exception {
@@ -435,10 +431,10 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 	/**
 	 * Tests that the correct exception is thrown if the query
-	 * contains a wrong queryable state name.
+	 * contains a wrong jobId or wrong queryable state name.
 	 */
 	@Test
-	public void testWrongQueryableStateName() throws Exception {
+	public void testWrongJobIdAndWrongQueryableStateName() throws Exception {
 		// Config
 		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
@@ -486,7 +482,27 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 					runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 			assertEquals(JobStatus.RUNNING, jobStatus.state());
 
-			CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = client.getKvState(
+			final JobID wrongJobId = new JobID();
+
+			CompletableFuture<ValueState<Tuple2<Integer, Long>>> unknownJobFuture = client.getKvState(
+					wrongJobId, 						// this is the wrong job id
+					"hankuna",
+					0,
+					BasicTypeInfo.INT_TYPE_INFO,
+					valueState);
+
+			try {
+				unknownJobFuture.get();
+				fail(); // by now the job must have failed.
+			} catch (ExecutionException e) {
+				Assert.assertTrue(e.getCause() instanceof RuntimeException);
+				Assert.assertTrue(e.getCause().getMessage().contains(
+						"FlinkJobNotFoundException: Could not find Flink job (" + wrongJobId + ")"));
+			} catch (Exception ignored) {
+				fail("Unexpected type of exception.");
+			}
+
+			CompletableFuture<ValueState<Tuple2<Integer, Long>>> unknownQSName = client.getKvState(
 					jobId,
 					"wrong-hankuna", // this is the wrong name.
 					0,
@@ -494,7 +510,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 					valueState);
 
 			try {
-				future.get();
+				unknownQSName.get();
 				fail(); // by now the job must have failed.
 			} catch (ExecutionException e) {
 				Assert.assertTrue(e.getCause() instanceof RuntimeException);

http://git-wip-us.apache.org/repos/asf/flink/blob/2fe078f3/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 95a3fd5..c12db23 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -70,7 +70,7 @@ import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState
 import org.apache.flink.runtime.messages.accumulators._
 import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint}
 import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
-import org.apache.flink.runtime.messages.{Acknowledge, StackTrace}
+import org.apache.flink.runtime.messages.{Acknowledge, FlinkJobNotFoundException, StackTrace}
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
 import org.apache.flink.runtime.metrics.util.MetricUtils
 import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistryImpl, MetricRegistry => FlinkMetricRegistry}
@@ -1503,7 +1503,7 @@ class JobManager(
             }
 
           case None =>
-            sender() ! Status.Failure(new IllegalStateException(s"Job ${msg.getJobId} not found"))
+            sender() ! Status.Failure(new FlinkJobNotFoundException(msg.getJobId))
         }
 
       // TaskManager KvState registration

http://git-wip-us.apache.org/repos/asf/flink/blob/2fe078f3/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index a697aae..6a02d1f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -63,6 +63,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse;
@@ -672,7 +673,7 @@ public class JobManagerTest extends TestLogger {
 		try {
 			Await.result(lookupFuture, deadline.timeLeft());
 			fail("Did not throw expected Exception");
-		} catch (IllegalStateException ignored) {
+		} catch (FlinkJobNotFoundException ignored) {
 			// Expected
 		}
 
@@ -735,7 +736,7 @@ public class JobManagerTest extends TestLogger {
 		try {
 			Await.result(lookupFuture, deadline.timeLeft());
 			fail("Did not throw expected Exception");
-		} catch (IllegalStateException ignored) {
+		} catch (FlinkJobNotFoundException ignored) {
 			// Expected
 		}