You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/11/02 10:31:38 UTC

flink git commit: [FLINK-4398] [query] Fix race in KvStateServerHandlerTest

Repository: flink
Updated Branches:
  refs/heads/master 2b369d37f -> b4bf99dfc


[FLINK-4398] [query] Fix race in KvStateServerHandlerTest

Successful request are reported asynchronously after the Netty channel
write has succeeded. The test checked the number of successful requests
too early.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b4bf99df
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b4bf99df
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b4bf99df

Branch: refs/heads/master
Commit: b4bf99dfc47a078ed7aaabe92477ae59039d20b1
Parents: 2b369d3
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Nov 2 11:30:22 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Nov 2 11:30:23 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/query/netty/KvStateServerHandler.java      | 4 +++-
 .../flink/runtime/query/netty/KvStateServerHandlerTest.java  | 8 ++++++++
 2 files changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b4bf99df/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
index 8201708..8542099 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Objects;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * This handler dispatches asynchronous tasks, which query {@link KvState}
@@ -286,7 +287,8 @@ class KvStateServerHandler extends ChannelInboundHandlerAdapter {
 
 			@Override
 			public void operationComplete(ChannelFuture future) throws Exception {
-				long durationMillis = (System.nanoTime() - creationNanos) / 1_000_000;
+				long durationNanos = System.nanoTime() - creationNanos;
+				long durationMillis = TimeUnit.MILLISECONDS.convert(durationNanos, TimeUnit.NANOSECONDS);
 
 				if (future.isSuccess()) {
 					stats.reportSuccessfulRequest(durationMillis);

http://git-wip-us.apache.org/repos/asf/flink/blob/b4bf99df/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
index 463d166..348d4d9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
@@ -52,6 +52,7 @@ import org.junit.Test;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
@@ -150,6 +151,13 @@ public class KvStateServerHandlerTest extends TestLogger {
 		assertEquals(expectedValue, actualValue);
 
 		assertEquals(stats.toString(), 1, stats.getNumRequests());
+
+		// Wait for async successful request report
+		long deadline = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
+		while (stats.getNumSuccessful() != 1 && System.nanoTime() <= deadline) {
+			Thread.sleep(10);
+		}
+
 		assertEquals(stats.toString(), 1, stats.getNumSuccessful());
 	}