You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/12 00:55:22 UTC
kafka git commit: KAFKA-2807: Move ThroughputThrottler back to tools
jar to fix upgrade tests.
Repository: kafka
Updated Branches:
refs/heads/trunk a8ccdc615 -> c6b8de4e6
KAFKA-2807: Move ThroughputThrottler back to tools jar to fix upgrade tests.
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Reviewers: Gwen Shapira
Closes #499 from ewencp/kafka-2807-relocate-throughput-throttler
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c6b8de4e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c6b8de4e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c6b8de4e
Branch: refs/heads/trunk
Commit: c6b8de4e6806d8f9f4af57e15f2a7f4170265c42
Parents: a8ccdc6
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Wed Nov 11 15:55:12 2015 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Wed Nov 11 15:55:12 2015 -0800
----------------------------------------------------------------------
build.gradle | 45 +++---
.../kafka/common/utils/ThroughputThrottler.java | 141 -------------------
.../connect/tools/VerifiableSourceTask.java | 2 +-
settings.gradle | 2 +-
.../apache/kafka/tools/ProducerPerformance.java | 1 -
.../apache/kafka/tools/ThroughputThrottler.java | 141 +++++++++++++++++++
.../apache/kafka/tools/VerifiableProducer.java | 1 -
7 files changed, 166 insertions(+), 167 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c6b8de4e/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 70fdbcd..0ee6c41 100644
--- a/build.gradle
+++ b/build.gradle
@@ -230,7 +230,7 @@ for ( sv in ['2_10_5', '2_11_7'] ) {
}
}
-def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 'connect:file', 'connect:tools']
+def connectPkgs = ['connect-api', 'connect-runtime', 'connect-json', 'connect-file', 'connect-tools']
def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams'] + connectPkgs
tasks.create(name: "jarConnect", dependsOn: connectPkgs.collect { it + ":jar" }) {}
@@ -321,7 +321,7 @@ project(':core') {
standardOutput = new File('docs/kafka_config.html').newOutputStream()
}
- task siteDocsTar(dependsOn: ['genProducerConfigDocs', 'genConsumerConfigDocs', 'genKafkaConfigDocs', ':connect:runtime:genConnectConfigDocs'], type: Tar) {
+ task siteDocsTar(dependsOn: ['genProducerConfigDocs', 'genConsumerConfigDocs', 'genKafkaConfigDocs', ':connect-runtime:genConnectConfigDocs'], type: Tar) {
classifier = 'site-docs'
compression = Compression.GZIP
from project.file("../docs")
@@ -342,16 +342,16 @@ project(':core') {
from(project.siteDocsTar) { into("site-docs/") }
from(project(':tools').jar) { into("libs/") }
from(project(':tools').configurations.runtime) { into("libs/") }
- from(project(':connect:api').jar) { into("libs/") }
- from(project(':connect:api').configurations.runtime) { into("libs/") }
- from(project(':connect:runtime').jar) { into("libs/") }
- from(project(':connect:runtime').configurations.runtime) { into("libs/") }
- from(project(':connect:json').jar) { into("libs/") }
- from(project(':connect:json').configurations.runtime) { into("libs/") }
- from(project(':connect:file').jar) { into("libs/") }
- from(project(':connect:file').configurations.runtime) { into("libs/") }
- from(project(':connect:tools').jar) { into("libs/") }
- from(project(':connect:tools').configurations.runtime) { into("libs/") }
+ from(project(':connect-api').jar) { into("libs/") }
+ from(project(':connect-api').configurations.runtime) { into("libs/") }
+ from(project(':connect-runtime').jar) { into("libs/") }
+ from(project(':connect-runtime').configurations.runtime) { into("libs/") }
+ from(project(':connect-json').jar) { into("libs/") }
+ from(project(':connect-json').configurations.runtime) { into("libs/") }
+ from(project(':connect-file').jar) { into("libs/") }
+ from(project(':connect-file').configurations.runtime) { into("libs/") }
+ from(project(':connect-tools').jar) { into("libs/") }
+ from(project(':connect-tools').configurations.runtime) { into("libs/") }
}
jar {
@@ -638,7 +638,7 @@ project(':log4j-appender') {
test.dependsOn('checkstyleMain', 'checkstyleTest')
}
-project(':connect:api') {
+project(':connect-api') {
apply plugin: 'checkstyle'
archivesBaseName = "connect-api"
@@ -695,12 +695,12 @@ project(':connect:api') {
test.dependsOn('checkstyleMain', 'checkstyleTest')
}
-project(':connect:json') {
+project(':connect-json') {
apply plugin: 'checkstyle'
archivesBaseName = "connect-json"
dependencies {
- compile project(':connect:api')
+ compile project(':connect-api')
compile "$slf4japi"
compile "com.fasterxml.jackson.core:jackson-databind:$jackson_version"
@@ -756,12 +756,12 @@ project(':connect:json') {
test.dependsOn('checkstyleMain', 'checkstyleTest')
}
-project(':connect:runtime') {
+project(':connect-runtime') {
apply plugin: 'checkstyle'
archivesBaseName = "connect-runtime"
dependencies {
- compile project(':connect:api')
+ compile project(':connect-api')
compile project(':clients')
compile "$slf4japi"
@@ -776,7 +776,7 @@ project(':connect:runtime') {
testCompile "$powermock_easymock"
testCompile project(':clients').sourceSets.test.output
testRuntime "$slf4jlog4j"
- testRuntime project(":connect:json")
+ testRuntime project(":connect-json")
}
task testJar(type: Jar) {
@@ -830,12 +830,12 @@ project(':connect:runtime') {
}
}
-project(':connect:file') {
+project(':connect-file') {
apply plugin: 'checkstyle'
archivesBaseName = "connect-file"
dependencies {
- compile project(':connect:api')
+ compile project(':connect-api')
compile "$slf4japi"
testCompile "$junit"
@@ -890,12 +890,13 @@ project(':connect:file') {
test.dependsOn('checkstyleMain', 'checkstyleTest')
}
-project(':connect:tools') {
+project(':connect-tools') {
apply plugin: 'checkstyle'
archivesBaseName = "connect-tools"
dependencies {
- compile project(':connect:api')
+ compile project(':connect-api')
+ compile project(':tools')
compile "$slf4japi"
compile "com.fasterxml.jackson.core:jackson-databind:$jackson_version"
http://git-wip-us.apache.org/repos/asf/kafka/blob/c6b8de4e/clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java b/clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java
deleted file mode 100644
index 1c63ffb..0000000
--- a/clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.common.utils;
-
-
-/**
- * This class helps producers throttle throughput.
- *
- * If targetThroughput >= 0, the resulting average throughput will be approximately
- * min(targetThroughput, maximumPossibleThroughput). If targetThroughput < 0,
- * no throttling will occur.
- *
- * To use, do this between successive send attempts:
- * <pre>
- * {@code
- * if (throttler.shouldThrottle(...)) {
- * throttler.throttle();
- * }
- * }
- * </pre>
- *
- * Note that this can be used to throttle message throughput or data throughput.
- */
-public class ThroughputThrottler {
-
- private static final long NS_PER_MS = 1000000L;
- private static final long NS_PER_SEC = 1000 * NS_PER_MS;
- private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
-
- long sleepTimeNs;
- long sleepDeficitNs = 0;
- long targetThroughput = -1;
- long startMs;
- private boolean wakeup = false;
-
- /**
- * @param targetThroughput Can be messages/sec or bytes/sec
- * @param startMs When the very first message is sent
- */
- public ThroughputThrottler(long targetThroughput, long startMs) {
- this.startMs = startMs;
- this.targetThroughput = targetThroughput;
- this.sleepTimeNs = targetThroughput > 0 ?
- NS_PER_SEC / targetThroughput :
- Long.MAX_VALUE;
- }
-
- /**
- * @param amountSoFar bytes produced so far if you want to throttle data throughput, or
- * messages produced so far if you want to throttle message throughput.
- * @param sendStartMs timestamp of the most recently sent message
- * @return
- */
- public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
- if (this.targetThroughput < 0) {
- // No throttling in this case
- return false;
- }
-
- float elapsedMs = (sendStartMs - startMs) / 1000.f;
- return elapsedMs > 0 && (amountSoFar / elapsedMs) > this.targetThroughput;
- }
-
- /**
- * Occasionally blocks for small amounts of time to achieve targetThroughput.
- *
- * Note that if targetThroughput is 0, this will block extremely aggressively.
- */
- public void throttle() {
- if (targetThroughput == 0) {
- try {
- synchronized (this) {
- while (!wakeup) {
- this.wait();
- }
- }
- } catch (InterruptedException e) {
- // do nothing
- }
- return;
- }
-
- // throttle throughput by sleeping, on average,
- // (1 / this.throughput) seconds between "things sent"
- sleepDeficitNs += sleepTimeNs;
-
- // If enough sleep deficit has accumulated, sleep a little
- if (sleepDeficitNs >= MIN_SLEEP_NS) {
- long sleepStartNs = System.nanoTime();
- long currentTimeNs = sleepStartNs;
- try {
- synchronized (this) {
- long elapsed = currentTimeNs - sleepStartNs;
- long remaining = sleepDeficitNs - elapsed;
- while (!wakeup && remaining > 0) {
- long sleepMs = remaining / 1000000;
- long sleepNs = remaining - sleepMs * 1000000;
- this.wait(sleepMs, (int) sleepNs);
- elapsed = System.nanoTime() - sleepStartNs;
- remaining = sleepDeficitNs - elapsed;
- }
- wakeup = false;
- }
- sleepDeficitNs = 0;
- } catch (InterruptedException e) {
- // If sleep is cut short, reduce deficit by the amount of
- // time we actually spent sleeping
- long sleepElapsedNs = System.nanoTime() - sleepStartNs;
- if (sleepElapsedNs <= sleepDeficitNs) {
- sleepDeficitNs -= sleepElapsedNs;
- }
- }
- }
- }
-
- /**
- * Wakeup the throttler if its sleeping.
- */
- public void wakeup() {
- synchronized (this) {
- wakeup = true;
- this.notifyAll();
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/kafka/blob/c6b8de4e/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
index 6fee2c4..a85a0e9 100644
--- a/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
+++ b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
@@ -19,11 +19,11 @@ package org.apache.kafka.connect.tools;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.common.utils.ThroughputThrottler;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.tools.ThroughputThrottler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kafka/blob/c6b8de4e/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 2728b5b..d1543c3 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -15,4 +15,4 @@
apply from: file('scala.gradle')
include 'core', 'examples', 'clients', 'tools', 'streams', 'log4j-appender',
- 'connect:api', 'connect:runtime', 'connect:json', 'connect:file', 'connect:tools'
+ 'connect-api', 'connect-runtime', 'connect-json', 'connect-file', 'connect-tools'
http://git-wip-us.apache.org/repos/asf/kafka/blob/c6b8de4e/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
index 2a7f7b1..3a06862 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
@@ -24,7 +24,6 @@ import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.clients.producer.*;
-import org.apache.kafka.common.utils.ThroughputThrottler;
public class ProducerPerformance {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c6b8de4e/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java b/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java
new file mode 100644
index 0000000..a3bcd2f
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.tools;
+
+
+/**
+ * This class helps producers throttle throughput.
+ *
+ * If targetThroughput >= 0, the resulting average throughput will be approximately
+ * min(targetThroughput, maximumPossibleThroughput). If targetThroughput < 0,
+ * no throttling will occur.
+ *
+ * To use, do this between successive send attempts:
+ * <pre>
+ * {@code
+ * if (throttler.shouldThrottle(...)) {
+ * throttler.throttle();
+ * }
+ * }
+ * </pre>
+ *
+ * Note that this can be used to throttle message throughput or data throughput.
+ */
+public class ThroughputThrottler {
+
+ private static final long NS_PER_MS = 1000000L;
+ private static final long NS_PER_SEC = 1000 * NS_PER_MS;
+ private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
+
+ long sleepTimeNs;
+ long sleepDeficitNs = 0;
+ long targetThroughput = -1;
+ long startMs;
+ private boolean wakeup = false;
+
+ /**
+ * @param targetThroughput Can be messages/sec or bytes/sec
+ * @param startMs When the very first message is sent
+ */
+ public ThroughputThrottler(long targetThroughput, long startMs) {
+ this.startMs = startMs;
+ this.targetThroughput = targetThroughput;
+ this.sleepTimeNs = targetThroughput > 0 ?
+ NS_PER_SEC / targetThroughput :
+ Long.MAX_VALUE;
+ }
+
+ /**
+ * @param amountSoFar bytes produced so far if you want to throttle data throughput, or
+ * messages produced so far if you want to throttle message throughput.
+ * @param sendStartMs timestamp of the most recently sent message
+ * @return
+ */
+ public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
+ if (this.targetThroughput < 0) {
+ // No throttling in this case
+ return false;
+ }
+
+ float elapsedMs = (sendStartMs - startMs) / 1000.f;
+ return elapsedMs > 0 && (amountSoFar / elapsedMs) > this.targetThroughput;
+ }
+
+ /**
+ * Occasionally blocks for small amounts of time to achieve targetThroughput.
+ *
+ * Note that if targetThroughput is 0, this will block extremely aggressively.
+ */
+ public void throttle() {
+ if (targetThroughput == 0) {
+ try {
+ synchronized (this) {
+ while (!wakeup) {
+ this.wait();
+ }
+ }
+ } catch (InterruptedException e) {
+ // do nothing
+ }
+ return;
+ }
+
+ // throttle throughput by sleeping, on average,
+ // (1 / this.throughput) seconds between "things sent"
+ sleepDeficitNs += sleepTimeNs;
+
+ // If enough sleep deficit has accumulated, sleep a little
+ if (sleepDeficitNs >= MIN_SLEEP_NS) {
+ long sleepStartNs = System.nanoTime();
+ long currentTimeNs = sleepStartNs;
+ try {
+ synchronized (this) {
+ long elapsed = currentTimeNs - sleepStartNs;
+ long remaining = sleepDeficitNs - elapsed;
+ while (!wakeup && remaining > 0) {
+ long sleepMs = remaining / 1000000;
+ long sleepNs = remaining - sleepMs * 1000000;
+ this.wait(sleepMs, (int) sleepNs);
+ elapsed = System.nanoTime() - sleepStartNs;
+ remaining = sleepDeficitNs - elapsed;
+ }
+ wakeup = false;
+ }
+ sleepDeficitNs = 0;
+ } catch (InterruptedException e) {
+ // If sleep is cut short, reduce deficit by the amount of
+ // time we actually spent sleeping
+ long sleepElapsedNs = System.nanoTime() - sleepStartNs;
+ if (sleepElapsedNs <= sleepDeficitNs) {
+ sleepDeficitNs -= sleepElapsedNs;
+ }
+ }
+ }
+ }
+
+ /**
+ * Wakeup the throttler if its sleeping.
+ */
+ public void wakeup() {
+ synchronized (this) {
+ wakeup = true;
+ this.notifyAll();
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/c6b8de4e/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index e8bd330..0cd90c0 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -41,7 +41,6 @@ import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
-import org.apache.kafka.common.utils.ThroughputThrottler;
/**
* Primarily intended for use with system testing, this producer prints metadata