You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/04/16 19:20:30 UTC

git commit: KAFKA-1359: Ensure all topic/server metrics registered at once.

Repository: kafka
Updated Branches:
  refs/heads/trunk 9a6f7113e -> ec075c5a8


KAFKA-1359: Ensure all topic/server metrics registered at once.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ec075c5a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ec075c5a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ec075c5a

Branch: refs/heads/trunk
Commit: ec075c5a853e4168ff30cf133493588671aa2fac
Parents: 9a6f711
Author: Jay Kreps <ja...@gmail.com>
Authored: Wed Apr 16 10:19:18 2014 -0700
Committer: Jay Kreps <ja...@gmail.com>
Committed: Wed Apr 16 10:19:18 2014 -0700

----------------------------------------------------------------------
 .../clients/producer/internals/Sender.java      | 104 +++++++++++--------
 .../apache/kafka/common/network/Selector.java   |  90 ++++++++++------
 2 files changed, 118 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ec075c5a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 855ae84..9f2b2e9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -774,27 +774,35 @@ public class Sender implements Runnable {
 
         public SenderMetrics(Metrics metrics) {
             this.metrics = metrics;
+
             this.batchSizeSensor = metrics.sensor("batch-size");
+            this.batchSizeSensor.add("batch-size-avg", "The average number of bytes sent per partition per-request.", new Avg());
+
             this.queueTimeSensor = metrics.sensor("queue-time");
+            this.queueTimeSensor.add("record-queue-time-avg",
+                "The average time in ms record batches spent in the record accumulator.",
+                new Avg());
+            this.queueTimeSensor.add("record-queue-time-max",
+                "The maximum time in ms record batches spent in the record accumulator.",
+                new Max());
+
             this.requestTimeSensor = metrics.sensor("request-time");
-            this.recordsPerRequestSensor = metrics.sensor("records-per-request");
-            this.retrySensor = metrics.sensor("record-retries");
-            this.errorSensor = metrics.sensor("errors");
-            this.maxRecordSizeSensor = metrics.sensor("record-size-max");
-            this.batchSizeSensor.add("batch-size-avg", "The average number of bytes sent per partition per-request.", new Avg());
-            this.retrySensor.add("record-retry-rate", "The average per-second number of retried record sends", new Rate());
             this.requestTimeSensor.add("request-latency-avg", "The average request latency in ms", new Avg());
             this.requestTimeSensor.add("request-latency-max", "The maximum request latency in ms", new Max());
-            this.queueTimeSensor.add("record-queue-time-avg",
-                                     "The average time in ms record batches spent in the record accumulator.",
-                                     new Avg());
-            this.queueTimeSensor.add("record-queue-time-max",
-                                     "The maximum time in ms record batches spent in the record accumulator.",
-                                     new Max());
-            this.errorSensor.add("record-error-rate", "The average per-second number of record sends that resulted in errors", new Rate());
+
+            this.recordsPerRequestSensor = metrics.sensor("records-per-request");
             this.recordsPerRequestSensor.add("record-send-rate", "The average number of records sent per second.", new Rate());
             this.recordsPerRequestSensor.add("records-per-request-avg", "The average number of records per request.", new Avg());
+
+            this.retrySensor = metrics.sensor("record-retries");
+            this.retrySensor.add("record-retry-rate", "The average per-second number of retried record sends", new Rate());
+
+            this.errorSensor = metrics.sensor("errors");
+            this.errorSensor.add("record-error-rate", "The average per-second number of record sends that resulted in errors", new Rate());
+
+            this.maxRecordSizeSensor = metrics.sensor("record-size-max");
             this.maxRecordSizeSensor.add("record-size-max", "The maximum record size", new Max());
+
             this.metrics.addMetric("requests-in-flight", "The current number of in-flight requests awaiting a response.", new Measurable() {
                 public double measure(MetricConfig config, long now) {
                     return inFlightRequests.totalInFlightRequests();
@@ -807,32 +815,53 @@ public class Sender implements Runnable {
             });
         }
 
+        public void maybeRegisterTopicMetrics(String topic) {
+            // if one sensor of the metrics has been registered for the topic,
+            // then all other sensors should have been registered; and vice versa
+            String topicRecordsCountName = "topic." + topic + ".records-per-batch";
+            Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName);
+            if (topicRecordCount == null) {
+                topicRecordCount = this.metrics.sensor(topicRecordsCountName);
+                topicRecordCount.add("topic." + topic + ".record-send-rate", new Rate());
+
+                String topicByteRateName = "topic." + topic + ".bytes";
+                Sensor topicByteRate = this.metrics.sensor(topicByteRateName);
+                topicByteRate.add("topic." + topic + ".byte-rate", new Rate());
+
+                String topicRetryName = "topic." + topic + ".record-retries";
+                Sensor topicRetrySensor = this.metrics.sensor(topicRetryName);
+                topicRetrySensor.add("topic." + topic + ".record-retry-rate", new Rate());
+
+                String topicErrorName = "topic." + topic + ".record-errors";
+                Sensor topicErrorSensor = this.metrics.sensor(topicErrorName);
+                topicErrorSensor.add("topic." + topic + ".record-error-rate", new Rate());
+            }
+        }
+
         public void updateProduceRequestMetrics(List<InFlightRequest> requests) {
             long ns = time.nanoseconds();
             for (int i = 0; i < requests.size(); i++) {
                 InFlightRequest request = requests.get(i);
                 int records = 0;
+
                 if (request.batches != null) {
                     for (RecordBatch batch : request.batches.values()) {
 
-                        // per-topic record count
-                        String topicRecordsCountName = "topic." + batch.topicPartition.topic() + ".records-per-batch";
-                        Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName);
-                        if (topicRecordCount == null) {
-                            topicRecordCount = this.metrics.sensor(topicRecordsCountName);
-                            topicRecordCount.add("topic." + batch.topicPartition.topic() + ".record-send-rate", new Rate());
-                        }
+                        // register all per-topic metrics at once
+                        String topic = batch.topicPartition.topic();
+                        maybeRegisterTopicMetrics(topic);
+
+                        // per-topic record send rate
+                        String topicRecordsCountName = "topic." + topic + ".records-per-batch";
+                        Sensor topicRecordCount = Utils.notNull(this.metrics.getSensor(topicRecordsCountName));
                         topicRecordCount.record(batch.recordCount);
 
-                        // per-topic bytes-per-second
-                        String topicByteRateName = "topic." + batch.topicPartition.topic() + ".bytes";
-                        Sensor topicByteRate = this.metrics.getSensor(topicByteRateName);
-                        if (topicByteRate == null) {
-                            topicByteRate = this.metrics.sensor(topicByteRateName);
-                            topicByteRate.add("topic." + batch.topicPartition.topic() + ".byte-rate", new Rate());
-                        }
+                        // per-topic bytes send rate
+                        String topicByteRateName = "topic." + topic + ".bytes";
+                        Sensor topicByteRate = Utils.notNull(this.metrics.getSensor(topicByteRateName));
                         topicByteRate.record(batch.records.sizeInBytes());
 
+                        // global metrics
                         this.batchSizeSensor.record(batch.records.sizeInBytes(), ns);
                         this.queueTimeSensor.record(batch.drained - batch.created, ns);
                         this.maxRecordSizeSensor.record(batch.maxRecordSize, ns);
@@ -847,35 +876,22 @@ public class Sender implements Runnable {
             this.retrySensor.record(count);
             String topicRetryName = "topic." + topic + ".record-retries";
             Sensor topicRetrySensor = this.metrics.getSensor(topicRetryName);
-            if (topicRetrySensor == null) {
-                topicRetrySensor = this.metrics.sensor(topicRetryName);
-                topicRetrySensor.add("topic." + topic + ".record-retry-rate", new Rate());
-            }
-            topicRetrySensor.record(count);
+            if (topicRetrySensor != null) topicRetrySensor.record(count);
         }
 
         public void recordErrors(String topic, int count) {
             this.errorSensor.record(count);
             String topicErrorName = "topic." + topic + ".record-errors";
             Sensor topicErrorSensor = this.metrics.getSensor(topicErrorName);
-            if (topicErrorSensor == null) {
-                topicErrorSensor = this.metrics.sensor(topicErrorName);
-                topicErrorSensor.add("topic." + topic + ".record-error-rate", new Rate());
-            }
-            topicErrorSensor.record(count);
+            if (topicErrorSensor != null) topicErrorSensor.record(count);
         }
 
         public void recordLatency(int node, long latency, long nowNs) {
             this.requestTimeSensor.record(latency, nowNs);
             if (node >= 0) {
-                String nodeTimeName = "server." + node + ".latency";
+                String nodeTimeName = "node-" + node + ".latency";
                 Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName);
-                if (nodeRequestTime == null) {
-                    nodeRequestTime = this.metrics.sensor(nodeTimeName);
-                    nodeRequestTime.add("node-" + node + ".latency-avg", new Avg());
-                    nodeRequestTime.add("node-" + node + ".latency-max", new Max());
-                }
-                nodeRequestTime.record(latency, nowNs);
+                if (nodeRequestTime != null) nodeRequestTime.record(latency, nowNs);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec075c5a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 558f8b4..6027cb2 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -221,6 +221,10 @@ public class Selector implements Selectable {
 
                 Transmissions transmissions = transmissions(key);
                 SocketChannel channel = channel(key);
+
+                // register all per-broker metrics at once
+                sensors.maybeRegisterNodeMetrics(transmissions.id);
+
                 try {
                     /* complete any connections that have finished their handshake */
                     if (key.isConnectable()) {
@@ -401,33 +405,41 @@ public class Selector implements Selectable {
 
         public SelectorMetrics(Metrics metrics) {
             this.metrics = metrics;
+
             this.connectionClosed = this.metrics.sensor("connections-closed");
+            this.connectionClosed.add("connection-close-rate", "Connections closed per second in the window.", new Rate());
+
             this.connectionCreated = this.metrics.sensor("connections-created");
+            this.connectionCreated.add("connection-creation-rate", "New connections established per second in the window.", new Rate());
+
             this.bytesTransferred = this.metrics.sensor("bytes-sent-received");
-            this.bytesSent = this.metrics.sensor("bytes-sent", bytesTransferred);
-            this.bytesReceived = this.metrics.sensor("bytes-received", bytesTransferred);
-            this.selectTime = this.metrics.sensor("select-time");
-            this.ioTime = this.metrics.sensor("io-time");
             bytesTransferred.add("network-io-rate",
-                                 "The average number of network operations (reads or writes) on all connections per second.",
-                                 new Rate(new Count()));
+                "The average number of network operations (reads or writes) on all connections per second.",
+                new Rate(new Count()));
+
+            this.bytesSent = this.metrics.sensor("bytes-sent", bytesTransferred);
             this.bytesSent.add("outgoing-byte-rate", "The average number of outgoing bytes sent per second to all servers.", new Rate());
             this.bytesSent.add("request-rate", "The average number of requests sent per second.", new Rate(new Count()));
             this.bytesSent.add("request-size-avg", "The average size of all requests in the window..", new Avg());
             this.bytesSent.add("request-size-max", "The maximum size of any request sent in the window.", new Max());
+
+            this.bytesReceived = this.metrics.sensor("bytes-received", bytesTransferred);
             this.bytesReceived.add("incoming-byte-rate", "Bytes/second read off all sockets", new Rate());
             this.bytesReceived.add("response-rate", "Responses received sent per second.", new Rate(new Count()));
-            this.connectionCreated.add("connection-creation-rate", "New connections established per second in the window.", new Rate());
-            this.connectionClosed.add("connection-close-rate", "Connections closed per second in the window.", new Rate());
+
+            this.selectTime = this.metrics.sensor("select-time");
             this.selectTime.add("select-rate",
-                                "Number of times the I/O layer checked for new I/O to perform per second",
-                                new Rate(new Count()));
+                "Number of times the I/O layer checked for new I/O to perform per second",
+                new Rate(new Count()));
             this.selectTime.add("io-wait-time-ns-avg",
-                                "The average length of time the I/O thread speant waiting for a socket ready for reads or writes in nanoseconds.",
-                                new Avg());
+                "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.",
+                new Avg());
             this.selectTime.add("io-wait-ratio", "The fraction of time the I/O thread spent waiting.", new Rate(TimeUnit.NANOSECONDS));
+
+            this.ioTime = this.metrics.sensor("io-time");
             this.ioTime.add("io-time-ns-avg", "The average length of time for I/O per select call in nanoseconds.", new Avg());
             this.ioTime.add("io-ratio", "The fraction of time the I/O thread spent doing I/O", new Rate(TimeUnit.NANOSECONDS));
+
             this.metrics.addMetric("connection-count", "The current number of active connections.", new Measurable() {
                 public double measure(MetricConfig config, long now) {
                     return keys.size();
@@ -435,35 +447,49 @@ public class Selector implements Selectable {
             });
         }
 
+        public void maybeRegisterNodeMetrics(int node) {
+            if (node >= 0) {
+                // if one sensor of the metrics has been registered for the node,
+                // then all other sensors should have been registered; and vice versa
+                String nodeRequestName = "node-" + node + ".bytes-sent";
+                Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
+                if (nodeRequest == null) {
+                    nodeRequest = this.metrics.sensor(nodeRequestName);
+                    nodeRequest.add("node-" + node + ".outgoing-byte-rate", new Rate());
+                    nodeRequest.add("node-" + node + ".request-rate", "The average number of requests sent per second.", new Rate(new Count()));
+                    nodeRequest.add("node-" + node + ".request-size-avg", "The average size of all requests in the window..", new Avg());
+                    nodeRequest.add("node-" + node + ".request-size-max", "The maximum size of any request sent in the window.", new Max());
+
+                    String nodeResponseName = "node-" + node + ".bytes-received";
+                    Sensor nodeResponse = this.metrics.sensor(nodeResponseName);
+                    nodeResponse.add("node-" + node + ".incoming-byte-rate", new Rate());
+                    nodeResponse.add("node-" + node + ".response-rate",
+                        "The average number of responses received per second.",
+                        new Rate(new Count()));
+
+                    String nodeTimeName = "node-" + node + ".latency";
+                    Sensor nodeRequestTime = this.metrics.sensor(nodeTimeName);
+                    nodeRequestTime.add("node-" + node + ".request-latency-avg", new Avg());
+                    nodeRequestTime.add("node-" + node + ".request-latency-max", new Max());
+                }
+            }
+        }
+
         public void recordBytesSent(int node, int bytes) {
             this.bytesSent.record(bytes);
             if (node >= 0) {
-                String name = "node-" + node + ".bytes-sent";
-                Sensor sensor = this.metrics.getSensor(name);
-                if (sensor == null) {
-                    sensor = this.metrics.sensor(name);
-                    sensor.add("node-" + node + ".outgoing-byte-rate", new Rate());
-                    sensor.add("node-" + node + ".request-rate", "The average number of requests sent per second.", new Rate(new Count()));
-                    sensor.add("node-" + node + ".request-size-avg", "The average size of all requests in the window..", new Avg());
-                    sensor.add("node-" + node + ".request-size-max", "The maximum size of any request sent in the window.", new Max());
-                }
-                sensor.record(bytes);
+                String nodeRequestName = "node-" + node + ".bytes-sent";
+                Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
+                if (nodeRequest != null) nodeRequest.record(bytes);
             }
         }
 
         public void recordBytesReceived(int node, int bytes) {
             this.bytesReceived.record(bytes);
             if (node >= 0) {
-                String name = "node-" + node + ".bytes-received";
-                Sensor sensor = this.metrics.getSensor(name);
-                if (sensor == null) {
-                    sensor = this.metrics.sensor(name);
-                    sensor.add("node-" + node + ".incoming-byte-rate", new Rate());
-                    sensor.add("node-" + node + ".response-rate",
-                               "The average number of responses received per second.",
-                               new Rate(new Count()));
-                }
-                sensor.record(bytes);
+                String nodeRequestName = "node-" + node + ".bytes-received";
+                Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
+                if (nodeRequest != null) nodeRequest.record(bytes);
             }
         }
     }