You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/11/02 10:31:38 UTC
flink git commit: [FLINK-4398] [query] Fix race in
KvStateServerHandlerTest
Repository: flink
Updated Branches:
refs/heads/master 2b369d37f -> b4bf99dfc
[FLINK-4398] [query] Fix race in KvStateServerHandlerTest
Successful request are reported asynchronously after the Netty channel
write has succeeded. The test checked the number of successful requests
too early.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b4bf99df
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b4bf99df
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b4bf99df
Branch: refs/heads/master
Commit: b4bf99dfc47a078ed7aaabe92477ae59039d20b1
Parents: 2b369d3
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Nov 2 11:30:22 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Nov 2 11:30:23 2016 +0100
----------------------------------------------------------------------
.../flink/runtime/query/netty/KvStateServerHandler.java | 4 +++-
.../flink/runtime/query/netty/KvStateServerHandlerTest.java | 8 ++++++++
2 files changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b4bf99df/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
index 8201708..8542099 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* This handler dispatches asynchronous tasks, which query {@link KvState}
@@ -286,7 +287,8 @@ class KvStateServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
- long durationMillis = (System.nanoTime() - creationNanos) / 1_000_000;
+ long durationNanos = System.nanoTime() - creationNanos;
+ long durationMillis = TimeUnit.MILLISECONDS.convert(durationNanos, TimeUnit.NANOSECONDS);
if (future.isSuccess()) {
stats.reportSuccessfulRequest(durationMillis);
http://git-wip-us.apache.org/repos/asf/flink/blob/b4bf99df/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
index 463d166..348d4d9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
@@ -52,6 +52,7 @@ import org.junit.Test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
@@ -150,6 +151,13 @@ public class KvStateServerHandlerTest extends TestLogger {
assertEquals(expectedValue, actualValue);
assertEquals(stats.toString(), 1, stats.getNumRequests());
+
+ // Wait for async successful request report
+ long deadline = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
+ while (stats.getNumSuccessful() != 1 && System.nanoTime() <= deadline) {
+ Thread.sleep(10);
+ }
+
assertEquals(stats.toString(), 1, stats.getNumSuccessful());
}