You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/01/05 12:24:01 UTC
[2/3] lucene-solr:jira/solr-9856: SOLR-9856 Add meters for buffered
and replayed ops. Unit test.
SOLR-9856 Add meters for buffered and replayed ops. Unit test.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/b4153890
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/b4153890
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/b4153890
Branch: refs/heads/jira/solr-9856
Commit: b4153890c7b05320a2936586900db295cfbe1826
Parents: 5d2513d
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Thu Jan 5 13:22:39 2017 +0100
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Thu Jan 5 13:22:39 2017 +0100
----------------------------------------------------------------------
.../java/org/apache/solr/update/UpdateLog.java | 47 ++++++++++++--------
.../org/apache/solr/search/TestRecovery.java | 40 ++++++++++++++++-
2 files changed, 68 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b4153890/solr/core/src/java/org/apache/solr/update/UpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index 4fdd211..6286810 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
import org.apache.hadoop.fs.FileSystem;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.SolrException;
@@ -190,9 +191,12 @@ public class UpdateLog implements PluginInfoInitialized {
int startingOperation; // last operation in the logs on startup
// metrics
- protected Gauge<Integer> bufferingDocsGauge;
+ protected Gauge<Integer> bufferedOpsGauge;
protected Gauge<Integer> replayLogsCountGauge;
- protected Gauge<Long> replayDocsCountGauge;
+ protected Gauge<Long> replayBytesGauge;
+ protected Gauge<Integer> stateGauge;
+ protected Meter applyingBufferedOpsMeter;
+ protected Meter replayOpsMeter;
public static class LogPtr {
final long pointer;
@@ -343,40 +347,40 @@ public class UpdateLog implements PluginInfoInitialized {
}
SolrMetricManager metricManager = core.getCoreDescriptor().getCoreContainer().getMetricManager();
String registry = core.getCoreMetricManager().getRegistryName();
- bufferingDocsGauge = () -> {
+ bufferedOpsGauge = () -> {
if (tlog == null) {
- System.err.println("no tlog");
return 0;
} else if (state == State.APPLYING_BUFFERED) {
- System.err.println("APPLYING_BUFFERED: " + recoveryInfo + ", tlog " + tlog.numRecords);
- return tlog.numRecords() - recoveryInfo.adds - recoveryInfo.deleteByQuery - recoveryInfo.deletes;
+ // numRecords counts header as a record
+ return tlog.numRecords() - 1 - recoveryInfo.adds - recoveryInfo.deleteByQuery - recoveryInfo.deletes - recoveryInfo.errors;
} else if (state == State.BUFFERING) {
- System.err.println("BUFFERING: " + tlog.numRecords());
- return tlog.numRecords();
+ // numRecords counts header as a record
+ return tlog.numRecords() - 1;
} else {
return 0;
}
};
replayLogsCountGauge = () -> logs.size();
- replayDocsCountGauge = () -> {
+ replayBytesGauge = () -> {
if (state == State.REPLAYING) {
synchronized(this) {
- long processed = recoveryInfo.adds + recoveryInfo.deletes + recoveryInfo.deleteByQuery;
- long totalDocs = 0;
+ long totalBytesToProcess = 0;
for (TransactionLog log : logs) {
- totalDocs += log.numRecords();
+ totalBytesToProcess += log.getLogSize();
}
- System.err.println("REPLAYING: " + totalDocs + " - " + processed);
- return totalDocs - processed;
+ return totalBytesToProcess;
}
} else {
- System.err.println("not REPLAYING");
return 0L;
}
};
- metricManager.register(registry, bufferingDocsGauge, true, "buffering.docs", SolrInfoMBean.Category.UPDATEHANDLER.toString(), "tlog");
- metricManager.register(registry, bufferingDocsGauge, true, "replying.logs", SolrInfoMBean.Category.UPDATEHANDLER.toString(), "tlog");
- metricManager.register(registry, bufferingDocsGauge, true, "replying.docs", SolrInfoMBean.Category.UPDATEHANDLER.toString(), "tlog");
+ metricManager.register(registry, bufferedOpsGauge, true, "ops", SolrInfoMBean.Category.UPDATEHANDLER.toString(), "tlog", "buffered");
+ metricManager.register(registry, replayLogsCountGauge, true, "logs", SolrInfoMBean.Category.UPDATEHANDLER.toString(), "tlog", "replay", "remaining");
+ metricManager.register(registry, replayBytesGauge, true, "bytes", SolrInfoMBean.Category.UPDATEHANDLER.toString(), "tlog", "replay", "remaining");
+ applyingBufferedOpsMeter = metricManager.meter(registry, "ops", SolrInfoMBean.Category.UPDATEHANDLER.toString(), "tlog", "applying_buffered");
+ replayOpsMeter = metricManager.meter(registry, "ops", SolrInfoMBean.Category.UPDATEHANDLER.toString(), "tlog", "replay");
+ stateGauge = () -> state.ordinal();
+ metricManager.register(registry, stateGauge, true, "state", SolrInfoMBean.Category.UPDATEHANDLER.toString(), "tlog");
}
/**
@@ -1470,6 +1474,13 @@ public class UpdateLog implements PluginInfoInitialized {
loglog.error("REPLAY_ERR: Exception replaying log", rsp.getException());
throw rsp.getException();
}
+ if (state == State.REPLAYING) {
+ replayOpsMeter.mark();
+ } else if (state == State.APPLYING_BUFFERED) {
+ applyingBufferedOpsMeter.mark();
+ } else {
+ // XXX should not happen?
+ }
} catch (IOException ex) {
recoveryInfo.errors++;
loglog.warn("REYPLAY_ERR: IOException reading log", ex);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b4153890/solr/core/src/test/org/apache/solr/search/TestRecovery.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/search/TestRecovery.java b/solr/core/src/test/org/apache/solr/search/TestRecovery.java
index 12d3ec3..60fd1f7 100644
--- a/solr/core/src/test/org/apache/solr/search/TestRecovery.java
+++ b/solr/core/src/test/org/apache/solr/search/TestRecovery.java
@@ -19,6 +19,11 @@ package org.apache.solr.search;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.solr.metrics.SolrMetricManager;
import org.noggit.ObjectBuilder;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.request.SolrQueryRequest;
@@ -55,7 +60,7 @@ public class TestRecovery extends SolrTestCaseJ4 {
// TODO: fix this test to not require FSDirectory
static String savedFactory;
-
+
@BeforeClass
public static void beforeClass() throws Exception {
savedFactory = System.getProperty("solr.DirectoryFactory");
@@ -85,6 +90,12 @@ public class TestRecovery extends SolrTestCaseJ4 {
}
+ private Map<String, Metric> getMetrics() {
+ SolrMetricManager manager = h.getCoreContainer().getMetricManager();
+ MetricRegistry registry = manager.registry(h.getCore().getCoreMetricManager().getRegistryName());
+ return registry.getMetrics();
+ }
+
@Test
public void testLogReplay() throws Exception {
try {
@@ -120,6 +131,9 @@ public class TestRecovery extends SolrTestCaseJ4 {
h.close();
createCore();
+
+ Map<String, Metric> metrics = getMetrics(); // live map view
+
// Solr should kick this off now
// h.getCore().getUpdateHandler().getUpdateLog().recoverFromLog();
@@ -130,6 +144,15 @@ public class TestRecovery extends SolrTestCaseJ4 {
// make sure we can still access versions after a restart
assertJQ(req("qt","/get", "getVersions",""+versions.size()),"/versions==" + versions);
+ assertEquals(UpdateLog.State.REPLAYING, h.getCore().getUpdateHandler().getUpdateLog().getState());
+ // check metrics
+ Gauge<Integer> state = (Gauge<Integer>)metrics.get("UPDATEHANDLER.tlog.state");
+ assertEquals(UpdateLog.State.REPLAYING.ordinal(), state.getValue().intValue());
+ Gauge<Integer> replayingLogs = (Gauge<Integer>)metrics.get("UPDATEHANDLER.tlog.replay.remaining.logs");
+ assertEquals(1, replayingLogs.getValue().intValue());
+ Gauge<Long> replayingDocs = (Gauge<Long>)metrics.get("UPDATEHANDLER.tlog.replay.remaining.bytes");
+ assertEquals(209L, replayingDocs.getValue().longValue());
+
// unblock recovery
logReplay.release(1000);
@@ -141,6 +164,10 @@ public class TestRecovery extends SolrTestCaseJ4 {
assertJQ(req("q","*:*") ,"/response/numFound==3");
+ Meter replayDocs = (Meter)metrics.get("UPDATEHANDLER.tlog.replay.ops");
+ assertEquals(5L, replayDocs.getCount());
+ assertEquals(UpdateLog.State.ACTIVE.ordinal(), state.getValue().intValue());
+
// make sure we can still access versions after recovery
assertJQ(req("qt","/get", "getVersions",""+versions.size()) ,"/versions==" + versions);
@@ -208,15 +235,20 @@ public class TestRecovery extends SolrTestCaseJ4 {
clearIndex();
assertU(commit());
+ Map<String, Metric> metrics = getMetrics();
+
assertEquals(UpdateLog.State.ACTIVE, ulog.getState());
ulog.bufferUpdates();
assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
+
Future<UpdateLog.RecoveryInfo> rinfoFuture = ulog.applyBufferedUpdates();
assertTrue(rinfoFuture == null);
assertEquals(UpdateLog.State.ACTIVE, ulog.getState());
ulog.bufferUpdates();
assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
+ Gauge<Integer> state = (Gauge<Integer>)metrics.get("UPDATEHANDLER.tlog.state");
+ assertEquals(UpdateLog.State.BUFFERING.ordinal(), state.getValue().intValue());
// simulate updates from a leader
updateJ(jsonAdd(sdoc("id","B1", "_version_","1010")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
@@ -248,6 +280,8 @@ public class TestRecovery extends SolrTestCaseJ4 {
,"=={'doc':null}"
);
+ Gauge<Integer> bufferedOps = (Gauge<Integer>)metrics.get("UPDATEHANDLER.tlog.buffered.ops");
+ assertEquals(6, bufferedOps.getValue().intValue());
rinfoFuture = ulog.applyBufferedUpdates();
assertTrue(rinfoFuture != null);
@@ -259,6 +293,8 @@ public class TestRecovery extends SolrTestCaseJ4 {
UpdateLog.RecoveryInfo rinfo = rinfoFuture.get();
assertEquals(UpdateLog.State.ACTIVE, ulog.getState());
+ Meter applyingBuffered = (Meter)metrics.get("UPDATEHANDLER.tlog.applying_buffered.ops");
+ assertEquals(6L, applyingBuffered.getCount());
assertJQ(req("qt","/get", "getVersions","6")
,"=={'versions':[-2010,1030,1020,-1017,1015,1010]}"
@@ -325,6 +361,8 @@ public class TestRecovery extends SolrTestCaseJ4 {
assertEquals(1, recInfo.deleteByQuery);
assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in a good state
+
+ assertEquals(0, bufferedOps.getValue().intValue());
} finally {
DirectUpdateHandler2.commitOnClose = true;
UpdateLog.testing_logReplayHook = null;