You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/20 11:27:31 UTC

[2/2] flink git commit: [FLINK-5482] [queryable state] Re-issue location lookup upon failure

[FLINK-5482] [queryable state] Re-issue location lookup upon failure

Any failing lookup, e.g. in case the job has not been started yet, previously
remained in the lookup cache and thus future queries did not retry the lookup
and failed. This commit changes the lookup caching code so that completed
and failed futures are removed from the cache and replaced by new lookups.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ff7f431
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ff7f431
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ff7f431

Branch: refs/heads/master
Commit: 7ff7f431dab1d5fa70d71747cda619b9f6491bd2
Parents: 661b3f9
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Jan 12 16:48:27 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Jan 20 12:26:50 2017 +0100

----------------------------------------------------------------------
 .../runtime/query/QueryableStateClient.java     | 20 +++++-
 .../flink/test/query/QueryableStateITCase.java  | 73 ++++++++++++++++++++
 2 files changed, 92 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7ff7f431/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
index 98c3580..7ba3199 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
@@ -341,7 +341,25 @@ public class QueryableStateClient {
 					return previous;
 				}
 			} else {
-				return cachedFuture;
+				// do not retain futures which failed as they will remain in
+				// the cache even if the error cause is not present any more
+				// and a new lookup may succeed
+				if (cachedFuture.isCompleted() &&
+					cachedFuture.value().get().isFailure()) {
+					// issue a new lookup
+					Future<KvStateLocation> lookupFuture = lookupService
+						.getKvStateLookupInfo(jobId, queryableStateName);
+
+					// replace the existing one if it has not been replaced yet
+					// otherwise return the one in the cache
+					if (lookupCache.replace(cacheKey, cachedFuture, lookupFuture)) {
+						return lookupFuture;
+					} else {
+						return lookupCache.get(cacheKey);
+					}
+				} else {
+					return cachedFuture;
+				}
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ff7f431/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
index eccd8e0..88e4f9a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
@@ -640,6 +640,79 @@ public class QueryableStateITCase extends TestLogger {
 	}
 
 	/**
+	 * Similar tests as {@link #testValueState()} but before submitting the
+	 * job, we already issue one request which fails.
+	 */
+	@Test
+	public void testQueryNonStartedJobState() throws Exception {
+		// Config
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+		final int numElements = 1024;
+
+		final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+
+		JobID jobId = null;
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(NUM_SLOTS);
+			// Very important, because cluster is shared between tests and we
+			// don't explicitly check that all slots are available before
+			// submitting.
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
+
+			DataStream<Tuple2<Integer, Long>> source = env
+				.addSource(new TestAscendingValueSource(numElements));
+
+			// Value state
+			ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>(
+				"any",
+				source.getType(),
+				null);
+
+			QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
+				source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+					@Override
+					public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+						return value.f0;
+					}
+				}).asQueryableState("hakuna", valueState);
+
+			// Submit the job graph
+			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+			jobId = jobGraph.getJobID();
+
+			// Now query
+			long expected = numElements;
+
+			// query once
+			client.getKvState(jobId, queryableState.getQueryableStateName(), 0,
+				KvStateRequestSerializer.serializeKeyAndNamespace(
+					0,
+					queryableState.getKeySerializer(),
+					VoidNamespace.INSTANCE,
+					VoidNamespaceSerializer.INSTANCE));
+
+			cluster.submitJobDetached(jobGraph);
+
+			executeValueQuery(deadline, client, jobId, queryableState,
+				expected);
+		} finally {
+			// Free cluster resources
+			if (jobId != null) {
+				Future<CancellationSuccess> cancellation = cluster
+					.getLeaderGateway(deadline.timeLeft())
+					.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+					.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+
+				Await.ready(cancellation, deadline.timeLeft());
+			}
+
+			client.shutDown();
+		}
+	}
+
+	/**
 	 * Retry a query for state for keys between 0 and {@link #NUM_SLOTS} until
 	 * <tt>expected</tt> equals the value of the result tuple's second field.
 	 */