You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/11/17 10:06:51 UTC
[4/5] flink git commit: [FLINK-8059][QS] QS client throws
FlinkJobNotFoundException for queries with unknown jobIds
[FLINK-8059][QS] QS client throws FlinkJobNotFoundException for queries with unknown jobIds
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2fe078f3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2fe078f3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2fe078f3
Branch: refs/heads/master
Commit: 2fe078f3927595cbc3c5de6635a710494e0f34b4
Parents: 5e059e9
Author: kkloudas <kk...@gmail.com>
Authored: Thu Nov 16 17:45:49 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 10:46:10 2017 +0100
----------------------------------------------------------------------
.../itcases/AbstractQueryableStateTestBase.java | 32 +++++++++++++++-----
.../flink/runtime/jobmanager/JobManager.scala | 4 +--
.../runtime/jobmanager/JobManagerTest.java | 5 +--
3 files changed, 29 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2fe078f3/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index a789dbd..65e9bb5 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -276,10 +276,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
/**
* Tests that duplicate query registrations fail the job at the JobManager.
- *
- * <b>NOTE: </b> This test is only in the non-HA variant of the tests because
- * in the HA mode we use the actual JM code which does not recognize the
- * {@code NotifyWhenJobStatus} message.
*/
@Test
public void testDuplicateRegistrationFailsJob() throws Exception {
@@ -435,10 +431,10 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
/**
* Tests that the correct exception is thrown if the query
- * contains a wrong queryable state name.
+ * contains a wrong jobId or wrong queryable state name.
*/
@Test
- public void testWrongQueryableStateName() throws Exception {
+ public void testWrongJobIdAndWrongQueryableStateName() throws Exception {
// Config
final Deadline deadline = TEST_TIMEOUT.fromNow();
@@ -486,7 +482,27 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
assertEquals(JobStatus.RUNNING, jobStatus.state());
- CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = client.getKvState(
+ final JobID wrongJobId = new JobID();
+
+ CompletableFuture<ValueState<Tuple2<Integer, Long>>> unknownJobFuture = client.getKvState(
+ wrongJobId, // this is the wrong job id
+ "hankuna",
+ 0,
+ BasicTypeInfo.INT_TYPE_INFO,
+ valueState);
+
+ try {
+ unknownJobFuture.get();
+ fail(); // by now the job must have failed.
+ } catch (ExecutionException e) {
+ Assert.assertTrue(e.getCause() instanceof RuntimeException);
+ Assert.assertTrue(e.getCause().getMessage().contains(
+ "FlinkJobNotFoundException: Could not find Flink job (" + wrongJobId + ")"));
+ } catch (Exception ignored) {
+ fail("Unexpected type of exception.");
+ }
+
+ CompletableFuture<ValueState<Tuple2<Integer, Long>>> unknownQSName = client.getKvState(
jobId,
"wrong-hankuna", // this is the wrong name.
0,
@@ -494,7 +510,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
valueState);
try {
- future.get();
+ unknownQSName.get();
fail(); // by now the job must have failed.
} catch (ExecutionException e) {
Assert.assertTrue(e.getCause() instanceof RuntimeException);
http://git-wip-us.apache.org/repos/asf/flink/blob/2fe078f3/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 95a3fd5..c12db23 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -70,7 +70,7 @@ import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState
import org.apache.flink.runtime.messages.accumulators._
import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint}
import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
-import org.apache.flink.runtime.messages.{Acknowledge, StackTrace}
+import org.apache.flink.runtime.messages.{Acknowledge, FlinkJobNotFoundException, StackTrace}
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
import org.apache.flink.runtime.metrics.util.MetricUtils
import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistryImpl, MetricRegistry => FlinkMetricRegistry}
@@ -1503,7 +1503,7 @@ class JobManager(
}
case None =>
- sender() ! Status.Failure(new IllegalStateException(s"Job ${msg.getJobId} not found"))
+ sender() ! Status.Failure(new FlinkJobNotFoundException(msg.getJobId))
}
// TaskManager KvState registration
http://git-wip-us.apache.org/repos/asf/flink/blob/2fe078f3/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index a697aae..6a02d1f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -63,6 +63,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
import org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse;
@@ -672,7 +673,7 @@ public class JobManagerTest extends TestLogger {
try {
Await.result(lookupFuture, deadline.timeLeft());
fail("Did not throw expected Exception");
- } catch (IllegalStateException ignored) {
+ } catch (FlinkJobNotFoundException ignored) {
// Expected
}
@@ -735,7 +736,7 @@ public class JobManagerTest extends TestLogger {
try {
Await.result(lookupFuture, deadline.timeLeft());
fail("Did not throw expected Exception");
- } catch (IllegalStateException ignored) {
+ } catch (FlinkJobNotFoundException ignored) {
// Expected
}