You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/04/16 19:20:30 UTC
git commit: KAFKA-1359: Ensure all topic/server metrics registered at
once.
Repository: kafka
Updated Branches:
refs/heads/trunk 9a6f7113e -> ec075c5a8
KAFKA-1359: Ensure all topic/server metrics registered at once.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ec075c5a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ec075c5a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ec075c5a
Branch: refs/heads/trunk
Commit: ec075c5a853e4168ff30cf133493588671aa2fac
Parents: 9a6f711
Author: Jay Kreps <ja...@gmail.com>
Authored: Wed Apr 16 10:19:18 2014 -0700
Committer: Jay Kreps <ja...@gmail.com>
Committed: Wed Apr 16 10:19:18 2014 -0700
----------------------------------------------------------------------
.../clients/producer/internals/Sender.java | 104 +++++++++++--------
.../apache/kafka/common/network/Selector.java | 90 ++++++++++------
2 files changed, 118 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ec075c5a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 855ae84..9f2b2e9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -774,27 +774,35 @@ public class Sender implements Runnable {
public SenderMetrics(Metrics metrics) {
this.metrics = metrics;
+
this.batchSizeSensor = metrics.sensor("batch-size");
+ this.batchSizeSensor.add("batch-size-avg", "The average number of bytes sent per partition per-request.", new Avg());
+
this.queueTimeSensor = metrics.sensor("queue-time");
+ this.queueTimeSensor.add("record-queue-time-avg",
+ "The average time in ms record batches spent in the record accumulator.",
+ new Avg());
+ this.queueTimeSensor.add("record-queue-time-max",
+ "The maximum time in ms record batches spent in the record accumulator.",
+ new Max());
+
this.requestTimeSensor = metrics.sensor("request-time");
- this.recordsPerRequestSensor = metrics.sensor("records-per-request");
- this.retrySensor = metrics.sensor("record-retries");
- this.errorSensor = metrics.sensor("errors");
- this.maxRecordSizeSensor = metrics.sensor("record-size-max");
- this.batchSizeSensor.add("batch-size-avg", "The average number of bytes sent per partition per-request.", new Avg());
- this.retrySensor.add("record-retry-rate", "The average per-second number of retried record sends", new Rate());
this.requestTimeSensor.add("request-latency-avg", "The average request latency in ms", new Avg());
this.requestTimeSensor.add("request-latency-max", "The maximum request latency in ms", new Max());
- this.queueTimeSensor.add("record-queue-time-avg",
- "The average time in ms record batches spent in the record accumulator.",
- new Avg());
- this.queueTimeSensor.add("record-queue-time-max",
- "The maximum time in ms record batches spent in the record accumulator.",
- new Max());
- this.errorSensor.add("record-error-rate", "The average per-second number of record sends that resulted in errors", new Rate());
+
+ this.recordsPerRequestSensor = metrics.sensor("records-per-request");
this.recordsPerRequestSensor.add("record-send-rate", "The average number of records sent per second.", new Rate());
this.recordsPerRequestSensor.add("records-per-request-avg", "The average number of records per request.", new Avg());
+
+ this.retrySensor = metrics.sensor("record-retries");
+ this.retrySensor.add("record-retry-rate", "The average per-second number of retried record sends", new Rate());
+
+ this.errorSensor = metrics.sensor("errors");
+ this.errorSensor.add("record-error-rate", "The average per-second number of record sends that resulted in errors", new Rate());
+
+ this.maxRecordSizeSensor = metrics.sensor("record-size-max");
this.maxRecordSizeSensor.add("record-size-max", "The maximum record size", new Max());
+
this.metrics.addMetric("requests-in-flight", "The current number of in-flight requests awaiting a response.", new Measurable() {
public double measure(MetricConfig config, long now) {
return inFlightRequests.totalInFlightRequests();
@@ -807,32 +815,53 @@ public class Sender implements Runnable {
});
}
+ public void maybeRegisterTopicMetrics(String topic) {
+ // if one sensor of the metrics has been registered for the topic,
+ // then all other sensors should have been registered; and vice versa
+ String topicRecordsCountName = "topic." + topic + ".records-per-batch";
+ Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName);
+ if (topicRecordCount == null) {
+ topicRecordCount = this.metrics.sensor(topicRecordsCountName);
+ topicRecordCount.add("topic." + topic + ".record-send-rate", new Rate());
+
+ String topicByteRateName = "topic." + topic + ".bytes";
+ Sensor topicByteRate = this.metrics.sensor(topicByteRateName);
+ topicByteRate.add("topic." + topic + ".byte-rate", new Rate());
+
+ String topicRetryName = "topic." + topic + ".record-retries";
+ Sensor topicRetrySensor = this.metrics.sensor(topicRetryName);
+ topicRetrySensor.add("topic." + topic + ".record-retry-rate", new Rate());
+
+ String topicErrorName = "topic." + topic + ".record-errors";
+ Sensor topicErrorSensor = this.metrics.sensor(topicErrorName);
+ topicErrorSensor.add("topic." + topic + ".record-error-rate", new Rate());
+ }
+ }
+
public void updateProduceRequestMetrics(List<InFlightRequest> requests) {
long ns = time.nanoseconds();
for (int i = 0; i < requests.size(); i++) {
InFlightRequest request = requests.get(i);
int records = 0;
+
if (request.batches != null) {
for (RecordBatch batch : request.batches.values()) {
- // per-topic record count
- String topicRecordsCountName = "topic." + batch.topicPartition.topic() + ".records-per-batch";
- Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName);
- if (topicRecordCount == null) {
- topicRecordCount = this.metrics.sensor(topicRecordsCountName);
- topicRecordCount.add("topic." + batch.topicPartition.topic() + ".record-send-rate", new Rate());
- }
+ // register all per-topic metrics at once
+ String topic = batch.topicPartition.topic();
+ maybeRegisterTopicMetrics(topic);
+
+ // per-topic record send rate
+ String topicRecordsCountName = "topic." + topic + ".records-per-batch";
+ Sensor topicRecordCount = Utils.notNull(this.metrics.getSensor(topicRecordsCountName));
topicRecordCount.record(batch.recordCount);
- // per-topic bytes-per-second
- String topicByteRateName = "topic." + batch.topicPartition.topic() + ".bytes";
- Sensor topicByteRate = this.metrics.getSensor(topicByteRateName);
- if (topicByteRate == null) {
- topicByteRate = this.metrics.sensor(topicByteRateName);
- topicByteRate.add("topic." + batch.topicPartition.topic() + ".byte-rate", new Rate());
- }
+ // per-topic bytes send rate
+ String topicByteRateName = "topic." + topic + ".bytes";
+ Sensor topicByteRate = Utils.notNull(this.metrics.getSensor(topicByteRateName));
topicByteRate.record(batch.records.sizeInBytes());
+ // global metrics
this.batchSizeSensor.record(batch.records.sizeInBytes(), ns);
this.queueTimeSensor.record(batch.drained - batch.created, ns);
this.maxRecordSizeSensor.record(batch.maxRecordSize, ns);
@@ -847,35 +876,22 @@ public class Sender implements Runnable {
this.retrySensor.record(count);
String topicRetryName = "topic." + topic + ".record-retries";
Sensor topicRetrySensor = this.metrics.getSensor(topicRetryName);
- if (topicRetrySensor == null) {
- topicRetrySensor = this.metrics.sensor(topicRetryName);
- topicRetrySensor.add("topic." + topic + ".record-retry-rate", new Rate());
- }
- topicRetrySensor.record(count);
+ if (topicRetrySensor != null) topicRetrySensor.record(count);
}
public void recordErrors(String topic, int count) {
this.errorSensor.record(count);
String topicErrorName = "topic." + topic + ".record-errors";
Sensor topicErrorSensor = this.metrics.getSensor(topicErrorName);
- if (topicErrorSensor == null) {
- topicErrorSensor = this.metrics.sensor(topicErrorName);
- topicErrorSensor.add("topic." + topic + ".record-error-rate", new Rate());
- }
- topicErrorSensor.record(count);
+ if (topicErrorSensor != null) topicErrorSensor.record(count);
}
public void recordLatency(int node, long latency, long nowNs) {
this.requestTimeSensor.record(latency, nowNs);
if (node >= 0) {
- String nodeTimeName = "server." + node + ".latency";
+ String nodeTimeName = "node-" + node + ".latency";
Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName);
- if (nodeRequestTime == null) {
- nodeRequestTime = this.metrics.sensor(nodeTimeName);
- nodeRequestTime.add("node-" + node + ".latency-avg", new Avg());
- nodeRequestTime.add("node-" + node + ".latency-max", new Max());
- }
- nodeRequestTime.record(latency, nowNs);
+ if (nodeRequestTime != null) nodeRequestTime.record(latency, nowNs);
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ec075c5a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 558f8b4..6027cb2 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -221,6 +221,10 @@ public class Selector implements Selectable {
Transmissions transmissions = transmissions(key);
SocketChannel channel = channel(key);
+
+ // register all per-broker metrics at once
+ sensors.maybeRegisterNodeMetrics(transmissions.id);
+
try {
/* complete any connections that have finished their handshake */
if (key.isConnectable()) {
@@ -401,33 +405,41 @@ public class Selector implements Selectable {
public SelectorMetrics(Metrics metrics) {
this.metrics = metrics;
+
this.connectionClosed = this.metrics.sensor("connections-closed");
+ this.connectionClosed.add("connection-close-rate", "Connections closed per second in the window.", new Rate());
+
this.connectionCreated = this.metrics.sensor("connections-created");
+ this.connectionCreated.add("connection-creation-rate", "New connections established per second in the window.", new Rate());
+
this.bytesTransferred = this.metrics.sensor("bytes-sent-received");
- this.bytesSent = this.metrics.sensor("bytes-sent", bytesTransferred);
- this.bytesReceived = this.metrics.sensor("bytes-received", bytesTransferred);
- this.selectTime = this.metrics.sensor("select-time");
- this.ioTime = this.metrics.sensor("io-time");
bytesTransferred.add("network-io-rate",
- "The average number of network operations (reads or writes) on all connections per second.",
- new Rate(new Count()));
+ "The average number of network operations (reads or writes) on all connections per second.",
+ new Rate(new Count()));
+
+ this.bytesSent = this.metrics.sensor("bytes-sent", bytesTransferred);
this.bytesSent.add("outgoing-byte-rate", "The average number of outgoing bytes sent per second to all servers.", new Rate());
this.bytesSent.add("request-rate", "The average number of requests sent per second.", new Rate(new Count()));
this.bytesSent.add("request-size-avg", "The average size of all requests in the window..", new Avg());
this.bytesSent.add("request-size-max", "The maximum size of any request sent in the window.", new Max());
+
+ this.bytesReceived = this.metrics.sensor("bytes-received", bytesTransferred);
this.bytesReceived.add("incoming-byte-rate", "Bytes/second read off all sockets", new Rate());
this.bytesReceived.add("response-rate", "Responses received sent per second.", new Rate(new Count()));
- this.connectionCreated.add("connection-creation-rate", "New connections established per second in the window.", new Rate());
- this.connectionClosed.add("connection-close-rate", "Connections closed per second in the window.", new Rate());
+
+ this.selectTime = this.metrics.sensor("select-time");
this.selectTime.add("select-rate",
- "Number of times the I/O layer checked for new I/O to perform per second",
- new Rate(new Count()));
+ "Number of times the I/O layer checked for new I/O to perform per second",
+ new Rate(new Count()));
this.selectTime.add("io-wait-time-ns-avg",
- "The average length of time the I/O thread speant waiting for a socket ready for reads or writes in nanoseconds.",
- new Avg());
+ "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.",
+ new Avg());
this.selectTime.add("io-wait-ratio", "The fraction of time the I/O thread spent waiting.", new Rate(TimeUnit.NANOSECONDS));
+
+ this.ioTime = this.metrics.sensor("io-time");
this.ioTime.add("io-time-ns-avg", "The average length of time for I/O per select call in nanoseconds.", new Avg());
this.ioTime.add("io-ratio", "The fraction of time the I/O thread spent doing I/O", new Rate(TimeUnit.NANOSECONDS));
+
this.metrics.addMetric("connection-count", "The current number of active connections.", new Measurable() {
public double measure(MetricConfig config, long now) {
return keys.size();
@@ -435,35 +447,49 @@ public class Selector implements Selectable {
});
}
+ public void maybeRegisterNodeMetrics(int node) {
+ if (node >= 0) {
+ // if one sensor of the metrics has been registered for the node,
+ // then all other sensors should have been registered; and vice versa
+ String nodeRequestName = "node-" + node + ".bytes-sent";
+ Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
+ if (nodeRequest == null) {
+ nodeRequest = this.metrics.sensor(nodeRequestName);
+ nodeRequest.add("node-" + node + ".outgoing-byte-rate", new Rate());
+ nodeRequest.add("node-" + node + ".request-rate", "The average number of requests sent per second.", new Rate(new Count()));
+ nodeRequest.add("node-" + node + ".request-size-avg", "The average size of all requests in the window..", new Avg());
+ nodeRequest.add("node-" + node + ".request-size-max", "The maximum size of any request sent in the window.", new Max());
+
+ String nodeResponseName = "node-" + node + ".bytes-received";
+ Sensor nodeResponse = this.metrics.sensor(nodeResponseName);
+ nodeResponse.add("node-" + node + ".incoming-byte-rate", new Rate());
+ nodeResponse.add("node-" + node + ".response-rate",
+ "The average number of responses received per second.",
+ new Rate(new Count()));
+
+ String nodeTimeName = "node-" + node + ".latency";
+ Sensor nodeRequestTime = this.metrics.sensor(nodeTimeName);
+ nodeRequestTime.add("node-" + node + ".request-latency-avg", new Avg());
+ nodeRequestTime.add("node-" + node + ".request-latency-max", new Max());
+ }
+ }
+ }
+
public void recordBytesSent(int node, int bytes) {
this.bytesSent.record(bytes);
if (node >= 0) {
- String name = "node-" + node + ".bytes-sent";
- Sensor sensor = this.metrics.getSensor(name);
- if (sensor == null) {
- sensor = this.metrics.sensor(name);
- sensor.add("node-" + node + ".outgoing-byte-rate", new Rate());
- sensor.add("node-" + node + ".request-rate", "The average number of requests sent per second.", new Rate(new Count()));
- sensor.add("node-" + node + ".request-size-avg", "The average size of all requests in the window..", new Avg());
- sensor.add("node-" + node + ".request-size-max", "The maximum size of any request sent in the window.", new Max());
- }
- sensor.record(bytes);
+ String nodeRequestName = "node-" + node + ".bytes-sent";
+ Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
+ if (nodeRequest != null) nodeRequest.record(bytes);
}
}
public void recordBytesReceived(int node, int bytes) {
this.bytesReceived.record(bytes);
if (node >= 0) {
- String name = "node-" + node + ".bytes-received";
- Sensor sensor = this.metrics.getSensor(name);
- if (sensor == null) {
- sensor = this.metrics.sensor(name);
- sensor.add("node-" + node + ".incoming-byte-rate", new Rate());
- sensor.add("node-" + node + ".response-rate",
- "The average number of responses received per second.",
- new Rate(new Count()));
- }
- sensor.record(bytes);
+ String nodeRequestName = "node-" + node + ".bytes-received";
+ Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
+ if (nodeRequest != null) nodeRequest.record(bytes);
}
}
}