You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/12/06 15:11:48 UTC
[1/3] flink git commit: [FLINK-7975][QS] Wait for QS client to
shutdown.
Repository: flink
Updated Branches:
refs/heads/master 5221a70e0 -> a3fd548e9
[FLINK-7975][QS] Wait for QS client to shutdown.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5760677b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5760677b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5760677b
Branch: refs/heads/master
Commit: 5760677b3bb26245ca4816548833da0257ec7c7a
Parents: 5221a70
Author: kkloudas <kk...@gmail.com>
Authored: Thu Nov 9 19:21:43 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Wed Dec 6 14:33:16 2017 +0100
----------------------------------------------------------------------
.../client/QueryableStateClient.java | 30 +++-
.../flink/queryablestate/network/Client.java | 171 +++++++++++++------
.../queryablestate/network/ClientTest.java | 88 ++++++++--
.../query/AbstractQueryableStateOperator.java | 2 +
4 files changed, 215 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5760677b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
index 7abf6bc..f1c69ed 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -108,9 +108,33 @@ public class QueryableStateClient {
new DisabledKvStateRequestStats());
}
- /** Shuts down the client. */
- public void shutdown() {
- client.shutdown();
+ /**
+ * Shuts down the client and returns a {@link CompletableFuture} that
+ * will be completed when the shutdown process is completed.
+ *
+ * <p>If an exception is thrown for any reason, then the returned future
+ * will be completed exceptionally with that exception.
+ *
+ * @return A {@link CompletableFuture} for further handling of the
+ * shutdown result.
+ */
+ public CompletableFuture<?> shutdownAndHandle() {
+ return client.shutdown();
+ }
+
+ /**
+ * Shuts down the client and waits until shutdown is completed.
+ *
+ * <p>If an exception is thrown, a warning is logged containing
+ * the exception message.
+ */
+ public void shutdownAndWait() {
+ try {
+ client.shutdown().get();
+ LOG.info("The Queryable State Client was shutdown successfully.");
+ } catch (Exception e) {
+ LOG.warn("The Queryable State Client shutdown failed: ", e);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/5760677b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
index 12286fa..364f835 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
@@ -42,15 +42,19 @@ import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChann
import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -64,6 +68,8 @@ import java.util.concurrent.atomic.AtomicReference;
@Internal
public class Client<REQ extends MessageBody, RESP extends MessageBody> {
+ private static final Logger LOG = LoggerFactory.getLogger(Client.class);
+
/** The name of the client. Used for logging and stack traces.*/
private final String clientName;
@@ -82,8 +88,8 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
/** Pending connections. */
private final Map<InetSocketAddress, PendingConnection> pendingConnections = new ConcurrentHashMap<>();
- /** Atomic shut down flag. */
- private final AtomicBoolean shutDown = new AtomicBoolean();
+ /** Atomic shut down future. */
+ private final AtomicReference<CompletableFuture<Void>> clientShutdownFuture = new AtomicReference<>(null);
/**
* Creates a client with the specified number of event loop threads.
@@ -133,7 +139,7 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
}
public CompletableFuture<RESP> sendRequest(final InetSocketAddress serverAddress, final REQ request) {
- if (shutDown.get()) {
+ if (clientShutdownFuture.get() != null) {
return FutureUtils.getFailedFuture(new IllegalStateException(clientName + " is already shut down."));
}
@@ -166,28 +172,57 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
* Shuts down the client and closes all connections.
*
* <p>After a call to this method, all returned futures will be failed.
+ *
+ * @return A {@link CompletableFuture} that will be completed when the shutdown process is done.
*/
- public void shutdown() {
- if (shutDown.compareAndSet(false, true)) {
+ public CompletableFuture<Void> shutdown() {
+ final CompletableFuture<Void> newShutdownFuture = new CompletableFuture<>();
+ if (clientShutdownFuture.compareAndSet(null, newShutdownFuture)) {
+
+ final List<CompletableFuture<Void>> connectionFutures = new ArrayList<>();
+
for (Map.Entry<InetSocketAddress, EstablishedConnection> conn : establishedConnections.entrySet()) {
if (establishedConnections.remove(conn.getKey(), conn.getValue())) {
- conn.getValue().close();
+ connectionFutures.add(conn.getValue().close());
}
}
for (Map.Entry<InetSocketAddress, PendingConnection> conn : pendingConnections.entrySet()) {
if (pendingConnections.remove(conn.getKey()) != null) {
- conn.getValue().close();
+ connectionFutures.add(conn.getValue().close());
}
}
- if (bootstrap != null) {
- EventLoopGroup group = bootstrap.group();
- if (group != null) {
- group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS);
+ CompletableFuture.allOf(
+ connectionFutures.toArray(new CompletableFuture<?>[connectionFutures.size()])
+ ).whenComplete((result, throwable) -> {
+
+ if (throwable != null) {
+ LOG.warn("Problem while shutting down the connections at the {}: {}", clientName, throwable);
}
- }
+
+ if (bootstrap != null) {
+ EventLoopGroup group = bootstrap.group();
+ if (group != null && !group.isShutdown()) {
+ group.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS)
+ .addListener(finished -> {
+ if (finished.isSuccess()) {
+ newShutdownFuture.complete(null);
+ } else {
+ newShutdownFuture.completeExceptionally(finished.cause());
+ }
+ });
+ } else {
+ newShutdownFuture.complete(null);
+ }
+ } else {
+ newShutdownFuture.complete(null);
+ }
+ });
+
+ return newShutdownFuture;
}
+ return clientShutdownFuture.get();
}
/**
@@ -209,8 +244,8 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
/** The established connection after the connect succeeds. */
private EstablishedConnection established;
- /** Closed flag. */
- private boolean closed;
+ /** Atomic shut down future. */
+ private final AtomicReference<CompletableFuture<Void>> connectionShutdownFuture = new AtomicReference<>(null);
/** Failure cause if something goes wrong. */
private Throwable failureCause;
@@ -250,7 +285,7 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
synchronized (connectLock) {
if (failureCause != null) {
return FutureUtils.getFailedFuture(failureCause);
- } else if (closed) {
+ } else if (connectionShutdownFuture.get() != null) {
return FutureUtils.getFailedFuture(new ClosedChannelException());
} else {
if (established != null) {
@@ -272,7 +307,7 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
*/
private void handInChannel(Channel channel) {
synchronized (connectLock) {
- if (closed || failureCause != null) {
+ if (connectionShutdownFuture.get() != null || failureCause != null) {
// Close the channel and we are done. Any queued requests
// are removed on the close/failure call and after that no
// new ones can be enqueued.
@@ -300,7 +335,7 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
// Check shut down for possible race with shut down. We
// don't want any lingering connections after shut down,
// which can happen if we don't check this here.
- if (shutDown.get()) {
+ if (clientShutdownFuture.get() != null) {
if (establishedConnections.remove(serverAddress, established)) {
established.close();
}
@@ -312,32 +347,40 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
/**
* Close the connecting channel with a ClosedChannelException.
*/
- private void close() {
- close(new ClosedChannelException());
+ private CompletableFuture<Void> close() {
+ return close(new ClosedChannelException());
}
/**
* Close the connecting channel with an Exception (can be {@code null})
* or forward to the established channel.
*/
- private void close(Throwable cause) {
- synchronized (connectLock) {
- if (!closed) {
+ private CompletableFuture<Void> close(Throwable cause) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ if (connectionShutdownFuture.compareAndSet(null, future)) {
+ synchronized (connectLock) {
if (failureCause == null) {
failureCause = cause;
}
if (established != null) {
- established.close();
+ established.close().whenComplete((result, throwable) -> {
+ if (throwable != null) {
+ future.completeExceptionally(throwable);
+ } else {
+ future.complete(null);
+ }
+ });
} else {
PendingRequest pending;
while ((pending = queuedRequests.poll()) != null) {
pending.completeExceptionally(cause);
}
+ future.complete(null);
}
- closed = true;
}
}
+ return connectionShutdownFuture.get();
}
@Override
@@ -347,7 +390,7 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
"serverAddress=" + serverAddress +
", queuedRequests=" + queuedRequests.size() +
", established=" + (established != null) +
- ", closed=" + closed +
+ ", closed=" + (connectionShutdownFuture.get() != null) +
'}';
}
}
@@ -383,8 +426,8 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
/** Current request number used to assign unique request IDs. */
private final AtomicLong requestCount = new AtomicLong();
- /** Reference to a failure that was reported by the channel. */
- private final AtomicReference<Throwable> failureCause = new AtomicReference<>();
+ /** Atomic shut down future. */
+ private final AtomicReference<CompletableFuture<Void>> connectionShutdownFuture = new AtomicReference<>(null);
/**
* Creates an established connection with the given channel.
@@ -412,8 +455,8 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
/**
* Close the channel with a ClosedChannelException.
*/
- void close() {
- close(new ClosedChannelException());
+ CompletableFuture<Void> close() {
+ return close(new ClosedChannelException());
}
/**
@@ -422,20 +465,33 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
* @param cause The cause to close the channel with.
* @return Channel close future
*/
- private boolean close(Throwable cause) {
- if (failureCause.compareAndSet(null, cause)) {
- channel.close();
- stats.reportInactiveConnection();
+ private CompletableFuture<Void> close(final Throwable cause) {
+ final CompletableFuture<Void> shutdownFuture = new CompletableFuture<>();
- for (long requestId : pendingRequests.keySet()) {
- TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
- if (pending != null && pending.completeExceptionally(cause)) {
- stats.reportFailedRequest();
+ if (connectionShutdownFuture.compareAndSet(null, shutdownFuture)) {
+ channel.close().addListener(finished -> {
+ stats.reportInactiveConnection();
+ for (long requestId : pendingRequests.keySet()) {
+ TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
+ if (pending != null && pending.completeExceptionally(cause)) {
+ stats.reportFailedRequest();
+ }
}
- }
- return true;
+
+ // when finishing, if netty successfully closes the channel, then the provided exception is used
+ // as the reason for the closing. If there was something wrong at the netty side, then that exception
+ // is prioritized over the provided one.
+ if (finished.isSuccess()) {
+ shutdownFuture.completeExceptionally(cause);
+ } else {
+ LOG.warn("Something went wrong when trying to close connection due to : ", cause);
+ shutdownFuture.completeExceptionally(finished.cause());
+ }
+ });
}
- return false;
+
+ // in case we had a race condition, return the winner of the race.
+ return connectionShutdownFuture.get();
}
/**
@@ -464,16 +520,22 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
}
});
- // Check failure for possible race. We don't want any lingering
+ // Check for possible race. We don't want any lingering
// promises after a failure, which can happen if we don't check
// this here. Note that close is treated as a failure as well.
- Throwable failure = failureCause.get();
- if (failure != null) {
- // Remove from pending requests to guard against concurrent
- // removal and to make sure that we only count it once as failed.
+ CompletableFuture<Void> clShutdownFuture = clientShutdownFuture.get();
+ if (clShutdownFuture != null) {
TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
- if (pending != null && pending.completeExceptionally(failure)) {
- stats.reportFailedRequest();
+ if (pending != null) {
+ clShutdownFuture.whenComplete((ignored, throwable) -> {
+ if (throwable != null && pending.completeExceptionally(throwable)) {
+ stats.reportFailedRequest();
+ } else {
+ // the shutdown future is always completed exceptionally so we should not arrive here.
+ // but in any case, we complete the pending connection request exceptionally.
+ pending.completeExceptionally(new ClosedChannelException());
+ }
+ });
}
}
} catch (Throwable t) {
@@ -486,27 +548,25 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
@Override
public void onRequestResult(long requestId, RESP response) {
TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
- if (pending != null && pending.complete(response)) {
+ if (pending != null && !pending.isDone()) {
long durationMillis = (System.nanoTime() - pending.getTimestamp()) / 1_000_000L;
stats.reportSuccessfulRequest(durationMillis);
+ pending.complete(response);
}
}
@Override
public void onRequestFailure(long requestId, Throwable cause) {
TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
- if (pending != null && pending.completeExceptionally(cause)) {
+ if (pending != null && !pending.isDone()) {
stats.reportFailedRequest();
+ pending.completeExceptionally(cause);
}
}
@Override
public void onFailure(Throwable cause) {
- if (close(cause)) {
- // Remove from established channels, otherwise future
- // requests will be handled by this failed channel.
- establishedConnections.remove(serverAddress, this);
- }
+ close(cause).handle((cancelled, ignored) -> establishedConnections.remove(serverAddress, this));
}
@Override
@@ -516,7 +576,6 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
", channel=" + channel +
", pendingRequests=" + pendingRequests.size() +
", requestCount=" + requestCount +
- ", failureCause=" + failureCause +
'}';
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5760677b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
index 1fa4deb..8638efa 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
@@ -54,7 +55,9 @@ import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import org.junit.AfterClass;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -95,15 +98,20 @@ public class ClientTest {
private static final Logger LOG = LoggerFactory.getLogger(ClientTest.class);
+ private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(20L, TimeUnit.SECONDS);
+
// Thread pool for client bootstrap (shared between tests)
- private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup();
+ private NioEventLoopGroup nioGroup;
- private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
+ @Before
+ public void setUp() throws Exception {
+ nioGroup = new NioEventLoopGroup();
+ }
- @AfterClass
- public static void tearDown() throws Exception {
- if (NIO_GROUP != null) {
- NIO_GROUP.shutdownGracefully();
+ @After
+ public void tearDown() throws Exception {
+ if (nioGroup != null) {
+ nioGroup.shutdownGracefully();
}
}
@@ -218,7 +226,24 @@ public class ClientTest {
assertEquals(expectedRequests, stats.getNumFailed());
} finally {
if (client != null) {
- client.shutdown();
+ Exception exc = null;
+ try {
+
+ // todo here we were seeing this problem:
+ // https://github.com/netty/netty/issues/4357 if we do a get().
+ // this is why we now simply wait a bit so that everything is
+ // shut down and then we check
+
+ client.shutdown().get(10L, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ exc = e;
+ LOG.error("An exception occurred while shutting down netty.", e);
+ }
+
+ Assert.assertTrue(
+ ExceptionUtils.stringifyException(exc),
+ client.isEventGroupShutdown()
+ );
}
if (serverChannel != null) {
@@ -265,7 +290,12 @@ public class ClientTest {
}
} finally {
if (client != null) {
- client.shutdown();
+ try {
+ client.shutdown().get(10L, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ Assert.assertTrue(client.isEventGroupShutdown());
}
assertEquals("Channel leak", 0L, stats.getNumConnections());
@@ -366,7 +396,12 @@ public class ClientTest {
}
if (client != null) {
- client.shutdown();
+ try {
+ client.shutdown().get(10L, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ Assert.assertTrue(client.isEventGroupShutdown());
}
assertEquals("Channel leak", 0L, stats.getNumConnections());
@@ -467,7 +502,12 @@ public class ClientTest {
assertEquals(2L, stats.getNumFailed());
} finally {
if (client != null) {
- client.shutdown();
+ try {
+ client.shutdown().get(10L, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ Assert.assertTrue(client.isEventGroupShutdown());
}
if (serverChannel != null) {
@@ -548,7 +588,12 @@ public class ClientTest {
assertEquals(1L, stats.getNumFailed());
} finally {
if (client != null) {
- client.shutdown();
+ try {
+ client.shutdown().get(10L, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ Assert.assertTrue(client.isEventGroupShutdown());
}
if (serverChannel != null) {
@@ -661,7 +706,7 @@ public class ClientTest {
Collections.shuffle(random);
// Dispatch queries
- List<Future<KvStateResponse>> futures = new ArrayList<>(batchSize);
+ List<CompletableFuture<KvStateResponse>> futures = new ArrayList<>(batchSize);
for (int j = 0; j < batchSize; j++) {
int targetServer = random.get(j) % numServers;
@@ -700,8 +745,12 @@ public class ClientTest {
LOG.info("Number of requests {}/100_000", numRequests);
}
- // Shut down
- client.shutdown();
+ try {
+ client.shutdown().get(10L, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ Assert.assertTrue(client.isEventGroupShutdown());
for (Future<Void> future : taskFutures) {
try {
@@ -739,7 +788,12 @@ public class ClientTest {
}
} finally {
if (client != null) {
- client.shutdown();
+ try {
+ client.shutdown().get(10L, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ Assert.assertTrue(client.isEventGroupShutdown());
}
for (int i = 0; i < numServers; i++) {
@@ -761,7 +815,7 @@ public class ClientTest {
// Bind address and port
.localAddress(InetAddress.getLocalHost(), 0)
// NIO server channels
- .group(NIO_GROUP)
+ .group(nioGroup)
.channel(NioServerSocketChannel.class)
// See initializer for pipeline details
.childHandler(new ChannelInitializer<SocketChannel>() {
http://git-wip-us.apache.org/repos/asf/flink/blob/5760677b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java
index 7522a61..5ca9c1e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java
@@ -36,6 +36,8 @@ abstract class AbstractQueryableStateOperator<S extends State, IN>
extends AbstractStreamOperator<IN>
implements OneInputStreamOperator<IN, IN> {
+ private static final long serialVersionUID = 7842489558298787382L;
+
/** State descriptor for the queryable state instance. */
protected final StateDescriptor<? extends S, ?> stateDescriptor;
[3/3] flink git commit: [FLINK-7880][QS] Wait for proper resource
cleanup after each ITCase.
Posted by kk...@apache.org.
[FLINK-7880][QS] Wait for proper resource cleanup after each ITCase.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3fd548e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3fd548e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3fd548e
Branch: refs/heads/master
Commit: a3fd548e9c76c67609bbf159d5fb743d756450b1
Parents: 74d052b
Author: kkloudas <kk...@gmail.com>
Authored: Wed Dec 6 14:32:46 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Wed Dec 6 14:33:28 2017 +0100
----------------------------------------------------------------------
.../itcases/AbstractQueryableStateTestBase.java | 1073 ++++++++----------
.../HAAbstractQueryableStateTestBase.java | 22 +-
.../HAQueryableStateRocksDBBackendITCase.java | 2 -
.../NonHAAbstractQueryableStateTestBase.java | 11 +-
...NonHAQueryableStateRocksDBBackendITCase.java | 2 -
5 files changed, 476 insertions(+), 634 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a3fd548e/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index 65e9bb5..5a28367 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -73,6 +73,7 @@ import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
@@ -92,7 +93,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;
-import scala.concurrent.Await;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ClassTag$;
@@ -159,52 +159,40 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
@Test
@SuppressWarnings("unchecked")
public void testQueryableState() throws Exception {
- // Config
+
final Deadline deadline = TEST_TIMEOUT.fromNow();
final int numKeys = 256;
- JobID jobId = null;
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
- try {
- //
- // Test program
- //
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStateBackend(stateBackend);
- env.setParallelism(maxParallelism);
- // Very important, because cluster is shared between tests and we
- // don't explicitly check that all slots are available before
- // submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
- DataStream<Tuple2<Integer, Long>> source = env
- .addSource(new TestKeyRangeSource(numKeys));
-
- // Reducing state
- ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
- "any-name",
- new SumReduce(),
- source.getType());
-
- final String queryName = "hakuna-matata";
-
- source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long serialVersionUID = 7143749578983540352L;
-
- @Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).asQueryableState(queryName, reducingState);
+ DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestKeyRangeSource(numKeys));
- // Submit the job graph
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- cluster.submitJobDetached(jobGraph);
+ ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
+ "any-name", new SumReduce(), source.getType());
+
+ final String queryName = "hakuna-matata";
+
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 7143749578983540352L;
+
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) {
+ return value.f0;
+ }
+ }).asQueryableState(queryName, reducingState);
- //
- // Start querying
- //
- jobId = jobGraph.getJobID();
+ try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+ final JobID jobId = autoCancellableJob.getJobId();
+ final JobGraph jobGraph = autoCancellableJob.getJobGraph();
+
+ cluster.submitJobDetached(jobGraph);
final AtomicLongArray counts = new AtomicLongArray(numKeys);
@@ -261,16 +249,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
long count = counts.get(i);
assertTrue("Count at position " + i + " is " + count, count > 0);
}
- } finally {
- // Free cluster resources
- if (jobId != null) {
- CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
- .getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
- cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- }
}
}
@@ -282,91 +260,94 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
final Deadline deadline = TEST_TIMEOUT.fromNow();
final int numKeys = 256;
- JobID jobId = null;
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
- try {
- //
- // Test program
- //
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStateBackend(stateBackend);
- env.setParallelism(maxParallelism);
- // Very important, because cluster is shared between tests and we
- // don't explicitly check that all slots are available before
- // submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
- DataStream<Tuple2<Integer, Long>> source = env
- .addSource(new TestKeyRangeSource(numKeys));
-
- // Reducing state
- ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
- "any-name",
- new SumReduce(),
- source.getType());
-
- final String queryName = "duplicate-me";
-
- final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
- source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long serialVersionUID = -4126824763829132959L;
-
- @Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).asQueryableState(queryName, reducingState);
+ DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestKeyRangeSource(numKeys));
- final QueryableStateStream<Integer, Tuple2<Integer, Long>> duplicate =
- source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long serialVersionUID = -6265024000462809436L;
+ // Reducing state
+ ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
+ "any-name",
+ new SumReduce(),
+ source.getType());
- @Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).asQueryableState(queryName);
+ final String queryName = "duplicate-me";
- // Submit the job graph
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- jobId = jobGraph.getJobID();
+ final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = -4126824763829132959L;
- CompletableFuture<TestingJobManagerMessages.JobStatusIs> failedFuture = FutureUtils.toJava(
- cluster.getLeaderGateway(deadline.timeLeft())
- .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.FAILED), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) {
+ return value.f0;
+ }
+ }).asQueryableState(queryName, reducingState);
- cluster.submitJobDetached(jobGraph);
+ final QueryableStateStream<Integer, Tuple2<Integer, Long>> duplicate =
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = -6265024000462809436L;
- TestingJobManagerMessages.JobStatusIs jobStatus =
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) {
+ return value.f0;
+ }
+ }).asQueryableState(queryName);
+
+ // Submit the job graph
+ final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+ final JobID jobId = jobGraph.getJobID();
+
+ final CompletableFuture<TestingJobManagerMessages.JobStatusIs> failedFuture =
+ notifyWhenJobStatusIs(jobId, JobStatus.FAILED, deadline);
+
+ final CompletableFuture<TestingJobManagerMessages.JobStatusIs> cancellationFuture =
+ notifyWhenJobStatusIs(jobId, JobStatus.CANCELED, deadline);
+
+ cluster.submitJobDetached(jobGraph);
+
+ try {
+ final TestingJobManagerMessages.JobStatusIs jobStatus =
failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+
assertEquals(JobStatus.FAILED, jobStatus.state());
+ } catch (Exception e) {
+
+ // if the assertion fails, it means that the job was (falsely) not cancelled.
+ // in this case, and given that the mini-cluster is shared with other tests,
+ // we cancel the job and wait for the cancellation so that the resources are freed.
- // Get the job and check the cause
- JobManagerMessages.JobFound jobFound = FutureUtils.toJava(
- cluster.getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)))
- .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-
- String failureCause = jobFound.executionGraph().getFailureCause().getExceptionAsString();
-
- assertTrue("Not instance of SuppressRestartsException", failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException"));
- int causedByIndex = failureCause.indexOf("Caused by: ");
- String subFailureCause = failureCause.substring(causedByIndex + "Caused by: ".length());
- assertTrue("Not caused by IllegalStateException", subFailureCause.startsWith("java.lang.IllegalStateException"));
- assertTrue("Exception does not contain registration name", subFailureCause.contains(queryName));
- } finally {
- // Free cluster resources
if (jobId != null) {
- scala.concurrent.Future<CancellationSuccess> cancellation = cluster
- .getLeaderGateway(deadline.timeLeft())
+ cluster.getLeaderGateway(deadline.timeLeft())
.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<JobManagerMessages.CancellationSuccess>apply(JobManagerMessages.CancellationSuccess.class));
+ .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
- Await.ready(cancellation, deadline.timeLeft());
+ cancellationFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
}
+
+ // and we re-throw the exception.
+ throw e;
}
+
+ // Get the job and check the cause
+ JobManagerMessages.JobFound jobFound = FutureUtils.toJava(
+ cluster.getLeaderGateway(deadline.timeLeft())
+ .ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft())
+ .mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)))
+ .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+
+ String failureCause = jobFound.executionGraph().getFailureCause().getExceptionAsString();
+
+ assertEquals(JobStatus.FAILED, jobFound.executionGraph().getState());
+ assertTrue("Not instance of SuppressRestartsException", failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException"));
+ int causedByIndex = failureCause.indexOf("Caused by: ");
+ String subFailureCause = failureCause.substring(causedByIndex + "Caused by: ".length());
+ assertTrue("Not caused by IllegalStateException", subFailureCause.startsWith("java.lang.IllegalStateException"));
+ assertTrue("Exception does not contain registration name", subFailureCause.contains(queryName));
}
/**
@@ -377,55 +358,40 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
*/
@Test
public void testValueState() throws Exception {
- // Config
- final Deadline deadline = TEST_TIMEOUT.fromNow();
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
final long numElements = 1024L;
- JobID jobId = null;
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStateBackend(stateBackend);
- env.setParallelism(maxParallelism);
- // Very important, because cluster is shared between tests and we
- // don't explicitly check that all slots are available before
- // submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
- DataStream<Tuple2<Integer, Long>> source = env
- .addSource(new TestAscendingValueSource(numElements));
-
- // Value state
- ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>(
- "any",
- source.getType());
-
- source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long serialVersionUID = 7662520075515707428L;
-
- @Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).asQueryableState("hakuna", valueState);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
- // Submit the job graph
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- jobId = jobGraph.getJobID();
+ DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
- cluster.submitJobDetached(jobGraph);
+ // Value state
+ ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>("any", source.getType());
- executeValueQuery(deadline, client, jobId, "hakuna", valueState, numElements);
- } finally {
- // Free cluster resources
- if (jobId != null) {
- CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
- .getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 7662520075515707428L;
- cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) {
+ return value.f0;
}
+ }).asQueryableState("hakuna", valueState);
+
+ try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+ final JobID jobId = autoCancellableJob.getJobId();
+ final JobGraph jobGraph = autoCancellableJob.getJobGraph();
+
+ cluster.submitJobDetached(jobGraph);
+
+ executeValueQuery(deadline, client, jobId, "hakuna", valueState, numElements);
}
}
@@ -434,48 +400,36 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
* contains a wrong jobId or wrong queryable state name.
*/
@Test
+ @Ignore
public void testWrongJobIdAndWrongQueryableStateName() throws Exception {
- // Config
- final Deadline deadline = TEST_TIMEOUT.fromNow();
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
final long numElements = 1024L;
- JobID jobId = null;
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStateBackend(stateBackend);
- env.setParallelism(maxParallelism);
- // Very important, because cluster is shared between tests and we
- // don't explicitly check that all slots are available before
- // submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
- DataStream<Tuple2<Integer, Long>> source = env
- .addSource(new TestAscendingValueSource(numElements));
-
- // Value state
- ValueStateDescriptor<Tuple2<Integer, Long>> valueState =
- new ValueStateDescriptor<>("any", source.getType());
-
- source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long serialVersionUID = 7662520075515707428L;
-
- @Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).asQueryableState("hakuna", valueState);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
- // Submit the job graph
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- jobId = jobGraph.getJobID();
+ DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
+ ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>("any", source.getType());
- CompletableFuture<TestingJobManagerMessages.JobStatusIs> runningFuture = FutureUtils.toJava(
- cluster.getLeaderGateway(deadline.timeLeft())
- .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 7662520075515707428L;
- cluster.submitJobDetached(jobGraph);
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) {
+ return value.f0;
+ }
+ }).asQueryableState("hakuna", valueState);
+
+ try (AutoCancellableJob closableJobGraph = new AutoCancellableJob(cluster, env, deadline)) {
+
+ // register to be notified when the job is running.
+ CompletableFuture<TestingJobManagerMessages.JobStatusIs> runningFuture =
+ notifyWhenJobStatusIs(closableJobGraph.getJobId(), JobStatus.RUNNING, deadline);
+
+ cluster.submitJobDetached(closableJobGraph.getJobGraph());
// expect for the job to be running
TestingJobManagerMessages.JobStatusIs jobStatus =
@@ -486,49 +440,38 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
CompletableFuture<ValueState<Tuple2<Integer, Long>>> unknownJobFuture = client.getKvState(
wrongJobId, // this is the wrong job id
- "hankuna",
+ "hakuna",
0,
BasicTypeInfo.INT_TYPE_INFO,
valueState);
try {
- unknownJobFuture.get();
- fail(); // by now the job must have failed.
+ unknownJobFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ fail(); // by now the request must have failed.
} catch (ExecutionException e) {
- Assert.assertTrue(e.getCause() instanceof RuntimeException);
- Assert.assertTrue(e.getCause().getMessage().contains(
+ Assert.assertTrue("GOT: " + e.getCause().getMessage(), e.getCause() instanceof RuntimeException);
+ Assert.assertTrue("GOT: " + e.getCause().getMessage(), e.getCause().getMessage().contains(
"FlinkJobNotFoundException: Could not find Flink job (" + wrongJobId + ")"));
- } catch (Exception ignored) {
- fail("Unexpected type of exception.");
+ } catch (Exception f) {
+ fail("Unexpected type of exception: " + f.getMessage());
}
CompletableFuture<ValueState<Tuple2<Integer, Long>>> unknownQSName = client.getKvState(
- jobId,
- "wrong-hankuna", // this is the wrong name.
+ closableJobGraph.getJobId(),
+ "wrong-hakuna", // this is the wrong name.
0,
BasicTypeInfo.INT_TYPE_INFO,
valueState);
try {
- unknownQSName.get();
- fail(); // by now the job must have failed.
+ unknownQSName.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ fail(); // by now the request must have failed.
} catch (ExecutionException e) {
- Assert.assertTrue(e.getCause() instanceof RuntimeException);
- Assert.assertTrue(e.getCause().getMessage().contains(
- "UnknownKvStateLocation: No KvStateLocation found for KvState instance with name 'wrong-hankuna'."));
- } catch (Exception ignored) {
- fail("Unexpected type of exception.");
- }
-
- } finally {
- // Free cluster resources
- if (jobId != null) {
- CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
- .getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
- cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ Assert.assertTrue("GOT: " + e.getCause().getMessage(), e.getCause() instanceof RuntimeException);
+ Assert.assertTrue("GOT: " + e.getCause().getMessage(), e.getCause().getMessage().contains(
+ "UnknownKvStateLocation: No KvStateLocation found for KvState instance with name 'wrong-hakuna'."));
+ } catch (Exception f) {
+ fail("Unexpected type of exception: " + f.getMessage());
}
}
}
@@ -539,50 +482,44 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
*/
@Test
public void testQueryNonStartedJobState() throws Exception {
- // Config
- final Deadline deadline = TEST_TIMEOUT.fromNow();
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
final long numElements = 1024L;
- JobID jobId = null;
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStateBackend(stateBackend);
- env.setParallelism(maxParallelism);
- // Very important, because cluster is shared between tests and we
- // don't explicitly check that all slots are available before
- // submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
- DataStream<Tuple2<Integer, Long>> source = env
- .addSource(new TestAscendingValueSource(numElements));
-
- // Value state
- ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>(
- "any",
- source.getType(),
- null);
-
- QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+ DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
+
+ ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>(
+ "any", source.getType(), null);
+
+ QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+
private static final long serialVersionUID = 7480503339992214681L;
@Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+ public Integer getKey(Tuple2<Integer, Long> value) {
return value.f0;
}
}).asQueryableState("hakuna", valueState);
- // Submit the job graph
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- jobId = jobGraph.getJobID();
+ try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+ final JobID jobId = autoCancellableJob.getJobId();
+ final JobGraph jobGraph = autoCancellableJob.getJobGraph();
- // Now query
long expected = numElements;
// query once
client.getKvState(
- jobId,
+ autoCancellableJob.getJobId(),
queryableState.getQueryableStateName(),
0,
BasicTypeInfo.INT_TYPE_INFO,
@@ -591,16 +528,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
cluster.submitJobDetached(jobGraph);
executeValueQuery(deadline, client, jobId, "hakuna", valueState, expected);
- } finally {
- // Free cluster resources
- if (jobId != null) {
- CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
- .getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
- cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- }
}
}
@@ -615,51 +542,37 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
@Test(expected = UnknownKeyOrNamespaceException.class)
public void testValueStateDefault() throws Throwable {
- // Config
final Deadline deadline = TEST_TIMEOUT.fromNow();
-
final long numElements = 1024L;
- JobID jobId = null;
- try {
- StreamExecutionEnvironment env =
- StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStateBackend(stateBackend);
- env.setParallelism(maxParallelism);
- // Very important, because cluster is shared between tests and we
- // don't explicitly check that all slots are available before
- // submitting.
- env.setRestartStrategy(RestartStrategies
- .fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
- DataStream<Tuple2<Integer, Long>> source = env
- .addSource(new TestAscendingValueSource(numElements));
-
- // Value state
- ValueStateDescriptor<Tuple2<Integer, Long>> valueState =
- new ValueStateDescriptor<>(
- "any",
- source.getType(),
- Tuple2.of(0, 1337L));
-
- // only expose key "1"
- QueryableStateStream<Integer, Tuple2<Integer, Long>>
- queryableState =
- source.keyBy(
- new KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long serialVersionUID = 4509274556892655887L;
-
- @Override
- public Integer getKey(
- Tuple2<Integer, Long> value) throws
- Exception {
- return 1;
- }
- }).asQueryableState("hakuna", valueState);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
- // Submit the job graph
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- jobId = jobGraph.getJobID();
+ DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
+
+ ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>(
+ "any", source.getType(), Tuple2.of(0, 1337L));
+
+ // only expose key "1"
+ QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState = source.keyBy(
+ new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 4509274556892655887L;
+
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) {
+ return 1;
+ }
+ }).asQueryableState("hakuna", valueState);
+
+ try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+ final JobID jobId = autoCancellableJob.getJobId();
+ final JobGraph jobGraph = autoCancellableJob.getJobGraph();
cluster.submitJobDetached(jobGraph);
@@ -683,17 +596,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
// exception in an ExecutionException.
throw e.getCause();
}
- } finally {
-
- // Free cluster resources
- if (jobId != null) {
- CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
- .getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
- cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- }
}
}
@@ -707,55 +609,41 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
*/
@Test
public void testValueStateShortcut() throws Exception {
- // Config
- final Deadline deadline = TEST_TIMEOUT.fromNow();
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
final long numElements = 1024L;
- JobID jobId = null;
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStateBackend(stateBackend);
- env.setParallelism(maxParallelism);
- // Very important, because cluster is shared between tests and we
- // don't explicitly check that all slots are available before
- // submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
- DataStream<Tuple2<Integer, Long>> source = env
- .addSource(new TestAscendingValueSource(numElements));
-
- // Value state shortcut
- QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
- source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long serialVersionUID = 9168901838808830068L;
-
- @Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).asQueryableState("matata");
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
- // Submit the job graph
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- jobId = jobGraph.getJobID();
+ DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
- cluster.submitJobDetached(jobGraph);
+ // Value state shortcut
+ final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 9168901838808830068L;
- final ValueStateDescriptor<Tuple2<Integer, Long>> stateDesc =
- (ValueStateDescriptor<Tuple2<Integer, Long>>) queryableState.getStateDescriptor();
- executeValueQuery(deadline, client, jobId, "matata", stateDesc, numElements);
- } finally {
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) {
+ return value.f0;
+ }
+ }).asQueryableState("matata");
- // Free cluster resources
- if (jobId != null) {
- CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(
- cluster.getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+ final ValueStateDescriptor<Tuple2<Integer, Long>> stateDesc =
+ (ValueStateDescriptor<Tuple2<Integer, Long>>) queryableState.getStateDescriptor();
- cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- }
+ try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+ final JobID jobId = autoCancellableJob.getJobId();
+ final JobGraph jobGraph = autoCancellableJob.getJobGraph();
+
+ cluster.submitJobDetached(jobGraph);
+ executeValueQuery(deadline, client, jobId, "matata", stateDesc, numElements);
}
}
@@ -768,50 +656,40 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
*/
@Test
public void testFoldingState() throws Exception {
- // Config
- final Deadline deadline = TEST_TIMEOUT.fromNow();
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
final int numElements = 1024;
- JobID jobId = null;
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStateBackend(stateBackend);
- env.setParallelism(maxParallelism);
- // Very important, because cluster is shared between tests and we
- // don't explicitly check that all slots are available before
- // submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
- DataStream<Tuple2<Integer, Long>> source = env
- .addSource(new TestAscendingValueSource(numElements));
-
- // Folding state
- FoldingStateDescriptor<Tuple2<Integer, Long>, String> foldingState =
- new FoldingStateDescriptor<>(
- "any",
- "0",
- new SumFold(),
- StringSerializer.INSTANCE);
-
- QueryableStateStream<Integer, String> queryableState =
- source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long serialVersionUID = -842809958106747539L;
-
- @Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).asQueryableState("pumba", foldingState);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+ DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
+
+ FoldingStateDescriptor<Tuple2<Integer, Long>, String> foldingState = new FoldingStateDescriptor<>(
+ "any", "0", new SumFold(), StringSerializer.INSTANCE);
- // Submit the job graph
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- jobId = jobGraph.getJobID();
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = -842809958106747539L;
+
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) {
+ return value.f0;
+ }
+ }).asQueryableState("pumba", foldingState);
+
+ try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+ final JobID jobId = autoCancellableJob.getJobId();
+ final JobGraph jobGraph = autoCancellableJob.getJobGraph();
cluster.submitJobDetached(jobGraph);
- // Now query
- String expected = Integer.toString(numElements * (numElements + 1) / 2);
+ final String expected = Integer.toString(numElements * (numElements + 1) / 2);
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
@@ -840,16 +718,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
assertTrue("Did not succeed query", success);
}
- } finally {
- // Free cluster resources
- if (jobId != null) {
- CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
- .getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
- cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- }
}
}
@@ -861,48 +729,40 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
*/
@Test
public void testReducingState() throws Exception {
- // Config
- final Deadline deadline = TEST_TIMEOUT.fromNow();
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
final long numElements = 1024L;
- JobID jobId = null;
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStateBackend(stateBackend);
- env.setParallelism(maxParallelism);
- // Very important, because cluster is shared between tests and we
- // don't explicitly check that all slots are available before
- // submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
- DataStream<Tuple2<Integer, Long>> source = env
- .addSource(new TestAscendingValueSource(numElements));
-
- // Reducing state
- ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState =
- new ReducingStateDescriptor<>(
- "any",
- new SumReduce(),
- source.getType());
-
- source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long serialVersionUID = 8470749712274833552L;
-
- @Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).asQueryableState("jungle", reducingState);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+ DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
- // Submit the job graph
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- jobId = jobGraph.getJobID();
+ ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
+ "any", new SumReduce(), source.getType());
+
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 8470749712274833552L;
+
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) {
+ return value.f0;
+ }
+ }).asQueryableState("jungle", reducingState);
+
+ try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+ final JobID jobId = autoCancellableJob.getJobId();
+ final JobGraph jobGraph = autoCancellableJob.getJobGraph();
cluster.submitJobDetached(jobGraph);
- // Now query
- long expected = numElements * (numElements + 1L) / 2L;
+ final long expected = numElements * (numElements + 1L) / 2L;
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
@@ -931,16 +791,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
assertTrue("Did not succeed query", success);
}
- } finally {
- // Free cluster resources
- if (jobId != null) {
- CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
- .getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
- cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- }
}
}
@@ -952,66 +802,60 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
*/
@Test
public void testMapState() throws Exception {
- // Config
- final Deadline deadline = TEST_TIMEOUT.fromNow();
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
final long numElements = 1024L;
- JobID jobId = null;
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStateBackend(stateBackend);
- env.setParallelism(maxParallelism);
- // Very important, because cluster is shared between tests and we
- // don't explicitly check that all slots are available before
- // submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
- DataStream<Tuple2<Integer, Long>> source = env
- .addSource(new TestAscendingValueSource(numElements));
-
- final MapStateDescriptor<Integer, Tuple2<Integer, Long>> mapStateDescriptor = new MapStateDescriptor<>(
- "timon",
- BasicTypeInfo.INT_TYPE_INFO,
- source.getType());
- mapStateDescriptor.setQueryable("timon-queryable");
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
- source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long serialVersionUID = 8470749712274833552L;
+ DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
- @Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() {
- private static final long serialVersionUID = -805125545438296619L;
+ final MapStateDescriptor<Integer, Tuple2<Integer, Long>> mapStateDescriptor = new MapStateDescriptor<>(
+ "timon", BasicTypeInfo.INT_TYPE_INFO, source.getType());
+ mapStateDescriptor.setQueryable("timon-queryable");
- private transient MapState<Integer, Tuple2<Integer, Long>> mapState;
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 8470749712274833552L;
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- mapState = getRuntimeContext().getMapState(mapStateDescriptor);
- }
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) {
+ return value.f0;
+ }
+ }).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() {
+ private static final long serialVersionUID = -805125545438296619L;
- @Override
- public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception {
- Tuple2<Integer, Long> v = mapState.get(value.f0);
- if (v == null) {
- v = new Tuple2<>(value.f0, 0L);
- }
- mapState.put(value.f0, new Tuple2<>(v.f0, v.f1 + value.f1));
+ private transient MapState<Integer, Tuple2<Integer, Long>> mapState;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ mapState = getRuntimeContext().getMapState(mapStateDescriptor);
+ }
+
+ @Override
+ public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception {
+ Tuple2<Integer, Long> v = mapState.get(value.f0);
+ if (v == null) {
+ v = new Tuple2<>(value.f0, 0L);
}
- });
+ mapState.put(value.f0, new Tuple2<>(v.f0, v.f1 + value.f1));
+ }
+ });
+
+ try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
- // Submit the job graph
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- jobId = jobGraph.getJobID();
+ final JobID jobId = autoCancellableJob.getJobId();
+ final JobGraph jobGraph = autoCancellableJob.getJobGraph();
cluster.submitJobDetached(jobGraph);
- // Now query
- long expected = numElements * (numElements + 1L) / 2L;
+ final long expected = numElements * (numElements + 1L) / 2L;
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
@@ -1039,16 +883,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
assertTrue("Did not succeed query", success);
}
- } finally {
- // Free cluster resources
- if (jobId != null) {
- CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
- .getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
- cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- }
}
}
@@ -1061,62 +895,56 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
*/
@Test
public void testListState() throws Exception {
- // Config
- final Deadline deadline = TEST_TIMEOUT.fromNow();
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
final long numElements = 1024L;
- JobID jobId = null;
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStateBackend(stateBackend);
- env.setParallelism(maxParallelism);
- // Very important, because cluster is shared between tests and we
- // don't explicitly check that all slots are available before
- // submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
- DataStream<Tuple2<Integer, Long>> source = env
- .addSource(new TestAscendingValueSource(numElements));
-
- final ListStateDescriptor<Long> listStateDescriptor = new ListStateDescriptor<Long>(
- "list",
- BasicTypeInfo.LONG_TYPE_INFO);
- listStateDescriptor.setQueryable("list-queryable");
-
- source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long serialVersionUID = 8470749712274833552L;
-
- @Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() {
- private static final long serialVersionUID = -805125545438296619L;
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
- private transient ListState<Long> listState;
+ DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- listState = getRuntimeContext().getListState(listStateDescriptor);
- }
+ final ListStateDescriptor<Long> listStateDescriptor = new ListStateDescriptor<Long>(
+ "list", BasicTypeInfo.LONG_TYPE_INFO);
+ listStateDescriptor.setQueryable("list-queryable");
- @Override
- public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception {
- listState.add(value.f1);
- }
- });
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 8470749712274833552L;
- // Submit the job graph
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- jobId = jobGraph.getJobID();
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) {
+ return value.f0;
+ }
+ }).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() {
+ private static final long serialVersionUID = -805125545438296619L;
- cluster.submitJobDetached(jobGraph);
+ private transient ListState<Long> listState;
- // Now query
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ listState = getRuntimeContext().getListState(listStateDescriptor);
+ }
- Map<Integer, Set<Long>> results = new HashMap<>();
+ @Override
+ public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception {
+ listState.add(value.f1);
+ }
+ });
+
+ try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+ final JobID jobId = autoCancellableJob.getJobId();
+ final JobGraph jobGraph = autoCancellableJob.getJobGraph();
+
+ cluster.submitJobDetached(jobGraph);
+
+ final Map<Integer, Set<Long>> results = new HashMap<>();
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
@@ -1159,66 +987,48 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
}
}
- } finally {
- // Free cluster resources
- if (jobId != null) {
- CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
- .getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
- cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- }
}
}
@Test
public void testAggregatingState() throws Exception {
- // Config
- final Deadline deadline = TEST_TIMEOUT.fromNow();
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
final long numElements = 1024L;
- JobID jobId = null;
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStateBackend(stateBackend);
- env.setParallelism(maxParallelism);
- // Very important, because cluster is shared between tests and we
- // don't explicitly check that all slots are available before
- // submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
- DataStream<Tuple2<Integer, Long>> source = env
- .addSource(new TestAscendingValueSource(numElements));
-
- final AggregatingStateDescriptor<Tuple2<Integer, Long>, String, String> aggrStateDescriptor =
- new AggregatingStateDescriptor<>(
- "aggregates",
- new SumAggr(),
- String.class);
- aggrStateDescriptor.setQueryable("aggr-queryable");
-
- source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long serialVersionUID = 8470749712274833552L;
-
- @Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).transform(
- "TestAggregatingOperator",
- BasicTypeInfo.STRING_TYPE_INFO,
- new AggregatingTestOperator(aggrStateDescriptor)
- );
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
- // Submit the job graph
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- jobId = jobGraph.getJobID();
+ DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
- cluster.submitJobDetached(jobGraph);
+ final AggregatingStateDescriptor<Tuple2<Integer, Long>, String, String> aggrStateDescriptor =
+ new AggregatingStateDescriptor<>("aggregates", new SumAggr(), String.class);
+ aggrStateDescriptor.setQueryable("aggr-queryable");
- // Now query
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 8470749712274833552L;
+
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) {
+ return value.f0;
+ }
+ }).transform(
+ "TestAggregatingOperator",
+ BasicTypeInfo.STRING_TYPE_INFO,
+ new AggregatingTestOperator(aggrStateDescriptor)
+ );
+
+ try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+ final JobID jobId = autoCancellableJob.getJobId();
+ final JobGraph jobGraph = autoCancellableJob.getJobGraph();
+
+ cluster.submitJobDetached(jobGraph);
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
@@ -1246,16 +1056,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
assertTrue("Did not succeed query", success);
}
- } finally {
- // Free cluster resources
- if (jobId != null) {
- CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
- .getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
- cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- }
}
}
@@ -1316,7 +1116,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
notifyAll();
}
}
-
}
/**
@@ -1465,6 +1264,60 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
///// General Utility Methods //////
+ /**
+ * A wrapper of the job graph that makes sure to cancel the job and wait for
+ * termination after the execution of every test.
+ */
+ private static class AutoCancellableJob implements AutoCloseable {
+
+ private final FlinkMiniCluster cluster;
+ private final Deadline deadline;
+ private final JobGraph jobGraph;
+
+ private final JobID jobId;
+ private final CompletableFuture<TestingJobManagerMessages.JobStatusIs> cancellationFuture;
+
+ AutoCancellableJob(final FlinkMiniCluster cluster, final StreamExecutionEnvironment env, final Deadline deadline) {
+ Preconditions.checkNotNull(env);
+
+ this.cluster = Preconditions.checkNotNull(cluster);
+ this.jobGraph = env.getStreamGraph().getJobGraph();
+ this.deadline = Preconditions.checkNotNull(deadline);
+
+ this.jobId = jobGraph.getJobID();
+ this.cancellationFuture = notifyWhenJobStatusIs(jobId, JobStatus.CANCELED, deadline);
+ }
+
+ JobGraph getJobGraph() {
+ return jobGraph;
+ }
+
+ JobID getJobId() {
+ return jobId;
+ }
+
+ @Override
+ public void close() throws Exception {
+ // Free cluster resources
+ if (jobId != null) {
+ cluster.getLeaderGateway(deadline.timeLeft())
+ .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+ .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+
+ cancellationFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ private static CompletableFuture<TestingJobManagerMessages.JobStatusIs> notifyWhenJobStatusIs(
+ final JobID jobId, final JobStatus status, final Deadline deadline) {
+
+ return FutureUtils.toJava(
+ cluster.getLeaderGateway(deadline.timeLeft())
+ .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, status), deadline.timeLeft())
+ .mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
+ }
+
private static <K, S extends State, V> CompletableFuture<S> getKvState(
final Deadline deadline,
final QueryableStateClient client,
http://git-wip-us.apache.org/repos/asf/flink/blob/a3fd548e/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
index b9ce7c2..8767b52 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
@@ -31,6 +31,8 @@ import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.rules.TemporaryFolder;
+import java.io.IOException;
+
import static org.junit.Assert.fail;
/**
@@ -79,19 +81,13 @@ public abstract class HAAbstractQueryableStateTestBase extends AbstractQueryable
}
@AfterClass
- public static void tearDown() {
- if (cluster != null) {
- cluster.stop();
- cluster.awaitTermination();
- }
+ public static void tearDown() throws IOException {
+ client.shutdownAndWait();
- try {
- zkServer.stop();
- zkServer.close();
- client.shutdownAndWait();
- } catch (Throwable e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ cluster.stop();
+ cluster.awaitTermination();
+
+ zkServer.stop();
+ zkServer.close();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a3fd548e/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
index 18b167f..cae02e2 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
@@ -22,14 +22,12 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
/**
* Several integration tests for queryable state using the {@link RocksDBStateBackend}.
*/
-@Ignore
public class HAQueryableStateRocksDBBackendITCase extends HAAbstractQueryableStateTestBase {
@Rule
http://git-wip-us.apache.org/repos/asf/flink/blob/a3fd548e/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
index a5e24b2..2686a29 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
@@ -67,12 +67,9 @@ public abstract class NonHAAbstractQueryableStateTestBase extends AbstractQuerya
@AfterClass
public static void tearDown() {
- try {
- cluster.stop();
- client.shutdownAndWait();
- } catch (Throwable e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ client.shutdownAndWait();
+
+ cluster.stop();
+ cluster.awaitTermination();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a3fd548e/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
index 39fbe9e..7778a94 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
@@ -22,14 +22,12 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
/**
* Several integration tests for queryable state using the {@link RocksDBStateBackend}.
*/
-@Ignore
public class NonHAQueryableStateRocksDBBackendITCase extends NonHAAbstractQueryableStateTestBase {
@Rule
[2/3] flink git commit: [FLINK-7974][QS] Wait for QS abstract server
to shutdown.
Posted by kk...@apache.org.
[FLINK-7974][QS] Wait for QS abstract server to shutdown.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/74d052bb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/74d052bb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/74d052bb
Branch: refs/heads/master
Commit: 74d052bb045031363652116ab8226d8ac00e0cd0
Parents: 5760677
Author: kkloudas <kk...@gmail.com>
Authored: Thu Nov 9 19:30:29 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Wed Dec 6 14:33:22 2017 +0100
----------------------------------------------------------------------
.../client/program/rest/RestClusterClient.java | 3 +-
.../org/apache/flink/util/ExecutorUtils.java | 77 +++++++++++
.../MesosApplicationMasterRunner.java | 3 +-
.../network/AbstractServerBase.java | 95 ++++++++++---
.../network/AbstractServerHandler.java | 11 +-
.../client/proxy/KvStateClientProxyHandler.java | 34 +++--
.../client/proxy/KvStateClientProxyImpl.java | 8 +-
.../server/KvStateServerHandler.java | 4 +-
.../server/KvStateServerImpl.java | 8 +-
.../HAAbstractQueryableStateTestBase.java | 5 +-
.../NonHAAbstractQueryableStateTestBase.java | 4 +-
.../network/AbstractServerTest.java | 135 +++++++++++--------
.../flink/runtime/concurrent/Executors.java | 49 +------
.../runtime/taskexecutor/TaskManagerRunner.java | 6 +-
.../flink/runtime/jobmanager/JobManager.scala | 6 +-
.../runtime/minicluster/FlinkMiniCluster.scala | 6 +-
.../slotmanager/SlotProtocolTest.java | 3 +-
.../handler/legacy/ExecutionGraphCacheTest.java | 4 +-
.../flink/yarn/YarnApplicationMasterRunner.java | 3 +-
19 files changed, 306 insertions(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index e21e94b..8c0462d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -48,6 +48,7 @@ import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointMessagePar
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.ExecutorUtils;
import javax.annotation.Nullable;
@@ -90,7 +91,7 @@ public class RestClusterClient extends ClusterClient {
log.error("An error occurred during the client shutdown.", e);
}
this.restClient.shutdown(Time.seconds(5));
- org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(5, TimeUnit.SECONDS, this.executorService);
+ ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, this.executorService);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java
new file mode 100644
index 0000000..d98bdd2
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Utilities for {@link java.util.concurrent.Executor Executors}.
+ */
+public class ExecutorUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ExecutorUtils.class);
+
+ /**
+ * Gracefully shutdown the given {@link ExecutorService}. The call waits the given timeout that
+ * all ExecutorServices terminate. If the ExecutorServices do not terminate in this time,
+ * they will be shut down hard.
+ *
+ * @param timeout to wait for the termination of all ExecutorServices
+ * @param unit of the timeout
+ * @param executorServices to shut down
+ */
+ public static void gracefulShutdown(long timeout, TimeUnit unit, ExecutorService... executorServices) {
+ for (ExecutorService executorService: executorServices) {
+ executorService.shutdown();
+ }
+
+ boolean wasInterrupted = false;
+ final long endTime = unit.toMillis(timeout) + System.currentTimeMillis();
+ long timeLeft = unit.toMillis(timeout);
+ boolean hasTimeLeft = timeLeft > 0L;
+
+ for (ExecutorService executorService: executorServices) {
+ if (wasInterrupted || !hasTimeLeft) {
+ executorService.shutdownNow();
+ } else {
+ try {
+ if (!executorService.awaitTermination(timeLeft, TimeUnit.MILLISECONDS)) {
+ LOG.warn("ExecutorService did not terminate in time. Shutting it down now.");
+ executorService.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while shutting down executor services. Shutting all " +
+ "remaining ExecutorServices down now.", e);
+ executorService.shutdownNow();
+
+ wasInterrupted = true;
+
+ Thread.currentThread().interrupt();
+ }
+
+ timeLeft = endTime - System.currentTimeMillis();
+ hasTimeLeft = timeLeft > 0L;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 93eb3c6..544150b 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -52,6 +52,7 @@ import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
+import org.apache.flink.util.ExecutorUtils;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
@@ -439,7 +440,7 @@ public class MesosApplicationMasterRunner {
}
}
- org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(
+ ExecutorUtils.gracefulShutdown(
AkkaUtils.getTimeout(config).toMillis(),
TimeUnit.MILLISECONDS,
futureExecutor,
http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
index 82a05f2..d5afeb3 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.queryablestate.network;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.queryablestate.network.messages.MessageBody;
+import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
@@ -45,10 +46,12 @@ import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
/**
* The base class for every server in the queryable state module.
@@ -83,6 +86,9 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
/** The number of threads to be used for query serving. */
private final int numQueryThreads;
+ /** Atomic shut down future. */
+ private final AtomicReference<CompletableFuture<Void>> serverShutdownFuture = new AtomicReference<>(null);
+
/** Netty's ServerBootstrap. */
private ServerBootstrap bootstrap;
@@ -179,8 +185,8 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
* @throws Exception If something goes wrong during the bind operation.
*/
public void start() throws Throwable {
- Preconditions.checkState(serverAddress == null,
- serverName + " is already running @ " + serverAddress + '.');
+ Preconditions.checkState(serverAddress == null && serverShutdownFuture.get() == null,
+ serverName + " is already running @ " + serverAddress + ". ");
Iterator<Integer> portIterator = bindPortRange.iterator();
while (portIterator.hasNext() && !attemptToBind(portIterator.next())) {}
@@ -251,7 +257,22 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
throw future.cause();
} catch (BindException e) {
log.debug("Failed to start {} on port {}: {}.", serverName, port, e.getMessage());
- shutdown();
+ try {
+ // we shutdown the server but we reset the future every time because in
+ // case of failure to bind, we will call attemptToBind() here, and not resetting
+ // the flag will interfere with future shutdown attempts.
+
+ shutdownServer()
+ .whenComplete((ignoredV, ignoredT) -> serverShutdownFuture.getAndSet(null))
+ .get();
+ } catch (Exception r) {
+
+ // Here we were seeing this problem:
+ // https://github.com/netty/netty/issues/4357 if we do a get().
+ // this is why we now simply wait a bit so that everything is shut down.
+
+ log.warn("Problem while shutting down {}: {}", serverName, r.getMessage());
+ }
}
// any other type of exception we let it bubble up.
return false;
@@ -259,26 +280,62 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
/**
* Shuts down the server and all related thread pools.
+ * @return A {@link CompletableFuture} that will be completed upon termination of the shutdown process.
*/
- public void shutdown() {
- log.info("Shutting down {} @ {}", serverName, serverAddress);
-
- if (handler != null) {
- handler.shutdown();
- handler = null;
- }
-
- if (queryExecutor != null) {
- queryExecutor.shutdown();
- }
+ public CompletableFuture<Void> shutdownServer() {
+ CompletableFuture<Void> shutdownFuture = new CompletableFuture<>();
+ if (serverShutdownFuture.compareAndSet(null, shutdownFuture)) {
+ log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+ final CompletableFuture<Void> groupShutdownFuture = new CompletableFuture<>();
+ if (bootstrap != null) {
+ EventLoopGroup group = bootstrap.group();
+ if (group != null && !group.isShutdown()) {
+ group.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS)
+ .addListener(finished -> {
+ if (finished.isSuccess()) {
+ groupShutdownFuture.complete(null);
+ } else {
+ groupShutdownFuture.completeExceptionally(finished.cause());
+ }
+ });
+ } else {
+ groupShutdownFuture.complete(null);
+ }
+ } else {
+ groupShutdownFuture.complete(null);
+ }
- if (bootstrap != null) {
- EventLoopGroup group = bootstrap.group();
- if (group != null) {
- group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS);
+ final CompletableFuture<Void> handlerShutdownFuture = new CompletableFuture<>();
+ if (handler == null) {
+ handlerShutdownFuture.complete(null);
+ } else {
+ handler.shutdown().whenComplete((result, throwable) -> {
+ if (throwable != null) {
+ handlerShutdownFuture.completeExceptionally(throwable);
+ } else {
+ handlerShutdownFuture.complete(null);
+ }
+ });
}
+
+ final CompletableFuture<Void> queryExecShutdownFuture = CompletableFuture.runAsync(() -> {
+ if (queryExecutor != null) {
+ ExecutorUtils.gracefulShutdown(10L, TimeUnit.MINUTES, queryExecutor);
+ }
+ });
+
+ CompletableFuture.allOf(
+ queryExecShutdownFuture, groupShutdownFuture, handlerShutdownFuture
+ ).whenComplete((result, throwable) -> {
+ if (throwable != null) {
+ shutdownFuture.completeExceptionally(throwable);
+ } else {
+ shutdownFuture.complete(null);
+ }
+ });
}
- serverAddress = null;
+ return serverShutdownFuture.get();
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
index 7e71a11..a514723 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
@@ -183,9 +183,16 @@ public abstract class AbstractServerHandler<REQ extends MessageBody, RESP extend
public abstract CompletableFuture<RESP> handleRequest(final long requestId, final REQ request);
/**
- * Shuts down any handler specific resources, e.g. thread pools etc.
+ * Shuts down any handler-specific resources, e.g. thread pools etc and returns
+ * a {@link CompletableFuture}.
+ *
+ * <p>If an exception is thrown during the shutdown process, then that exception
+ * will be included in the returned future.
+ *
+ * @return A {@link CompletableFuture} that will be completed when the shutdown
+ * process actually finishes.
*/
- public abstract void shutdown();
+ public abstract CompletableFuture<Void> shutdown();
/**
* Task to execute the actual query against the state instance.
http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
index af33701..29ee0d7 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
@@ -33,7 +33,9 @@ import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
import org.apache.flink.queryablestate.server.KvStateServerImpl;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.query.KvStateClientProxy;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateMessage;
@@ -42,6 +44,7 @@ import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import akka.dispatch.OnComplete;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -141,6 +144,7 @@ public class KvStateClientProxyHandler extends AbstractServerHandler<KvStateRequ
// KvStateLocation. Therefore we retry this query and
// force look up the location.
+ LOG.debug("Retrying after failing to retrieve state due to: {}.", throwable.getCause().getMessage());
executeActionAsync(result, request, true);
} else {
result.completeExceptionally(throwable);
@@ -203,20 +207,34 @@ public class KvStateClientProxyHandler extends AbstractServerHandler<KvStateRequ
LOG.debug("Retrieving location for state={} of job={} from the job manager.", jobId, queryableStateName);
+ final CompletableFuture<KvStateLocation> location = new CompletableFuture<>();
+ lookupCache.put(cacheKey, location);
return proxy.getJobManagerFuture().thenComposeAsync(
jobManagerGateway -> {
final Object msg = new KvStateMessage.LookupKvStateLocation(jobId, queryableStateName);
- final CompletableFuture<KvStateLocation> locationFuture = FutureUtils.toJava(
- jobManagerGateway.ask(msg, FiniteDuration.apply(1000L, TimeUnit.MILLISECONDS))
- .mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class)));
-
- lookupCache.put(cacheKey, locationFuture);
- return locationFuture;
+ jobManagerGateway.ask(msg, FiniteDuration.apply(1000L, TimeUnit.MILLISECONDS))
+ .mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class))
+ .onComplete(new OnComplete<KvStateLocation>() {
+
+ @Override
+ public void onComplete(Throwable failure, KvStateLocation loc) throws Throwable {
+ if (failure != null) {
+ if (failure instanceof FlinkJobNotFoundException) {
+ // if the jobId was wrong, remove the entry from the cache.
+ lookupCache.remove(cacheKey);
+ }
+ location.completeExceptionally(failure);
+ } else {
+ location.complete(loc);
+ }
+ }
+ }, Executors.directExecutionContext());
+ return location;
}, queryExecutor);
}
@Override
- public void shutdown() {
- kvStateClient.shutdown();
+ public CompletableFuture<Void> shutdown() {
+ return kvStateClient.shutdown();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
index 6fcaf40..aa5e7b6 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
@@ -35,6 +35,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
/**
* The default implementation of the {@link KvStateClientProxy}.
@@ -96,7 +97,12 @@ public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, K
@Override
public void shutdown() {
- super.shutdown();
+ try {
+ shutdownServer().get(10L, TimeUnit.SECONDS);
+ log.info("{} was shutdown successfully.", getServerName());
+ } catch (Exception e) {
+ log.warn("{} shutdown failed: {}", getServerName(), e);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
index 476f153..18a2944 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
@@ -101,7 +101,7 @@ public class KvStateServerHandler extends AbstractServerHandler<KvStateInternalR
}
@Override
- public void shutdown() {
- // do nothing
+ public CompletableFuture<Void> shutdown() {
+ return CompletableFuture.completedFuture(null);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
index 3a37a3a..0720268 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
@@ -32,6 +32,7 @@ import org.apache.flink.util.Preconditions;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
/**
* The default implementation of the {@link KvStateServer}.
@@ -101,6 +102,11 @@ public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest
@Override
public void shutdown() {
- super.shutdown();
+ try {
+ shutdownServer().get(10L, TimeUnit.SECONDS);
+ log.info("{} was shutdown successfully.", getServerName());
+ } catch (Exception e) {
+ log.warn("{} shutdown failed: {}", getServerName(), e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
index 79809b3..b9ce7c2 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
@@ -88,11 +88,10 @@ public abstract class HAAbstractQueryableStateTestBase extends AbstractQueryable
try {
zkServer.stop();
zkServer.close();
- } catch (Exception e) {
+ client.shutdownAndWait();
+ } catch (Throwable e) {
e.printStackTrace();
fail(e.getMessage());
}
-
- client.shutdown();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
index 6945cca..a5e24b2 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
@@ -69,10 +69,10 @@ public abstract class NonHAAbstractQueryableStateTestBase extends AbstractQuerya
public static void tearDown() {
try {
cluster.stop();
- } catch (Exception e) {
+ client.shutdownAndWait();
+ } catch (Throwable e) {
e.printStackTrace();
fail(e.getMessage());
}
- client.shutdown();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
index 3d2ed40..103c638 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
@@ -22,7 +22,9 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.queryablestate.network.messages.MessageDeserializer;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.stats.AtomicKvStateRequestStats;
import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
+import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
@@ -37,6 +39,7 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -60,22 +63,15 @@ public class AbstractServerTest {
expectedEx.expect(FlinkRuntimeException.class);
expectedEx.expectMessage("Unable to start Test Server 2. All ports in provided range are occupied.");
- TestServer server1 = null;
- TestServer server2 = null;
- try {
+ List<Integer> portList = new ArrayList<>();
+ portList.add(7777);
- server1 = startServer("Test Server 1", 7777);
- Assert.assertEquals(7777L, server1.getServerAddress().getPort());
+ try (TestServer server1 = new TestServer("Test Server 1", new DisabledKvStateRequestStats(), portList.iterator())) {
+ server1.start();
- server2 = startServer("Test Server 2", 7777);
- } finally {
-
- if (server1 != null) {
- server1.shutdown();
- }
-
- if (server2 != null) {
- server2.shutdown();
+ try (TestServer server2 = new TestServer("Test Server 2", new DisabledKvStateRequestStats(),
+ Collections.singletonList(server1.getServerAddress().getPort()).iterator())) {
+ server2.start();
}
}
}
@@ -86,69 +82,81 @@ public class AbstractServerTest {
*/
@Test
public void testPortRangeSuccess() throws Throwable {
- TestServer server1 = null;
- TestServer server2 = null;
- Client<TestMessage, TestMessage> client = null;
- try {
- server1 = startServer("Test Server 1", 7777, 7778, 7779);
- Assert.assertEquals(7777L, server1.getServerAddress().getPort());
-
- server2 = startServer("Test Server 2", 7777, 7778, 7779);
- Assert.assertEquals(7778L, server2.getServerAddress().getPort());
-
- client = new Client<>(
- "Test Client",
- 1,
- new MessageSerializer<>(new TestMessage.TestMessageDeserializer(), new TestMessage.TestMessageDeserializer()),
- new DisabledKvStateRequestStats());
+ // this is shared between the two servers.
+ AtomicKvStateRequestStats serverStats = new AtomicKvStateRequestStats();
+ AtomicKvStateRequestStats clientStats = new AtomicKvStateRequestStats();
+
+ List<Integer> portList = new ArrayList<>();
+ portList.add(7777);
+ portList.add(7778);
+ portList.add(7779);
+
+ try (
+ TestServer server1 = new TestServer("Test Server 1", serverStats, portList.iterator());
+ TestServer server2 = new TestServer("Test Server 2", serverStats, portList.iterator());
+ TestClient client = new TestClient(
+ "Test Client",
+ 1,
+ new MessageSerializer<>(new TestMessage.TestMessageDeserializer(), new TestMessage.TestMessageDeserializer()),
+ clientStats
+ )
+ ) {
+ server1.start();
+ Assert.assertTrue(server1.getServerAddress().getPort() >= 7777 && server1.getServerAddress().getPort() <= 7779);
+
+ server2.start();
+ Assert.assertTrue(server2.getServerAddress().getPort() >= 7777 && server2.getServerAddress().getPort() <= 7779);
TestMessage response1 = client.sendRequest(server1.getServerAddress(), new TestMessage("ping")).join();
Assert.assertEquals(server1.getServerName() + "-ping", response1.getMessage());
TestMessage response2 = client.sendRequest(server2.getServerAddress(), new TestMessage("pong")).join();
Assert.assertEquals(server2.getServerName() + "-pong", response2.getMessage());
- } finally {
- if (server1 != null) {
- server1.shutdown();
- }
-
- if (server2 != null) {
- server2.shutdown();
- }
+ // the client connects to both servers and the stats object is shared.
+ Assert.assertEquals(2L, serverStats.getNumConnections());
- if (client != null) {
- client.shutdown();
- }
+ Assert.assertEquals(2L, clientStats.getNumConnections());
+ Assert.assertEquals(0L, clientStats.getNumFailed());
+ Assert.assertEquals(2L, clientStats.getNumSuccessful());
+ Assert.assertEquals(2L, clientStats.getNumRequests());
}
+
+ Assert.assertEquals(0L, clientStats.getNumConnections());
+ Assert.assertEquals(0L, clientStats.getNumFailed());
+ Assert.assertEquals(2L, clientStats.getNumSuccessful());
+ Assert.assertEquals(2L, clientStats.getNumRequests());
}
- /**
- * Initializes a {@link TestServer} with the given port range.
- * @param serverName the name of the server.
- * @param ports a range of ports.
- * @return A test server with the given name.
- */
- private TestServer startServer(String serverName, int... ports) throws Throwable {
- List<Integer> portList = new ArrayList<>(ports.length);
- for (int p : ports) {
- portList.add(p);
+ private static class TestClient extends Client<TestMessage, TestMessage> implements AutoCloseable {
+
+ TestClient(
+ String clientName,
+ int numEventLoopThreads,
+ MessageSerializer<TestMessage, TestMessage> serializer,
+ KvStateRequestStats stats) {
+ super(clientName, numEventLoopThreads, serializer, stats);
}
- final TestServer server = new TestServer(serverName, portList.iterator());
- server.start();
- return server;
+ @Override
+ public void close() throws Exception {
+ shutdown().join();
+ Assert.assertTrue(isEventGroupShutdown());
+ }
}
/**
* A server that receives a {@link TestMessage test message} and returns another test
* message containing the same string as the request with the name of the server prepended.
*/
- private class TestServer extends AbstractServerBase<TestMessage, TestMessage> {
+ private static class TestServer extends AbstractServerBase<TestMessage, TestMessage> implements AutoCloseable {
- protected TestServer(String name, Iterator<Integer> bindPort) throws UnknownHostException {
+ private final KvStateRequestStats requestStats;
+
+ TestServer(String name, KvStateRequestStats stats, Iterator<Integer> bindPort) throws UnknownHostException {
super(name, InetAddress.getLocalHost(), bindPort, 1, 1);
+ this.requestStats = stats;
}
@Override
@@ -156,7 +164,7 @@ public class AbstractServerTest {
return new AbstractServerHandler<TestMessage, TestMessage>(
this,
new MessageSerializer<>(new TestMessage.TestMessageDeserializer(), new TestMessage.TestMessageDeserializer()),
- new DisabledKvStateRequestStats()) {
+ requestStats) {
@Override
public CompletableFuture<TestMessage> handleRequest(long requestId, TestMessage request) {
@@ -165,11 +173,22 @@ public class AbstractServerTest {
}
@Override
- public void shutdown() {
- // do nothing
+ public CompletableFuture<Void> shutdown() {
+ return CompletableFuture.completedFuture(null);
}
};
}
+
+ @Override
+ public void close() throws Exception {
+ shutdownServer().get();
+ if (requestStats instanceof AtomicKvStateRequestStats) {
+ AtomicKvStateRequestStats stats = (AtomicKvStateRequestStats) requestStats;
+ Assert.assertEquals(0L, stats.getNumConnections());
+ }
+ Assert.assertTrue(getQueryExecutor().isTerminated());
+ Assert.assertTrue(isEventGroupShutdown());
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
index 04cdce7..703ac4e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
@@ -22,14 +22,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
+
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
import scala.concurrent.ExecutionContext;
/**
- * Collection of {@link Executor} implementations
+ * Collection of {@link Executor} implementations.
*/
public class Executors {
@@ -94,48 +93,4 @@ public class Executors {
return this;
}
}
-
- /**
- * Gracefully shutdown the given {@link ExecutorService}. The call waits the given timeout that
- * all ExecutorServices terminate. If the ExecutorServices do not terminate in this time,
- * they will be shut down hard.
- *
- * @param timeout to wait for the termination of all ExecutorServices
- * @param unit of the timeout
- * @param executorServices to shut down
- */
- public static void gracefulShutdown(long timeout, TimeUnit unit, ExecutorService... executorServices) {
- for (ExecutorService executorService: executorServices) {
- executorService.shutdown();
- }
-
- boolean wasInterrupted = false;
- final long endTime = unit.toMillis(timeout) + System.currentTimeMillis();
- long timeLeft = unit.toMillis(timeout);
- boolean hasTimeLeft = timeLeft > 0L;
-
- for (ExecutorService executorService: executorServices) {
- if (wasInterrupted || !hasTimeLeft) {
- executorService.shutdownNow();
- } else {
- try {
- if (!executorService.awaitTermination(timeLeft, TimeUnit.MILLISECONDS)) {
- LOG.warn("ExecutorService did not terminate in time. Shutting it down now.");
- executorService.shutdownNow();
- }
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while shutting down executor services. Shutting all " +
- "remaining ExecutorServices down now.", e);
- executorService.shutdownNow();
-
- wasInterrupted = true;
-
- Thread.currentThread().interrupt();
- }
-
- timeLeft = endTime - System.currentTimeMillis();
- hasTimeLeft = timeLeft > 0L;
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index a24daf0..7258e52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -47,6 +46,7 @@ import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.Preconditions;
import akka.actor.ActorSystem;
@@ -88,7 +88,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
private final MetricRegistryImpl metricRegistry;
- /** Executor used to run future callbacks */
+ /** Executor used to run future callbacks. */
private final ExecutorService executor;
private final TaskExecutor taskManager;
@@ -165,7 +165,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
- Executors.gracefulShutdown(timeout.toMilliseconds(), TimeUnit.MILLISECONDS, executor);
+ ExecutorUtils.gracefulShutdown(timeout.toMilliseconds(), TimeUnit.MILLISECONDS, executor);
if (exception != null) {
throw exception;
http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 7799a78..5f82159 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -45,7 +45,7 @@ import org.apache.flink.runtime.clusterframework.{BootstrapTools, FlinkResourceM
import org.apache.flink.runtime.clusterframework.messages._
import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.concurrent.{FutureUtils, ScheduledExecutorServiceAdapter, Executors => FlinkExecutors}
+import org.apache.flink.runtime.concurrent.{FutureUtils, ScheduledExecutorServiceAdapter}
import org.apache.flink.runtime.execution.SuppressRestartsException
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ResolveOrder
import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager}
@@ -85,7 +85,7 @@ import org.apache.flink.runtime.util._
import org.apache.flink.runtime.webmonitor.retriever.impl.{AkkaJobManagerRetriever, AkkaQueryServiceRetriever}
import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
-import org.apache.flink.util.{FlinkException, InstantiationUtil, NetUtils, SerializedThrowable}
+import org.apache.flink.util._
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -2060,7 +2060,7 @@ object JobManager {
LOG.warn("Could not properly shut down the metric registry.", t)
}
- FlinkExecutors.gracefulShutdown(
+ ExecutorUtils.gracefulShutdown(
timeout.toMillis,
TimeUnit.MILLISECONDS,
futureExecutor,
http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 227b854..5554061 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -32,7 +32,7 @@ import org.apache.flink.configuration._
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.akka.{AkkaJobManagerGateway, AkkaUtils}
import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
-import org.apache.flink.runtime.concurrent.{FutureUtils, ScheduledExecutorServiceAdapter, Executors => FlinkExecutors}
+import org.apache.flink.runtime.concurrent.{FutureUtils, ScheduledExecutorServiceAdapter}
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders
import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils}
import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
@@ -44,7 +44,7 @@ import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegi
import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware}
import org.apache.flink.runtime.webmonitor.retriever.impl.{AkkaJobManagerRetriever, AkkaQueryServiceRetriever}
import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
-import org.apache.flink.util.NetUtils
+import org.apache.flink.util.{ExecutorUtils, NetUtils}
import org.slf4j.LoggerFactory
import scala.concurrent.duration.{Duration, FiniteDuration}
@@ -440,7 +440,7 @@ abstract class FlinkMiniCluster(
isRunning = false
- FlinkExecutors.gracefulShutdown(
+ ExecutorUtils.gracefulShutdown(
timeout.toMillis,
TimeUnit.MILLISECONDS,
futureExecutor,
http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 97942ea..79e38df 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
@@ -64,7 +65,7 @@ public class SlotProtocolTest extends TestLogger {
@AfterClass
public static void afterClass() {
- Executors.gracefulShutdown(timeout, TimeUnit.MILLISECONDS, scheduledExecutorService);
+ ExecutorUtils.gracefulShutdown(timeout, TimeUnit.MILLISECONDS, scheduledExecutorService);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
index 1a8ea84..ecadaa5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
@@ -33,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
@@ -233,7 +233,7 @@ public class ExecutionGraphCacheTest extends TestLogger {
verify(jobManagerGateway, times(1)).requestJob(eq(jobId), any(Time.class));
} finally {
- Executors.gracefulShutdown(5000L, TimeUnit.MILLISECONDS, executor);
+ ExecutorUtils.gracefulShutdown(5000L, TimeUnit.MILLISECONDS, executor);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 3bdc2ac..279981a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -51,6 +51,7 @@ import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
+import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
@@ -472,7 +473,7 @@ public class YarnApplicationMasterRunner {
}
}
- org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(
+ ExecutorUtils.gracefulShutdown(
AkkaUtils.getTimeout(config).toMillis(),
TimeUnit.MILLISECONDS,
futureExecutor,