You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/11/17 11:03:19 UTC
[1/6] flink git commit: [FLINK-8063][QS] QS client does not retry
when an UnknownKvStateLocation is thrown.
Repository: flink
Updated Branches:
refs/heads/release-1.4 42e24413b -> 3753ae251
[FLINK-8063][QS] QS client does not retry when an UnknownKvStateLocation is thrown.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0324e34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0324e34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0324e34
Branch: refs/heads/release-1.4
Commit: d0324e34a06e7374179d1627a4a3653d07f1c614
Parents: 42e2441
Author: kkloudas <kk...@gmail.com>
Authored: Tue Nov 14 15:05:45 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 10:37:18 2017 +0100
----------------------------------------------------------------------
.../network/AbstractServerHandler.java | 2 +-
.../client/proxy/KvStateClientProxyHandler.java | 11 +-
.../itcases/AbstractQueryableStateTestBase.java | 230 ++++++++++++-------
3 files changed, 150 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d0324e34/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
index 9e02291..7e71a11 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
@@ -262,7 +262,7 @@ public abstract class AbstractServerHandler<REQ extends MessageBody, RESP extend
try {
stats.reportFailedRequest();
- final String errMsg = "Failed request " + requestId + ". Caused by: " + ExceptionUtils.stringifyException(t);
+ final String errMsg = "Failed request " + requestId + "." + System.lineSeparator() + " Caused by: " + ExceptionUtils.stringifyException(t);
final ByteBuf err = MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, new RuntimeException(errMsg));
ctx.writeAndFlush(err);
} catch (IOException io) {
http://git-wip-us.apache.org/repos/asf/flink/blob/d0324e34/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
index 73ef7f3..af33701 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
@@ -37,7 +37,6 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.query.KvStateClientProxy;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateMessage;
-import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.util.Preconditions;
@@ -48,7 +47,6 @@ import org.slf4j.LoggerFactory;
import java.net.ConnectException;
import java.net.InetSocketAddress;
-import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -133,12 +131,11 @@ public class KvStateClientProxyHandler extends AbstractServerHandler<KvStateRequ
operationFuture.whenCompleteAsync(
(t, throwable) -> {
if (throwable != null) {
- if (throwable instanceof CancellationException) {
- result.completeExceptionally(throwable);
- } else if (throwable.getCause() instanceof UnknownKvStateIdException ||
+ if (
+ throwable.getCause() instanceof UnknownKvStateIdException ||
throwable.getCause() instanceof UnknownKvStateKeyGroupLocationException ||
- throwable.getCause() instanceof UnknownKvStateLocation ||
- throwable.getCause() instanceof ConnectException) {
+ throwable.getCause() instanceof ConnectException
+ ) {
// These failures are likely to be caused by out-of-sync
// KvStateLocation. Therefore we retry this query and
http://git-wip-us.apache.org/repos/asf/flink/blob/d0324e34/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index b4bae9c..c1cbb61 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -37,7 +37,6 @@ import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
@@ -89,12 +88,10 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;
-import java.util.function.Supplier;
import scala.concurrent.Await;
import scala.concurrent.duration.Deadline;
@@ -103,15 +100,14 @@ import scala.reflect.ClassTag$;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* Base class for queryable state integration tests with a configurable state backend.
*/
public abstract class AbstractQueryableStateTestBase extends TestLogger {
- private static final int NO_OF_RETRIES = 100;
private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10000L, TimeUnit.SECONDS);
- private static final Time QUERY_RETRY_DELAY = Time.milliseconds(100L);
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
private final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService);
@@ -229,14 +225,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
allNonZero = false;
}
- CompletableFuture<ReducingState<Tuple2<Integer, Long>>> result = getKvStateWithRetries(
+ CompletableFuture<ReducingState<Tuple2<Integer, Long>>> result = getKvState(
+ deadline,
client,
jobId,
queryName,
key,
BasicTypeInfo.INT_TYPE_INFO,
reducingState,
- QUERY_RETRY_DELAY,
false,
executor);
@@ -284,7 +280,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
*
* <b>NOTE: </b> This test is only in the non-HA variant of the tests because
* in the HA mode we use the actual JM code which does not recognize the
- * {@code NotifyWhenJobStatus} message. *
+ * {@code NotifyWhenJobStatus} message.
*/
@Test
public void testDuplicateRegistrationFailsJob() throws Exception {
@@ -439,6 +435,92 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
}
/**
+ * Tests that the correct exception is thrown if the query
+ * contains a wrong queryable state name.
+ */
+ @Test
+ public void testWrongQueryableStateName() throws Exception {
+ // Config
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+ final long numElements = 1024L;
+
+ JobID jobId = null;
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+ DataStream<Tuple2<Integer, Long>> source = env
+ .addSource(new TestAscendingValueSource(numElements));
+
+ // Value state
+ ValueStateDescriptor<Tuple2<Integer, Long>> valueState =
+ new ValueStateDescriptor<>("any", source.getType());
+
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 7662520075515707428L;
+
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+ return value.f0;
+ }
+ }).asQueryableState("hakuna", valueState);
+
+ // Submit the job graph
+ JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+ jobId = jobGraph.getJobID();
+
+ CompletableFuture<TestingJobManagerMessages.JobStatusIs> runningFuture = FutureUtils.toJava(
+ cluster.getLeaderGateway(deadline.timeLeft())
+ .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), deadline.timeLeft())
+ .mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
+
+ cluster.submitJobDetached(jobGraph);
+
+ // expect for the job to be running
+ TestingJobManagerMessages.JobStatusIs jobStatus =
+ runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ assertEquals(JobStatus.RUNNING, jobStatus.state());
+
+ CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = client.getKvState(
+ jobId,
+ "wrong-hankuna", // this is the wrong name.
+ 0,
+ VoidNamespace.INSTANCE,
+ BasicTypeInfo.INT_TYPE_INFO,
+ VoidNamespaceTypeInfo.INSTANCE,
+ valueState);
+
+ try {
+ future.get();
+ fail(); // by now the job must have failed.
+ } catch (ExecutionException e) {
+ Assert.assertTrue(e.getCause() instanceof RuntimeException);
+ Assert.assertTrue(e.getCause().getMessage().contains(
+ "UnknownKvStateLocation: No KvStateLocation found for KvState instance with name 'wrong-hankuna'."));
+ } catch (Exception ignored) {
+ fail("Unexpected type of exception.");
+ }
+
+ } finally {
+ // Free cluster resources
+ if (jobId != null) {
+ CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+ .getLeaderGateway(deadline.timeLeft())
+ .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+ .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+ cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ /**
* Similar tests as {@link #testValueState()} but before submitting the
* job, we already issue one request which fails.
*/
@@ -572,14 +654,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
// Now query
int key = 0;
- CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+ CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvState(
+ deadline,
client,
jobId,
queryableState.getQueryableStateName(),
key,
BasicTypeInfo.INT_TYPE_INFO,
valueState,
- QUERY_RETRY_DELAY,
true,
executor);
@@ -723,14 +805,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
while (deadline.hasTimeLeft() && !success) {
- CompletableFuture<FoldingState<Tuple2<Integer, Long>, String>> future = getKvStateWithRetries(
+ CompletableFuture<FoldingState<Tuple2<Integer, Long>, String>> future = getKvState(
+ deadline,
client,
jobId,
"pumba",
key,
BasicTypeInfo.INT_TYPE_INFO,
foldingState,
- QUERY_RETRY_DELAY,
false,
executor);
@@ -814,14 +896,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
while (deadline.hasTimeLeft() && !success) {
- CompletableFuture<ReducingState<Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+ CompletableFuture<ReducingState<Tuple2<Integer, Long>>> future = getKvState(
+ deadline,
client,
jobId,
"jungle",
key,
BasicTypeInfo.INT_TYPE_INFO,
reducingState,
- QUERY_RETRY_DELAY,
false,
executor);
@@ -923,14 +1005,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
while (deadline.hasTimeLeft() && !success) {
- CompletableFuture<MapState<Integer, Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+ CompletableFuture<MapState<Integer, Tuple2<Integer, Long>>> future = getKvState(
+ deadline,
client,
jobId,
"timon-queryable",
key,
BasicTypeInfo.INT_TYPE_INFO,
mapStateDescriptor,
- QUERY_RETRY_DELAY,
false,
executor);
@@ -1028,14 +1110,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
while (deadline.hasTimeLeft() && !success) {
- CompletableFuture<ListState<Long>> future = getKvStateWithRetries(
+ final CompletableFuture<ListState<Long>> future = getKvState(
+ deadline,
client,
jobId,
"list-queryable",
key,
BasicTypeInfo.INT_TYPE_INFO,
listStateDescriptor,
- QUERY_RETRY_DELAY,
false,
executor);
@@ -1130,14 +1212,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
while (deadline.hasTimeLeft() && !success) {
- CompletableFuture<AggregatingState<Tuple2<Integer, Long>, String>> future = getKvStateWithRetries(
+ CompletableFuture<AggregatingState<Tuple2<Integer, Long>, String>> future = getKvState(
+ deadline,
client,
jobId,
"aggr-queryable",
key,
BasicTypeInfo.INT_TYPE_INFO,
aggrStateDescriptor,
- QUERY_RETRY_DELAY,
false,
executor);
@@ -1372,84 +1454,62 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
///// General Utility Methods //////
- private static <K, S extends State, V> CompletableFuture<S> getKvStateWithRetries(
+ private static <K, S extends State, V> CompletableFuture<S> getKvState(
+ final Deadline deadline,
final QueryableStateClient client,
final JobID jobId,
final String queryName,
final K key,
final TypeInformation<K> keyTypeInfo,
final StateDescriptor<S, V> stateDescriptor,
- final Time retryDelay,
final boolean failForUnknownKeyOrNamespace,
- final ScheduledExecutor executor) {
- return retryWithDelay(
- () -> client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor),
- NO_OF_RETRIES,
- retryDelay,
- executor,
- failForUnknownKeyOrNamespace);
- }
-
- private static <T> CompletableFuture<T> retryWithDelay(
- final Supplier<CompletableFuture<T>> operation,
- final int retries,
- final Time retryDelay,
- final ScheduledExecutor scheduledExecutor,
- final boolean failIfUnknownKeyOrNamespace) {
-
- final CompletableFuture<T> resultFuture = new CompletableFuture<>();
-
- retryWithDelay(
- resultFuture,
- operation,
- retries,
- retryDelay,
- scheduledExecutor,
- failIfUnknownKeyOrNamespace);
+ final ScheduledExecutor executor) throws InterruptedException {
+ final CompletableFuture<S> resultFuture = new CompletableFuture<>();
+ getKvStateIgnoringCertainExceptions(
+ deadline, resultFuture, client, jobId, queryName, key, keyTypeInfo,
+ stateDescriptor, failForUnknownKeyOrNamespace, executor);
return resultFuture;
}
- public static <T> void retryWithDelay(
- final CompletableFuture<T> resultFuture,
- final Supplier<CompletableFuture<T>> operation,
- final int retries,
- final Time retryDelay,
- final ScheduledExecutor scheduledExecutor,
- final boolean failIfUnknownKeyOrNamespace) {
+ private static <K, S extends State, V> void getKvStateIgnoringCertainExceptions(
+ final Deadline deadline,
+ final CompletableFuture<S> resultFuture,
+ final QueryableStateClient client,
+ final JobID jobId,
+ final String queryName,
+ final K key,
+ final TypeInformation<K> keyTypeInfo,
+ final StateDescriptor<S, V> stateDescriptor,
+ final boolean failForUnknownKeyOrNamespace,
+ final ScheduledExecutor executor) throws InterruptedException {
if (!resultFuture.isDone()) {
- final CompletableFuture<T> operationResultFuture = operation.get();
- operationResultFuture.whenCompleteAsync(
- (t, throwable) -> {
- if (throwable != null) {
- if (throwable.getCause() instanceof CancellationException) {
- resultFuture.completeExceptionally(new FutureUtils.RetryException("Operation future was cancelled.", throwable.getCause()));
- } else if (throwable.getCause() instanceof AssertionError ||
- (failIfUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException)) {
- resultFuture.completeExceptionally(throwable.getCause());
- } else {
- if (retries > 0) {
- final ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
- () -> retryWithDelay(resultFuture, operation, retries - 1, retryDelay, scheduledExecutor, failIfUnknownKeyOrNamespace),
- retryDelay.toMilliseconds(),
- TimeUnit.MILLISECONDS);
-
- resultFuture.whenComplete(
- (innerT, innerThrowable) -> scheduledFuture.cancel(false));
- } else {
- resultFuture.completeExceptionally(new FutureUtils.RetryException("Could not complete the operation. Number of retries " +
- "has been exhausted.", throwable));
- }
- }
- } else {
- resultFuture.complete(t);
+ Thread.sleep(100L);
+ CompletableFuture<S> expected = client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor);
+ expected.whenCompleteAsync((result, throwable) -> {
+ if (throwable != null) {
+ if (
+ throwable.getCause() instanceof CancellationException ||
+ throwable.getCause() instanceof AssertionError ||
+ (failForUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException)
+ ) {
+ resultFuture.completeExceptionally(throwable.getCause());
+ } else if (deadline.hasTimeLeft()) {
+ try {
+ getKvStateIgnoringCertainExceptions(
+ deadline, resultFuture, client, jobId, queryName, key, keyTypeInfo,
+ stateDescriptor, failForUnknownKeyOrNamespace, executor);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
- },
- scheduledExecutor);
+ }
+ } else {
+ resultFuture.complete(result);
+ }
+ }, executor);
- resultFuture.whenComplete(
- (t, throwable) -> operationResultFuture.cancel(false));
+ resultFuture.whenComplete((result, throwable) -> expected.cancel(false));
}
}
@@ -1468,14 +1528,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
while (deadline.hasTimeLeft() && !success) {
- CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+ CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvState(
+ deadline,
client,
jobId,
queryableStateName,
key,
BasicTypeInfo.INT_TYPE_INFO,
stateDescriptor,
- QUERY_RETRY_DELAY,
false,
executor);
[6/6] flink git commit: [FLINK-8057][QS] Change error message in
KvStateRegistry.registerKvState().
Posted by kk...@apache.org.
[FLINK-8057][QS] Change error message in KvStateRegistry.registerKvState().
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3753ae25
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3753ae25
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3753ae25
Branch: refs/heads/release-1.4
Commit: 3753ae2517fbc940c05ea54e3eb0a960fecdf879
Parents: 1a68d75
Author: kkloudas <kk...@gmail.com>
Authored: Fri Nov 17 09:26:10 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 11:21:18 2017 +0100
----------------------------------------------------------------------
.../flink/runtime/query/KvStateRegistry.java | 23 ++++++--------------
1 file changed, 7 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3753ae25/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index af19d81..ed1f92e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -45,7 +45,7 @@ public class KvStateRegistry {
new ConcurrentHashMap<>();
/** Registry listener to be notified on registration/unregistration. */
- private final AtomicReference<KvStateRegistryListener> listener = new AtomicReference<>();
+ private final AtomicReference<KvStateRegistryListener> listenerRef = new AtomicReference<>();
/**
* Registers a listener with the registry.
@@ -54,7 +54,7 @@ public class KvStateRegistry {
* @throws IllegalStateException If there is a registered listener
*/
public void registerListener(KvStateRegistryListener listener) {
- if (!this.listener.compareAndSet(null, listener)) {
+ if (!listenerRef.compareAndSet(null, listener)) {
throw new IllegalStateException("Listener already registered.");
}
}
@@ -63,20 +63,10 @@ public class KvStateRegistry {
* Unregisters the listener with the registry.
*/
public void unregisterListener() {
- listener.set(null);
+ listenerRef.set(null);
}
/**
- * Registers the KvState instance identified by the given 4-tuple of JobID,
- * JobVertexID, key group index, and registration name.
- *
- * @param kvStateId KvStateID to identify the KvState instance
- * @param kvState KvState instance to register
- * @throws IllegalStateException If there is a KvState instance registered
- * with the same ID.
- */
-
- /**
* Registers the KvState instance and returns the assigned ID.
*
* @param jobId JobId the KvState instance belongs to
@@ -96,7 +86,7 @@ public class KvStateRegistry {
KvStateID kvStateId = new KvStateID();
if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) {
- KvStateRegistryListener listener = this.listener.get();
+ final KvStateRegistryListener listener = listenerRef.get();
if (listener != null) {
listener.notifyKvStateRegistered(
jobId,
@@ -108,7 +98,8 @@ public class KvStateRegistry {
return kvStateId;
} else {
- throw new IllegalStateException(kvStateId + " is already registered.");
+ throw new IllegalStateException(
+ "State \"" + registrationName + " \"(id=" + kvStateId + ") appears registered although it should not.");
}
}
@@ -127,7 +118,7 @@ public class KvStateRegistry {
KvStateID kvStateId) {
if (registeredKvStates.remove(kvStateId) != null) {
- KvStateRegistryListener listener = this.listener.get();
+ final KvStateRegistryListener listener = listenerRef.get();
if (listener != null) {
listener.notifyKvStateUnregistered(
jobId,
[5/6] flink git commit: [FLINK-8059][QS] QS client throws
FlinkJobNotFoundException for queries with unknown jobIds.
Posted by kk...@apache.org.
[FLINK-8059][QS] QS client throws FlinkJobNotFoundException for queries with unknown jobIds.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a68d752
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a68d752
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a68d752
Branch: refs/heads/release-1.4
Commit: 1a68d7527932b12bd2cb392c7c7781023756bf0c
Parents: 12b0c58
Author: kkloudas <kk...@gmail.com>
Authored: Thu Nov 16 17:45:49 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 11:20:55 2017 +0100
----------------------------------------------------------------------
.../itcases/AbstractQueryableStateTestBase.java | 32 +++++++++++++++-----
.../flink/runtime/jobmanager/JobManager.scala | 4 +--
.../runtime/jobmanager/JobManagerTest.java | 5 +--
3 files changed, 29 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1a68d752/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index a789dbd..65e9bb5 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -276,10 +276,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
/**
* Tests that duplicate query registrations fail the job at the JobManager.
- *
- * <b>NOTE: </b> This test is only in the non-HA variant of the tests because
- * in the HA mode we use the actual JM code which does not recognize the
- * {@code NotifyWhenJobStatus} message.
*/
@Test
public void testDuplicateRegistrationFailsJob() throws Exception {
@@ -435,10 +431,10 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
/**
* Tests that the correct exception is thrown if the query
- * contains a wrong queryable state name.
+ * contains a wrong jobId or wrong queryable state name.
*/
@Test
- public void testWrongQueryableStateName() throws Exception {
+ public void testWrongJobIdAndWrongQueryableStateName() throws Exception {
// Config
final Deadline deadline = TEST_TIMEOUT.fromNow();
@@ -486,7 +482,27 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
assertEquals(JobStatus.RUNNING, jobStatus.state());
- CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = client.getKvState(
+ final JobID wrongJobId = new JobID();
+
+ CompletableFuture<ValueState<Tuple2<Integer, Long>>> unknownJobFuture = client.getKvState(
+ wrongJobId, // this is the wrong job id
+ "hankuna",
+ 0,
+ BasicTypeInfo.INT_TYPE_INFO,
+ valueState);
+
+ try {
+ unknownJobFuture.get();
+ fail(); // by now the job must have failed.
+ } catch (ExecutionException e) {
+ Assert.assertTrue(e.getCause() instanceof RuntimeException);
+ Assert.assertTrue(e.getCause().getMessage().contains(
+ "FlinkJobNotFoundException: Could not find Flink job (" + wrongJobId + ")"));
+ } catch (Exception ignored) {
+ fail("Unexpected type of exception.");
+ }
+
+ CompletableFuture<ValueState<Tuple2<Integer, Long>>> unknownQSName = client.getKvState(
jobId,
"wrong-hankuna", // this is the wrong name.
0,
@@ -494,7 +510,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
valueState);
try {
- future.get();
+ unknownQSName.get();
fail(); // by now the job must have failed.
} catch (ExecutionException e) {
Assert.assertTrue(e.getCause() instanceof RuntimeException);
http://git-wip-us.apache.org/repos/asf/flink/blob/1a68d752/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 4fb1196..f57637a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -70,7 +70,7 @@ import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState
import org.apache.flink.runtime.messages.accumulators._
import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint}
import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
-import org.apache.flink.runtime.messages.{Acknowledge, StackTrace}
+import org.apache.flink.runtime.messages.{Acknowledge, FlinkJobNotFoundException, StackTrace}
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
import org.apache.flink.runtime.metrics.util.MetricUtils
import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistryImpl, MetricRegistry => FlinkMetricRegistry}
@@ -1503,7 +1503,7 @@ class JobManager(
}
case None =>
- sender() ! Status.Failure(new IllegalStateException(s"Job ${msg.getJobId} not found"))
+ sender() ! Status.Failure(new FlinkJobNotFoundException(msg.getJobId))
}
// TaskManager KvState registration
http://git-wip-us.apache.org/repos/asf/flink/blob/1a68d752/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index a697aae..6a02d1f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -63,6 +63,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
import org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse;
@@ -672,7 +673,7 @@ public class JobManagerTest extends TestLogger {
try {
Await.result(lookupFuture, deadline.timeLeft());
fail("Did not throw expected Exception");
- } catch (IllegalStateException ignored) {
+ } catch (FlinkJobNotFoundException ignored) {
// Expected
}
@@ -735,7 +736,7 @@ public class JobManagerTest extends TestLogger {
try {
Await.result(lookupFuture, deadline.timeLeft());
fail("Did not throw expected Exception");
- } catch (IllegalStateException ignored) {
+ } catch (FlinkJobNotFoundException ignored) {
// Expected
}
[2/6] flink git commit: [FLINK-8062][QS] Make getKvState() with
namespace private.
Posted by kk...@apache.org.
[FLINK-8062][QS] Make getKvState() with namespace private.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/96b350ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/96b350ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/96b350ad
Branch: refs/heads/release-1.4
Commit: 96b350ad91a1f248d0a3c616d1e59638013892be
Parents: d0324e3
Author: kkloudas <kk...@gmail.com>
Authored: Wed Nov 15 15:32:42 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 11:18:47 2017 +0100
----------------------------------------------------------------------
.../flink/queryablestate/client/QueryableStateClient.java | 3 +--
.../itcases/AbstractQueryableStateTestBase.java | 7 +------
2 files changed, 2 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/96b350ad/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
index 304505a..03e02e1 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -186,8 +186,7 @@ public class QueryableStateClient {
* @param stateDescriptor The {@link StateDescriptor} of the state we want to query.
* @return Future holding the immutable {@link State} object containing the result.
*/
- @PublicEvolving
- public <K, N, S extends State, V> CompletableFuture<S> getKvState(
+ private <K, N, S extends State, V> CompletableFuture<S> getKvState(
final JobID jobId,
final String queryableStateName,
final K key,
http://git-wip-us.apache.org/repos/asf/flink/blob/96b350ad/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index c1cbb61..a789dbd 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -47,7 +47,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.queryablestate.client.VoidNamespace;
import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
-import org.apache.flink.queryablestate.client.VoidNamespaceTypeInfo;
import org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -491,9 +490,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
jobId,
"wrong-hankuna", // this is the wrong name.
0,
- VoidNamespace.INSTANCE,
BasicTypeInfo.INT_TYPE_INFO,
- VoidNamespaceTypeInfo.INSTANCE,
valueState);
try {
@@ -572,9 +569,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
jobId,
queryableState.getQueryableStateName(),
0,
- VoidNamespace.INSTANCE,
BasicTypeInfo.INT_TYPE_INFO,
- VoidNamespaceTypeInfo.INSTANCE,
valueState);
cluster.submitJobDetached(jobGraph);
@@ -1486,7 +1481,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
if (!resultFuture.isDone()) {
Thread.sleep(100L);
- CompletableFuture<S> expected = client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor);
+ CompletableFuture<S> expected = client.getKvState(jobId, queryName, key, keyTypeInfo, stateDescriptor);
expected.whenCompleteAsync((result, throwable) -> {
if (throwable != null) {
if (
[3/6] flink git commit: [FLINK-8065][QS] Improve error message when
client already shut down.
Posted by kk...@apache.org.
[FLINK-8065][QS] Improve error message when client already shut down.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6314e486
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6314e486
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6314e486
Branch: refs/heads/release-1.4
Commit: 6314e4861df3100e8edd666f00e062c128f6e09f
Parents: 96b350a
Author: kkloudas <kk...@gmail.com>
Authored: Wed Nov 15 15:38:36 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 11:19:08 2017 +0100
----------------------------------------------------------------------
.../main/java/org/apache/flink/queryablestate/network/Client.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6314e486/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
index 13d34fb..e21145b 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
@@ -133,7 +133,7 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
public CompletableFuture<RESP> sendRequest(final InetSocketAddress serverAddress, final REQ request) {
if (shutDown.get()) {
- return FutureUtils.getFailedFuture(new IllegalStateException("Shut down"));
+ return FutureUtils.getFailedFuture(new IllegalStateException(clientName + " is already shut down."));
}
EstablishedConnection connection = establishedConnections.get(serverAddress);
[4/6] flink git commit: [FLINK-8055][QS] Deduplicate logging messages
about QS start.
Posted by kk...@apache.org.
[FLINK-8055][QS] Deduplicate logging messages about QS start.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/12b0c58f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/12b0c58f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/12b0c58f
Branch: refs/heads/release-1.4
Commit: 12b0c58f6780376ac2da0f02c6e7eb8a24ab8a13
Parents: 6314e48
Author: kkloudas <kk...@gmail.com>
Authored: Thu Nov 16 17:02:16 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 11:19:35 2017 +0100
----------------------------------------------------------------------
.../network/AbstractServerBase.java | 20 ++++++++++----------
.../flink/queryablestate/network/Client.java | 20 ++++++++++++++------
.../server/KvStateServerImpl.java | 5 -----
.../HAAbstractQueryableStateTestBase.java | 2 +-
.../network/AbstractServerTest.java | 2 +-
.../network/KvStateServerHandlerTest.java | 2 +-
.../runtime/io/network/NetworkEnvironment.java | 2 --
7 files changed, 27 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/12b0c58f/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
index 07ca26d..82a05f2 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
@@ -60,7 +60,7 @@ import java.util.concurrent.TimeUnit;
@Internal
public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends MessageBody> {
- private static final Logger LOG = LoggerFactory.getLogger(AbstractServerBase.class);
+ protected final Logger log = LoggerFactory.getLogger(getClass());
/** AbstractServerBase config: low water mark. */
private static final int LOW_WATER_MARK = 8 * 1024;
@@ -180,16 +180,16 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
*/
public void start() throws Throwable {
Preconditions.checkState(serverAddress == null,
- "The " + serverName + " already running @ " + serverAddress + '.');
+ serverName + " is already running @ " + serverAddress + '.');
Iterator<Integer> portIterator = bindPortRange.iterator();
while (portIterator.hasNext() && !attemptToBind(portIterator.next())) {}
if (serverAddress != null) {
- LOG.info("Started the {} @ {}.", serverName, serverAddress);
+ log.info("Started {} @ {}.", serverName, serverAddress);
} else {
- LOG.info("Unable to start the {}. All ports in provided range are occupied.", serverName);
- throw new FlinkRuntimeException("Unable to start the " + serverName + ". All ports in provided range are occupied.");
+ log.info("Unable to start {}. All ports in provided range are occupied.", serverName);
+ throw new FlinkRuntimeException("Unable to start " + serverName + ". All ports in provided range are occupied.");
}
}
@@ -203,7 +203,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
* @throws Exception If something goes wrong during the bind operation.
*/
private boolean attemptToBind(final int port) throws Throwable {
- LOG.debug("Attempting to start server {} on port {}.", serverName, port);
+ log.debug("Attempting to start {} on port {}.", serverName, port);
this.queryExecutor = createQueryExecutor();
this.handler = initializeHandler();
@@ -250,7 +250,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
throw future.cause();
} catch (BindException e) {
- LOG.debug("Failed to start server {} on port {}: {}.", serverName, port, e.getMessage());
+ log.debug("Failed to start {} on port {}: {}.", serverName, port, e.getMessage());
shutdown();
}
// any other type of exception we let it bubble up.
@@ -261,7 +261,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
* Shuts down the server and all related thread pools.
*/
public void shutdown() {
- LOG.info("Shutting down server {} @ {}", serverName, serverAddress);
+ log.info("Shutting down {} @ {}", serverName, serverAddress);
if (handler != null) {
handler.shutdown();
@@ -311,7 +311,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
}
@VisibleForTesting
- public boolean isExecutorShutdown() {
- return queryExecutor.isShutdown();
+ public boolean isEventGroupShutdown() {
+ return bootstrap == null || bootstrap.group().isTerminated();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b0c58f/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
index e21145b..12286fa 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
@@ -19,6 +19,7 @@
package org.apache.flink.queryablestate.network;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.queryablestate.FutureUtils;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
@@ -282,12 +283,14 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
while (!queuedRequests.isEmpty()) {
final PendingRequest pending = queuedRequests.poll();
- established.sendRequest(pending.request)
- .thenAccept(resp -> pending.complete(resp))
- .exceptionally(throwable -> {
- pending.completeExceptionally(throwable);
- return null;
- });
+ established.sendRequest(pending.request).whenComplete(
+ (response, throwable) -> {
+ if (throwable != null) {
+ pending.completeExceptionally(throwable);
+ } else {
+ pending.complete(response);
+ }
+ });
}
// Publish the channel for the general public
@@ -533,4 +536,9 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
}
}
}
+
+ @VisibleForTesting
+ public boolean isEventGroupShutdown() {
+ return bootstrap == null || bootstrap.group().isTerminated();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b0c58f/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
index fe07687..3a37a3a 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
@@ -29,9 +29,6 @@ import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServer;
import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Iterator;
@@ -42,8 +39,6 @@ import java.util.Iterator;
@Internal
public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest, KvStateResponse> implements KvStateServer {
- private static final Logger LOG = LoggerFactory.getLogger(KvStateServerImpl.class);
-
/** The {@link KvStateRegistry} to query for state instances. */
private final KvStateRegistry kvStateRegistry;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b0c58f/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
index fc4b2bc..79809b3 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
@@ -65,7 +65,7 @@ public abstract class HAAbstractQueryableStateTestBase extends AbstractQueryable
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
cluster = new TestingCluster(config, false);
- cluster.start();
+ cluster.start(true);
client = new QueryableStateClient("localhost", proxyPortRangeStart);
http://git-wip-us.apache.org/repos/asf/flink/blob/12b0c58f/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
index 2775cd4..3d2ed40 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
@@ -58,7 +58,7 @@ public class AbstractServerTest {
// the expected exception along with the adequate message
expectedEx.expect(FlinkRuntimeException.class);
- expectedEx.expectMessage("Unable to start the Test Server 2. All ports in provided range are occupied.");
+ expectedEx.expectMessage("Unable to start Test Server 2. All ports in provided range are occupied.");
TestServer server1 = null;
TestServer server2 = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b0c58f/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index 041544d..7b301ed 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -391,7 +391,7 @@ public class KvStateServerHandlerTest extends TestLogger {
localTestServer.start();
localTestServer.shutdown();
- assertTrue(localTestServer.isExecutorShutdown());
+ assertTrue(localTestServer.getQueryExecutor().isTerminated());
MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
http://git-wip-us.apache.org/repos/asf/flink/blob/12b0c58f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 4fffacd..71d0386 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -310,7 +310,6 @@ public class NetworkEnvironment {
if (kvStateServer != null) {
try {
kvStateServer.start();
- LOG.info("Started the Queryable State Data Server @ {}", kvStateServer.getServerAddress());
} catch (Throwable ie) {
kvStateServer.shutdown();
kvStateServer = null;
@@ -321,7 +320,6 @@ public class NetworkEnvironment {
if (kvStateProxy != null) {
try {
kvStateProxy.start();
- LOG.info("Started the Queryable State Client Proxy @ {}", kvStateProxy.getServerAddress());
} catch (Throwable ie) {
kvStateProxy.shutdown();
kvStateProxy = null;