You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/03/28 04:53:54 UTC
[1/2] KAFKA-1251: Add metrics to the producer.
Repository: kafka
Updated Branches:
refs/heads/trunk 9bc47bc13 -> 23d7fc470
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
index 12c9500..a2b7722 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
@@ -70,6 +70,7 @@ public class SenderTest {
REQUEST_TIMEOUT_MS,
SEND_BUFFER_SIZE,
RECEIVE_BUFFER_SIZE,
+ metrics,
time);
@Before
@@ -114,6 +115,7 @@ public class SenderTest {
REQUEST_TIMEOUT_MS,
SEND_BUFFER_SIZE,
RECEIVE_BUFFER_SIZE,
+ new Metrics(),
time);
Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null);
RequestSend request1 = completeSend(sender);
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index fdd8914..9ff73f4 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -1,18 +1,14 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.metrics;
@@ -22,25 +18,16 @@ import static org.junit.Assert.fail;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
-
import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.metrics.JmxReporter;
-import org.apache.kafka.common.metrics.Measurable;
-import org.apache.kafka.common.metrics.MetricConfig;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.metrics.Quota;
-import org.apache.kafka.common.metrics.QuotaViolationException;
-import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.metrics.stats.Percentile;
import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Total;
-import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing;
import org.apache.kafka.common.utils.MockTime;
import org.junit.Test;
@@ -154,9 +141,10 @@ public class MetricsTest {
public void testOldDataHasNoEffect() {
Max max = new Max();
long windowMs = 100;
- MetricConfig config = new MetricConfig().timeWindow(windowMs, TimeUnit.MILLISECONDS);
+ int samples = 2;
+ MetricConfig config = new MetricConfig().timeWindow(windowMs, TimeUnit.MILLISECONDS).samples(samples);
max.record(config, 50, time.nanoseconds());
- time.sleep(windowMs);
+ time.sleep(samples * windowMs);
assertEquals(Double.NEGATIVE_INFINITY, max.measure(config, time.nanoseconds()), EPS);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 90e2dcf..99856e9 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -1,18 +1,14 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.network;
@@ -27,16 +23,12 @@ import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
-import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-
-import org.apache.kafka.common.network.NetworkReceive;
-import org.apache.kafka.common.network.NetworkSend;
-import org.apache.kafka.common.network.Selectable;
-import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@@ -58,7 +50,7 @@ public class SelectorTest {
public void setup() throws Exception {
this.server = new EchoServer();
this.server.start();
- this.selector = new Selector();
+ this.selector = new Selector(new Metrics(), new MockTime());
}
@After
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MetricsBench.java b/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
index 7239b4a..9d98c11 100644
--- a/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
+++ b/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
@@ -1,18 +1,14 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.test;
@@ -27,7 +23,6 @@ import org.apache.kafka.common.metrics.stats.Percentile;
import org.apache.kafka.common.metrics.stats.Percentiles;
import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing;
-
public class MetricsBench {
public static void main(String[] args) {
@@ -48,7 +43,7 @@ public class MetricsBench {
}
long start = System.nanoTime();
for (int i = 0; i < iters; i++)
- child.record(i);
+ parent.record(i);
double ellapsed = (System.nanoTime() - start) / (double) iters;
System.out.println(String.format("%.2f ns per metric recording.", ellapsed));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
index 46cf86e..6aab854 100644
--- a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
+++ b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
@@ -1,18 +1,14 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.test;
@@ -24,11 +20,11 @@ import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.common.utils.CopyOnWriteMap;
import org.apache.kafka.common.utils.SystemTime;
-
public class Microbenchmarks {
public static void main(String[] args) throws Exception {
@@ -112,6 +108,43 @@ public class Microbenchmarks {
t1.join();
t2.join();
+ System.out.println("Testing locks");
+ done.set(false);
+ final ReentrantLock lock2 = new ReentrantLock();
+ Thread t3 = new Thread() {
+ public void run() {
+ time.sleep(1);
+ int counter = 0;
+ long start = time.nanoseconds();
+ for (int i = 0; i < iters; i++) {
+ lock2.lock();
+ counter++;
+ lock2.unlock();
+ }
+ System.out.println("lock: " + ((System.nanoTime() - start) / iters));
+ System.out.println(counter);
+ done.set(true);
+ }
+ };
+
+ Thread t4 = new Thread() {
+ public void run() {
+ int counter = 0;
+ while (!done.get()) {
+ time.sleep(1);
+ lock2.lock();
+ counter++;
+ lock2.unlock();
+ }
+ System.out.println("Counter: " + counter);
+ }
+ };
+
+ t3.start();
+ t4.start();
+ t3.join();
+ t4.join();
+
Map<String, Integer> values = new HashMap<String, Integer>();
for (int i = 0; i < 100; i++)
values.put(Integer.toString(i), i);
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/config/log4j.properties
----------------------------------------------------------------------
diff --git a/config/log4j.properties b/config/log4j.properties
index baa698b..9502254 100644
--- a/config/log4j.properties
+++ b/config/log4j.properties
@@ -41,7 +41,7 @@ log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
-log4j.appender.cleanerAppender.File=log-cleaner.log
+log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
[2/2] git commit: KAFKA-1251: Add metrics to the producer.
Posted by jk...@apache.org.
KAFKA-1251: Add metrics to the producer.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/23d7fc47
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/23d7fc47
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/23d7fc47
Branch: refs/heads/trunk
Commit: 23d7fc470638c4dffa5ca005ef2e3d34c14dc92e
Parents: 9bc47bc
Author: Jay Kreps <ja...@gmail.com>
Authored: Tue Mar 18 17:38:56 2014 -0700
Committer: Jay Kreps <ja...@gmail.com>
Committed: Thu Mar 27 20:52:42 2014 -0700
----------------------------------------------------------------------
.../kafka/clients/producer/KafkaProducer.java | 46 ++--
.../kafka/clients/producer/ProducerConfig.java | 15 +-
.../internals/FutureRecordMetadata.java | 25 +--
.../producer/internals/RecordAccumulator.java | 37 ++--
.../clients/producer/internals/RecordBatch.java | 6 +-
.../clients/producer/internals/Sender.java | 218 ++++++++++++++++---
.../clients/tools/ProducerPerformance.java | 39 ++--
.../apache/kafka/common/metrics/Metrics.java | 61 ++++--
.../org/apache/kafka/common/metrics/Sensor.java | 45 ++--
.../kafka/common/metrics/stats/Percentiles.java | 2 +-
.../apache/kafka/common/metrics/stats/Rate.java | 42 ++--
.../kafka/common/metrics/stats/SampledStat.java | 49 +++--
.../kafka/common/network/ByteBufferSend.java | 30 +--
.../kafka/common/network/NetworkReceive.java | 24 +-
.../apache/kafka/common/network/Selector.java | 109 +++++++++-
.../kafka/common/utils/CopyOnWriteMap.java | 26 +--
.../kafka/clients/producer/MetadataTest.java | 24 +-
.../clients/producer/RecordAccumulatorTest.java | 8 +-
.../kafka/clients/producer/SenderTest.java | 2 +
.../kafka/common/metrics/MetricsTest.java | 40 ++--
.../kafka/common/network/SelectorTest.java | 34 ++-
.../org/apache/kafka/test/MetricsBench.java | 27 +--
.../org/apache/kafka/test/Microbenchmarks.java | 63 ++++--
config/log4j.properties | 2 +-
24 files changed, 643 insertions(+), 331 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 1ff9174..8c1c575 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -39,12 +39,15 @@ import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,6 +73,7 @@ public class KafkaProducer implements Producer {
private final Metrics metrics;
private final Thread ioThread;
private final CompressionType compressionType;
+ private final Sensor errors;
/**
* A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -91,9 +95,14 @@ public class KafkaProducer implements Producer {
private KafkaProducer(ProducerConfig config) {
log.trace("Starting the Kafka producer");
- this.metrics = new Metrics(new MetricConfig(),
- Collections.singletonList((MetricsReporter) new JmxReporter("kafka.producer.")),
- new SystemTime());
+ Time time = new SystemTime();
+ MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES))
+ .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS),
+ TimeUnit.MILLISECONDS);
+ String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
+ String jmxPrefix = "kafka.producer." + (clientId.length() > 0 ? clientId + "." : "");
+ List<MetricsReporter> reporters = Collections.singletonList((MetricsReporter) new JmxReporter(jmxPrefix));
+ this.metrics = new Metrics(metricConfig, reporters, time);
this.partitioner = new Partitioner();
this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
this.metadata = new Metadata(config.getLong(ProducerConfig.METADATA_FETCH_BACKOFF_CONFIG),
@@ -107,13 +116,13 @@ public class KafkaProducer implements Producer {
config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG),
metrics,
- new SystemTime());
+ time);
List<InetSocketAddress> addresses = parseAndValidateAddresses(config.getList(ProducerConfig.BROKER_LIST_CONFIG));
- this.metadata.update(Cluster.bootstrap(addresses), System.currentTimeMillis());
- this.sender = new Sender(new Selector(),
+ this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
+ this.sender = new Sender(new Selector(this.metrics, time),
this.metadata,
this.accumulator,
- config.getString(ProducerConfig.CLIENT_ID_CONFIG),
+ clientId,
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
(short) config.getInt(ProducerConfig.REQUIRED_ACKS_CONFIG),
@@ -121,9 +130,14 @@ public class KafkaProducer implements Producer {
config.getInt(ProducerConfig.REQUEST_TIMEOUT_CONFIG),
config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
+ this.metrics,
new SystemTime());
this.ioThread = new KafkaThread("kafka-producer-network-thread", this.sender, true);
this.ioThread.start();
+
+ this.errors = this.metrics.sensor("errors");
+ this.errors.add("message-error-rate", "The average number of errors per second returned to the client.", new Rate());
+
config.logUnused();
log.debug("Kafka producer started");
}
@@ -223,7 +237,8 @@ public class KafkaProducer implements Producer {
try {
Cluster cluster = metadata.fetch(record.topic(), this.metadataFetchTimeoutMs);
int partition = partitioner.partition(record, cluster);
- ensureValidSize(record.key(), record.value());
+ int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(record.key(), record.value());
+ ensureValidRecordSize(serializedSize);
TopicPartition tp = new TopicPartition(record.topic(), partition);
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), compressionType, callback);
@@ -235,24 +250,25 @@ public class KafkaProducer implements Producer {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
+ this.errors.record();
return new FutureFailure(e);
} catch (InterruptedException e) {
+ this.errors.record();
throw new KafkaException(e);
}
}
/**
- * Check that this key-value pair will have a serialized size small enough
+ * Validate that the record size isn't too large
*/
- private void ensureValidSize(byte[] key, byte[] value) {
- int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
- if (serializedSize > this.maxRequestSize)
- throw new RecordTooLargeException("The message is " + serializedSize
+ private void ensureValidRecordSize(int size) {
+ if (size > this.maxRequestSize)
+ throw new RecordTooLargeException("The message is " + size
+ " bytes when serialized which is larger than the maximum request size you have configured with the "
+ ProducerConfig.MAX_REQUEST_SIZE_CONFIG
+ " configuration.");
- if (serializedSize > this.totalMemorySize)
- throw new RecordTooLargeException("The message is " + serializedSize
+ if (size > this.totalMemorySize)
+ throw new RecordTooLargeException("The message is " + size
+ " bytes when serialized which is larger than the total memory buffer you have configured with the "
+ ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG
+ " configuration.");
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 48706ba..259c14b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -141,6 +141,17 @@ public class ProducerConfig extends AbstractConfig {
public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
/**
+ * The window size for a single metrics sample in ms. Defaults to 30 seconds.
+ */
+ public static final String METRICS_SAMPLE_WINDOW_MS = "metrics.sample.window.ms";
+
+ /**
+ * The number of samples used when reporting metrics. Defaults to two. So by default we use two 30 second windows,
+ * so metrics are computed over up to 60 seconds.
+ */
+ public static final String METRICS_NUM_SAMPLES = "metrics.num.samples";
+
+ /**
* Should we register the Kafka metrics as JMX mbeans?
*/
public static final String ENABLE_JMX_CONFIG = "enable.jmx";
@@ -166,7 +177,9 @@ public class ProducerConfig extends AbstractConfig {
.define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), "")
.define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), "blah blah")
.define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", "blah blah")
- .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "");
+ .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "")
+ .define(METRICS_SAMPLE_WINDOW_MS, Type.LONG, 30000, atLeast(0), "")
+ .define(METRICS_NUM_SAMPLES, Type.INT, 2, atLeast(1), "");
}
ProducerConfig(Map<? extends Object, ? extends Object> props) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
index aec31c3..4a2da41 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
@@ -1,18 +1,14 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.producer.internals;
@@ -23,7 +19,6 @@ import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.RecordMetadata;
-
/**
* The future result of a record send
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 50bf95f..ffd13ff 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -28,8 +28,8 @@ import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.CopyOnWriteMap;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -90,27 +90,32 @@ public final class RecordAccumulator {
}
private void registerMetrics(Metrics metrics) {
- metrics.addMetric("blocked_threads",
- "The number of user threads blocked waiting for buffer memory to enqueue their records",
- new Measurable() {
+ metrics.addMetric("waiting-threads",
+ "The number of user threads blocked waiting for buffer memory to enqueue their records",
+ new Measurable() {
public double measure(MetricConfig config, long now) {
return free.queued();
}
});
- metrics.addMetric("buffer_total_bytes",
- "The total amount of buffer memory that is available (not currently used for buffering records).",
- new Measurable() {
+ metrics.addMetric("buffer-total-bytes",
+ "The maximum amount of buffer memory the client can use (whether or not it is currently used).",
+ new Measurable() {
public double measure(MetricConfig config, long now) {
return free.totalMemory();
}
});
- metrics.addMetric("buffer_available_bytes",
- "The total amount of buffer memory that is available (not currently used for buffering records).",
- new Measurable() {
- public double measure(MetricConfig config, long now) {
- return free.availableMemory();
- }
- });
+ metrics.addMetric("buffer-available-bytes",
+ "The total amount of buffer memory that is not being used (either unallocated or in the free list).",
+ new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return free.availableMemory();
+ }
+ });
+ metrics.addMetric("ready-partitions", "The number of topic-partitions with buffered data ready to be sent.", new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return ready(now).size();
+ }
+ });
}
/**
@@ -226,10 +231,11 @@ public final class RecordAccumulator {
*
* @param partitions The list of partitions to drain
* @param maxSize The maximum number of bytes to drain
+ * @param now The current unix time
* @return A list of {@link RecordBatch} for partitions specified with total size less than the requested maxSize.
* TODO: There may be a starvation issue due to iteration order
*/
- public List<RecordBatch> drain(List<TopicPartition> partitions, int maxSize) {
+ public List<RecordBatch> drain(List<TopicPartition> partitions, int maxSize, long now) {
if (partitions.isEmpty())
return Collections.emptyList();
int size = 0;
@@ -252,6 +258,7 @@ public final class RecordAccumulator {
batch.records.close();
size += batch.records.sizeInBytes();
ready.add(batch);
+ batch.drained = now;
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index 35f1d7a..94157f7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -18,6 +18,7 @@ import java.util.List;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,8 +32,10 @@ public final class RecordBatch {
private static final Logger log = LoggerFactory.getLogger(RecordBatch.class);
public int recordCount = 0;
+ public int maxRecordSize = 0;
public volatile int attempts = 0;
public final long created;
+ public long drained;
public long lastAttempt;
public final MemoryRecords records;
public final TopicPartition topicPartition;
@@ -58,6 +61,7 @@ public final class RecordBatch {
return null;
} else {
this.records.append(0L, key, value);
+ this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
if (callback != null)
thunks.add(new Thunk(callback, future));
@@ -70,7 +74,7 @@ public final class RecordBatch {
* Complete the request
*
* @param baseOffset The base offset of the messages assigned by the server
- * @param exception The exception returned or null if no exception
+ * @param exception The exception that occurred (or null if the request was successful)
*/
public void done(long baseOffset, RuntimeException exception) {
this.produceFuture.done(topicPartition, baseOffset, exception);
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 565331d..d89813e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -24,13 +24,20 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidMetadataException;
-import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Selectable;
@@ -108,6 +115,9 @@ public class Sender implements Runnable {
/* true while the sender thread is still running */
private volatile boolean running;
+ /* metrics */
+ private final SenderMetrics sensors;
+
public Sender(Selectable selector,
Metadata metadata,
RecordAccumulator accumulator,
@@ -119,6 +129,7 @@ public class Sender implements Runnable {
int requestTimeout,
int socketSendBuffer,
int socketReceiveBuffer,
+ Metrics metrics,
Time time) {
this.nodeStates = new NodeStates(reconnectBackoffMs);
this.accumulator = accumulator;
@@ -137,6 +148,7 @@ public class Sender implements Runnable {
this.metadataFetchInProgress = false;
this.time = time;
this.metadataFetchNodeIndex = new Random().nextInt();
+ this.sensors = new SenderMetrics(metrics);
}
/**
@@ -191,8 +203,9 @@ public class Sender implements Runnable {
List<TopicPartition> sendable = processReadyPartitions(cluster, ready, now);
// create produce requests
- List<RecordBatch> batches = this.accumulator.drain(sendable, this.maxRequestSize);
- List<InFlightRequest> requests = collate(cluster, batches);
+ List<RecordBatch> batches = this.accumulator.drain(sendable, this.maxRequestSize, now);
+ List<InFlightRequest> requests = collate(cluster, batches, now);
+ sensors.updateProduceRequestMetrics(requests);
if (ready.size() > 0) {
log.trace("Partitions with complete batches: {}", ready);
@@ -215,8 +228,8 @@ public class Sender implements Runnable {
// handle responses, connections, and disconnections
handleSends(this.selector.completedSends());
- handleResponses(this.selector.completedReceives(), time.milliseconds());
- handleDisconnects(this.selector.disconnected(), time.milliseconds());
+ handleResponses(this.selector.completedReceives(), now);
+ handleDisconnects(this.selector.disconnected(), now);
handleConnects(this.selector.connected());
}
@@ -234,7 +247,7 @@ public class Sender implements Runnable {
if (nodeStates.isConnected(node.id())) {
Set<String> topics = metadata.topics();
this.metadataFetchInProgress = true;
- InFlightRequest metadataRequest = metadataRequest(node.id(), topics);
+ InFlightRequest metadataRequest = metadataRequest(now, node.id(), topics);
log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
sends.add(metadataRequest.request);
this.inFlightRequests.add(metadataRequest);
@@ -349,16 +362,9 @@ public class Sender implements Runnable {
ApiKeys requestKey = ApiKeys.forId(request.request.header().apiKey());
switch (requestKey) {
case PRODUCE:
- for (RecordBatch batch : request.batches.values()) {
- if (canRetry(batch, Errors.NETWORK_EXCEPTION)) {
- log.warn("Destination node disconnected for topic-partition {}, retrying ({} attempts left).",
- batch.topicPartition, this.retries - batch.attempts - 1);
- this.accumulator.reenqueue(batch, now);
- } else {
- batch.done(-1L, new NetworkException("The server disconnected unexpectedly without sending a response."));
- this.accumulator.deallocate(batch);
- }
- }
+ int correlation = request.request.header().correlationId();
+ for (RecordBatch batch : request.batches.values())
+ completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlation, now);
break;
case METADATA:
metadataFetchInProgress = false;
@@ -408,6 +414,7 @@ public class Sender implements Runnable {
* Handle responses from the server
*/
private void handleResponses(List<NetworkReceive> receives, long now) {
+ long ns = time.nanoseconds();
for (NetworkReceive receive : receives) {
int source = receive.source();
InFlightRequest req = inFlightRequests.nextCompleted(source);
@@ -420,12 +427,14 @@ public class Sender implements Runnable {
handleProduceResponse(req, req.request.header(), body, now);
} else if (req.request.header().apiKey() == ApiKeys.METADATA.id) {
log.trace("Received metadata response response from node {} with correlation id {}", source, req.request.header()
- .correlationId());
+ .correlationId());
handleMetadataResponse(req.request.header(), body, now);
} else {
throw new IllegalStateException("Unexpected response type: " + req.request.header().apiKey());
}
+ this.sensors.recordLatency(receive.source(), now - req.created, ns);
}
+
}
private void handleMetadataResponse(RequestHeader header, Struct body, long now) {
@@ -453,21 +462,39 @@ public class Sender implements Runnable {
if (error.exception() instanceof InvalidMetadataException)
metadata.forceUpdate();
RecordBatch batch = request.batches.get(tp);
- if (canRetry(batch, error)) {
- // retry
- log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
- header.correlationId(), batch.topicPartition, this.retries - batch.attempts - 1, error);
- this.accumulator.reenqueue(batch, now);
- } else {
- // tell the user the result of their request
- batch.done(response.baseOffset, error.exception());
- this.accumulator.deallocate(batch);
- }
+ completeBatch(batch, error, response.baseOffset, header.correlationId(), now);
}
}
}
/**
+ * Complete or retry the given batch of records.
+ * @param batch The record batch
+ * @param error The error (or null if none)
+ * @param baseOffset The base offset assigned to the records if successful
+ * @param correlationId The correlation id for the request
+ * @param now The current time stamp
+ */
+ private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long correlationId, long now) {
+ if (error != Errors.NONE && canRetry(batch, error)) {
+ // retry
+ log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
+ correlationId,
+ batch.topicPartition,
+ this.retries - batch.attempts - 1,
+ error);
+ this.accumulator.reenqueue(batch, now);
+ this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount);
+ } else {
+ // tell the user the result of their request
+ batch.done(baseOffset, error.exception());
+ this.accumulator.deallocate(batch);
+ if (error != Errors.NONE)
+ this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
+ }
+ }
+
+ /**
* We can retry a send if the error is transient and the number of attempts taken is fewer than the maximum allowed
*/
private boolean canRetry(RecordBatch batch, Errors error) {
@@ -488,16 +515,16 @@ public class Sender implements Runnable {
/**
* Create a metadata request for the given topics
*/
- private InFlightRequest metadataRequest(int node, Set<String> topics) {
+ private InFlightRequest metadataRequest(long now, int node, Set<String> topics) {
MetadataRequest metadata = new MetadataRequest(new ArrayList<String>(topics));
RequestSend send = new RequestSend(node, header(ApiKeys.METADATA), metadata.toStruct());
- return new InFlightRequest(true, send, null);
+ return new InFlightRequest(now, true, send, null);
}
/**
* Collate the record batches into a list of produce requests on a per-node basis
*/
- private List<InFlightRequest> collate(Cluster cluster, List<RecordBatch> batches) {
+ private List<InFlightRequest> collate(Cluster cluster, List<RecordBatch> batches, long now) {
Map<Integer, List<RecordBatch>> collated = new HashMap<Integer, List<RecordBatch>>();
for (RecordBatch batch : batches) {
Node node = cluster.leaderFor(batch.topicPartition);
@@ -510,14 +537,14 @@ public class Sender implements Runnable {
}
List<InFlightRequest> requests = new ArrayList<InFlightRequest>(collated.size());
for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
- requests.add(produceRequest(entry.getKey(), acks, requestTimeout, entry.getValue()));
+ requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()));
return requests;
}
/**
* Create a produce request from the given record batches
*/
- private InFlightRequest produceRequest(int destination, short acks, int timeout, List<RecordBatch> batches) {
+ private InFlightRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
Map<TopicPartition, RecordBatch> batchesByPartition = new HashMap<TopicPartition, RecordBatch>();
Map<String, List<RecordBatch>> batchesByTopic = new HashMap<String, List<RecordBatch>>();
for (RecordBatch batch : batches) {
@@ -552,7 +579,7 @@ public class Sender implements Runnable {
produce.set("topic_data", topicDatas.toArray());
RequestSend send = new RequestSend(destination, header(ApiKeys.PRODUCE), produce);
- return new InFlightRequest(acks != 0, send, batchesByPartition);
+ return new InFlightRequest(now, acks != 0, send, batchesByPartition);
}
private RequestHeader header(ApiKeys key) {
@@ -641,16 +668,19 @@ public class Sender implements Runnable {
* An request that hasn't been fully processed yet
*/
private static final class InFlightRequest {
+ public long created;
public boolean expectResponse;
public Map<TopicPartition, RecordBatch> batches;
public RequestSend request;
/**
+ * @param created The unix timestamp for the time at which this request was created.
* @param expectResponse Should we expect a response message or is this request complete once it is sent?
* @param request The request
* @param batches The record batches contained in the request if it is a produce request
*/
- public InFlightRequest(boolean expectResponse, RequestSend request, Map<TopicPartition, RecordBatch> batches) {
+ public InFlightRequest(long created, boolean expectResponse, RequestSend request, Map<TopicPartition, RecordBatch> batches) {
+ this.created = created;
this.batches = batches;
this.request = request;
this.expectResponse = expectResponse;
@@ -728,4 +758,124 @@ public class Sender implements Runnable {
}
}
+ /**
+ * A collection of sensors for the sender
+ */
+ private class SenderMetrics {
+
+ private final Metrics metrics;
+ public final Sensor retrySensor;
+ public final Sensor errorSensor;
+ public final Sensor queueTimeSensor;
+ public final Sensor requestTimeSensor;
+ public final Sensor recordsPerRequestSensor;
+ public final Sensor batchSizeSensor;
+ public final Sensor maxRecordSizeSensor;
+
+ public SenderMetrics(Metrics metrics) {
+ this.metrics = metrics;
+ this.batchSizeSensor = metrics.sensor("batch-size");
+ this.queueTimeSensor = metrics.sensor("queue-time");
+ this.requestTimeSensor = metrics.sensor("request-time");
+ this.recordsPerRequestSensor = metrics.sensor("records-per-request");
+ this.retrySensor = metrics.sensor("record-retries");
+ this.errorSensor = metrics.sensor("errors");
+ this.maxRecordSizeSensor = metrics.sensor("record-size-max");
+ this.batchSizeSensor.add("batch-size-avg", "The average number of bytes sent per partition per-request.", new Avg());
+ this.retrySensor.add("record-retry-rate", "The average per-second number of retried record sends", new Rate());
+ this.requestTimeSensor.add("request-latency-avg", "The average request latency in ms", new Avg());
+ this.requestTimeSensor.add("request-latency-max", "The maximum request latency in ms", new Max());
+ this.queueTimeSensor.add("record-queue-time-avg",
+ "The average time in ms record batches spent in the record accumulator.",
+ new Avg());
+ this.queueTimeSensor.add("record-queue-time-max",
+ "The maximum time in ms record batches spent in the record accumulator.",
+ new Max());
+ this.errorSensor.add("record-error-rate", "The average per-second number of record sends that resulted in errors", new Rate());
+ this.recordsPerRequestSensor.add("record-send-rate", "The average number of records sent per second.", new Rate());
+ this.recordsPerRequestSensor.add("records-per-request", "The average number of records per request.", new Avg());
+ this.maxRecordSizeSensor.add("record-size-max", "The maximum record size", new Max());
+ this.metrics.addMetric("requests-in-flight", "The current number of in-flight requests awaiting a response.", new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return inFlightRequests.totalInFlightRequests();
+ }
+ });
+ metrics.addMetric("metadata-age", "The age in seconds of the current producer metadata being used.", new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return (TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS) - metadata.lastUpdate()) / 1000.0;
+ }
+ });
+ }
+
+ public void updateProduceRequestMetrics(List<InFlightRequest> requests) {
+ long ns = time.nanoseconds();
+ for (int i = 0; i < requests.size(); i++) {
+ InFlightRequest request = requests.get(i);
+ int records = 0;
+ if (request.batches != null) {
+ for (RecordBatch batch : request.batches.values()) {
+
+ // per-topic record count
+ String topicRecordsCountName = "topic." + batch.topicPartition.topic() + ".records-per-batch";
+ Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName);
+ if (topicRecordCount == null) {
+ topicRecordCount = this.metrics.sensor(topicRecordsCountName);
+ topicRecordCount.add("topic." + batch.topicPartition.topic() + ".record-send-rate", new Rate());
+ }
+ topicRecordCount.record(batch.recordCount);
+
+ // per-topic bytes-per-second
+ String topicByteRateName = "topic." + batch.topicPartition.topic() + ".bytes";
+ Sensor topicByteRate = this.metrics.getSensor(topicByteRateName);
+ if (topicByteRate == null) {
+ topicByteRate = this.metrics.sensor(topicByteRateName);
+ topicByteRate.add("topic." + batch.topicPartition.topic() + ".byte-rate", new Rate());
+ }
+ topicByteRate.record(batch.records.sizeInBytes());
+
+ this.batchSizeSensor.record(batch.records.sizeInBytes(), ns);
+ this.queueTimeSensor.record(batch.drained - batch.created, ns);
+ this.maxRecordSizeSensor.record(batch.maxRecordSize, ns);
+ records += batch.recordCount;
+ }
+ this.recordsPerRequestSensor.record(records, ns);
+ }
+ }
+ }
+
+ public void recordRetries(String topic, int count) {
+ this.retrySensor.record(count);
+ String topicRetryName = "topic." + topic + ".record-retries";
+ Sensor topicRetrySensor = this.metrics.getSensor(topicRetryName);
+ if (topicRetrySensor == null) {
+ topicRetrySensor = this.metrics.sensor(topicRetryName);
+ topicRetrySensor.add("topic." + topic + ".record-retry-rate", new Rate());
+ }
+ topicRetrySensor.record(count);
+ }
+
+ public void recordErrors(String topic, int count) {
+ this.errorSensor.record(count);
+ String topicErrorName = "topic." + topic + ".record-errors";
+ Sensor topicErrorSensor = this.metrics.getSensor(topicErrorName);
+ if (topicErrorSensor == null) {
+ topicErrorSensor = this.metrics.sensor(topicErrorName);
+ topicErrorSensor.add("topic." + topic + ".record-error-rate", new Rate());
+ }
+ topicErrorSensor.record(count);
+ }
+
+ public void recordLatency(int node, long latency, long nowNs) {
+ this.requestTimeSensor.record(latency, nowNs);
+ String nodeTimeName = "server." + node + ".latency";
+ Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName);
+ if (nodeRequestTime == null) {
+ nodeRequestTime = this.metrics.sensor(nodeTimeName);
+ nodeRequestTime.add("node-" + node + ".latency-avg", new Avg());
+ nodeRequestTime.add("node-" + node + ".latency-max", new Max());
+ }
+ nodeRequestTime.record(latency, nowNs);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
index 05085e0..3b3fb2c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
+++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
@@ -1,18 +1,14 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.tools;
@@ -30,7 +26,8 @@ public class ProducerPerformance {
public static void main(String[] args) throws Exception {
if (args.length < 5) {
- System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url topic_name num_records record_size acks [compression_type]");
+ System.err.println("USAGE: java " + ProducerPerformance.class.getName()
+ + " url topic_name num_records record_size acks [compression_type]");
System.exit(1);
}
String url = args[0];
@@ -43,8 +40,8 @@ public class ProducerPerformance {
props.setProperty(ProducerConfig.BROKER_LIST_CONFIG, url);
props.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, Integer.toString(5 * 1000));
props.setProperty(ProducerConfig.REQUEST_TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE));
- props.setProperty(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, Integer.toString(256 * 1024 * 1024));
- props.setProperty(ProducerConfig.MAX_PARTITION_SIZE_CONFIG, Integer.toString(256 * 1024));
+ props.setProperty(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, Integer.toString(64 * 1024 * 1024));
+ props.setProperty(ProducerConfig.MAX_PARTITION_SIZE_CONFIG, Integer.toString(64 * 1024));
if (args.length == 6)
props.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, args[5]);
@@ -65,9 +62,9 @@ public class ProducerPerformance {
for (int i = 0; i < numRecords; i++) {
long sendStart = System.currentTimeMillis();
producer.send(record, callback);
- long sendEllapsed = System.currentTimeMillis() - sendStart;
- maxLatency = Math.max(maxLatency, sendEllapsed);
- totalLatency += sendEllapsed;
+ long sendElapsed = System.currentTimeMillis() - sendStart;
+ maxLatency = Math.max(maxLatency, sendElapsed);
+ totalLatency += sendElapsed;
if (i % reportingInterval == 0) {
System.out.printf("%d max latency = %d ms, avg latency = %.5f\n",
i,
@@ -81,7 +78,7 @@ public class ProducerPerformance {
double msgsSec = 1000.0 * numRecords / (double) ellapsed;
double mbSec = msgsSec * (recordSize + Records.LOG_OVERHEAD) / (1024.0 * 1024.0);
producer.close();
- System.out.printf("%d records sent in %d ms ms. %.2f records per second (%.2f mb/sec).\n", numRecords, ellapsed, msgsSec, mbSec);
+ System.out.printf("%d records sent in %d ms. %.2f records per second (%.2f mb/sec).\n", numRecords, ellapsed, msgsSec, mbSec);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 6db2dfb..49be401 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -1,32 +1,27 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.metrics;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.kafka.common.utils.CopyOnWriteMap;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
-
/**
* A registry of sensors and metrics.
* <p>
@@ -67,7 +62,7 @@ public class Metrics {
* Create a metrics repository with no metric reporters and default configuration.
*/
public Metrics(Time time) {
- this(new MetricConfig(), new ArrayList<MetricsReporter>(), time);
+ this(new MetricConfig(), new ArrayList<MetricsReporter>(0), time);
}
/**
@@ -87,8 +82,8 @@ public class Metrics {
*/
public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time) {
this.config = defaultConfig;
- this.sensors = new ConcurrentHashMap<String, Sensor>();
- this.metrics = new ConcurrentHashMap<String, KafkaMetric>();
+ this.sensors = new CopyOnWriteMap<String, Sensor>();
+ this.metrics = new CopyOnWriteMap<String, KafkaMetric>();
this.reporters = Utils.notNull(reporters);
this.time = time;
for (MetricsReporter reporter : reporters)
@@ -96,8 +91,26 @@ public class Metrics {
}
/**
- * Create a sensor with the given unique name and zero or more parent sensors. All parent sensors will receive every
- * value recorded with this sensor.
+ * Get the sensor with the given name if it exists
+ * @param name The name of the sensor
+ * @return Return the sensor or null if no such sensor exists
+ */
+ public Sensor getSensor(String name) {
+ return this.sensors.get(Utils.notNull(name));
+ }
+
+ /**
+ * Get or create a sensor with the given unique name and no parent sensors.
+ * @param name The sensor name
+ * @return The sensor
+ */
+ public Sensor sensor(String name) {
+ return sensor(name, null, (Sensor[]) null);
+ }
+
+ /**
+ * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
+ * receive every value recorded with this sensor.
* @param name The name of the sensor
* @param parents The parent sensors
* @return The sensor that is created
@@ -107,15 +120,15 @@ public class Metrics {
}
/**
- * Create a sensor with the given unique name and zero or more parent sensors. All parent sensors will receive every
- * value recorded with this sensor.
+ * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
+ * receive every value recorded with this sensor.
* @param name The name of the sensor
* @param config A default configuration to use for this sensor for metrics that don't have their own config
* @param parents The parent sensors
* @return The sensor that is created
*/
public synchronized Sensor sensor(String name, MetricConfig config, Sensor... parents) {
- Sensor s = this.sensors.get(Utils.notNull(name));
+ Sensor s = getSensor(name);
if (s == null) {
s = new Sensor(this, name, parents, config == null ? this.config : config, time);
this.sensors.put(name, s);
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index 7e4849b..d68349b 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -1,18 +1,14 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.metrics;
@@ -26,7 +22,6 @@ import org.apache.kafka.common.metrics.CompoundStat.NamedMeasurable;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
-
/**
* A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on
* message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set
@@ -46,7 +41,7 @@ public final class Sensor {
super();
this.registry = registry;
this.name = Utils.notNull(name);
- this.parents = parents;
+ this.parents = parents == null ? new Sensor[0] : parents;
this.metrics = new ArrayList<KafkaMetric>();
this.stats = new ArrayList<Stat>();
this.config = config;
@@ -86,27 +81,39 @@ public final class Sensor {
record(value, time.nanoseconds());
}
- private void record(double value, long time) {
+ /**
+ * Record a value at a known time. This method is slightly faster than {@link #record(double)} since it will reuse
+ * the time stamp.
+ * @param value The value we are recording
+ * @param time The time in nanoseconds
+ * @throws QuotaViolationException if recording this value moves a metric beyond its configured maximum or minimum
+ * bound
+ */
+ public void record(double value, long time) {
synchronized (this) {
// increment all the stats
for (int i = 0; i < this.stats.size(); i++)
this.stats.get(i).record(config, value, time);
checkQuotas(time);
-
}
for (int i = 0; i < parents.length; i++)
parents[i].record(value, time);
}
+ /**
+ * Check if we have violated our quota for any metric that has a configured quota
+ * @param time
+ */
private void checkQuotas(long time) {
for (int i = 0; i < this.metrics.size(); i++) {
KafkaMetric metric = this.metrics.get(i);
MetricConfig config = metric.config();
if (config != null) {
Quota quota = config.quota();
- if (quota != null)
+ if (quota != null) {
if (!quota.acceptable(metric.value(time)))
throw new QuotaViolationException("Metric " + metric.name() + " is in violation of its quota of " + quota.bound());
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
index 4d54916..b47ed88 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
@@ -74,7 +74,7 @@ public class Percentiles extends SampledStat implements CompoundStat {
}
public double value(MetricConfig config, long now, double quantile) {
- timeoutObsoleteSamples(config, now);
+ purgeObsoleteSamples(config, now);
float count = 0.0f;
for (Sample sample : this.samples)
count += sample.eventCount;
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
index 3b0454f..7f5cc53 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
@@ -1,18 +1,14 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.metrics.stats;
@@ -22,12 +18,11 @@ import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;
-
/**
- * The rate of the given quanitity. By default this is the total observed over a set of samples from a sampled statistic
- * divided by the ellapsed time over the sample windows. Alternative {@link SampledStat} implementations can be
- * provided, however, to record the rate of occurences (e.g. the count of values measured over the time interval) or
- * other such values.
+ * The rate of the given quantity. By default this is the total observed over a set of samples from a sampled statistic
+ * divided by the elapsed time over the sample windows. Alternative {@link SampledStat} implementations can be provided,
+ * however, to record the rate of occurrences (e.g. the count of values measured over the time interval) or other such
+ * values.
*/
public class Rate implements MeasurableStat {
@@ -42,6 +37,10 @@ public class Rate implements MeasurableStat {
this(unit, new SampledTotal());
}
+ public Rate(SampledStat stat) {
+ this(TimeUnit.SECONDS, stat);
+ }
+
public Rate(TimeUnit unit, SampledStat stat) {
this.stat = stat;
this.unit = unit;
@@ -58,8 +57,9 @@ public class Rate implements MeasurableStat {
@Override
public double measure(MetricConfig config, long now) {
- double ellapsed = convert(now - stat.oldest().lastWindow);
- return stat.measure(config, now) / ellapsed;
+ double value = stat.measure(config, now);
+ double elapsed = convert(now - stat.oldest(now).lastWindow);
+ return value / elapsed;
}
private double convert(long time) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java
index f8b413a..776f3a1 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java
@@ -1,18 +1,14 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.metrics.stats;
@@ -22,7 +18,6 @@ import java.util.List;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;
-
/**
* A SampledStat records a single scalar value measured over one or more samples. Each sample is recorded over a
* configurable window. The window can be defined by number of events or ellapsed time (or both, if both are given the
@@ -72,7 +67,7 @@ public abstract class SampledStat implements MeasurableStat {
@Override
public double measure(MetricConfig config, long now) {
- timeoutObsoleteSamples(config, now);
+ purgeObsoleteSamples(config, now);
return combine(this.samples, config, now);
}
@@ -82,20 +77,28 @@ public abstract class SampledStat implements MeasurableStat {
return this.samples.get(this.current);
}
- public Sample oldest() {
- return this.samples.get((this.current + 1) % this.samples.size());
+ public Sample oldest(long now) {
+ if (samples.size() == 0)
+ this.samples.add(newSample(now));
+ Sample oldest = this.samples.get(0);
+ for (int i = 1; i < this.samples.size(); i++) {
+ Sample curr = this.samples.get(i);
+ if (curr.lastWindow < oldest.lastWindow)
+ oldest = curr;
+ }
+ return oldest;
}
protected abstract void update(Sample sample, MetricConfig config, double value, long now);
public abstract double combine(List<Sample> samples, MetricConfig config, long now);
- /* Timeout any windows that have expired in the absense of any events */
- protected void timeoutObsoleteSamples(MetricConfig config, long now) {
+ /* Timeout any windows that have expired in the absence of any events */
+ protected void purgeObsoleteSamples(MetricConfig config, long now) {
+ long expireAge = config.samples() * config.timeWindowNs();
for (int i = 0; i < samples.size(); i++) {
- int idx = (this.current + i) % samples.size();
- Sample sample = this.samples.get(idx);
- if (now - sample.lastWindow >= (i + 1) * config.timeWindowNs())
+ Sample sample = this.samples.get(i);
+ if (now - sample.lastWindow >= expireAge)
sample.reset(now);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
index 9305b61..6350424 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
@@ -1,18 +1,14 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.network;
@@ -29,6 +25,7 @@ public class ByteBufferSend implements Send {
private final int destination;
protected final ByteBuffer[] buffers;
private int remaining;
+ private int size;
public ByteBufferSend(int destination, ByteBuffer... buffers) {
super();
@@ -36,6 +33,7 @@ public class ByteBufferSend implements Send {
this.buffers = buffers;
for (int i = 0; i < buffers.length; i++)
remaining += buffers[i].remaining();
+ this.size = remaining;
}
@Override
@@ -58,6 +56,10 @@ public class ByteBufferSend implements Send {
return this.remaining;
}
+ public int size() {
+ return this.size;
+ }
+
@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
long written = channel.write(buffers);
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
index 51d4892..dcc639a 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
@@ -1,18 +1,14 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.network;
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 9839632..02c0606 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -25,8 +25,18 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,21 +78,25 @@ public class Selector implements Selectable {
private final List<NetworkReceive> completedReceives;
private final List<Integer> disconnected;
private final List<Integer> connected;
+ private final Time time;
+ private final SelectorMetrics sensors;
/**
* Create a new selector
*/
- public Selector() {
+ public Selector(Metrics metrics, Time time) {
try {
this.selector = java.nio.channels.Selector.open();
} catch (IOException e) {
throw new KafkaException(e);
}
+ this.time = time;
this.keys = new HashMap<Integer, SelectionKey>();
this.completedSends = new ArrayList<NetworkSend>();
this.completedReceives = new ArrayList<NetworkReceive>();
this.connected = new ArrayList<Integer>();
this.disconnected = new ArrayList<Integer>();
+ this.sensors = new SelectorMetrics(metrics);
}
/**
@@ -192,7 +206,11 @@ public class Selector implements Selectable {
}
/* check ready keys */
+ long startSelect = time.nanoseconds();
int readyKeys = select(timeout);
+ long endSelect = time.nanoseconds();
+ this.sensors.selectTime.record(endSelect - startSelect, endSelect);
+
if (readyKeys > 0) {
Set<SelectionKey> keys = this.selector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
@@ -208,6 +226,7 @@ public class Selector implements Selectable {
channel.finishConnect();
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
this.connected.add(transmissions.id);
+ this.sensors.connectionCreated.record();
}
/* read from any connections that have readable data */
@@ -218,6 +237,7 @@ public class Selector implements Selectable {
if (transmissions.receive.complete()) {
transmissions.receive.payload().rewind();
this.completedReceives.add(transmissions.receive);
+ this.sensors.recordBytesReceived(transmissions.id, transmissions.receive.payload().limit());
transmissions.clearReceive();
}
}
@@ -227,6 +247,7 @@ public class Selector implements Selectable {
transmissions.send.writeTo(channel);
if (transmissions.send.remaining() <= 0) {
this.completedSends.add(transmissions.send);
+ this.sensors.recordBytesSent(transmissions.id, transmissions.send.size());
transmissions.clearSend();
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
}
@@ -241,6 +262,8 @@ public class Selector implements Selectable {
}
}
}
+ long endIo = time.nanoseconds();
+ this.sensors.ioTime.record(endIo - endSelect, endIo);
}
@Override
@@ -309,6 +332,7 @@ public class Selector implements Selectable {
} catch (IOException e) {
log.error("Exception closing connection to node {}:", trans.id, e);
}
+ this.sensors.connectionClosed.record();
}
/**
@@ -364,4 +388,87 @@ public class Selector implements Selectable {
}
}
+ private class SelectorMetrics {
+ private final Metrics metrics;
+ public final Sensor connectionClosed;
+ public final Sensor connectionCreated;
+ public final Sensor bytesTransferred;
+ public final Sensor bytesSent;
+ public final Sensor bytesReceived;
+ public final Sensor selectTime;
+ public final Sensor ioTime;
+
+ public SelectorMetrics(Metrics metrics) {
+ this.metrics = metrics;
+ this.connectionClosed = this.metrics.sensor("connections-closed");
+ this.connectionCreated = this.metrics.sensor("connections-created");
+ this.bytesTransferred = this.metrics.sensor("bytes-sent-received");
+ this.bytesSent = this.metrics.sensor("bytes-sent", bytesTransferred);
+ this.bytesReceived = this.metrics.sensor("bytes-received", bytesTransferred);
+ this.selectTime = this.metrics.sensor("select-time");
+ this.ioTime = this.metrics.sensor("io-time");
+ bytesTransferred.add("network-ops-per-second",
+ "The average number of network operations (reads or writes) on all connections per second.",
+ new Rate(new Count()));
+ this.bytesSent.add("bytes-sent-per-second", "The average number of outgoing bytes sent per second to all servers.", new Rate());
+ this.bytesSent.add("requests-sent-per-second", "The average number of requests sent per second.", new Rate(new Count()));
+ this.bytesSent.add("request-size-avg", "The average size of all requests in the window..", new Avg());
+ this.bytesSent.add("request-size-max", "The maximum size of any request sent in the window.", new Max());
+ this.bytesReceived.add("bytes-received-per-second", "Bytes/second read off all sockets", new Rate());
+ this.bytesReceived.add("responses-received-per-second", "Responses received sent per second.", new Rate(new Count()));
+ this.connectionCreated.add("connections-created-per-second",
+ "New connections established per second in the window.",
+ new Rate());
+ this.connectionClosed.add("connections-closed-per-second", "Connections closed per second in the window.", new Rate());
+ this.selectTime.add("select-calls-per-second",
+ "Number of times the I/O layer checked for new I/O to perform per second",
+ new Rate(new Count()));
+ this.selectTime.add("io-wait-time-avg-ns",
+ "The average length of time the I/O thread speant waiting for a socket ready for reads or writes in nanoseconds.",
+ new Avg());
+ this.selectTime.add("io-wait-percentage", "The fraction of time the I/O thread spent waiting.", new Rate(TimeUnit.NANOSECONDS));
+ this.ioTime.add("io-time-avg-ns", "The average length of time for I/O per select call in nanoseconds.", new Avg());
+ this.ioTime.add("io-percentage", "The fraction of time the I/O thread spent doing I/O", new Rate(TimeUnit.NANOSECONDS));
+ this.metrics.addMetric("connection-count", "The current number of active connections.", new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return keys.size();
+ }
+ });
+ }
+
+ public void recordBytesSent(int node, int bytes) {
+ this.bytesSent.record(bytes);
+ if (node >= 0) {
+ String name = "node-" + node + ".bytes-sent";
+ Sensor sensor = this.metrics.getSensor(name);
+ if (sensor == null) {
+ sensor = this.metrics.sensor(name);
+ sensor.add("node-" + node + ".bytes-sent-per-second", new Rate());
+ sensor.add("node-" + node + ".requests-sent-per-second",
+ "The average number of requests sent per second.",
+ new Rate(new Count()));
+ sensor.add("connection-" + node + ".request-size-avg", "The average size of all requests in the window..", new Avg());
+ sensor.add("node-" + node + ".request-size-max", "The maximum size of any request sent in the window.", new Max());
+ }
+ sensor.record(bytes);
+ }
+ }
+
+ public void recordBytesReceived(int node, int bytes) {
+ this.bytesReceived.record(bytes);
+ if (node >= 0) {
+ String name = "node-" + node + ".bytes-received";
+ Sensor sensor = this.metrics.getSensor(name);
+ if (sensor == null) {
+ sensor = this.metrics.sensor(name);
+ sensor.add("node-" + node + ".bytes-received-per-second", new Rate());
+ sensor.add("node-" + node + ".responses-received-per-second",
+ "The average number of responses received per second.",
+ new Rate(new Count()));
+ }
+ sensor.record(bytes);
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/common/utils/CopyOnWriteMap.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/CopyOnWriteMap.java b/clients/src/main/java/org/apache/kafka/common/utils/CopyOnWriteMap.java
index 187d22f..9c0e81a 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/CopyOnWriteMap.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/CopyOnWriteMap.java
@@ -1,18 +1,14 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.utils;
@@ -24,7 +20,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentMap;
/**
- * A simple read-optimized map implementation that synchronizes only writes and does a fully copy on each modification
+ * A simple read-optimized map implementation that synchronizes only writes and does a full copy on each modification
*/
public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
index 09a5355..8b4ac0f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
@@ -1,18 +1,14 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.producer;
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
index ed56906..f37ab77 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
@@ -52,7 +52,7 @@ public class RecordAccumulatorTest {
}
accum.append(tp, key, value, CompressionType.NONE, null);
assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds()));
- List<RecordBatch> batches = accum.drain(asList(tp), Integer.MAX_VALUE);
+ List<RecordBatch> batches = accum.drain(asList(tp), Integer.MAX_VALUE, 0);
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);
Iterator<LogEntry> iter = batch.records.iterator();
@@ -80,7 +80,7 @@ public class RecordAccumulatorTest {
assertEquals("No partitions should be ready", 0, accum.ready(time.milliseconds()).size());
time.sleep(10);
assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds()));
- List<RecordBatch> batches = accum.drain(asList(tp), Integer.MAX_VALUE);
+ List<RecordBatch> batches = accum.drain(asList(tp), Integer.MAX_VALUE, 0);
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);
Iterator<LogEntry> iter = batch.records.iterator();
@@ -101,7 +101,7 @@ public class RecordAccumulatorTest {
}
assertEquals("Both partitions should be ready", 2, accum.ready(time.milliseconds()).size());
- List<RecordBatch> batches = accum.drain(partitions, 1024);
+ List<RecordBatch> batches = accum.drain(partitions, 1024, 0);
assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size());
}
@@ -131,7 +131,7 @@ public class RecordAccumulatorTest {
long now = time.milliseconds();
while (read < numThreads * msgs) {
List<TopicPartition> tps = accum.ready(now);
- List<RecordBatch> batches = accum.drain(tps, 5 * 1024);
+ List<RecordBatch> batches = accum.drain(tps, 5 * 1024, 0);
for (RecordBatch batch : batches) {
for (LogEntry entry : batch.records)
read++;