You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/11/17 10:06:48 UTC
[1/5] flink git commit: [FLINK-8062][QS] Make getKvState() with
namespace private.
Repository: flink
Updated Branches:
refs/heads/master a0838de79 -> a4d869759
[FLINK-8062][QS] Make getKvState() with namespace private.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff7e3cf6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff7e3cf6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff7e3cf6
Branch: refs/heads/master
Commit: ff7e3cf6749a6b6bc898fde871c36661c8629c23
Parents: a0838de
Author: kkloudas <kk...@gmail.com>
Authored: Wed Nov 15 15:32:42 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 10:46:08 2017 +0100
----------------------------------------------------------------------
.../flink/queryablestate/client/QueryableStateClient.java | 3 +--
.../itcases/AbstractQueryableStateTestBase.java | 7 +------
2 files changed, 2 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ff7e3cf6/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
index 304505a..03e02e1 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -186,8 +186,7 @@ public class QueryableStateClient {
* @param stateDescriptor The {@link StateDescriptor} of the state we want to query.
* @return Future holding the immutable {@link State} object containing the result.
*/
- @PublicEvolving
- public <K, N, S extends State, V> CompletableFuture<S> getKvState(
+ private <K, N, S extends State, V> CompletableFuture<S> getKvState(
final JobID jobId,
final String queryableStateName,
final K key,
http://git-wip-us.apache.org/repos/asf/flink/blob/ff7e3cf6/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index c1cbb61..a789dbd 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -47,7 +47,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.queryablestate.client.VoidNamespace;
import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
-import org.apache.flink.queryablestate.client.VoidNamespaceTypeInfo;
import org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -491,9 +490,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
jobId,
"wrong-hankuna", // this is the wrong name.
0,
- VoidNamespace.INSTANCE,
BasicTypeInfo.INT_TYPE_INFO,
- VoidNamespaceTypeInfo.INSTANCE,
valueState);
try {
@@ -572,9 +569,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
jobId,
queryableState.getQueryableStateName(),
0,
- VoidNamespace.INSTANCE,
BasicTypeInfo.INT_TYPE_INFO,
- VoidNamespaceTypeInfo.INSTANCE,
valueState);
cluster.submitJobDetached(jobGraph);
@@ -1486,7 +1481,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
if (!resultFuture.isDone()) {
Thread.sleep(100L);
- CompletableFuture<S> expected = client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor);
+ CompletableFuture<S> expected = client.getKvState(jobId, queryName, key, keyTypeInfo, stateDescriptor);
expected.whenCompleteAsync((result, throwable) -> {
if (throwable != null) {
if (
[2/5] flink git commit: [FLINK-8055][QS] Deduplicate logging messages
about QS start.
Posted by kk...@apache.org.
[FLINK-8055][QS] Deduplicate logging messages about QS start.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5e059e96
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5e059e96
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5e059e96
Branch: refs/heads/master
Commit: 5e059e968633c4292734ebed209fa1b3c30529a1
Parents: 75c1454
Author: kkloudas <kk...@gmail.com>
Authored: Thu Nov 16 17:02:16 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 10:46:09 2017 +0100
----------------------------------------------------------------------
.../network/AbstractServerBase.java | 20 ++++++++++----------
.../flink/queryablestate/network/Client.java | 20 ++++++++++++++------
.../server/KvStateServerImpl.java | 5 -----
.../HAAbstractQueryableStateTestBase.java | 2 +-
.../network/AbstractServerTest.java | 2 +-
.../network/KvStateServerHandlerTest.java | 2 +-
.../runtime/io/network/NetworkEnvironment.java | 2 --
7 files changed, 27 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
index 07ca26d..82a05f2 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
@@ -60,7 +60,7 @@ import java.util.concurrent.TimeUnit;
@Internal
public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends MessageBody> {
- private static final Logger LOG = LoggerFactory.getLogger(AbstractServerBase.class);
+ protected final Logger log = LoggerFactory.getLogger(getClass());
/** AbstractServerBase config: low water mark. */
private static final int LOW_WATER_MARK = 8 * 1024;
@@ -180,16 +180,16 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
*/
public void start() throws Throwable {
Preconditions.checkState(serverAddress == null,
- "The " + serverName + " already running @ " + serverAddress + '.');
+ serverName + " is already running @ " + serverAddress + '.');
Iterator<Integer> portIterator = bindPortRange.iterator();
while (portIterator.hasNext() && !attemptToBind(portIterator.next())) {}
if (serverAddress != null) {
- LOG.info("Started the {} @ {}.", serverName, serverAddress);
+ log.info("Started {} @ {}.", serverName, serverAddress);
} else {
- LOG.info("Unable to start the {}. All ports in provided range are occupied.", serverName);
- throw new FlinkRuntimeException("Unable to start the " + serverName + ". All ports in provided range are occupied.");
+ log.info("Unable to start {}. All ports in provided range are occupied.", serverName);
+ throw new FlinkRuntimeException("Unable to start " + serverName + ". All ports in provided range are occupied.");
}
}
@@ -203,7 +203,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
* @throws Exception If something goes wrong during the bind operation.
*/
private boolean attemptToBind(final int port) throws Throwable {
- LOG.debug("Attempting to start server {} on port {}.", serverName, port);
+ log.debug("Attempting to start {} on port {}.", serverName, port);
this.queryExecutor = createQueryExecutor();
this.handler = initializeHandler();
@@ -250,7 +250,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
throw future.cause();
} catch (BindException e) {
- LOG.debug("Failed to start server {} on port {}: {}.", serverName, port, e.getMessage());
+ log.debug("Failed to start {} on port {}: {}.", serverName, port, e.getMessage());
shutdown();
}
// any other type of exception we let it bubble up.
@@ -261,7 +261,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
* Shuts down the server and all related thread pools.
*/
public void shutdown() {
- LOG.info("Shutting down server {} @ {}", serverName, serverAddress);
+ log.info("Shutting down {} @ {}", serverName, serverAddress);
if (handler != null) {
handler.shutdown();
@@ -311,7 +311,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
}
@VisibleForTesting
- public boolean isExecutorShutdown() {
- return queryExecutor.isShutdown();
+ public boolean isEventGroupShutdown() {
+ return bootstrap == null || bootstrap.group().isTerminated();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
index e21145b..12286fa 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
@@ -19,6 +19,7 @@
package org.apache.flink.queryablestate.network;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.queryablestate.FutureUtils;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
@@ -282,12 +283,14 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
while (!queuedRequests.isEmpty()) {
final PendingRequest pending = queuedRequests.poll();
- established.sendRequest(pending.request)
- .thenAccept(resp -> pending.complete(resp))
- .exceptionally(throwable -> {
- pending.completeExceptionally(throwable);
- return null;
- });
+ established.sendRequest(pending.request).whenComplete(
+ (response, throwable) -> {
+ if (throwable != null) {
+ pending.completeExceptionally(throwable);
+ } else {
+ pending.complete(response);
+ }
+ });
}
// Publish the channel for the general public
@@ -533,4 +536,9 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
}
}
}
+
+ @VisibleForTesting
+ public boolean isEventGroupShutdown() {
+ return bootstrap == null || bootstrap.group().isTerminated();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
index fe07687..3a37a3a 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
@@ -29,9 +29,6 @@ import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServer;
import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Iterator;
@@ -42,8 +39,6 @@ import java.util.Iterator;
@Internal
public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest, KvStateResponse> implements KvStateServer {
- private static final Logger LOG = LoggerFactory.getLogger(KvStateServerImpl.class);
-
/** The {@link KvStateRegistry} to query for state instances. */
private final KvStateRegistry kvStateRegistry;
http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
index fc4b2bc..79809b3 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
@@ -65,7 +65,7 @@ public abstract class HAAbstractQueryableStateTestBase extends AbstractQueryable
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
cluster = new TestingCluster(config, false);
- cluster.start();
+ cluster.start(true);
client = new QueryableStateClient("localhost", proxyPortRangeStart);
http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
index 2775cd4..3d2ed40 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
@@ -58,7 +58,7 @@ public class AbstractServerTest {
// the expected exception along with the adequate message
expectedEx.expect(FlinkRuntimeException.class);
- expectedEx.expectMessage("Unable to start the Test Server 2. All ports in provided range are occupied.");
+ expectedEx.expectMessage("Unable to start Test Server 2. All ports in provided range are occupied.");
TestServer server1 = null;
TestServer server2 = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index 041544d..7b301ed 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -391,7 +391,7 @@ public class KvStateServerHandlerTest extends TestLogger {
localTestServer.start();
localTestServer.shutdown();
- assertTrue(localTestServer.isExecutorShutdown());
+ assertTrue(localTestServer.getQueryExecutor().isTerminated());
MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 4fffacd..71d0386 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -310,7 +310,6 @@ public class NetworkEnvironment {
if (kvStateServer != null) {
try {
kvStateServer.start();
- LOG.info("Started the Queryable State Data Server @ {}", kvStateServer.getServerAddress());
} catch (Throwable ie) {
kvStateServer.shutdown();
kvStateServer = null;
@@ -321,7 +320,6 @@ public class NetworkEnvironment {
if (kvStateProxy != null) {
try {
kvStateProxy.start();
- LOG.info("Started the Queryable State Client Proxy @ {}", kvStateProxy.getServerAddress());
} catch (Throwable ie) {
kvStateProxy.shutdown();
kvStateProxy = null;
[5/5] flink git commit: [FLINK-8057][QS] Change error message in
KvStateRegistry.registerKvState()
Posted by kk...@apache.org.
[FLINK-8057][QS] Change error message in KvStateRegistry.registerKvState()
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4d86975
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4d86975
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4d86975
Branch: refs/heads/master
Commit: a4d86975967942054d1bd466641e9c835fb014ac
Parents: 2fe078f
Author: kkloudas <kk...@gmail.com>
Authored: Fri Nov 17 09:26:10 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 10:46:11 2017 +0100
----------------------------------------------------------------------
.../flink/runtime/query/KvStateRegistry.java | 23 ++++++--------------
1 file changed, 7 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a4d86975/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index af19d81..ed1f92e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -45,7 +45,7 @@ public class KvStateRegistry {
new ConcurrentHashMap<>();
/** Registry listener to be notified on registration/unregistration. */
- private final AtomicReference<KvStateRegistryListener> listener = new AtomicReference<>();
+ private final AtomicReference<KvStateRegistryListener> listenerRef = new AtomicReference<>();
/**
* Registers a listener with the registry.
@@ -54,7 +54,7 @@ public class KvStateRegistry {
* @throws IllegalStateException If there is a registered listener
*/
public void registerListener(KvStateRegistryListener listener) {
- if (!this.listener.compareAndSet(null, listener)) {
+ if (!listenerRef.compareAndSet(null, listener)) {
throw new IllegalStateException("Listener already registered.");
}
}
@@ -63,20 +63,10 @@ public class KvStateRegistry {
* Unregisters the listener with the registry.
*/
public void unregisterListener() {
- listener.set(null);
+ listenerRef.set(null);
}
/**
- * Registers the KvState instance identified by the given 4-tuple of JobID,
- * JobVertexID, key group index, and registration name.
- *
- * @param kvStateId KvStateID to identify the KvState instance
- * @param kvState KvState instance to register
- * @throws IllegalStateException If there is a KvState instance registered
- * with the same ID.
- */
-
- /**
* Registers the KvState instance and returns the assigned ID.
*
* @param jobId JobId the KvState instance belongs to
@@ -96,7 +86,7 @@ public class KvStateRegistry {
KvStateID kvStateId = new KvStateID();
if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) {
- KvStateRegistryListener listener = this.listener.get();
+ final KvStateRegistryListener listener = listenerRef.get();
if (listener != null) {
listener.notifyKvStateRegistered(
jobId,
@@ -108,7 +98,8 @@ public class KvStateRegistry {
return kvStateId;
} else {
- throw new IllegalStateException(kvStateId + " is already registered.");
+ throw new IllegalStateException(
+ "State \"" + registrationName + " \"(id=" + kvStateId + ") appears registered although it should not.");
}
}
@@ -127,7 +118,7 @@ public class KvStateRegistry {
KvStateID kvStateId) {
if (registeredKvStates.remove(kvStateId) != null) {
- KvStateRegistryListener listener = this.listener.get();
+ final KvStateRegistryListener listener = listenerRef.get();
if (listener != null) {
listener.notifyKvStateUnregistered(
jobId,
[4/5] flink git commit: [FLINK-8059][QS] QS client throws
FlinkJobNotFoundException for queries with unknown jobIds
Posted by kk...@apache.org.
[FLINK-8059][QS] QS client throws FlinkJobNotFoundException for queries with unknown jobIds
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2fe078f3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2fe078f3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2fe078f3
Branch: refs/heads/master
Commit: 2fe078f3927595cbc3c5de6635a710494e0f34b4
Parents: 5e059e9
Author: kkloudas <kk...@gmail.com>
Authored: Thu Nov 16 17:45:49 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 10:46:10 2017 +0100
----------------------------------------------------------------------
.../itcases/AbstractQueryableStateTestBase.java | 32 +++++++++++++++-----
.../flink/runtime/jobmanager/JobManager.scala | 4 +--
.../runtime/jobmanager/JobManagerTest.java | 5 +--
3 files changed, 29 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2fe078f3/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index a789dbd..65e9bb5 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -276,10 +276,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
/**
* Tests that duplicate query registrations fail the job at the JobManager.
- *
- * <b>NOTE: </b> This test is only in the non-HA variant of the tests because
- * in the HA mode we use the actual JM code which does not recognize the
- * {@code NotifyWhenJobStatus} message.
*/
@Test
public void testDuplicateRegistrationFailsJob() throws Exception {
@@ -435,10 +431,10 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
/**
* Tests that the correct exception is thrown if the query
- * contains a wrong queryable state name.
+ * contains a wrong jobId or wrong queryable state name.
*/
@Test
- public void testWrongQueryableStateName() throws Exception {
+ public void testWrongJobIdAndWrongQueryableStateName() throws Exception {
// Config
final Deadline deadline = TEST_TIMEOUT.fromNow();
@@ -486,7 +482,27 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
assertEquals(JobStatus.RUNNING, jobStatus.state());
- CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = client.getKvState(
+ final JobID wrongJobId = new JobID();
+
+ CompletableFuture<ValueState<Tuple2<Integer, Long>>> unknownJobFuture = client.getKvState(
+ wrongJobId, // this is the wrong job id
+ "hankuna",
+ 0,
+ BasicTypeInfo.INT_TYPE_INFO,
+ valueState);
+
+ try {
+ unknownJobFuture.get();
+ fail(); // by now the job must have failed.
+ } catch (ExecutionException e) {
+ Assert.assertTrue(e.getCause() instanceof RuntimeException);
+ Assert.assertTrue(e.getCause().getMessage().contains(
+ "FlinkJobNotFoundException: Could not find Flink job (" + wrongJobId + ")"));
+ } catch (Exception ignored) {
+ fail("Unexpected type of exception.");
+ }
+
+ CompletableFuture<ValueState<Tuple2<Integer, Long>>> unknownQSName = client.getKvState(
jobId,
"wrong-hankuna", // this is the wrong name.
0,
@@ -494,7 +510,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
valueState);
try {
- future.get();
+ unknownQSName.get();
fail(); // by now the job must have failed.
} catch (ExecutionException e) {
Assert.assertTrue(e.getCause() instanceof RuntimeException);
http://git-wip-us.apache.org/repos/asf/flink/blob/2fe078f3/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 95a3fd5..c12db23 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -70,7 +70,7 @@ import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState
import org.apache.flink.runtime.messages.accumulators._
import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint}
import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
-import org.apache.flink.runtime.messages.{Acknowledge, StackTrace}
+import org.apache.flink.runtime.messages.{Acknowledge, FlinkJobNotFoundException, StackTrace}
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
import org.apache.flink.runtime.metrics.util.MetricUtils
import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistryImpl, MetricRegistry => FlinkMetricRegistry}
@@ -1503,7 +1503,7 @@ class JobManager(
}
case None =>
- sender() ! Status.Failure(new IllegalStateException(s"Job ${msg.getJobId} not found"))
+ sender() ! Status.Failure(new FlinkJobNotFoundException(msg.getJobId))
}
// TaskManager KvState registration
http://git-wip-us.apache.org/repos/asf/flink/blob/2fe078f3/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index a697aae..6a02d1f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -63,6 +63,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
import org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse;
@@ -672,7 +673,7 @@ public class JobManagerTest extends TestLogger {
try {
Await.result(lookupFuture, deadline.timeLeft());
fail("Did not throw expected Exception");
- } catch (IllegalStateException ignored) {
+ } catch (FlinkJobNotFoundException ignored) {
// Expected
}
@@ -735,7 +736,7 @@ public class JobManagerTest extends TestLogger {
try {
Await.result(lookupFuture, deadline.timeLeft());
fail("Did not throw expected Exception");
- } catch (IllegalStateException ignored) {
+ } catch (FlinkJobNotFoundException ignored) {
// Expected
}
[3/5] flink git commit: [FLINK-8065][QS] Improve error message when
client already shut down.
Posted by kk...@apache.org.
[FLINK-8065][QS] Improve error message when client already shut down.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/75c14541
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/75c14541
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/75c14541
Branch: refs/heads/master
Commit: 75c14541fdc52d5446b179e8e660b8a4fd90310c
Parents: ff7e3cf
Author: kkloudas <kk...@gmail.com>
Authored: Wed Nov 15 15:38:36 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 10:46:09 2017 +0100
----------------------------------------------------------------------
.../main/java/org/apache/flink/queryablestate/network/Client.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/75c14541/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
index 13d34fb..e21145b 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
@@ -133,7 +133,7 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
public CompletableFuture<RESP> sendRequest(final InetSocketAddress serverAddress, final REQ request) {
if (shutDown.get()) {
- return FutureUtils.getFailedFuture(new IllegalStateException("Shut down"));
+ return FutureUtils.getFailedFuture(new IllegalStateException(clientName + " is already shut down."));
}
EstablishedConnection connection = establishedConnections.get(serverAddress);