You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/03/02 06:55:36 UTC
flink git commit: [FLINK-4422] [kafka] Convert all time interval
measurements to System.nanoTime()
Repository: flink
Updated Branches:
refs/heads/master bf4eed144 -> 51b7ede28
[FLINK-4422] [kafka] Convert all time interval measurements to System.nanoTime()
This closes #3422.
This closes #3421.
This closes #3420.
This closes #3419.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51b7ede2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51b7ede2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51b7ede2
Branch: refs/heads/master
Commit: 51b7ede288e06ccedfbba9f92ac361f28bb51452
Parents: bf4eed1
Author: Jin Mingjian <ji...@gmail.com>
Authored: Mon Feb 27 12:58:44 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Mar 2 14:53:02 2017 +0800
----------------------------------------------------------------------
.../connectors/kafka/KafkaTestEnvironmentImpl.java | 4 ++--
.../kafka/internals/ClosableBlockingQueue.java | 8 ++++----
.../connectors/kafka/internals/KillerWatchDog.java | 4 ++--
.../connectors/kafka/KafkaTestEnvironmentImpl.java | 4 ++--
.../connectors/kafka/KafkaTestEnvironmentImpl.java | 4 ++--
.../connectors/kafka/KafkaConsumerTestBase.java | 14 +++++++-------
6 files changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/51b7ede2/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index bc1faaf..d27e53a 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -296,7 +296,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
// validate that the topic has been created
- final long deadline = System.currentTimeMillis() + 30000;
+ final long deadline = System.nanoTime() + 30_000_000_000L;
do {
try {
if(secureMode) {
@@ -321,7 +321,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
checkZKConn.close();
}
- while (System.currentTimeMillis() < deadline);
+ while (System.nanoTime() < deadline);
fail("Test topic could not be created");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/51b7ede2/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
index 23ff276..e31dcac 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
@@ -355,13 +355,13 @@ public class ClosableBlockingQueue<E> {
throw new IllegalArgumentException("invalid timeout");
}
- final long deadline = System.currentTimeMillis() + timeoutMillis;
+ final long deadline = System.nanoTime() + timeoutMillis * 1_000_000L;
lock.lock();
try {
while (open && elements.isEmpty() && timeoutMillis > 0) {
nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS);
- timeoutMillis = deadline - System.currentTimeMillis();
+ timeoutMillis = (deadline - System.nanoTime()) / 1_000_000L;
}
if (!open) {
@@ -437,13 +437,13 @@ public class ClosableBlockingQueue<E> {
throw new IllegalArgumentException("invalid timeout");
}
- final long deadline = System.currentTimeMillis() + timeoutMillis;
+ final long deadline = System.nanoTime() + timeoutMillis * 1_000_000L;
lock.lock();
try {
while (open && elements.isEmpty() && timeoutMillis > 0) {
nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS);
- timeoutMillis = deadline - System.currentTimeMillis();
+ timeoutMillis = (deadline - System.nanoTime()) / 1_000_000L;
}
if (!open) {
http://git-wip-us.apache.org/repos/asf/flink/blob/51b7ede2/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
index 4d61e53..574d9f7 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
@@ -42,10 +42,10 @@ class KillerWatchDog extends Thread {
@SuppressWarnings("deprecation")
@Override
public void run() {
- final long deadline = System.currentTimeMillis() + timeout;
+ final long deadline = System.nanoTime() / 1_000_000L + timeout;
long now;
- while (toKill.isAlive() && (now = System.currentTimeMillis()) < deadline) {
+ while (toKill.isAlive() && (now = (System.nanoTime() / 1_000_000L)) < deadline) {
try {
toKill.join(deadline - now);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/51b7ede2/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 6c2672a..643ee8e 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -283,7 +283,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
creator.close();
// validate that the topic has been created
- final long deadline = System.currentTimeMillis() + 30000;
+ final long deadline = System.nanoTime() + 30_000_000_000L;
do {
try {
Thread.sleep(100);
@@ -296,7 +296,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
return;
}
}
- while (System.currentTimeMillis() < deadline);
+ while (System.nanoTime() < deadline);
fail ("Test topic could not be created");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/51b7ede2/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 99c11c4..c9ef6da 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -295,7 +295,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
LOG.info("Topic {} create request is successfully posted", topic);
// validate that the topic has been created
- final long deadline = System.currentTimeMillis() + Integer.parseInt(zkTimeout);
+ final long deadline = System.nanoTime() + Integer.parseInt(zkTimeout) * 1_000_000L;
do {
try {
if(secureMode) {
@@ -325,7 +325,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
LOG.info("topic {} has not been created yet. Will check again...", topic);
checkZKConn.close();
}
- while (System.currentTimeMillis() < deadline);
+ while (System.nanoTime() < deadline);
fail("Test topic could not be created");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/51b7ede2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index cb8b0d0..580c507 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -233,7 +233,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
runner.start();
final Long l50 = 50L; // the final committed offset in Kafka should be 50
- final long deadline = 30000 + System.currentTimeMillis();
+ final long deadline = 30_000_000_000L + System.nanoTime();
KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler();
@@ -248,7 +248,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
Thread.sleep(100);
}
- while (System.currentTimeMillis() < deadline);
+ while (System.nanoTime() < deadline);
// cancel the job
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
@@ -406,7 +406,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler();
final Long l50 = 50L; // the final committed offset in Kafka should be 50
- final long deadline = 30000 + System.currentTimeMillis();
+ final long deadline = 30_000_000_000L + System.nanoTime();
do {
Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
@@ -418,7 +418,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
Thread.sleep(100);
}
- while (System.currentTimeMillis() < deadline);
+ while (System.nanoTime() < deadline);
// cancel the job
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
@@ -2018,10 +2018,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
};
runner.start();
- final long deadline = System.currentTimeMillis() + 10000;
+ final long deadline = System.nanoTime() + 10_000_000_000L;
long delay;
- while (runner.isAlive() && (delay = deadline - System.currentTimeMillis()) > 0) {
- runner.join(delay);
+ while (runner.isAlive() && (delay = deadline - System.nanoTime()) > 0) {
+ runner.join(delay/1_000_000L);
}
boolean success;