You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/03/02 06:55:36 UTC

flink git commit: [FLINK-4422] [kafka] Convert all time interval measurements to System.nanoTime()

Repository: flink
Updated Branches:
  refs/heads/master bf4eed144 -> 51b7ede28


[FLINK-4422] [kafka] Convert all time interval measurements to System.nanoTime()

This closes #3422.
This closes #3421.
This closes #3420.
This closes #3419.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51b7ede2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51b7ede2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51b7ede2

Branch: refs/heads/master
Commit: 51b7ede288e06ccedfbba9f92ac361f28bb51452
Parents: bf4eed1
Author: Jin Mingjian <ji...@gmail.com>
Authored: Mon Feb 27 12:58:44 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Mar 2 14:53:02 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/KafkaTestEnvironmentImpl.java    |  4 ++--
 .../kafka/internals/ClosableBlockingQueue.java        |  8 ++++----
 .../connectors/kafka/internals/KillerWatchDog.java    |  4 ++--
 .../connectors/kafka/KafkaTestEnvironmentImpl.java    |  4 ++--
 .../connectors/kafka/KafkaTestEnvironmentImpl.java    |  4 ++--
 .../connectors/kafka/KafkaConsumerTestBase.java       | 14 +++++++-------
 6 files changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/51b7ede2/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index bc1faaf..d27e53a 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -296,7 +296,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		}
 
 		// validate that the topic has been created
-		final long deadline = System.currentTimeMillis() + 30000;
+		final long deadline = System.nanoTime() + 30_000_000_000L;
 		do {
 			try {
 				if(secureMode) {
@@ -321,7 +321,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			}
 			checkZKConn.close();
 		}
-		while (System.currentTimeMillis() < deadline);
+		while (System.nanoTime() < deadline);
 		fail("Test topic could not be created");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/51b7ede2/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
index 23ff276..e31dcac 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
@@ -355,13 +355,13 @@ public class ClosableBlockingQueue<E> {
 			throw new IllegalArgumentException("invalid timeout");
 		}
 		
-		final long deadline = System.currentTimeMillis() + timeoutMillis;
+		final long deadline = System.nanoTime() + timeoutMillis * 1_000_000L;
 		
 		lock.lock();
 		try {
 			while (open && elements.isEmpty() && timeoutMillis > 0) { 
 				nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS);
-				timeoutMillis = deadline - System.currentTimeMillis();
+				timeoutMillis = (deadline - System.nanoTime()) / 1_000_000L;
 			}
 			
 			if (!open) {
@@ -437,13 +437,13 @@ public class ClosableBlockingQueue<E> {
 			throw new IllegalArgumentException("invalid timeout");
 		}
 
-		final long deadline = System.currentTimeMillis() + timeoutMillis;
+		final long deadline = System.nanoTime() + timeoutMillis * 1_000_000L;
 
 		lock.lock();
 		try {
 			while (open && elements.isEmpty() && timeoutMillis > 0) {
 				nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS);
-				timeoutMillis = deadline - System.currentTimeMillis();
+				timeoutMillis = (deadline - System.nanoTime()) / 1_000_000L;
 			}
 
 			if (!open) {

http://git-wip-us.apache.org/repos/asf/flink/blob/51b7ede2/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
index 4d61e53..574d9f7 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
@@ -42,10 +42,10 @@ class KillerWatchDog extends Thread {
 	@SuppressWarnings("deprecation")
 	@Override
 	public void run() {
-		final long deadline = System.currentTimeMillis() + timeout;
+		final long deadline = System.nanoTime() / 1_000_000L + timeout;
 		long now;
 
-		while (toKill.isAlive() && (now = System.currentTimeMillis()) < deadline) {
+		while (toKill.isAlive() && (now = (System.nanoTime() / 1_000_000L)) < deadline) {
 			try {
 				toKill.join(deadline - now);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/51b7ede2/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 6c2672a..643ee8e 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -283,7 +283,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		creator.close();
 
 		// validate that the topic has been created
-		final long deadline = System.currentTimeMillis() + 30000;
+		final long deadline = System.nanoTime() + 30_000_000_000L;
 		do {
 			try {
 				Thread.sleep(100);
@@ -296,7 +296,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 				return;
 			}
 		}
-		while (System.currentTimeMillis() < deadline);
+		while (System.nanoTime() < deadline);
 		fail ("Test topic could not be created");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/51b7ede2/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 99c11c4..c9ef6da 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -295,7 +295,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		LOG.info("Topic {} create request is successfully posted", topic);
 
 		// validate that the topic has been created
-		final long deadline = System.currentTimeMillis() + Integer.parseInt(zkTimeout);
+		final long deadline = System.nanoTime() + Integer.parseInt(zkTimeout) * 1_000_000L;
 		do {
 			try {
 				if(secureMode) {
@@ -325,7 +325,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			LOG.info("topic {} has not been created yet. Will check again...", topic);
 			checkZKConn.close();
 		}
-		while (System.currentTimeMillis() < deadline);
+		while (System.nanoTime() < deadline);
 		fail("Test topic could not be created");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/51b7ede2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index cb8b0d0..580c507 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -233,7 +233,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		runner.start();
 
 		final Long l50 = 50L; // the final committed offset in Kafka should be 50
-		final long deadline = 30000 + System.currentTimeMillis();
+		final long deadline = 30_000_000_000L + System.nanoTime();
 
 		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler();
 
@@ -248,7 +248,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 			Thread.sleep(100);
 		}
-		while (System.currentTimeMillis() < deadline);
+		while (System.nanoTime() < deadline);
 
 		// cancel the job
 		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
@@ -406,7 +406,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler();
 
 		final Long l50 = 50L; // the final committed offset in Kafka should be 50
-		final long deadline = 30000 + System.currentTimeMillis();
+		final long deadline = 30_000_000_000L + System.nanoTime();
 		do {
 			Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
 			Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
@@ -418,7 +418,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 			Thread.sleep(100);
 		}
-		while (System.currentTimeMillis() < deadline);
+		while (System.nanoTime() < deadline);
 
 		// cancel the job
 		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
@@ -2018,10 +2018,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			};
 			runner.start();
 			
-			final long deadline = System.currentTimeMillis() + 10000;
+			final long deadline = System.nanoTime() + 10_000_000_000L;
 			long delay;
-			while (runner.isAlive() && (delay = deadline - System.currentTimeMillis()) > 0) {
-				runner.join(delay);
+			while (runner.isAlive() && (delay = deadline - System.nanoTime()) > 0) {
+				runner.join(delay/1_000_000L);
 			}
 			
 			boolean success;