You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/20 11:27:30 UTC

[1/2] flink git commit: [FLINK-5482] [tests] Dedup code in QueryableStateITCase

Repository: flink
Updated Branches:
  refs/heads/master 883fc5a77 -> 7ff7f431d


[FLINK-5482] [tests] Dedup code in QueryableStateITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/661b3f90
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/661b3f90
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/661b3f90

Branch: refs/heads/master
Commit: 661b3f90e481f1de8a35041d5d136988414dc621
Parents: 883fc5a
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Jan 12 16:41:30 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Jan 20 12:26:45 2017 +0100

----------------------------------------------------------------------
 .../flink/test/query/QueryableStateITCase.java  | 152 ++++++-------------
 1 file changed, 50 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/661b3f90/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
index a5ed6ad..eccd8e0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
@@ -622,40 +622,8 @@ public class QueryableStateITCase extends TestLogger {
 			// Now query
 			long expected = numElements;
 
-			FiniteDuration retryDelay = new FiniteDuration(1, TimeUnit.SECONDS);
-			for (int key = 0; key < NUM_SLOTS; key++) {
-				final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(
-						key,
-						queryableState.getKeySerializer(),
-						VoidNamespace.INSTANCE,
-						VoidNamespaceSerializer.INSTANCE);
-
-				boolean success = false;
-				while (deadline.hasTimeLeft() && !success) {
-					Future<byte[]> future = getKvStateWithRetries(client,
-							jobId,
-							queryableState.getQueryableStateName(),
-							key,
-							serializedKey,
-							retryDelay);
-
-					byte[] serializedValue = Await.result(future, deadline.timeLeft());
-
-					Tuple2<Integer, Long> value = KvStateRequestSerializer.deserializeValue(
-							serializedValue,
-							queryableState.getValueSerializer());
-
-					assertEquals("Key mismatch", key, value.f0.intValue());
-					if (expected == value.f1) {
-						success = true;
-					} else {
-						// Retry
-						Thread.sleep(50);
-					}
-				}
-
-				assertTrue("Did not succeed query", success);
-			}
+			executeValueQuery(deadline, client, jobId, queryableState,
+				expected);
 		} finally {
 			// Free cluster resources
 			if (jobId != null) {
@@ -672,6 +640,50 @@ public class QueryableStateITCase extends TestLogger {
 	}
 
 	/**
+	 * Retry a query for state for keys between 0 and {@link #NUM_SLOTS} until
+	 * <tt>expected</tt> equals the value of the result tuple's second field.
+	 */
+	private void executeValueQuery(final Deadline deadline,
+		final QueryableStateClient client, final JobID jobId,
+		final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState,
+		final long expected) throws Exception {
+		FiniteDuration retryDelay = new FiniteDuration(1, TimeUnit.SECONDS);
+		for (int key = 0; key < NUM_SLOTS; key++) {
+			final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(
+				key,
+				queryableState.getKeySerializer(),
+				VoidNamespace.INSTANCE,
+				VoidNamespaceSerializer.INSTANCE);
+
+			boolean success = false;
+			while (deadline.hasTimeLeft() && !success) {
+				Future<byte[]> future = getKvStateWithRetries(client,
+					jobId,
+					queryableState.getQueryableStateName(),
+					key,
+					serializedKey,
+					retryDelay);
+
+				byte[] serializedValue = Await.result(future, deadline.timeLeft());
+
+				Tuple2<Integer, Long> value = KvStateRequestSerializer.deserializeValue(
+					serializedValue,
+					queryableState.getValueSerializer());
+
+				assertEquals("Key mismatch", key, value.f0.intValue());
+				if (expected == value.f1) {
+					success = true;
+				} else {
+					// Retry
+					Thread.sleep(50);
+				}
+			}
+
+			assertTrue("Did not succeed query", success);
+		}
+	}
+
+	/**
 	 * Tests simple value state queryable state instance. Each source emits
 	 * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
 	 * queried. The tests succeeds after each subtask index is queried with
@@ -718,40 +730,8 @@ public class QueryableStateITCase extends TestLogger {
 			// Now query
 			long expected = numElements;
 
-			FiniteDuration retryDelay = new FiniteDuration(1, TimeUnit.SECONDS);
-			for (int key = 0; key < NUM_SLOTS; key++) {
-				final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(
-						key,
-						queryableState.getKeySerializer(),
-						VoidNamespace.INSTANCE,
-						VoidNamespaceSerializer.INSTANCE);
-
-				boolean success = false;
-				while (deadline.hasTimeLeft() && !success) {
-					Future<byte[]> future = getKvStateWithRetries(client,
-							jobId,
-							queryableState.getQueryableStateName(),
-							key,
-							serializedKey,
-							retryDelay);
-
-					byte[] serializedValue = Await.result(future, deadline.timeLeft());
-
-					Tuple2<Integer, Long> value = KvStateRequestSerializer.deserializeValue(
-							serializedValue,
-							queryableState.getValueSerializer());
-
-					assertEquals("Key mismatch", key, value.f0.intValue());
-					if (expected == value.f1) {
-						success = true;
-					} else {
-						// Retry
-						Thread.sleep(50);
-					}
-				}
-
-				assertTrue("Did not succeed query", success);
-			}
+			executeValueQuery(deadline, client, jobId, queryableState,
+				expected);
 		} finally {
 			// Free cluster resources
 			if (jobId != null) {
@@ -1024,40 +1004,8 @@ public class QueryableStateITCase extends TestLogger {
 			// Now query
 			long expected = numElements * (numElements + 1) / 2;
 
-			FiniteDuration retryDelay = new FiniteDuration(1, TimeUnit.SECONDS);
-			for (int key = 0; key < NUM_SLOTS; key++) {
-				final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(
-						key,
-						queryableState.getKeySerializer(),
-						VoidNamespace.INSTANCE,
-						VoidNamespaceSerializer.INSTANCE);
-
-				boolean success = false;
-				while (deadline.hasTimeLeft() && !success) {
-					Future<byte[]> future = getKvStateWithRetries(client,
-							jobId,
-							queryableState.getQueryableStateName(),
-							key,
-							serializedKey,
-							retryDelay);
-
-					byte[] serializedValue = Await.result(future, deadline.timeLeft());
-
-					Tuple2<Integer, Long> value = KvStateRequestSerializer.deserializeValue(
-							serializedValue,
-							queryableState.getValueSerializer());
-
-					assertEquals("Key mismatch", key, value.f0.intValue());
-					if (expected == value.f1) {
-						success = true;
-					} else {
-						// Retry
-						Thread.sleep(50);
-					}
-				}
-
-				assertTrue("Did not succeed query", success);
-			}
+			executeValueQuery(deadline, client, jobId, queryableState,
+				expected);
 		} finally {
 			// Free cluster resources
 			if (jobId != null) {


[2/2] flink git commit: [FLINK-5482] [queryable state] Re-issue location lookup upon failure

Posted by uc...@apache.org.
[FLINK-5482] [queryable state] Re-issue location lookup upon failure

Any failing lookup, e.g. in case the job has not been started yet, previously
remained in the lookup cache and thus future queries did not retry the lookup
and failed. This commit changes the lookup caching code so that completed
and failed futures are removed from the cache and replaced by new lookups.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ff7f431
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ff7f431
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ff7f431

Branch: refs/heads/master
Commit: 7ff7f431dab1d5fa70d71747cda619b9f6491bd2
Parents: 661b3f9
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Jan 12 16:48:27 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Jan 20 12:26:50 2017 +0100

----------------------------------------------------------------------
 .../runtime/query/QueryableStateClient.java     | 20 +++++-
 .../flink/test/query/QueryableStateITCase.java  | 73 ++++++++++++++++++++
 2 files changed, 92 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7ff7f431/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
index 98c3580..7ba3199 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
@@ -341,7 +341,25 @@ public class QueryableStateClient {
 					return previous;
 				}
 			} else {
-				return cachedFuture;
+				// do not retain futures which failed as they will remain in
+				// the cache even if the error cause is not present any more
+				// and a new lookup may succeed
+				if (cachedFuture.isCompleted() &&
+					cachedFuture.value().get().isFailure()) {
+					// issue a new lookup
+					Future<KvStateLocation> lookupFuture = lookupService
+						.getKvStateLookupInfo(jobId, queryableStateName);
+
+					// replace the existing one if it has not been replaced yet
+					// otherwise return the one in the cache
+					if (lookupCache.replace(cacheKey, cachedFuture, lookupFuture)) {
+						return lookupFuture;
+					} else {
+						return lookupCache.get(cacheKey);
+					}
+				} else {
+					return cachedFuture;
+				}
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ff7f431/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
index eccd8e0..88e4f9a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
@@ -640,6 +640,79 @@ public class QueryableStateITCase extends TestLogger {
 	}
 
 	/**
+	 * Similar tests as {@link #testValueState()} but before submitting the
+	 * job, we already issue one request which fails.
+	 */
+	@Test
+	public void testQueryNonStartedJobState() throws Exception {
+		// Config
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+		final int numElements = 1024;
+
+		final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+
+		JobID jobId = null;
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(NUM_SLOTS);
+			// Very important, because cluster is shared between tests and we
+			// don't explicitly check that all slots are available before
+			// submitting.
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
+
+			DataStream<Tuple2<Integer, Long>> source = env
+				.addSource(new TestAscendingValueSource(numElements));
+
+			// Value state
+			ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>(
+				"any",
+				source.getType(),
+				null);
+
+			QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
+				source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+					@Override
+					public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+						return value.f0;
+					}
+				}).asQueryableState("hakuna", valueState);
+
+			// Submit the job graph
+			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+			jobId = jobGraph.getJobID();
+
+			// Now query
+			long expected = numElements;
+
+			// query once
+			client.getKvState(jobId, queryableState.getQueryableStateName(), 0,
+				KvStateRequestSerializer.serializeKeyAndNamespace(
+					0,
+					queryableState.getKeySerializer(),
+					VoidNamespace.INSTANCE,
+					VoidNamespaceSerializer.INSTANCE));
+
+			cluster.submitJobDetached(jobGraph);
+
+			executeValueQuery(deadline, client, jobId, queryableState,
+				expected);
+		} finally {
+			// Free cluster resources
+			if (jobId != null) {
+				Future<CancellationSuccess> cancellation = cluster
+					.getLeaderGateway(deadline.timeLeft())
+					.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+					.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+
+				Await.ready(cancellation, deadline.timeLeft());
+			}
+
+			client.shutDown();
+		}
+	}
+
+	/**
 	 * Retry a query for state for keys between 0 and {@link #NUM_SLOTS} until
 	 * <tt>expected</tt> equals the value of the result tuple's second field.
 	 */