You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ab...@apache.org on 2023/11/27 11:41:57 UTC

(solr-sandbox) branch main updated: Producer metrics (#87)

This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/main by this push:
     new 47e941a  Producer metrics (#87)
47e941a is described below

commit 47e941aaf61ba73e95294027392913e517c931ed
Author: Marcin Górski <ma...@gmail.com>
AuthorDate: Mon Nov 27 12:41:51 2023 +0100

    Producer metrics (#87)
---
 .../update/processor/MirroringUpdateProcessor.java |  21 ++-
 .../MirroringUpdateRequestProcessorFactory.java    |   5 +-
 .../solr/update/processor/ProducerMetrics.java     |  70 +++++++
 .../crossdc/KafkaRequestMirroringHandlerTest.java  |   2 +-
 .../processor/MirroringUpdateProcessorTest.java    | 205 ++++++++++++---------
 5 files changed, 214 insertions(+), 89 deletions(-)

diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
index 0fcdbd5..c23b670 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
@@ -44,6 +44,7 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
    */
   private final boolean doMirroring;
   final RequestMirroringHandler requestMirroringHandler;
+  final ProducerMetrics producerMetrics;
 
   /**
    * The mirrored request starts as null, gets created and appended to at each process() call,
@@ -75,13 +76,15 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
    */
   private DistributedUpdateProcessor.DistribPhase distribPhase;
 
-  public MirroringUpdateProcessor(final UpdateRequestProcessor next, boolean doMirroring,
+  public MirroringUpdateProcessor(final UpdateRequestProcessor next,
+                                  boolean doMirroring,
       final boolean indexUnmirrorableDocs,
       final boolean mirrorCommits,
       final long maxMirroringBatchSizeBytes,
       final SolrParams mirroredReqParams,
       final DistributedUpdateProcessor.DistribPhase distribPhase,
-      final RequestMirroringHandler requestMirroringHandler) {
+      final RequestMirroringHandler requestMirroringHandler,
+      final ProducerMetrics producerMetrics) {
     super(next);
     this.doMirroring = doMirroring;
     this.indexUnmirrorableDocs = indexUnmirrorableDocs;
@@ -90,7 +93,7 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
     this.mirrorParams = mirroredReqParams;
     this.distribPhase = distribPhase;
     this.requestMirroringHandler = requestMirroringHandler;
-
+    this.producerMetrics = producerMetrics;
     // Find the downstream distributed update processor
 
   }
@@ -107,18 +110,26 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
     doc.removeField(CommonParams.VERSION_FIELD); // strip internal doc version
     final long estimatedDocSizeInBytes = ObjectSizeEstimator.estimate(doc);
     log.info("estimated doc size is {} bytes, max size is {}", estimatedDocSizeInBytes, maxMirroringDocSizeBytes);
+    producerMetrics.getDocumentSize().update(estimatedDocSizeInBytes);
     final boolean tooLargeForKafka = estimatedDocSizeInBytes > maxMirroringDocSizeBytes;
     if (tooLargeForKafka && !indexUnmirrorableDocs) {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Update exceeds the doc-size limit and is unmirrorable. id="
 
           + cmd.getPrintableId() + " doc size=" + estimatedDocSizeInBytes + " maxDocSize=" + maxMirroringDocSizeBytes);
     } else if (tooLargeForKafka) {
+      producerMetrics.getDocumentTooLarge().inc();
       log.warn(
           "Skipping mirroring of doc {} as it exceeds the doc-size limit ({} bytes) and is unmirrorable. doc size={}",
           cmd.getPrintableId(), maxMirroringDocSizeBytes, estimatedDocSizeInBytes);
     }
 
-    super.processAdd(cmd); // let this throw to prevent mirroring invalid reqs
+    try {
+      super.processAdd(cmd); // let this throw to prevent mirroring invalid reqs
+      producerMetrics.getLocal().inc();
+    } catch (IOException exception) {
+      producerMetrics.getLocalError().inc();
+      throw exception;
+    }
 
     // submit only from the leader shards so we mirror each doc once
     boolean isLeader = isLeader(cmd.getReq(),  cmd.getIndexedIdStr(), null, cmd.getSolrInputDocument());
@@ -128,8 +139,10 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
 
       try {
         requestMirroringHandler.mirror(mirrorRequest);
+        producerMetrics.getSubmitted().inc();
       } catch (Exception e) {
         log.error("mirror submit failed", e);
+        producerMetrics.getSubmitError().inc();
         throw new SolrException(SERVER_ERROR, "mirror submit failed", e);
       }
     }
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
index 80ce2a1..250ed26 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -66,7 +66,7 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
 
     /** This is instantiated in inform(SolrCore) and then shared by all processor instances - visible for testing */
     private volatile KafkaRequestMirroringHandler mirroringHandler;
-
+    private volatile ProducerMetrics producerMetrics;
 
     private boolean enabled = true;
 
@@ -190,6 +190,7 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
         Closer closer = new Closer(sink);
         core.addCloseHook(new MyCloseHook(closer));
 
+        producerMetrics = new ProducerMetrics(core.getSolrMetricsContext().getChildContext(this), core);
         mirroringHandler = new KafkaRequestMirroringHandler(sink);
     }
 
@@ -250,7 +251,7 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
         }
 
         return new MirroringUpdateProcessor(next, doMirroring, indexUnmirrorableDocs, mirrorCommits, maxMirroringBatchSizeBytes, mirroredParams,
-                DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)), doMirroring ? mirroringHandler : null);
+                DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)), doMirroring ? mirroringHandler : null, producerMetrics);
     }
 
     public static class NoOpUpdateRequestProcessor extends UpdateRequestProcessor {
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/ProducerMetrics.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/ProducerMetrics.java
new file mode 100644
index 0000000..007fce2
--- /dev/null
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/ProducerMetrics.java
@@ -0,0 +1,70 @@
+package org.apache.solr.update.processor;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Histogram;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.metrics.SolrMetricsContext;
+
+/**
+ * Metrics presented for each SolrCore using `crossdc.producer.` path.
+ */
+public class ProducerMetrics {
+
+    private final Counter local;
+    private final Counter localError;
+    private final Counter submitted;
+    private final Counter submitError;
+    private final Histogram documentSize;
+    private final Counter documentTooLarge;
+
+    public ProducerMetrics(SolrMetricsContext solrMetricsContext, SolrCore solrCore) {
+        this.local = solrMetricsContext.counter(solrCore, "local", "crossdc", "producer");
+        this.localError = solrMetricsContext.counter(solrCore, "local", "crossdc", "producer", "errors");
+        this.submitted = solrMetricsContext.counter(solrCore, "submitted", "crossdc", "producer");
+        this.submitError = solrMetricsContext.counter(solrCore, "submit", "crossdc", "producer", "errors");
+        this.documentSize = solrMetricsContext.histogram(solrCore, "documentSize", "crossdc", "producer");
+        this.documentTooLarge = solrMetricsContext.counter(solrCore, "documentTooLarge", "crossdc", "producer", "errors");
+    }
+
+    /**
+     * Counter representing the number of local documents processed successfully.
+     */
+    public Counter getLocal() {
+        return this.local;
+    }
+
+    /**
+     * Counter representing the number of local documents processed with error.
+     */
+    public Counter getLocalError() {
+        return this.localError;
+    }
+
+    /**
+     * Counter representing the number of documents submitted to the Kafka topic.
+     */
+    public Counter getSubmitted() {
+        return this.submitted;
+    }
+
+    /**
+     * Counter representing the number of documents that were not submitted to the Kafka topic because of exception during execution.
+     */
+    public Counter getSubmitError() {
+        return this.submitError;
+    }
+
+    /**
+     * Histogram of the processed document size.
+     */
+    public Histogram getDocumentSize() {
+        return this.documentSize;
+    }
+
+    /**
+     * Counter representing the number of documents that were too large to send to the Kafka topic.
+     */
+    public Counter getDocumentTooLarge() {
+        return this.documentTooLarge;
+    }
+}
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/KafkaRequestMirroringHandlerTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/KafkaRequestMirroringHandlerTest.java
index 1b2dd12..596020b 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/KafkaRequestMirroringHandlerTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/KafkaRequestMirroringHandlerTest.java
@@ -18,7 +18,7 @@ import static org.mockito.Mockito.verify;
 public class KafkaRequestMirroringHandlerTest {
 
     @Mock
-    KafkaMirroringSink kafkaMirroringSink;
+    private KafkaMirroringSink kafkaMirroringSink;
 
     @Test
     public void testCheckDeadLetterQueueMessageExecution() throws MirroringException {
diff --git a/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java b/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java
index 78abb00..5f81f30 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java
@@ -1,5 +1,7 @@
 package org.apache.solr.update.processor;
 
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Histogram;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
@@ -15,6 +17,7 @@ import org.apache.solr.common.params.UpdateParams;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrCore;
+import org.apache.solr.metrics.SolrMetricsContext;
 import org.apache.solr.request.SolrQueryRequestBase;
 import org.apache.solr.schema.IndexSchema;
 import org.apache.solr.update.AddUpdateCommand;
@@ -23,11 +26,17 @@ import org.apache.solr.update.DeleteUpdateCommand;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.util.Map;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
 
     private UpdateRequestProcessor next;
@@ -40,11 +49,12 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
     UpdateRequest requestMock;
     private UpdateRequestProcessor nextProcessor;
     private SolrCore core;
-    private HttpSolrClient.Builder builder = Mockito.mock(HttpSolrClient.Builder.class);
-    private HttpSolrClient client = Mockito.mock(HttpSolrClient.class);
+    private HttpSolrClient.Builder builder = mock(HttpSolrClient.Builder.class);
+    private HttpSolrClient client = mock(HttpSolrClient.class);
     private CloudDescriptor cloudDesc;
     private ZkStateReader zkStateReader;
     private Replica replica;
+    private ProducerMetrics producerMetrics;
 
     @Before
     public void setUp() throws Exception {
@@ -52,16 +62,16 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
         addUpdateCommand = new AddUpdateCommand(req);
         addUpdateCommand.solrDoc = new SolrInputDocument();
         addUpdateCommand.solrDoc.addField("id", "test");
-        req = Mockito.mock(SolrQueryRequestBase.class);
-        Mockito.when(req.getParams()).thenReturn(new ModifiableSolrParams());
+        req = mock(SolrQueryRequestBase.class);
+        when(req.getParams()).thenReturn(new ModifiableSolrParams());
 
-        requestMock = Mockito.mock(UpdateRequest.class);
+        requestMock = mock(UpdateRequest.class);
         addUpdateCommand.setReq(req);
 
-        nextProcessor = Mockito.mock(UpdateRequestProcessor.class);
+        nextProcessor = mock(UpdateRequestProcessor.class);
 
-        IndexSchema schema = Mockito.mock(IndexSchema.class);
-        Mockito.when(req.getSchema()).thenReturn(schema);
+        IndexSchema schema = mock(IndexSchema.class);
+        when(req.getSchema()).thenReturn(schema);
 
         deleteUpdateCommand = new DeleteUpdateCommand(req);
         deleteUpdateCommand.query = "*:*";
@@ -75,8 +85,36 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
         commitUpdateCommand.openSearcher = true;
         commitUpdateCommand.waitSearcher = true;
 
-        next = Mockito.mock(UpdateRequestProcessor.class);
-        requestMirroringHandler = Mockito.mock(RequestMirroringHandler.class);
+        producerMetrics = spy(new ProducerMetrics(mock(SolrMetricsContext.class), mock(SolrCore.class)) {
+            private final Counter counterMock = mock(Counter.class);
+
+            public Counter getLocal() {
+                return counterMock;
+            }
+
+            public Counter getLocalError() {
+                return counterMock;
+            }
+
+            public Counter getSubmitted() {
+                return counterMock;
+            }
+
+            public Counter getDocumentTooLarge() {
+                return counterMock;
+            }
+
+            public Counter getSubmitError() {
+                return counterMock;
+            }
+
+            public Histogram getDocumentSize() {
+                return mock(Histogram.class);
+            }
+        });
+
+        next = mock(UpdateRequestProcessor.class);
+        requestMirroringHandler = mock(RequestMirroringHandler.class);
         processor =
                 new MirroringUpdateProcessor(
                         next,
@@ -86,47 +124,48 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
                         1000L,
                         new ModifiableSolrParams(),
                         DistributedUpdateProcessor.DistribPhase.NONE,
-                        requestMirroringHandler) {
+                        requestMirroringHandler,
+                        producerMetrics) {
                     UpdateRequest createMirrorRequest() {
                         return requestMock;
                     }
                 };
 
-        core = Mockito.mock(SolrCore.class);
-        CoreDescriptor coreDesc = Mockito.mock(CoreDescriptor.class);
-        cloudDesc = Mockito.mock(CloudDescriptor.class);
-        Mockito.when(cloudDesc.getShardId()).thenReturn("shard1");
-        CoreContainer coreContainer = Mockito.mock(CoreContainer.class);
-        ZkController zkController = Mockito.mock(ZkController.class);
-        ClusterState clusterState = Mockito.mock(ClusterState.class);
-        DocCollection docCollection = Mockito.mock(DocCollection.class);
-        DocRouter docRouter = Mockito.mock(DocRouter.class);
-        Slice slice = Mockito.mock(Slice.class);
-        Mockito.when(slice.getName()).thenReturn("shard1");
-        zkStateReader = Mockito.mock(ZkStateReader.class);
-        replica = Mockito.mock(Replica.class);
-
-        Mockito.when(replica.getName()).thenReturn("replica1");
-        Mockito.when(zkStateReader.getLeaderRetry(Mockito.any(), Mockito.any()))
+        core = mock(SolrCore.class);
+        CoreDescriptor coreDesc = mock(CoreDescriptor.class);
+        cloudDesc = mock(CloudDescriptor.class);
+        when(cloudDesc.getShardId()).thenReturn("shard1");
+        CoreContainer coreContainer = mock(CoreContainer.class);
+        ZkController zkController = mock(ZkController.class);
+        ClusterState clusterState = mock(ClusterState.class);
+        DocCollection docCollection = mock(DocCollection.class);
+        DocRouter docRouter = mock(DocRouter.class);
+        Slice slice = mock(Slice.class);
+        when(slice.getName()).thenReturn("shard1");
+        zkStateReader = mock(ZkStateReader.class);
+        replica = mock(Replica.class);
+
+        when(replica.getName()).thenReturn("replica1");
+        when(zkStateReader.getLeaderRetry(any(), any()))
                 .thenReturn(replica);
-        Mockito.when(zkController.getZkStateReader()).thenReturn(zkStateReader);
-        Mockito.when(coreDesc.getCloudDescriptor()).thenReturn(cloudDesc);
-        Mockito.when(clusterState.getCollection(Mockito.any())).thenReturn(docCollection);
-        Mockito.when(docCollection.getRouter()).thenReturn(docRouter);
-        Mockito.when(
+        when(zkController.getZkStateReader()).thenReturn(zkStateReader);
+        when(coreDesc.getCloudDescriptor()).thenReturn(cloudDesc);
+        when(clusterState.getCollection(any())).thenReturn(docCollection);
+        when(docCollection.getRouter()).thenReturn(docRouter);
+        when(
                         docRouter.getTargetSlice(
-                                Mockito.any(),
-                                Mockito.any(),
-                                Mockito.any(),
-                                Mockito.any(),
-                                Mockito.any()))
+                                any(),
+                                any(),
+                                any(),
+                                any(),
+                                any()))
                 .thenReturn(slice);
-        Mockito.when(docCollection.getSlicesMap()).thenReturn(Map.of("shard1", slice));
-        Mockito.when(zkController.getClusterState()).thenReturn(clusterState);
-        Mockito.when(coreContainer.getZkController()).thenReturn(zkController);
-        Mockito.when(core.getCoreContainer()).thenReturn(coreContainer);
-        Mockito.when(core.getCoreDescriptor()).thenReturn(coreDesc);
-        Mockito.when(req.getCore()).thenReturn(core);
+        when(docCollection.getSlicesMap()).thenReturn(Map.of("shard1", slice));
+        when(zkController.getClusterState()).thenReturn(clusterState);
+        when(coreContainer.getZkController()).thenReturn(zkController);
+        when(core.getCoreContainer()).thenReturn(coreContainer);
+        when(core.getCoreDescriptor()).thenReturn(coreDesc);
+        when(req.getCore()).thenReturn(core);
     }
 
     /**
@@ -137,7 +176,7 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
     public void processDeleteWhenDistribPhaseIsNoneAndDeleteByIdIsFalse() {
         try {
             processor.processDelete(deleteUpdateCommand);
-            Mockito.verify(requestMirroringHandler, Mockito.times(1)).mirror(requestMock);
+            verify(requestMirroringHandler, times(1)).mirror(requestMock);
         } catch (Exception e) {
             fail("IOException should not be thrown");
         }
@@ -150,9 +189,9 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
     @Test
     public void processAddWhenDocSizeWithinLimitAndNodeIsLeader() {
         try {
-            Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+            when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
             processor.processAdd(addUpdateCommand);
-            Mockito.verify(requestMirroringHandler, Mockito.times(1)).mirror(requestMock);
+            verify(requestMirroringHandler, times(1)).mirror(requestMock);
         } catch (IOException e) {
             fail("IOException should not be thrown");
         } catch (Exception e) {
@@ -167,10 +206,10 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
     @Test
     public void processDeleteWhenNodeIsLeaderAndDeleteByIdIsTrue() {
         try {
-            Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+            when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
             deleteUpdateCommand.setId("test");
             processor.processDelete(deleteUpdateCommand);
-            Mockito.verify(requestMirroringHandler, Mockito.times(1)).mirror(requestMock);
+            verify(requestMirroringHandler, times(1)).mirror(requestMock);
         } catch (Exception e) {
             fail("IOException should not be thrown");
         }
@@ -180,10 +219,10 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
     public void processCommitOnlyAnotherShard() {
         try {
             // should skip if processing in other shard than the first
-            Mockito.when(cloudDesc.getShardId()).thenReturn("shard2");
+            when(cloudDesc.getShardId()).thenReturn("shard2");
             processor.processCommit(commitUpdateCommand);
-            Mockito.verify(next).processCommit(commitUpdateCommand);
-            Mockito.verify(requestMirroringHandler, Mockito.times(0)).mirror(requestMock);
+            verify(next).processCommit(commitUpdateCommand);
+            verify(requestMirroringHandler, times(0)).mirror(requestMock);
         } catch (Exception e) {
             fail("IOException should not be thrown: " + e);
         }
@@ -193,11 +232,11 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
     public void processCommitOnlyNonLeader() {
         try {
             // should skip if processing on non-leader replica
-            Mockito.when(replica.getName()).thenReturn("foobar");
-            Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+            when(replica.getName()).thenReturn("foobar");
+            when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
             processor.processCommit(commitUpdateCommand);
-            Mockito.verify(next).processCommit(commitUpdateCommand);
-            Mockito.verify(requestMirroringHandler, Mockito.times(0)).mirror(requestMock);
+            verify(next).processCommit(commitUpdateCommand);
+            verify(requestMirroringHandler, times(0)).mirror(requestMock);
         } catch (Exception e) {
             fail("IOException should not be thrown: " + e);
         }
@@ -206,7 +245,7 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
     @Test
     public void processCommitOnlyLeader() {
         try {
-            Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+            when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
             // don't override createMirrorRequest, call actual method
             processor =
                 new MirroringUpdateProcessor(
@@ -217,11 +256,12 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
                     1000L,
                     new ModifiableSolrParams(),
                     DistributedUpdateProcessor.DistribPhase.NONE,
-                    requestMirroringHandler);
+                    requestMirroringHandler,
+                        producerMetrics);
             ArgumentCaptor<UpdateRequest> captor = ArgumentCaptor.forClass(UpdateRequest.class);
             processor.processCommit(commitUpdateCommand);
-            Mockito.verify(next).processCommit(commitUpdateCommand);
-            Mockito.verify(requestMirroringHandler, Mockito.times(1)).mirror(captor.capture());
+            verify(next).processCommit(commitUpdateCommand);
+            verify(requestMirroringHandler, times(1)).mirror(captor.capture());
             UpdateRequest req = captor.getValue();
             assertNotNull(req.getParams());
             SolrParams params = req.getParams();
@@ -241,7 +281,7 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
     @Test
     public void processCommitNoMirroring() {
         try {
-            Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+            when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
             // don't override createMirrorRequest, call actual method
             processor =
                 new MirroringUpdateProcessor(
@@ -252,10 +292,11 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
                     1000L,
                     new ModifiableSolrParams(),
                     DistributedUpdateProcessor.DistribPhase.NONE,
-                    requestMirroringHandler);
+                    requestMirroringHandler,
+                        producerMetrics);
             processor.processCommit(commitUpdateCommand);
-            Mockito.verify(next).processCommit(commitUpdateCommand);
-            Mockito.verify(requestMirroringHandler, Mockito.times(0)).mirror(requestMock);
+            verify(next).processCommit(commitUpdateCommand);
+            verify(requestMirroringHandler, times(0)).mirror(requestMock);
         } catch (Exception e) {
             fail("Exception should not be thrown: " + e);
         }
@@ -263,7 +304,7 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
 
     @Test
     public void testProcessAddWithinLimit() throws Exception {
-        Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+        when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
         SolrInputDocument doc = new SolrInputDocument();
         doc.addField("id", "1");
         AddUpdateCommand cmd = new AddUpdateCommand(req);
@@ -271,8 +312,8 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
         cmd.commitWithin = 1000;
         cmd.overwrite = true;
         processor.processAdd(cmd);
-        Mockito.verify(next).processAdd(cmd);
-        Mockito.verify(requestMirroringHandler).mirror(requestMock);
+        verify(next).processAdd(cmd);
+        verify(requestMirroringHandler).mirror(requestMock);
     }
 
     @Test
@@ -283,45 +324,45 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
         solrInputDocument.addField("large_field", "Test ".repeat(10000));
         addUpdateCommand.solrDoc = solrInputDocument;
 
-        Mockito.when(req.getCore()).thenReturn(core);
-        Mockito.when(req.getCore().getCoreDescriptor()).thenReturn(Mockito.mock(CoreDescriptor.class));
-        Mockito.when(req.getCore().getCoreDescriptor().getCloudDescriptor()).thenReturn(Mockito.mock(CloudDescriptor.class));
-        Mockito.when(req.getCore().getCoreContainer()).thenReturn(Mockito.mock(CoreContainer.class));
-        Mockito.when(req.getCore().getCoreContainer().getZkController()).thenReturn(Mockito.mock(ZkController.class));
-        Mockito.when(req.getCore().getCoreContainer().getZkController().getClusterState()).thenReturn(Mockito.mock(ClusterState.class));
+        when(req.getCore()).thenReturn(core);
+        when(req.getCore().getCoreDescriptor()).thenReturn(mock(CoreDescriptor.class));
+        when(req.getCore().getCoreDescriptor().getCloudDescriptor()).thenReturn(mock(CloudDescriptor.class));
+        when(req.getCore().getCoreContainer()).thenReturn(mock(CoreContainer.class));
+        when(req.getCore().getCoreContainer().getZkController()).thenReturn(mock(ZkController.class));
+        when(req.getCore().getCoreContainer().getZkController().getClusterState()).thenReturn(mock(ClusterState.class));
 
         SolrParams mirrorParams = new ModifiableSolrParams();
         MirroringUpdateProcessor mirroringUpdateProcessorWithLimit = new MirroringUpdateProcessor(nextProcessor, true, false, // indexUnmirrorableDocs set to false
-                true, 50000, mirrorParams, DistributedUpdateProcessor.DistribPhase.NONE, requestMirroringHandler);
+                true, 50000, mirrorParams, DistributedUpdateProcessor.DistribPhase.NONE, requestMirroringHandler, producerMetrics);
 
         assertThrows(SolrException.class, () -> mirroringUpdateProcessorWithLimit.processAdd(addUpdateCommand));
     }
 
     @Test
     public void testProcessAddLeader() throws Exception {
-        Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+        when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
         processor.processAdd(addUpdateCommand);
-        Mockito.verify(requestMirroringHandler, Mockito.times(1)).mirror(Mockito.any());
+        verify(requestMirroringHandler, times(1)).mirror(any());
     }
 
     @Test
     public void testProcessAddNotLeader() throws Exception {
-        Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica2");
+        when(cloudDesc.getCoreNodeName()).thenReturn("replica2");
         processor.processAdd(addUpdateCommand);
-        Mockito.verify(requestMirroringHandler, Mockito.times(0)).mirror(Mockito.any());
+        verify(requestMirroringHandler, times(0)).mirror(any());
     }
 
     @Test
     public void testProcessDelete() throws Exception {
-        Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+        when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
         processor.processDelete(deleteUpdateCommand);
-        Mockito.verify(requestMirroringHandler, Mockito.times(1)).mirror(Mockito.any());
+        verify(requestMirroringHandler, times(1)).mirror(any());
     }
 
     @Test
     public void testProcessDBQResults() throws Exception {
-        Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
-        Mockito.when(builder.build()).thenReturn(client);
+        when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+        when(builder.build()).thenReturn(client);
         SolrInputDocument doc = new SolrInputDocument();
         doc.addField("id", "test");
         addUpdateCommand.solrDoc = doc;