You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by an...@apache.org on 2022/07/08 16:03:22 UTC

[solr-sandbox] branch crossdc-wip updated: Consumer: Add initial metrics (#29)

This is an automated email from the ASF dual-hosted git repository.

anshum pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/crossdc-wip by this push:
     new 032749f  Consumer: Add initial metrics (#29)
032749f is described below

commit 032749fde2d85ae96298616ab8fbffc0ac1fdce2
Author: Patrik Greco <pg...@apple.com>
AuthorDate: Fri Jul 8 18:03:18 2022 +0200

    Consumer: Add initial metrics (#29)
    
    This change adds initial support for Dropwizard Metrics to the Consumer.
    With this we could allow users to send metrics to a number of reporting
    backends.
---
 crossdc-consumer/build.gradle                      |  1 +
 .../crossdc/consumer/KafkaCrossDcConsumer.java     |  8 +++++++
 .../messageprocessor/SolrMessageProcessor.java     | 25 +++++++++++++++++-----
 3 files changed, 29 insertions(+), 5 deletions(-)

diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/build.gradle
index f9df355..466bbfa 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/build.gradle
@@ -32,6 +32,7 @@ dependencies {
     implementation group: 'org.apache.solr', name: 'solr-solrj', version: '8.11.2'
     implementation project(path: ':crossdc-commons', configuration: 'shadow')
 
+    implementation 'io.dropwizard.metrics:metrics-core:4.2.9'
     implementation 'org.slf4j:slf4j-api:1.7.36'
     implementation 'org.eclipse.jetty:jetty-http:9.4.41.v20210516'
     implementation 'org.eclipse.jetty:jetty-server:9.4.41.v20210516'
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index 4bd8585..95f98cc 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -1,5 +1,7 @@
 package org.apache.solr.crossdc.consumer;
 
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.SharedMetricRegistries;
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.SerializationException;
@@ -26,6 +28,8 @@ import java.util.Properties;
 public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+  private final MetricRegistry metrics = SharedMetricRegistries.getOrCreate("metrics");
+
   private final KafkaConsumer<String, MirroredSolrRequest> consumer;
   private final KafkaMirroringSink kafkaMirroringSink;
 
@@ -134,6 +138,7 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
                 if (log.isTraceEnabled()) {
                   log.trace("result=failed-resubmit");
                 }
+                metrics.counter("failed-resubmit").inc();
                 kafkaMirroringSink.submit(record.value());
                 break;
               case HANDLED:
@@ -141,14 +146,17 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
                 if (log.isTraceEnabled()) {
                   log.trace("result=handled");
                 }
+                metrics.counter("handled").inc();
                 break;
               case NOT_HANDLED_SHUTDOWN:
                 if (log.isTraceEnabled()) {
                   log.trace("result=nothandled_shutdown");
                 }
+                metrics.counter("nothandled_shutdown").inc();
               case FAILED_RETRY:
                 log.error("Unexpected response while processing request. We never expect {}.",
                     result.status().toString());
+                metrics.counter("failed-retry").inc();
                 break;
               default:
                 if (log.isTraceEnabled()) {
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
index a7493aa..b1a428e 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
@@ -16,6 +16,8 @@
  */
 package org.apache.solr.crossdc.messageprocessor;
 
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.SharedMetricRegistries;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -47,6 +49,9 @@ import java.util.concurrent.TimeUnit;
  */
 public class SolrMessageProcessor extends MessageProcessor implements IQueueHandler<MirroredSolrRequest>  {
     private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+    private final MetricRegistry metrics = SharedMetricRegistries.getOrCreate("metrics");
+
     final CloudSolrClient client;
 
     private static final String VERSION_FIELD = "_version_";
@@ -180,9 +185,12 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
         }
 
         if (status != 0) {
+            metrics.counter("processedErrors").inc();
             throw new SolrException(SolrException.ErrorCode.getErrorCode(status), "response=" + response);
         }
 
+        metrics.counter("processed").inc();
+
         result = new Result<>(ResultStatus.HANDLED);
         return result;
     }
@@ -192,13 +200,19 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
             final StringBuilder rmsg = new StringBuilder(64);
             rmsg.append("Submitting update request");
             if(((UpdateRequest) request).getDeleteById() != null) {
-                rmsg.append(" numDeleteByIds=").append(((UpdateRequest) request).getDeleteById().size());
+                final int numDeleteByIds = ((UpdateRequest) request).getDeleteById().size();
+                metrics.counter("numDeleteByIds").inc(numDeleteByIds);
+                rmsg.append(" numDeleteByIds=").append(numDeleteByIds);
             }
             if(((UpdateRequest) request).getDocuments() != null) {
-                rmsg.append(" numUpdates=").append(((UpdateRequest) request).getDocuments().size());
+                final int numUpdates = ((UpdateRequest) request).getDocuments().size();
+                metrics.counter("numUpdates").inc(numUpdates);
+                rmsg.append(" numUpdates=").append(numUpdates);
             }
             if(((UpdateRequest) request).getDeleteQuery() != null) {
-                rmsg.append(" numDeleteByQuery=").append(((UpdateRequest) request).getDeleteQuery().size());
+                final int numDeleteByQuery = ((UpdateRequest) request).getDeleteQuery().size();
+                metrics.counter("numDeleteByQuery").inc(numDeleteByQuery);
+                rmsg.append(" numDeleteByQuery=").append(numDeleteByQuery);
             }
             log.info(rmsg.toString());
         }
@@ -256,8 +270,9 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
         // Only record the latency of the first attempt, essentially measuring the latency from submitting on the
         // primary side until the request is eligible to be consumed on the buddy side (or vice versa).
         if (mirroredSolrRequest.getAttempt() == 1) {
-            log.debug("First attempt latency = {}",
-                    System.currentTimeMillis() - TimeUnit.NANOSECONDS.toMillis(mirroredSolrRequest.getSubmitTimeNanos()));
+            final long latency = System.currentTimeMillis() - TimeUnit.NANOSECONDS.toMillis(mirroredSolrRequest.getSubmitTimeNanos());
+            log.debug("First attempt latency = {}", latency);
+            metrics.timer("latency").update(latency, TimeUnit.MILLISECONDS);
         }
     }