You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/11/17 09:34:31 UTC
flink git commit: [FLINK-8063][QS] QS client does not retry when an
UnknownKvStateLocation is thrown.
Repository: flink
Updated Branches:
refs/heads/master 81dc260dc -> a0838de79
[FLINK-8063][QS] QS client does not retry when an UnknownKvStateLocation is thrown.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a0838de7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a0838de7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a0838de7
Branch: refs/heads/master
Commit: a0838de79ff73b0322f3ce255df54f5f33b2bf3b
Parents: 81dc260
Author: kkloudas <kk...@gmail.com>
Authored: Tue Nov 14 15:05:45 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 10:29:30 2017 +0100
----------------------------------------------------------------------
.../network/AbstractServerHandler.java | 2 +-
.../client/proxy/KvStateClientProxyHandler.java | 11 +-
.../itcases/AbstractQueryableStateTestBase.java | 230 ++++++++++++-------
3 files changed, 150 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a0838de7/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
index 9e02291..7e71a11 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
@@ -262,7 +262,7 @@ public abstract class AbstractServerHandler<REQ extends MessageBody, RESP extend
try {
stats.reportFailedRequest();
- final String errMsg = "Failed request " + requestId + ". Caused by: " + ExceptionUtils.stringifyException(t);
+ final String errMsg = "Failed request " + requestId + "." + System.lineSeparator() + " Caused by: " + ExceptionUtils.stringifyException(t);
final ByteBuf err = MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, new RuntimeException(errMsg));
ctx.writeAndFlush(err);
} catch (IOException io) {
http://git-wip-us.apache.org/repos/asf/flink/blob/a0838de7/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
index 73ef7f3..af33701 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
@@ -37,7 +37,6 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.query.KvStateClientProxy;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateMessage;
-import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.util.Preconditions;
@@ -48,7 +47,6 @@ import org.slf4j.LoggerFactory;
import java.net.ConnectException;
import java.net.InetSocketAddress;
-import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -133,12 +131,11 @@ public class KvStateClientProxyHandler extends AbstractServerHandler<KvStateRequ
operationFuture.whenCompleteAsync(
(t, throwable) -> {
if (throwable != null) {
- if (throwable instanceof CancellationException) {
- result.completeExceptionally(throwable);
- } else if (throwable.getCause() instanceof UnknownKvStateIdException ||
+ if (
+ throwable.getCause() instanceof UnknownKvStateIdException ||
throwable.getCause() instanceof UnknownKvStateKeyGroupLocationException ||
- throwable.getCause() instanceof UnknownKvStateLocation ||
- throwable.getCause() instanceof ConnectException) {
+ throwable.getCause() instanceof ConnectException
+ ) {
// These failures are likely to be caused by out-of-sync
// KvStateLocation. Therefore we retry this query and
http://git-wip-us.apache.org/repos/asf/flink/blob/a0838de7/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index b4bae9c..c1cbb61 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -37,7 +37,6 @@ import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
@@ -89,12 +88,10 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;
-import java.util.function.Supplier;
import scala.concurrent.Await;
import scala.concurrent.duration.Deadline;
@@ -103,15 +100,14 @@ import scala.reflect.ClassTag$;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* Base class for queryable state integration tests with a configurable state backend.
*/
public abstract class AbstractQueryableStateTestBase extends TestLogger {
- private static final int NO_OF_RETRIES = 100;
private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10000L, TimeUnit.SECONDS);
- private static final Time QUERY_RETRY_DELAY = Time.milliseconds(100L);
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
private final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService);
@@ -229,14 +225,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
allNonZero = false;
}
- CompletableFuture<ReducingState<Tuple2<Integer, Long>>> result = getKvStateWithRetries(
+ CompletableFuture<ReducingState<Tuple2<Integer, Long>>> result = getKvState(
+ deadline,
client,
jobId,
queryName,
key,
BasicTypeInfo.INT_TYPE_INFO,
reducingState,
- QUERY_RETRY_DELAY,
false,
executor);
@@ -284,7 +280,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
*
* <b>NOTE: </b> This test is only in the non-HA variant of the tests because
* in the HA mode we use the actual JM code which does not recognize the
- * {@code NotifyWhenJobStatus} message. *
+ * {@code NotifyWhenJobStatus} message.
*/
@Test
public void testDuplicateRegistrationFailsJob() throws Exception {
@@ -439,6 +435,92 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
}
/**
+ * Tests that the correct exception is thrown if the query
+ * contains a wrong queryable state name.
+ */
+ @Test
+ public void testWrongQueryableStateName() throws Exception {
+ // Config
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+ final long numElements = 1024L;
+
+ JobID jobId = null;
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+ DataStream<Tuple2<Integer, Long>> source = env
+ .addSource(new TestAscendingValueSource(numElements));
+
+ // Value state
+ ValueStateDescriptor<Tuple2<Integer, Long>> valueState =
+ new ValueStateDescriptor<>("any", source.getType());
+
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 7662520075515707428L;
+
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+ return value.f0;
+ }
+ }).asQueryableState("hakuna", valueState);
+
+ // Submit the job graph
+ JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+ jobId = jobGraph.getJobID();
+
+ CompletableFuture<TestingJobManagerMessages.JobStatusIs> runningFuture = FutureUtils.toJava(
+ cluster.getLeaderGateway(deadline.timeLeft())
+ .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), deadline.timeLeft())
+ .mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
+
+ cluster.submitJobDetached(jobGraph);
+
+ // expect for the job to be running
+ TestingJobManagerMessages.JobStatusIs jobStatus =
+ runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ assertEquals(JobStatus.RUNNING, jobStatus.state());
+
+ CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = client.getKvState(
+ jobId,
+ "wrong-hankuna", // this is the wrong name.
+ 0,
+ VoidNamespace.INSTANCE,
+ BasicTypeInfo.INT_TYPE_INFO,
+ VoidNamespaceTypeInfo.INSTANCE,
+ valueState);
+
+ try {
+ future.get();
+ fail(); // by now the job must have failed.
+ } catch (ExecutionException e) {
+ Assert.assertTrue(e.getCause() instanceof RuntimeException);
+ Assert.assertTrue(e.getCause().getMessage().contains(
+ "UnknownKvStateLocation: No KvStateLocation found for KvState instance with name 'wrong-hankuna'."));
+ } catch (Exception ignored) {
+ fail("Unexpected type of exception.");
+ }
+
+ } finally {
+ // Free cluster resources
+ if (jobId != null) {
+ CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+ .getLeaderGateway(deadline.timeLeft())
+ .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+ .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+ cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ /**
* Similar tests as {@link #testValueState()} but before submitting the
* job, we already issue one request which fails.
*/
@@ -572,14 +654,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
// Now query
int key = 0;
- CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+ CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvState(
+ deadline,
client,
jobId,
queryableState.getQueryableStateName(),
key,
BasicTypeInfo.INT_TYPE_INFO,
valueState,
- QUERY_RETRY_DELAY,
true,
executor);
@@ -723,14 +805,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
while (deadline.hasTimeLeft() && !success) {
- CompletableFuture<FoldingState<Tuple2<Integer, Long>, String>> future = getKvStateWithRetries(
+ CompletableFuture<FoldingState<Tuple2<Integer, Long>, String>> future = getKvState(
+ deadline,
client,
jobId,
"pumba",
key,
BasicTypeInfo.INT_TYPE_INFO,
foldingState,
- QUERY_RETRY_DELAY,
false,
executor);
@@ -814,14 +896,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
while (deadline.hasTimeLeft() && !success) {
- CompletableFuture<ReducingState<Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+ CompletableFuture<ReducingState<Tuple2<Integer, Long>>> future = getKvState(
+ deadline,
client,
jobId,
"jungle",
key,
BasicTypeInfo.INT_TYPE_INFO,
reducingState,
- QUERY_RETRY_DELAY,
false,
executor);
@@ -923,14 +1005,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
while (deadline.hasTimeLeft() && !success) {
- CompletableFuture<MapState<Integer, Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+ CompletableFuture<MapState<Integer, Tuple2<Integer, Long>>> future = getKvState(
+ deadline,
client,
jobId,
"timon-queryable",
key,
BasicTypeInfo.INT_TYPE_INFO,
mapStateDescriptor,
- QUERY_RETRY_DELAY,
false,
executor);
@@ -1028,14 +1110,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
while (deadline.hasTimeLeft() && !success) {
- CompletableFuture<ListState<Long>> future = getKvStateWithRetries(
+ final CompletableFuture<ListState<Long>> future = getKvState(
+ deadline,
client,
jobId,
"list-queryable",
key,
BasicTypeInfo.INT_TYPE_INFO,
listStateDescriptor,
- QUERY_RETRY_DELAY,
false,
executor);
@@ -1130,14 +1212,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
while (deadline.hasTimeLeft() && !success) {
- CompletableFuture<AggregatingState<Tuple2<Integer, Long>, String>> future = getKvStateWithRetries(
+ CompletableFuture<AggregatingState<Tuple2<Integer, Long>, String>> future = getKvState(
+ deadline,
client,
jobId,
"aggr-queryable",
key,
BasicTypeInfo.INT_TYPE_INFO,
aggrStateDescriptor,
- QUERY_RETRY_DELAY,
false,
executor);
@@ -1372,84 +1454,62 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
///// General Utility Methods //////
- private static <K, S extends State, V> CompletableFuture<S> getKvStateWithRetries(
+ private static <K, S extends State, V> CompletableFuture<S> getKvState(
+ final Deadline deadline,
final QueryableStateClient client,
final JobID jobId,
final String queryName,
final K key,
final TypeInformation<K> keyTypeInfo,
final StateDescriptor<S, V> stateDescriptor,
- final Time retryDelay,
final boolean failForUnknownKeyOrNamespace,
- final ScheduledExecutor executor) {
- return retryWithDelay(
- () -> client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor),
- NO_OF_RETRIES,
- retryDelay,
- executor,
- failForUnknownKeyOrNamespace);
- }
-
- private static <T> CompletableFuture<T> retryWithDelay(
- final Supplier<CompletableFuture<T>> operation,
- final int retries,
- final Time retryDelay,
- final ScheduledExecutor scheduledExecutor,
- final boolean failIfUnknownKeyOrNamespace) {
-
- final CompletableFuture<T> resultFuture = new CompletableFuture<>();
-
- retryWithDelay(
- resultFuture,
- operation,
- retries,
- retryDelay,
- scheduledExecutor,
- failIfUnknownKeyOrNamespace);
+ final ScheduledExecutor executor) throws InterruptedException {
+ final CompletableFuture<S> resultFuture = new CompletableFuture<>();
+ getKvStateIgnoringCertainExceptions(
+ deadline, resultFuture, client, jobId, queryName, key, keyTypeInfo,
+ stateDescriptor, failForUnknownKeyOrNamespace, executor);
return resultFuture;
}
- public static <T> void retryWithDelay(
- final CompletableFuture<T> resultFuture,
- final Supplier<CompletableFuture<T>> operation,
- final int retries,
- final Time retryDelay,
- final ScheduledExecutor scheduledExecutor,
- final boolean failIfUnknownKeyOrNamespace) {
+ private static <K, S extends State, V> void getKvStateIgnoringCertainExceptions(
+ final Deadline deadline,
+ final CompletableFuture<S> resultFuture,
+ final QueryableStateClient client,
+ final JobID jobId,
+ final String queryName,
+ final K key,
+ final TypeInformation<K> keyTypeInfo,
+ final StateDescriptor<S, V> stateDescriptor,
+ final boolean failForUnknownKeyOrNamespace,
+ final ScheduledExecutor executor) throws InterruptedException {
if (!resultFuture.isDone()) {
- final CompletableFuture<T> operationResultFuture = operation.get();
- operationResultFuture.whenCompleteAsync(
- (t, throwable) -> {
- if (throwable != null) {
- if (throwable.getCause() instanceof CancellationException) {
- resultFuture.completeExceptionally(new FutureUtils.RetryException("Operation future was cancelled.", throwable.getCause()));
- } else if (throwable.getCause() instanceof AssertionError ||
- (failIfUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException)) {
- resultFuture.completeExceptionally(throwable.getCause());
- } else {
- if (retries > 0) {
- final ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
- () -> retryWithDelay(resultFuture, operation, retries - 1, retryDelay, scheduledExecutor, failIfUnknownKeyOrNamespace),
- retryDelay.toMilliseconds(),
- TimeUnit.MILLISECONDS);
-
- resultFuture.whenComplete(
- (innerT, innerThrowable) -> scheduledFuture.cancel(false));
- } else {
- resultFuture.completeExceptionally(new FutureUtils.RetryException("Could not complete the operation. Number of retries " +
- "has been exhausted.", throwable));
- }
- }
- } else {
- resultFuture.complete(t);
+ Thread.sleep(100L);
+ CompletableFuture<S> expected = client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor);
+ expected.whenCompleteAsync((result, throwable) -> {
+ if (throwable != null) {
+ if (
+ throwable.getCause() instanceof CancellationException ||
+ throwable.getCause() instanceof AssertionError ||
+ (failForUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException)
+ ) {
+ resultFuture.completeExceptionally(throwable.getCause());
+ } else if (deadline.hasTimeLeft()) {
+ try {
+ getKvStateIgnoringCertainExceptions(
+ deadline, resultFuture, client, jobId, queryName, key, keyTypeInfo,
+ stateDescriptor, failForUnknownKeyOrNamespace, executor);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
- },
- scheduledExecutor);
+ }
+ } else {
+ resultFuture.complete(result);
+ }
+ }, executor);
- resultFuture.whenComplete(
- (t, throwable) -> operationResultFuture.cancel(false));
+ resultFuture.whenComplete((result, throwable) -> expected.cancel(false));
}
}
@@ -1468,14 +1528,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
while (deadline.hasTimeLeft() && !success) {
- CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+ CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvState(
+ deadline,
client,
jobId,
queryableStateName,
key,
BasicTypeInfo.INT_TYPE_INFO,
stateDescriptor,
- QUERY_RETRY_DELAY,
false,
executor);