You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/11/17 09:34:31 UTC

flink git commit: [FLINK-8063][QS] QS client does not retry when an UnknownKvStateLocation is thrown.

Repository: flink
Updated Branches:
  refs/heads/master 81dc260dc -> a0838de79


[FLINK-8063][QS] QS client does not retry when an UnknownKvStateLocation is thrown.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a0838de7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a0838de7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a0838de7

Branch: refs/heads/master
Commit: a0838de79ff73b0322f3ce255df54f5f33b2bf3b
Parents: 81dc260
Author: kkloudas <kk...@gmail.com>
Authored: Tue Nov 14 15:05:45 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 10:29:30 2017 +0100

----------------------------------------------------------------------
 .../network/AbstractServerHandler.java          |   2 +-
 .../client/proxy/KvStateClientProxyHandler.java |  11 +-
 .../itcases/AbstractQueryableStateTestBase.java | 230 ++++++++++++-------
 3 files changed, 150 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a0838de7/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
index 9e02291..7e71a11 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
@@ -262,7 +262,7 @@ public abstract class AbstractServerHandler<REQ extends MessageBody, RESP extend
 					try {
 						stats.reportFailedRequest();
 
-						final String errMsg = "Failed request " + requestId + ". Caused by: " + ExceptionUtils.stringifyException(t);
+						final String errMsg = "Failed request " + requestId + "." + System.lineSeparator() + " Caused by: " + ExceptionUtils.stringifyException(t);
 						final ByteBuf err = MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, new RuntimeException(errMsg));
 						ctx.writeAndFlush(err);
 					} catch (IOException io) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a0838de7/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
index 73ef7f3..af33701 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
@@ -37,7 +37,6 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.query.KvStateClientProxy;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateMessage;
-import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.util.Preconditions;
 
@@ -48,7 +47,6 @@ import org.slf4j.LoggerFactory;
 
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -133,12 +131,11 @@ public class KvStateClientProxyHandler extends AbstractServerHandler<KvStateRequ
 			operationFuture.whenCompleteAsync(
 					(t, throwable) -> {
 						if (throwable != null) {
-							if (throwable instanceof CancellationException) {
-								result.completeExceptionally(throwable);
-							} else if (throwable.getCause() instanceof UnknownKvStateIdException ||
+							if (
+									throwable.getCause() instanceof UnknownKvStateIdException ||
 									throwable.getCause() instanceof UnknownKvStateKeyGroupLocationException ||
-									throwable.getCause() instanceof UnknownKvStateLocation ||
-									throwable.getCause() instanceof ConnectException) {
+									throwable.getCause() instanceof ConnectException
+								) {
 
 								// These failures are likely to be caused by out-of-sync
 								// KvStateLocation. Therefore we retry this query and

http://git-wip-us.apache.org/repos/asf/flink/blob/a0838de7/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index b4bae9c..c1cbb61 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -37,7 +37,6 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
@@ -89,12 +88,10 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongArray;
-import java.util.function.Supplier;
 
 import scala.concurrent.Await;
 import scala.concurrent.duration.Deadline;
@@ -103,15 +100,14 @@ import scala.reflect.ClassTag$;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Base class for queryable state integration tests with a configurable state backend.
  */
 public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
-	private static final int NO_OF_RETRIES = 100;
 	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10000L, TimeUnit.SECONDS);
-	private static final Time QUERY_RETRY_DELAY = Time.milliseconds(100L);
 
 	private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
 	private final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService);
@@ -229,14 +225,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 						allNonZero = false;
 					}
 
-					CompletableFuture<ReducingState<Tuple2<Integer, Long>>> result = getKvStateWithRetries(
+					CompletableFuture<ReducingState<Tuple2<Integer, Long>>> result = getKvState(
+							deadline,
 							client,
 							jobId,
 							queryName,
 							key,
 							BasicTypeInfo.INT_TYPE_INFO,
 							reducingState,
-							QUERY_RETRY_DELAY,
 							false,
 							executor);
 
@@ -284,7 +280,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 	 *
 	 * <b>NOTE: </b> This test is only in the non-HA variant of the tests because
 	 * in the HA mode we use the actual JM code which does not recognize the
-	 * {@code NotifyWhenJobStatus} message.	 *
+	 * {@code NotifyWhenJobStatus} message.
 	 */
 	@Test
 	public void testDuplicateRegistrationFailsJob() throws Exception {
@@ -439,6 +435,92 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 	}
 
 	/**
+	 * Tests that the correct exception is thrown if the query
+	 * contains a wrong queryable state name.
+	 */
+	@Test
+	public void testWrongQueryableStateName() throws Exception {
+		// Config
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+		final long numElements = 1024L;
+
+		JobID jobId = null;
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setStateBackend(stateBackend);
+			env.setParallelism(maxParallelism);
+			// Very important, because cluster is shared between tests and we
+			// don't explicitly check that all slots are available before
+			// submitting.
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+			DataStream<Tuple2<Integer, Long>> source = env
+					.addSource(new TestAscendingValueSource(numElements));
+
+			// Value state
+			ValueStateDescriptor<Tuple2<Integer, Long>> valueState =
+					new ValueStateDescriptor<>("any", source.getType());
+
+			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+				private static final long serialVersionUID = 7662520075515707428L;
+
+				@Override
+				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+					return value.f0;
+				}
+			}).asQueryableState("hakuna", valueState);
+
+			// Submit the job graph
+			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+			jobId = jobGraph.getJobID();
+
+			CompletableFuture<TestingJobManagerMessages.JobStatusIs> runningFuture = FutureUtils.toJava(
+					cluster.getLeaderGateway(deadline.timeLeft())
+							.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), deadline.timeLeft())
+							.mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
+
+			cluster.submitJobDetached(jobGraph);
+
+			// expect for the job to be running
+			TestingJobManagerMessages.JobStatusIs jobStatus =
+					runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			assertEquals(JobStatus.RUNNING, jobStatus.state());
+
+			CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = client.getKvState(
+					jobId,
+					"wrong-hankuna", // this is the wrong name.
+					0,
+					VoidNamespace.INSTANCE,
+					BasicTypeInfo.INT_TYPE_INFO,
+					VoidNamespaceTypeInfo.INSTANCE,
+					valueState);
+
+			try {
+				future.get();
+				fail(); // by now the job must have failed.
+			} catch (ExecutionException e) {
+				Assert.assertTrue(e.getCause() instanceof RuntimeException);
+				Assert.assertTrue(e.getCause().getMessage().contains(
+						"UnknownKvStateLocation: No KvStateLocation found for KvState instance with name 'wrong-hankuna'."));
+			} catch (Exception ignored) {
+				fail("Unexpected type of exception.");
+			}
+
+		} finally {
+			// Free cluster resources
+			if (jobId != null) {
+				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+						.getLeaderGateway(deadline.timeLeft())
+						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			}
+		}
+	}
+
+	/**
 	 * Similar tests as {@link #testValueState()} but before submitting the
 	 * job, we already issue one request which fails.
 	 */
@@ -572,14 +654,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 			// Now query
 			int key = 0;
-			CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+			CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvState(
+					deadline,
 					client,
 					jobId,
 					queryableState.getQueryableStateName(),
 					key,
 					BasicTypeInfo.INT_TYPE_INFO,
 					valueState,
-					QUERY_RETRY_DELAY,
 					true,
 					executor);
 
@@ -723,14 +805,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			for (int key = 0; key < maxParallelism; key++) {
 				boolean success = false;
 				while (deadline.hasTimeLeft() && !success) {
-					CompletableFuture<FoldingState<Tuple2<Integer, Long>, String>> future = getKvStateWithRetries(
+					CompletableFuture<FoldingState<Tuple2<Integer, Long>, String>> future = getKvState(
+							deadline,
 							client,
 							jobId,
 							"pumba",
 							key,
 							BasicTypeInfo.INT_TYPE_INFO,
 							foldingState,
-							QUERY_RETRY_DELAY,
 							false,
 							executor);
 
@@ -814,14 +896,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			for (int key = 0; key < maxParallelism; key++) {
 				boolean success = false;
 				while (deadline.hasTimeLeft() && !success) {
-					CompletableFuture<ReducingState<Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+					CompletableFuture<ReducingState<Tuple2<Integer, Long>>> future = getKvState(
+							deadline,
 							client,
 							jobId,
 							"jungle",
 							key,
 							BasicTypeInfo.INT_TYPE_INFO,
 							reducingState,
-							QUERY_RETRY_DELAY,
 							false,
 							executor);
 
@@ -923,14 +1005,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			for (int key = 0; key < maxParallelism; key++) {
 				boolean success = false;
 				while (deadline.hasTimeLeft() && !success) {
-					CompletableFuture<MapState<Integer, Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+					CompletableFuture<MapState<Integer, Tuple2<Integer, Long>>> future = getKvState(
+							deadline,
 							client,
 							jobId,
 							"timon-queryable",
 							key,
 							BasicTypeInfo.INT_TYPE_INFO,
 							mapStateDescriptor,
-							QUERY_RETRY_DELAY,
 							false,
 							executor);
 
@@ -1028,14 +1110,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			for (int key = 0; key < maxParallelism; key++) {
 				boolean success = false;
 				while (deadline.hasTimeLeft() && !success) {
-					CompletableFuture<ListState<Long>> future = getKvStateWithRetries(
+					final CompletableFuture<ListState<Long>> future = getKvState(
+							deadline,
 							client,
 							jobId,
 							"list-queryable",
 							key,
 							BasicTypeInfo.INT_TYPE_INFO,
 							listStateDescriptor,
-							QUERY_RETRY_DELAY,
 							false,
 							executor);
 
@@ -1130,14 +1212,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			for (int key = 0; key < maxParallelism; key++) {
 				boolean success = false;
 				while (deadline.hasTimeLeft() && !success) {
-					CompletableFuture<AggregatingState<Tuple2<Integer, Long>, String>> future = getKvStateWithRetries(
+					CompletableFuture<AggregatingState<Tuple2<Integer, Long>, String>> future = getKvState(
+							deadline,
 							client,
 							jobId,
 							"aggr-queryable",
 							key,
 							BasicTypeInfo.INT_TYPE_INFO,
 							aggrStateDescriptor,
-							QUERY_RETRY_DELAY,
 							false,
 							executor);
 
@@ -1372,84 +1454,62 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 	/////				General Utility Methods				//////
 
-	private static <K, S extends State, V> CompletableFuture<S> getKvStateWithRetries(
+	private static <K, S extends State, V> CompletableFuture<S> getKvState(
+			final Deadline deadline,
 			final QueryableStateClient client,
 			final JobID jobId,
 			final String queryName,
 			final K key,
 			final TypeInformation<K> keyTypeInfo,
 			final StateDescriptor<S, V> stateDescriptor,
-			final Time retryDelay,
 			final boolean failForUnknownKeyOrNamespace,
-			final ScheduledExecutor executor) {
-		return retryWithDelay(
-				() -> client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor),
-				NO_OF_RETRIES,
-				retryDelay,
-				executor,
-				failForUnknownKeyOrNamespace);
-	}
-
-	private static <T> CompletableFuture<T> retryWithDelay(
-			final Supplier<CompletableFuture<T>> operation,
-			final int retries,
-			final Time retryDelay,
-			final ScheduledExecutor scheduledExecutor,
-			final boolean failIfUnknownKeyOrNamespace) {
-
-		final CompletableFuture<T> resultFuture = new CompletableFuture<>();
-
-		retryWithDelay(
-				resultFuture,
-				operation,
-				retries,
-				retryDelay,
-				scheduledExecutor,
-				failIfUnknownKeyOrNamespace);
+			final ScheduledExecutor executor) throws InterruptedException {
 
+		final CompletableFuture<S> resultFuture = new CompletableFuture<>();
+		getKvStateIgnoringCertainExceptions(
+				deadline, resultFuture, client, jobId, queryName, key, keyTypeInfo,
+				stateDescriptor, failForUnknownKeyOrNamespace, executor);
 		return resultFuture;
 	}
 
-	public static <T> void retryWithDelay(
-			final CompletableFuture<T> resultFuture,
-			final Supplier<CompletableFuture<T>> operation,
-			final int retries,
-			final Time retryDelay,
-			final ScheduledExecutor scheduledExecutor,
-			final boolean failIfUnknownKeyOrNamespace) {
+	private static <K, S extends State, V> void getKvStateIgnoringCertainExceptions(
+			final Deadline deadline,
+			final CompletableFuture<S> resultFuture,
+			final QueryableStateClient client,
+			final JobID jobId,
+			final String queryName,
+			final K key,
+			final TypeInformation<K> keyTypeInfo,
+			final StateDescriptor<S, V> stateDescriptor,
+			final boolean failForUnknownKeyOrNamespace,
+			final ScheduledExecutor executor) throws InterruptedException {
 
 		if (!resultFuture.isDone()) {
-			final CompletableFuture<T> operationResultFuture = operation.get();
-			operationResultFuture.whenCompleteAsync(
-					(t, throwable) -> {
-						if (throwable != null) {
-							if (throwable.getCause() instanceof CancellationException) {
-								resultFuture.completeExceptionally(new FutureUtils.RetryException("Operation future was cancelled.", throwable.getCause()));
-							} else if (throwable.getCause() instanceof AssertionError ||
-									(failIfUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException)) {
-								resultFuture.completeExceptionally(throwable.getCause());
-							} else {
-								if (retries > 0) {
-									final ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
-											() -> retryWithDelay(resultFuture, operation, retries - 1, retryDelay, scheduledExecutor, failIfUnknownKeyOrNamespace),
-											retryDelay.toMilliseconds(),
-											TimeUnit.MILLISECONDS);
-
-									resultFuture.whenComplete(
-											(innerT, innerThrowable) -> scheduledFuture.cancel(false));
-								} else {
-									resultFuture.completeExceptionally(new FutureUtils.RetryException("Could not complete the operation. Number of retries " +
-											"has been exhausted.", throwable));
-								}
-							}
-						} else {
-							resultFuture.complete(t);
+			Thread.sleep(100L);
+			CompletableFuture<S> expected = client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor);
+			expected.whenCompleteAsync((result, throwable) -> {
+				if (throwable != null) {
+					if (
+							throwable.getCause() instanceof CancellationException ||
+							throwable.getCause() instanceof AssertionError ||
+							(failForUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException)
+					) {
+						resultFuture.completeExceptionally(throwable.getCause());
+					} else if (deadline.hasTimeLeft()) {
+						try {
+							getKvStateIgnoringCertainExceptions(
+									deadline, resultFuture, client, jobId, queryName, key, keyTypeInfo,
+									stateDescriptor, failForUnknownKeyOrNamespace, executor);
+						} catch (InterruptedException e) {
+							e.printStackTrace();
 						}
-					},
-					scheduledExecutor);
+					}
+				} else {
+					resultFuture.complete(result);
+				}
+			}, executor);
 
-			resultFuture.whenComplete(
-					(t, throwable) -> operationResultFuture.cancel(false));
+			resultFuture.whenComplete((result, throwable) -> expected.cancel(false));
 		}
 	}
 
@@ -1468,14 +1528,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 		for (int key = 0; key < maxParallelism; key++) {
 			boolean success = false;
 			while (deadline.hasTimeLeft() && !success) {
-				CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+				CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvState(
+						deadline,
 						client,
 						jobId,
 						queryableStateName,
 						key,
 						BasicTypeInfo.INT_TYPE_INFO,
 						stateDescriptor,
-						QUERY_RETRY_DELAY,
 						false,
 						executor);