You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/01/05 12:24:01 UTC

[2/3] lucene-solr:jira/solr-9856: SOLR-9856 Add meters for buffered and replayed ops. Unit test.

SOLR-9856 Add meters for buffered and replayed ops. Unit test.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/b4153890
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/b4153890
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/b4153890

Branch: refs/heads/jira/solr-9856
Commit: b4153890c7b05320a2936586900db295cfbe1826
Parents: 5d2513d
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Thu Jan 5 13:22:39 2017 +0100
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Thu Jan 5 13:22:39 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/solr/update/UpdateLog.java  | 47 ++++++++++++--------
 .../org/apache/solr/search/TestRecovery.java    | 40 ++++++++++++++++-
 2 files changed, 68 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b4153890/solr/core/src/java/org/apache/solr/update/UpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index 4fdd211..6286810 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.lucene.util.BytesRef;
 import org.apache.solr.common.SolrException;
@@ -190,9 +191,12 @@ public class UpdateLog implements PluginInfoInitialized {
   int startingOperation;  // last operation in the logs on startup
 
   // metrics
-  protected Gauge<Integer> bufferingDocsGauge;
+  protected Gauge<Integer> bufferedOpsGauge;
   protected Gauge<Integer> replayLogsCountGauge;
-  protected Gauge<Long> replayDocsCountGauge;
+  protected Gauge<Long> replayBytesGauge;
+  protected Gauge<Integer> stateGauge;
+  protected Meter applyingBufferedOpsMeter;
+  protected Meter replayOpsMeter;
 
   public static class LogPtr {
     final long pointer;
@@ -343,40 +347,40 @@ public class UpdateLog implements PluginInfoInitialized {
     }
     SolrMetricManager metricManager = core.getCoreDescriptor().getCoreContainer().getMetricManager();
     String registry = core.getCoreMetricManager().getRegistryName();
-    bufferingDocsGauge = () -> {
+    bufferedOpsGauge = () -> {
       if (tlog == null) {
-        System.err.println("no tlog");
         return 0;
       } else if (state == State.APPLYING_BUFFERED) {
-        System.err.println("APPLYING_BUFFERED: " + recoveryInfo + ", tlog " + tlog.numRecords);
-        return tlog.numRecords() - recoveryInfo.adds - recoveryInfo.deleteByQuery - recoveryInfo.deletes;
+        // numRecords counts header as a record
+        return tlog.numRecords() - 1 - recoveryInfo.adds - recoveryInfo.deleteByQuery - recoveryInfo.deletes - recoveryInfo.errors;
       } else if (state == State.BUFFERING) {
-        System.err.println("BUFFERING: " + tlog.numRecords());
-        return tlog.numRecords();
+        // numRecords counts header as a record
+        return tlog.numRecords() - 1;
       } else {
         return 0;
       }
     };
     replayLogsCountGauge = () -> logs.size();
-    replayDocsCountGauge = () -> {
+    replayBytesGauge = () -> {
       if (state == State.REPLAYING) {
         synchronized(this) {
-          long processed = recoveryInfo.adds + recoveryInfo.deletes + recoveryInfo.deleteByQuery;
-          long totalDocs = 0;
+          long totalBytesToProcess = 0;
           for (TransactionLog log : logs) {
-            totalDocs += log.numRecords();
+            totalBytesToProcess += log.getLogSize();
           }
-          System.err.println("REPLAYING: " + totalDocs + " - " + processed);
-          return totalDocs - processed;
+          return totalBytesToProcess;
         }
       } else {
-        System.err.println("not REPLAYING");
         return 0L;
       }
     };
-    metricManager.register(registry, bufferingDocsGauge, true, "buffering.docs", SolrInfoMBean.Category.UPDATEHANDLER.toString(), "tlog");
-    metricManager.register(registry, bufferingDocsGauge, true, "replying.logs", SolrInfoMBean.Category.UPDATEHANDLER.toString(), "tlog");
-    metricManager.register(registry, bufferingDocsGauge, true, "replying.docs", SolrInfoMBean.Category.UPDATEHANDLER.toString(), "tlog");
+    metricManager.register(registry, bufferedOpsGauge, true, "ops", SolrInfoMBean.Category.UPDATEHANDLER.toString(), "tlog", "buffered");
+    metricManager.register(registry, replayLogsCountGauge, true, "logs", SolrInfoMBean.Category.UPDATEHANDLER.toString(), "tlog", "replay", "remaining");
+    metricManager.register(registry, replayBytesGauge, true, "bytes", SolrInfoMBean.Category.UPDATEHANDLER.toString(), "tlog", "replay", "remaining");
+    applyingBufferedOpsMeter = metricManager.meter(registry, "ops", SolrInfoMBean.Category.UPDATEHANDLER.toString(), "tlog", "applying_buffered");
+    replayOpsMeter = metricManager.meter(registry, "ops", SolrInfoMBean.Category.UPDATEHANDLER.toString(), "tlog", "replay");
+    stateGauge = () -> state.ordinal();
+    metricManager.register(registry, stateGauge, true, "state", SolrInfoMBean.Category.UPDATEHANDLER.toString(), "tlog");
   }
 
   /**
@@ -1470,6 +1474,13 @@ public class UpdateLog implements PluginInfoInitialized {
               loglog.error("REPLAY_ERR: Exception replaying log", rsp.getException());
               throw rsp.getException();
             }
+            if (state == State.REPLAYING) {
+              replayOpsMeter.mark();
+            } else if (state == State.APPLYING_BUFFERED) {
+              applyingBufferedOpsMeter.mark();
+            } else {
+              // XXX should not happen?
+            }
           } catch (IOException ex) {
             recoveryInfo.errors++;
             loglog.warn("REYPLAY_ERR: IOException reading log", ex);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b4153890/solr/core/src/test/org/apache/solr/search/TestRecovery.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/search/TestRecovery.java b/solr/core/src/test/org/apache/solr/search/TestRecovery.java
index 12d3ec3..60fd1f7 100644
--- a/solr/core/src/test/org/apache/solr/search/TestRecovery.java
+++ b/solr/core/src/test/org/apache/solr/search/TestRecovery.java
@@ -19,6 +19,11 @@ package org.apache.solr.search;
 
 import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
 
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.solr.metrics.SolrMetricManager;
 import org.noggit.ObjectBuilder;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.request.SolrQueryRequest;
@@ -55,7 +60,7 @@ public class TestRecovery extends SolrTestCaseJ4 {
 
   // TODO: fix this test to not require FSDirectory
   static String savedFactory;
-  
+
   @BeforeClass
   public static void beforeClass() throws Exception {
     savedFactory = System.getProperty("solr.DirectoryFactory");
@@ -85,6 +90,12 @@ public class TestRecovery extends SolrTestCaseJ4 {
   }
 
 
+  private Map<String, Metric> getMetrics() {
+    SolrMetricManager manager = h.getCoreContainer().getMetricManager();
+    MetricRegistry registry = manager.registry(h.getCore().getCoreMetricManager().getRegistryName());
+    return registry.getMetrics();
+  }
+
   @Test
   public void testLogReplay() throws Exception {
     try {
@@ -120,6 +131,9 @@ public class TestRecovery extends SolrTestCaseJ4 {
 
       h.close();
       createCore();
+
+      Map<String, Metric> metrics = getMetrics(); // live map view
+
       // Solr should kick this off now
       // h.getCore().getUpdateHandler().getUpdateLog().recoverFromLog();
 
@@ -130,6 +144,15 @@ public class TestRecovery extends SolrTestCaseJ4 {
       // make sure we can still access versions after a restart
       assertJQ(req("qt","/get", "getVersions",""+versions.size()),"/versions==" + versions);
 
+      assertEquals(UpdateLog.State.REPLAYING, h.getCore().getUpdateHandler().getUpdateLog().getState());
+      // check metrics
+      Gauge<Integer> state = (Gauge<Integer>)metrics.get("UPDATEHANDLER.tlog.state");
+      assertEquals(UpdateLog.State.REPLAYING.ordinal(), state.getValue().intValue());
+      Gauge<Integer> replayingLogs = (Gauge<Integer>)metrics.get("UPDATEHANDLER.tlog.replay.remaining.logs");
+      assertEquals(1, replayingLogs.getValue().intValue());
+      Gauge<Long> replayingDocs = (Gauge<Long>)metrics.get("UPDATEHANDLER.tlog.replay.remaining.bytes");
+      assertEquals(209L, replayingDocs.getValue().longValue());
+
       // unblock recovery
       logReplay.release(1000);
 
@@ -141,6 +164,10 @@ public class TestRecovery extends SolrTestCaseJ4 {
 
       assertJQ(req("q","*:*") ,"/response/numFound==3");
 
+      Meter replayDocs = (Meter)metrics.get("UPDATEHANDLER.tlog.replay.ops");
+      assertEquals(5L, replayDocs.getCount());
+      assertEquals(UpdateLog.State.ACTIVE.ordinal(), state.getValue().intValue());
+
       // make sure we can still access versions after recovery
       assertJQ(req("qt","/get", "getVersions",""+versions.size()) ,"/versions==" + versions);
 
@@ -208,15 +235,20 @@ public class TestRecovery extends SolrTestCaseJ4 {
       clearIndex();
       assertU(commit());
 
+      Map<String, Metric> metrics = getMetrics();
+
       assertEquals(UpdateLog.State.ACTIVE, ulog.getState());
       ulog.bufferUpdates();
       assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
+
       Future<UpdateLog.RecoveryInfo> rinfoFuture = ulog.applyBufferedUpdates();
       assertTrue(rinfoFuture == null);
       assertEquals(UpdateLog.State.ACTIVE, ulog.getState());
 
       ulog.bufferUpdates();
       assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
+      Gauge<Integer> state = (Gauge<Integer>)metrics.get("UPDATEHANDLER.tlog.state");
+      assertEquals(UpdateLog.State.BUFFERING.ordinal(), state.getValue().intValue());
 
       // simulate updates from a leader
       updateJ(jsonAdd(sdoc("id","B1", "_version_","1010")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
@@ -248,6 +280,8 @@ public class TestRecovery extends SolrTestCaseJ4 {
           ,"=={'doc':null}"
       );
 
+      Gauge<Integer> bufferedOps = (Gauge<Integer>)metrics.get("UPDATEHANDLER.tlog.buffered.ops");
+      assertEquals(6, bufferedOps.getValue().intValue());
 
       rinfoFuture = ulog.applyBufferedUpdates();
       assertTrue(rinfoFuture != null);
@@ -259,6 +293,8 @@ public class TestRecovery extends SolrTestCaseJ4 {
       UpdateLog.RecoveryInfo rinfo = rinfoFuture.get();
       assertEquals(UpdateLog.State.ACTIVE, ulog.getState());
 
+      Meter applyingBuffered = (Meter)metrics.get("UPDATEHANDLER.tlog.applying_buffered.ops");
+      assertEquals(6L, applyingBuffered.getCount());
 
       assertJQ(req("qt","/get", "getVersions","6")
           ,"=={'versions':[-2010,1030,1020,-1017,1015,1010]}"
@@ -325,6 +361,8 @@ public class TestRecovery extends SolrTestCaseJ4 {
       assertEquals(1, recInfo.deleteByQuery);
 
       assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in a good state
+
+      assertEquals(0, bufferedOps.getValue().intValue());
     } finally {
       DirectUpdateHandler2.commitOnClose = true;
       UpdateLog.testing_logReplayHook = null;