You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ab...@apache.org on 2023/11/27 11:41:57 UTC
(solr-sandbox) branch main updated: Producer metrics (#87)
This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/main by this push:
new 47e941a Producer metrics (#87)
47e941a is described below
commit 47e941aaf61ba73e95294027392913e517c931ed
Author: Marcin Górski <ma...@gmail.com>
AuthorDate: Mon Nov 27 12:41:51 2023 +0100
Producer metrics (#87)
---
.../update/processor/MirroringUpdateProcessor.java | 21 ++-
.../MirroringUpdateRequestProcessorFactory.java | 5 +-
.../solr/update/processor/ProducerMetrics.java | 70 +++++++
.../crossdc/KafkaRequestMirroringHandlerTest.java | 2 +-
.../processor/MirroringUpdateProcessorTest.java | 205 ++++++++++++---------
5 files changed, 214 insertions(+), 89 deletions(-)
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
index 0fcdbd5..c23b670 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
@@ -44,6 +44,7 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
*/
private final boolean doMirroring;
final RequestMirroringHandler requestMirroringHandler;
+ final ProducerMetrics producerMetrics;
/**
* The mirrored request starts as null, gets created and appended to at each process() call,
@@ -75,13 +76,15 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
*/
private DistributedUpdateProcessor.DistribPhase distribPhase;
- public MirroringUpdateProcessor(final UpdateRequestProcessor next, boolean doMirroring,
+ public MirroringUpdateProcessor(final UpdateRequestProcessor next,
+ boolean doMirroring,
final boolean indexUnmirrorableDocs,
final boolean mirrorCommits,
final long maxMirroringBatchSizeBytes,
final SolrParams mirroredReqParams,
final DistributedUpdateProcessor.DistribPhase distribPhase,
- final RequestMirroringHandler requestMirroringHandler) {
+ final RequestMirroringHandler requestMirroringHandler,
+ final ProducerMetrics producerMetrics) {
super(next);
this.doMirroring = doMirroring;
this.indexUnmirrorableDocs = indexUnmirrorableDocs;
@@ -90,7 +93,7 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
this.mirrorParams = mirroredReqParams;
this.distribPhase = distribPhase;
this.requestMirroringHandler = requestMirroringHandler;
-
+ this.producerMetrics = producerMetrics;
// Find the downstream distributed update processor
}
@@ -107,18 +110,26 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
doc.removeField(CommonParams.VERSION_FIELD); // strip internal doc version
final long estimatedDocSizeInBytes = ObjectSizeEstimator.estimate(doc);
log.info("estimated doc size is {} bytes, max size is {}", estimatedDocSizeInBytes, maxMirroringDocSizeBytes);
+ producerMetrics.getDocumentSize().update(estimatedDocSizeInBytes);
final boolean tooLargeForKafka = estimatedDocSizeInBytes > maxMirroringDocSizeBytes;
if (tooLargeForKafka && !indexUnmirrorableDocs) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Update exceeds the doc-size limit and is unmirrorable. id="
+ cmd.getPrintableId() + " doc size=" + estimatedDocSizeInBytes + " maxDocSize=" + maxMirroringDocSizeBytes);
} else if (tooLargeForKafka) {
+ producerMetrics.getDocumentTooLarge().inc();
log.warn(
"Skipping mirroring of doc {} as it exceeds the doc-size limit ({} bytes) and is unmirrorable. doc size={}",
cmd.getPrintableId(), maxMirroringDocSizeBytes, estimatedDocSizeInBytes);
}
- super.processAdd(cmd); // let this throw to prevent mirroring invalid reqs
+ try {
+ super.processAdd(cmd); // let this throw to prevent mirroring invalid reqs
+ producerMetrics.getLocal().inc();
+ } catch (IOException exception) {
+ producerMetrics.getLocalError().inc();
+ throw exception;
+ }
// submit only from the leader shards so we mirror each doc once
boolean isLeader = isLeader(cmd.getReq(), cmd.getIndexedIdStr(), null, cmd.getSolrInputDocument());
@@ -128,8 +139,10 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
try {
requestMirroringHandler.mirror(mirrorRequest);
+ producerMetrics.getSubmitted().inc();
} catch (Exception e) {
log.error("mirror submit failed", e);
+ producerMetrics.getSubmitError().inc();
throw new SolrException(SERVER_ERROR, "mirror submit failed", e);
}
}
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
index 80ce2a1..250ed26 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -66,7 +66,7 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
/** This is instantiated in inform(SolrCore) and then shared by all processor instances - visible for testing */
private volatile KafkaRequestMirroringHandler mirroringHandler;
-
+ private volatile ProducerMetrics producerMetrics;
private boolean enabled = true;
@@ -190,6 +190,7 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
Closer closer = new Closer(sink);
core.addCloseHook(new MyCloseHook(closer));
+ producerMetrics = new ProducerMetrics(core.getSolrMetricsContext().getChildContext(this), core);
mirroringHandler = new KafkaRequestMirroringHandler(sink);
}
@@ -250,7 +251,7 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
}
return new MirroringUpdateProcessor(next, doMirroring, indexUnmirrorableDocs, mirrorCommits, maxMirroringBatchSizeBytes, mirroredParams,
- DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)), doMirroring ? mirroringHandler : null);
+ DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)), doMirroring ? mirroringHandler : null, producerMetrics);
}
public static class NoOpUpdateRequestProcessor extends UpdateRequestProcessor {
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/ProducerMetrics.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/ProducerMetrics.java
new file mode 100644
index 0000000..007fce2
--- /dev/null
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/ProducerMetrics.java
@@ -0,0 +1,70 @@
+package org.apache.solr.update.processor;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Histogram;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.metrics.SolrMetricsContext;
+
+/**
+ * Metrics presented for each SolrCore using `crossdc.producer.` path.
+ */
+public class ProducerMetrics {
+
+ private final Counter local;
+ private final Counter localError;
+ private final Counter submitted;
+ private final Counter submitError;
+ private final Histogram documentSize;
+ private final Counter documentTooLarge;
+
+ public ProducerMetrics(SolrMetricsContext solrMetricsContext, SolrCore solrCore) {
+ this.local = solrMetricsContext.counter(solrCore, "local", "crossdc", "producer");
+ this.localError = solrMetricsContext.counter(solrCore, "local", "crossdc", "producer", "errors");
+ this.submitted = solrMetricsContext.counter(solrCore, "submitted", "crossdc", "producer");
+ this.submitError = solrMetricsContext.counter(solrCore, "submit", "crossdc", "producer", "errors");
+ this.documentSize = solrMetricsContext.histogram(solrCore, "documentSize", "crossdc", "producer");
+ this.documentTooLarge = solrMetricsContext.counter(solrCore, "documentTooLarge", "crossdc", "producer", "errors");
+ }
+
+ /**
+ * Counter representing the number of local documents processed successfully.
+ */
+ public Counter getLocal() {
+ return this.local;
+ }
+
+ /**
+ * Counter representing the number of local documents processed with error.
+ */
+ public Counter getLocalError() {
+ return this.localError;
+ }
+
+ /**
+ * Counter representing the number of documents submitted to the Kafka topic.
+ */
+ public Counter getSubmitted() {
+ return this.submitted;
+ }
+
+ /**
+ * Counter representing the number of documents that were not submitted to the Kafka topic because of exception during execution.
+ */
+ public Counter getSubmitError() {
+ return this.submitError;
+ }
+
+ /**
+ * Histogram of the processed document size.
+ */
+ public Histogram getDocumentSize() {
+ return this.documentSize;
+ }
+
+ /**
+ * Counter representing the number of documents that were too large to send to the Kafka topic.
+ */
+ public Counter getDocumentTooLarge() {
+ return this.documentTooLarge;
+ }
+}
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/KafkaRequestMirroringHandlerTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/KafkaRequestMirroringHandlerTest.java
index 1b2dd12..596020b 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/KafkaRequestMirroringHandlerTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/KafkaRequestMirroringHandlerTest.java
@@ -18,7 +18,7 @@ import static org.mockito.Mockito.verify;
public class KafkaRequestMirroringHandlerTest {
@Mock
- KafkaMirroringSink kafkaMirroringSink;
+ private KafkaMirroringSink kafkaMirroringSink;
@Test
public void testCheckDeadLetterQueueMessageExecution() throws MirroringException {
diff --git a/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java b/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java
index 78abb00..5f81f30 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java
@@ -1,5 +1,7 @@
package org.apache.solr.update.processor;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Histogram;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
@@ -15,6 +17,7 @@ import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
+import org.apache.solr.metrics.SolrMetricsContext;
import org.apache.solr.request.SolrQueryRequestBase;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.update.AddUpdateCommand;
@@ -23,11 +26,17 @@ import org.apache.solr.update.DeleteUpdateCommand;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
import java.io.IOException;
import java.util.Map;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
private UpdateRequestProcessor next;
@@ -40,11 +49,12 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
UpdateRequest requestMock;
private UpdateRequestProcessor nextProcessor;
private SolrCore core;
- private HttpSolrClient.Builder builder = Mockito.mock(HttpSolrClient.Builder.class);
- private HttpSolrClient client = Mockito.mock(HttpSolrClient.class);
+ private HttpSolrClient.Builder builder = mock(HttpSolrClient.Builder.class);
+ private HttpSolrClient client = mock(HttpSolrClient.class);
private CloudDescriptor cloudDesc;
private ZkStateReader zkStateReader;
private Replica replica;
+ private ProducerMetrics producerMetrics;
@Before
public void setUp() throws Exception {
@@ -52,16 +62,16 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
addUpdateCommand = new AddUpdateCommand(req);
addUpdateCommand.solrDoc = new SolrInputDocument();
addUpdateCommand.solrDoc.addField("id", "test");
- req = Mockito.mock(SolrQueryRequestBase.class);
- Mockito.when(req.getParams()).thenReturn(new ModifiableSolrParams());
+ req = mock(SolrQueryRequestBase.class);
+ when(req.getParams()).thenReturn(new ModifiableSolrParams());
- requestMock = Mockito.mock(UpdateRequest.class);
+ requestMock = mock(UpdateRequest.class);
addUpdateCommand.setReq(req);
- nextProcessor = Mockito.mock(UpdateRequestProcessor.class);
+ nextProcessor = mock(UpdateRequestProcessor.class);
- IndexSchema schema = Mockito.mock(IndexSchema.class);
- Mockito.when(req.getSchema()).thenReturn(schema);
+ IndexSchema schema = mock(IndexSchema.class);
+ when(req.getSchema()).thenReturn(schema);
deleteUpdateCommand = new DeleteUpdateCommand(req);
deleteUpdateCommand.query = "*:*";
@@ -75,8 +85,36 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
commitUpdateCommand.openSearcher = true;
commitUpdateCommand.waitSearcher = true;
- next = Mockito.mock(UpdateRequestProcessor.class);
- requestMirroringHandler = Mockito.mock(RequestMirroringHandler.class);
+ producerMetrics = spy(new ProducerMetrics(mock(SolrMetricsContext.class), mock(SolrCore.class)) {
+ private final Counter counterMock = mock(Counter.class);
+
+ public Counter getLocal() {
+ return counterMock;
+ }
+
+ public Counter getLocalError() {
+ return counterMock;
+ }
+
+ public Counter getSubmitted() {
+ return counterMock;
+ }
+
+ public Counter getDocumentTooLarge() {
+ return counterMock;
+ }
+
+ public Counter getSubmitError() {
+ return counterMock;
+ }
+
+ public Histogram getDocumentSize() {
+ return mock(Histogram.class);
+ }
+ });
+
+ next = mock(UpdateRequestProcessor.class);
+ requestMirroringHandler = mock(RequestMirroringHandler.class);
processor =
new MirroringUpdateProcessor(
next,
@@ -86,47 +124,48 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
1000L,
new ModifiableSolrParams(),
DistributedUpdateProcessor.DistribPhase.NONE,
- requestMirroringHandler) {
+ requestMirroringHandler,
+ producerMetrics) {
UpdateRequest createMirrorRequest() {
return requestMock;
}
};
- core = Mockito.mock(SolrCore.class);
- CoreDescriptor coreDesc = Mockito.mock(CoreDescriptor.class);
- cloudDesc = Mockito.mock(CloudDescriptor.class);
- Mockito.when(cloudDesc.getShardId()).thenReturn("shard1");
- CoreContainer coreContainer = Mockito.mock(CoreContainer.class);
- ZkController zkController = Mockito.mock(ZkController.class);
- ClusterState clusterState = Mockito.mock(ClusterState.class);
- DocCollection docCollection = Mockito.mock(DocCollection.class);
- DocRouter docRouter = Mockito.mock(DocRouter.class);
- Slice slice = Mockito.mock(Slice.class);
- Mockito.when(slice.getName()).thenReturn("shard1");
- zkStateReader = Mockito.mock(ZkStateReader.class);
- replica = Mockito.mock(Replica.class);
-
- Mockito.when(replica.getName()).thenReturn("replica1");
- Mockito.when(zkStateReader.getLeaderRetry(Mockito.any(), Mockito.any()))
+ core = mock(SolrCore.class);
+ CoreDescriptor coreDesc = mock(CoreDescriptor.class);
+ cloudDesc = mock(CloudDescriptor.class);
+ when(cloudDesc.getShardId()).thenReturn("shard1");
+ CoreContainer coreContainer = mock(CoreContainer.class);
+ ZkController zkController = mock(ZkController.class);
+ ClusterState clusterState = mock(ClusterState.class);
+ DocCollection docCollection = mock(DocCollection.class);
+ DocRouter docRouter = mock(DocRouter.class);
+ Slice slice = mock(Slice.class);
+ when(slice.getName()).thenReturn("shard1");
+ zkStateReader = mock(ZkStateReader.class);
+ replica = mock(Replica.class);
+
+ when(replica.getName()).thenReturn("replica1");
+ when(zkStateReader.getLeaderRetry(any(), any()))
.thenReturn(replica);
- Mockito.when(zkController.getZkStateReader()).thenReturn(zkStateReader);
- Mockito.when(coreDesc.getCloudDescriptor()).thenReturn(cloudDesc);
- Mockito.when(clusterState.getCollection(Mockito.any())).thenReturn(docCollection);
- Mockito.when(docCollection.getRouter()).thenReturn(docRouter);
- Mockito.when(
+ when(zkController.getZkStateReader()).thenReturn(zkStateReader);
+ when(coreDesc.getCloudDescriptor()).thenReturn(cloudDesc);
+ when(clusterState.getCollection(any())).thenReturn(docCollection);
+ when(docCollection.getRouter()).thenReturn(docRouter);
+ when(
docRouter.getTargetSlice(
- Mockito.any(),
- Mockito.any(),
- Mockito.any(),
- Mockito.any(),
- Mockito.any()))
+ any(),
+ any(),
+ any(),
+ any(),
+ any()))
.thenReturn(slice);
- Mockito.when(docCollection.getSlicesMap()).thenReturn(Map.of("shard1", slice));
- Mockito.when(zkController.getClusterState()).thenReturn(clusterState);
- Mockito.when(coreContainer.getZkController()).thenReturn(zkController);
- Mockito.when(core.getCoreContainer()).thenReturn(coreContainer);
- Mockito.when(core.getCoreDescriptor()).thenReturn(coreDesc);
- Mockito.when(req.getCore()).thenReturn(core);
+ when(docCollection.getSlicesMap()).thenReturn(Map.of("shard1", slice));
+ when(zkController.getClusterState()).thenReturn(clusterState);
+ when(coreContainer.getZkController()).thenReturn(zkController);
+ when(core.getCoreContainer()).thenReturn(coreContainer);
+ when(core.getCoreDescriptor()).thenReturn(coreDesc);
+ when(req.getCore()).thenReturn(core);
}
/**
@@ -137,7 +176,7 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
public void processDeleteWhenDistribPhaseIsNoneAndDeleteByIdIsFalse() {
try {
processor.processDelete(deleteUpdateCommand);
- Mockito.verify(requestMirroringHandler, Mockito.times(1)).mirror(requestMock);
+ verify(requestMirroringHandler, times(1)).mirror(requestMock);
} catch (Exception e) {
fail("IOException should not be thrown");
}
@@ -150,9 +189,9 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
@Test
public void processAddWhenDocSizeWithinLimitAndNodeIsLeader() {
try {
- Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+ when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
processor.processAdd(addUpdateCommand);
- Mockito.verify(requestMirroringHandler, Mockito.times(1)).mirror(requestMock);
+ verify(requestMirroringHandler, times(1)).mirror(requestMock);
} catch (IOException e) {
fail("IOException should not be thrown");
} catch (Exception e) {
@@ -167,10 +206,10 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
@Test
public void processDeleteWhenNodeIsLeaderAndDeleteByIdIsTrue() {
try {
- Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+ when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
deleteUpdateCommand.setId("test");
processor.processDelete(deleteUpdateCommand);
- Mockito.verify(requestMirroringHandler, Mockito.times(1)).mirror(requestMock);
+ verify(requestMirroringHandler, times(1)).mirror(requestMock);
} catch (Exception e) {
fail("IOException should not be thrown");
}
@@ -180,10 +219,10 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
public void processCommitOnlyAnotherShard() {
try {
// should skip if processing in other shard than the first
- Mockito.when(cloudDesc.getShardId()).thenReturn("shard2");
+ when(cloudDesc.getShardId()).thenReturn("shard2");
processor.processCommit(commitUpdateCommand);
- Mockito.verify(next).processCommit(commitUpdateCommand);
- Mockito.verify(requestMirroringHandler, Mockito.times(0)).mirror(requestMock);
+ verify(next).processCommit(commitUpdateCommand);
+ verify(requestMirroringHandler, times(0)).mirror(requestMock);
} catch (Exception e) {
fail("IOException should not be thrown: " + e);
}
@@ -193,11 +232,11 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
public void processCommitOnlyNonLeader() {
try {
// should skip if processing on non-leader replica
- Mockito.when(replica.getName()).thenReturn("foobar");
- Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+ when(replica.getName()).thenReturn("foobar");
+ when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
processor.processCommit(commitUpdateCommand);
- Mockito.verify(next).processCommit(commitUpdateCommand);
- Mockito.verify(requestMirroringHandler, Mockito.times(0)).mirror(requestMock);
+ verify(next).processCommit(commitUpdateCommand);
+ verify(requestMirroringHandler, times(0)).mirror(requestMock);
} catch (Exception e) {
fail("IOException should not be thrown: " + e);
}
@@ -206,7 +245,7 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
@Test
public void processCommitOnlyLeader() {
try {
- Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+ when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
// don't override createMirrorRequest, call actual method
processor =
new MirroringUpdateProcessor(
@@ -217,11 +256,12 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
1000L,
new ModifiableSolrParams(),
DistributedUpdateProcessor.DistribPhase.NONE,
- requestMirroringHandler);
+ requestMirroringHandler,
+ producerMetrics);
ArgumentCaptor<UpdateRequest> captor = ArgumentCaptor.forClass(UpdateRequest.class);
processor.processCommit(commitUpdateCommand);
- Mockito.verify(next).processCommit(commitUpdateCommand);
- Mockito.verify(requestMirroringHandler, Mockito.times(1)).mirror(captor.capture());
+ verify(next).processCommit(commitUpdateCommand);
+ verify(requestMirroringHandler, times(1)).mirror(captor.capture());
UpdateRequest req = captor.getValue();
assertNotNull(req.getParams());
SolrParams params = req.getParams();
@@ -241,7 +281,7 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
@Test
public void processCommitNoMirroring() {
try {
- Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+ when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
// don't override createMirrorRequest, call actual method
processor =
new MirroringUpdateProcessor(
@@ -252,10 +292,11 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
1000L,
new ModifiableSolrParams(),
DistributedUpdateProcessor.DistribPhase.NONE,
- requestMirroringHandler);
+ requestMirroringHandler,
+ producerMetrics);
processor.processCommit(commitUpdateCommand);
- Mockito.verify(next).processCommit(commitUpdateCommand);
- Mockito.verify(requestMirroringHandler, Mockito.times(0)).mirror(requestMock);
+ verify(next).processCommit(commitUpdateCommand);
+ verify(requestMirroringHandler, times(0)).mirror(requestMock);
} catch (Exception e) {
fail("Exception should not be thrown: " + e);
}
@@ -263,7 +304,7 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
@Test
public void testProcessAddWithinLimit() throws Exception {
- Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+ when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "1");
AddUpdateCommand cmd = new AddUpdateCommand(req);
@@ -271,8 +312,8 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
cmd.commitWithin = 1000;
cmd.overwrite = true;
processor.processAdd(cmd);
- Mockito.verify(next).processAdd(cmd);
- Mockito.verify(requestMirroringHandler).mirror(requestMock);
+ verify(next).processAdd(cmd);
+ verify(requestMirroringHandler).mirror(requestMock);
}
@Test
@@ -283,45 +324,45 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
solrInputDocument.addField("large_field", "Test ".repeat(10000));
addUpdateCommand.solrDoc = solrInputDocument;
- Mockito.when(req.getCore()).thenReturn(core);
- Mockito.when(req.getCore().getCoreDescriptor()).thenReturn(Mockito.mock(CoreDescriptor.class));
- Mockito.when(req.getCore().getCoreDescriptor().getCloudDescriptor()).thenReturn(Mockito.mock(CloudDescriptor.class));
- Mockito.when(req.getCore().getCoreContainer()).thenReturn(Mockito.mock(CoreContainer.class));
- Mockito.when(req.getCore().getCoreContainer().getZkController()).thenReturn(Mockito.mock(ZkController.class));
- Mockito.when(req.getCore().getCoreContainer().getZkController().getClusterState()).thenReturn(Mockito.mock(ClusterState.class));
+ when(req.getCore()).thenReturn(core);
+ when(req.getCore().getCoreDescriptor()).thenReturn(mock(CoreDescriptor.class));
+ when(req.getCore().getCoreDescriptor().getCloudDescriptor()).thenReturn(mock(CloudDescriptor.class));
+ when(req.getCore().getCoreContainer()).thenReturn(mock(CoreContainer.class));
+ when(req.getCore().getCoreContainer().getZkController()).thenReturn(mock(ZkController.class));
+ when(req.getCore().getCoreContainer().getZkController().getClusterState()).thenReturn(mock(ClusterState.class));
SolrParams mirrorParams = new ModifiableSolrParams();
MirroringUpdateProcessor mirroringUpdateProcessorWithLimit = new MirroringUpdateProcessor(nextProcessor, true, false, // indexUnmirrorableDocs set to false
- true, 50000, mirrorParams, DistributedUpdateProcessor.DistribPhase.NONE, requestMirroringHandler);
+ true, 50000, mirrorParams, DistributedUpdateProcessor.DistribPhase.NONE, requestMirroringHandler, producerMetrics);
assertThrows(SolrException.class, () -> mirroringUpdateProcessorWithLimit.processAdd(addUpdateCommand));
}
@Test
public void testProcessAddLeader() throws Exception {
- Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+ when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
processor.processAdd(addUpdateCommand);
- Mockito.verify(requestMirroringHandler, Mockito.times(1)).mirror(Mockito.any());
+ verify(requestMirroringHandler, times(1)).mirror(any());
}
@Test
public void testProcessAddNotLeader() throws Exception {
- Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica2");
+ when(cloudDesc.getCoreNodeName()).thenReturn("replica2");
processor.processAdd(addUpdateCommand);
- Mockito.verify(requestMirroringHandler, Mockito.times(0)).mirror(Mockito.any());
+ verify(requestMirroringHandler, times(0)).mirror(any());
}
@Test
public void testProcessDelete() throws Exception {
- Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+ when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
processor.processDelete(deleteUpdateCommand);
- Mockito.verify(requestMirroringHandler, Mockito.times(1)).mirror(Mockito.any());
+ verify(requestMirroringHandler, times(1)).mirror(any());
}
@Test
public void testProcessDBQResults() throws Exception {
- Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
- Mockito.when(builder.build()).thenReturn(client);
+ when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+ when(builder.build()).thenReturn(client);
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "test");
addUpdateCommand.solrDoc = doc;