You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by is...@apache.org on 2022/07/18 06:40:18 UTC

[lucene-solr] branch jira/solr15138-with-zk-metrics created (now 5d4a26d442f)

This is an automated email from the ASF dual-hosted git repository.

ishan pushed a change to branch jira/solr15138-with-zk-metrics
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


      at 5d4a26d442f Implement metrics for Zookeeper client

This branch includes the following new commits:

     new 5d4a26d442f Implement metrics for Zookeeper client

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[lucene-solr] 01/01: Implement metrics for Zookeeper client

Posted by is...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ishan pushed a commit to branch jira/solr15138-with-zk-metrics
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 5d4a26d442fdb862d2c7f13f54fb0ea3b399f10c
Author: Ubuntu <ro...@ubuntu>
AuthorDate: Mon Jul 18 12:09:32 2022 +0530

    Implement metrics for Zookeeper client
---
 .../java/org/apache/solr/core/CoreContainer.java   |   3 +
 .../src/java/org/apache/solr/core/ZkContainer.java |  19 ++-
 .../org/apache/solr/common/cloud/SolrZkClient.java | 175 ++++++++++++++++++---
 .../org/apache/solr/common/util/JavaBinCodec.java  |   8 +
 .../org/apache/solr/common/util/TextWriter.java    |   6 +
 5 files changed, 188 insertions(+), 23 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index f255d6cc369..a1c2909faf2 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -729,6 +729,9 @@ public class CoreContainer {
 
     zkSys.initZooKeeper(this, cfg.getCloudConfig());
     if (isZooKeeperAware()) {
+      //initialize ZkClient metrics
+      zkSys.getZkMetricsProducer().initializeMetrics(solrMetricsContext, "zkClient");
+
       pkiAuthenticationSecurityBuilder = new PKIAuthenticationPlugin(this, zkSys.getZkController().getNodeName(),
           (PublicKeyHandler) containerHandlers.get(PublicKeyHandler.PATH));
       // use deprecated API for back-compat, remove in 9.0
diff --git a/solr/core/src/java/org/apache/solr/core/ZkContainer.java b/solr/core/src/java/org/apache/solr/core/ZkContainer.java
index 989c24be128..d864f7bbb29 100644
--- a/solr/core/src/java/org/apache/solr/core/ZkContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/ZkContainer.java
@@ -41,6 +41,9 @@ import org.apache.solr.common.cloud.ZooKeeperException;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.apache.solr.logging.MDCLoggingContext;
+import org.apache.solr.metrics.MetricsMap;
+import org.apache.solr.metrics.SolrMetricProducer;
+import org.apache.solr.metrics.SolrMetricsContext;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,7 +72,9 @@ public class ZkContainer {
   
   // see ZkController.zkRunOnly
   private boolean zkRunOnly = Boolean.getBoolean("zkRunOnly"); // expert
-  
+
+  private SolrMetricProducer metricProducer ;
+
   public ZkContainer() {
     
   }
@@ -166,6 +171,15 @@ public class ZkContainer {
         if(boostrapConf) {
           ZkController.bootstrapConf(zkController.getZkClient(), cc);
         }
+        MetricsMap metricsMap = new MetricsMap(zkController.getZkClient().getMetrics());
+        metricProducer = new SolrMetricProducer() {
+          @Override
+          public void initializeMetrics(SolrMetricsContext parentContext, String scope) {
+            SolrMetricsContext ctx = parentContext.getChildContext(this);
+            ctx.gauge(null, metricsMap,
+                true, scope, null, SolrInfoBean.Category.CONTAINER.toString());
+          }
+        };
 
       } catch (InterruptedException e) {
         // Restore the interrupted status
@@ -263,6 +277,9 @@ public class ZkContainer {
     }
     
   }
+  public SolrMetricProducer getZkMetricsProducer() {
+    return this.metricProducer;
+  }
 
   public ExecutorService getCoreZkRegisterExecutorService() {
     return coreZkRegister;
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 8ad1eaf2820..e293ebe02d1 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -26,6 +26,8 @@ import java.nio.file.Path;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -33,11 +35,14 @@ import java.util.regex.Pattern;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.common.MapWriter;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.StringUtils;
+import org.apache.solr.common.annotation.JsonProperty;
 import org.apache.solr.common.cloud.ConnectionManager.IsClosed;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.common.util.ReflectMapWriter;
 import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -55,6 +60,9 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.zookeeper.Watcher.WatcherType.Children;
+import static org.apache.zookeeper.Watcher.WatcherType.Data;
+
 /**
  *
  * All Solr ZooKeeper interactions should go through this class rather than
@@ -75,6 +83,13 @@ public class SolrZkClient implements Closeable {
 
   private ZkCmdExecutor zkCmdExecutor;
 
+  private final ZkMetrics metrics = new ZkMetrics();
+
+
+  public MapWriter getMetrics() {
+    return metrics::writeMap;
+  }
+
   private final ExecutorService zkCallbackExecutor =
       ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("zkCallback"));
   private final ExecutorService zkConnManagerCallbackExecutor =
@@ -141,7 +156,7 @@ public class SolrZkClient implements Closeable {
     this.zkClientTimeout = zkClientTimeout;
     // we must retry at least as long as the session timeout
     zkCmdExecutor = new ZkCmdExecutor(zkClientTimeout, new IsClosed() {
-      
+
       @Override
       public boolean isClosed() {
         return SolrZkClient.this.isClosed();
@@ -149,7 +164,7 @@ public class SolrZkClient implements Closeable {
     });
     connManager = new ConnectionManager("ZooKeeperConnection Watcher:"
         + zkServerAddress, this, zkServerAddress, strat, onReconnect, beforeReconnect, new IsClosed() {
-          
+
           @Override
           public boolean isClosed() {
             return SolrZkClient.this.isClosed();
@@ -259,6 +274,7 @@ public class SolrZkClient implements Closeable {
     } else {
       keeper.delete(path, version);
     }
+    metrics.deletes.increment();
   }
 
   /**
@@ -267,9 +283,14 @@ public class SolrZkClient implements Closeable {
    * calling {@link #exists(String, org.apache.zookeeper.Watcher, boolean)} or
    * {@link #getData(String, org.apache.zookeeper.Watcher, org.apache.zookeeper.data.Stat, boolean)}.
    */
+  public Watcher wrapWatcher(final Watcher watcher, Watcher.WatcherType type) {
+    if (watcher == null || watcher instanceof ProcessWatchWithExecutor) return watcher;
+    return new ProcessWatchWithExecutor(watcher, type);
+  }
+
   public Watcher wrapWatcher(final Watcher watcher) {
     if (watcher == null || watcher instanceof ProcessWatchWithExecutor) return watcher;
-    return new ProcessWatchWithExecutor(watcher);
+    return new ProcessWatchWithExecutor(watcher, Watcher.WatcherType.Data);
   }
 
   /**
@@ -291,11 +312,14 @@ public class SolrZkClient implements Closeable {
    */
   public Stat exists(final String path, final Watcher watcher, boolean retryOnConnLoss)
       throws KeeperException, InterruptedException {
+    Stat result = null;
     if (retryOnConnLoss) {
-      return zkCmdExecutor.retryOperation(() -> keeper.exists(path, wrapWatcher(watcher)));
+      result = zkCmdExecutor.retryOperation(() -> keeper.exists(path, wrapWatcher(watcher)));
     } else {
-      return keeper.exists(path, wrapWatcher(watcher));
+      result = keeper.exists(path, wrapWatcher(watcher));
     }
+    metrics.existsChecks.increment();
+    return result;
   }
 
   /**
@@ -303,11 +327,15 @@ public class SolrZkClient implements Closeable {
    */
   public Boolean exists(final String path, boolean retryOnConnLoss)
       throws KeeperException, InterruptedException {
+    metrics.existsChecks.increment();
+    Boolean result = null;
     if (retryOnConnLoss) {
-      return zkCmdExecutor.retryOperation(() -> keeper.exists(path, null) != null);
+      result = zkCmdExecutor.retryOperation(() -> keeper.exists(path, null) != null);
     } else {
-      return keeper.exists(path, null) != null;
+      result = keeper.exists(path, null) != null;
     }
+    metrics.existsChecks.increment();
+    return result;
   }
 
   /**
@@ -315,11 +343,18 @@ public class SolrZkClient implements Closeable {
    */
   public List<String> getChildren(final String path, final Watcher watcher, boolean retryOnConnLoss)
       throws KeeperException, InterruptedException {
+    List<String> result = null;
     if (retryOnConnLoss) {
-      return zkCmdExecutor.retryOperation(() -> keeper.getChildren(path, wrapWatcher(watcher)));
+      result = zkCmdExecutor.retryOperation(() -> keeper.getChildren(path, wrapWatcher(watcher, Children)));
     } else {
-      return keeper.getChildren(path, wrapWatcher(watcher));
+      result = keeper.getChildren(path, wrapWatcher(watcher, Children));
     }
+
+    metrics.childFetches.increment();
+    if (result != null) {
+      metrics.cumulativeChildrenFetched.add(result.size());
+    }
+    return result;
   }
 
   /**
@@ -327,11 +362,17 @@ public class SolrZkClient implements Closeable {
    */
   public List<String> getChildren(final String path, final Watcher watcher,Stat stat, boolean retryOnConnLoss)
       throws KeeperException, InterruptedException {
+    List<String> result = null;
     if (retryOnConnLoss) {
-      return zkCmdExecutor.retryOperation(() -> keeper.getChildren(path, wrapWatcher(watcher) , stat));
+      result = zkCmdExecutor.retryOperation(() -> keeper.getChildren(path, wrapWatcher(watcher, Children), stat));
     } else {
-      return keeper.getChildren(path, wrapWatcher(watcher), stat);
+      result = keeper.getChildren(path, wrapWatcher(watcher), stat);
+    }
+    metrics.childFetches.increment();
+    if (result != null) {
+      metrics.cumulativeChildrenFetched.add(result.size());
     }
+    return result;
   }
 
   /**
@@ -339,11 +380,17 @@ public class SolrZkClient implements Closeable {
    */
   public byte[] getData(final String path, final Watcher watcher, final Stat stat, boolean retryOnConnLoss)
       throws KeeperException, InterruptedException {
+    byte[] result = null;
     if (retryOnConnLoss) {
-      return zkCmdExecutor.retryOperation(() -> keeper.getData(path, wrapWatcher(watcher), stat));
+      result = zkCmdExecutor.retryOperation(() -> keeper.getData(path, wrapWatcher(watcher), stat));
     } else {
-      return keeper.getData(path, wrapWatcher(watcher), stat);
+      result = keeper.getData(path, wrapWatcher(watcher), stat);
     }
+    metrics.reads.increment();
+    if (result != null) {
+      metrics.bytesRead.add(result.length);
+    }
+    return result;
   }
 
   /**
@@ -351,11 +398,17 @@ public class SolrZkClient implements Closeable {
    */
   public Stat setData(final String path, final byte data[], final int version, boolean retryOnConnLoss)
       throws KeeperException, InterruptedException {
+    Stat result = null;
     if (retryOnConnLoss) {
-      return zkCmdExecutor.retryOperation(() -> keeper.setData(path, data, version));
+      result = zkCmdExecutor.retryOperation(() -> keeper.setData(path, data, version));
     } else {
-      return keeper.setData(path, data, version);
+      result = keeper.setData(path, data, version);
+    }
+    metrics.writes.increment();
+    if (data != null) {
+      metrics.bytesWritten.add(data.length);
     }
+    return result;
   }
 
   public void atomicUpdate(String path, Function<byte[], byte[]> editor) throws KeeperException, InterruptedException {
@@ -400,13 +453,20 @@ public class SolrZkClient implements Closeable {
   public String create(final String path, final byte[] data,
       final CreateMode createMode, boolean retryOnConnLoss) throws KeeperException,
       InterruptedException {
+    String result = null;
     if (retryOnConnLoss) {
-      return zkCmdExecutor.retryOperation(() -> keeper.create(path, data, zkACLProvider.getACLsToAdd(path),
+      result = zkCmdExecutor.retryOperation(() -> keeper.create(path, data, zkACLProvider.getACLsToAdd(path),
           createMode));
     } else {
       List<ACL> acls = zkACLProvider.getACLsToAdd(path);
-      return keeper.create(path, data, acls, createMode);
+      result = keeper.create(path, data, acls, createMode);
+    }
+    metrics.writes.increment();
+    if (data != null) {
+      metrics.bytesWritten.increment();
     }
+    return result;
+
   }
 
   /**
@@ -504,6 +564,10 @@ public class SolrZkClient implements Closeable {
   public void makePath(String path, byte[] data, CreateMode createMode,
       Watcher watcher, boolean failOnExists, boolean retryOnConnLoss, int skipPathParts) throws KeeperException, InterruptedException {
     log.debug("makePath: {}", path);
+    metrics.writes.increment();
+    if (data != null) {
+      metrics.bytesWritten.add(data.length);
+    }
     boolean retry = true;
 
     if (path.startsWith("/")) {
@@ -591,11 +655,17 @@ public class SolrZkClient implements Closeable {
   }
 
   public List<OpResult> multi(final Iterable<Op> ops, boolean retryOnConnLoss) throws InterruptedException, KeeperException  {
+    List<OpResult> result = null;
     if (retryOnConnLoss) {
-      return zkCmdExecutor.retryOperation(() -> keeper.multi(ops));
+      result = zkCmdExecutor.retryOperation(() -> keeper.multi(ops));
     } else {
-      return keeper.multi(ops);
+      result = keeper.multi(ops);
+    }
+    metrics.multiOps.increment();
+    if (result != null) {
+      metrics.cumulativeMultiOps.add(result.size());
     }
+    return result;
   }
 
   /**
@@ -852,22 +922,40 @@ public class SolrZkClient implements Closeable {
    */
   private final class ProcessWatchWithExecutor implements Watcher { // see below for why final.
     private final Watcher watcher;
+    private final Watcher.WatcherType type;
 
-    ProcessWatchWithExecutor(Watcher watcher) {
+    ProcessWatchWithExecutor(Watcher watcher, WatcherType type) {
       if (watcher == null) {
         throw new IllegalArgumentException("Watcher must not be null");
       }
       this.watcher = watcher;
+      this.type = type;
+      if (type == Data) {
+        metrics.dataWatches.incrementAndGet();
+      } else if (type == Children) {
+        metrics.childrenWatches.incrementAndGet();
+      }
     }
 
     @Override
     public void process(final WatchedEvent event) {
+      if (type == Data) {
+        metrics.dataWatches.decrementAndGet();
+      } else if (type == Children) {
+        metrics.childrenWatches.decrementAndGet();
+      }
       log.debug("Submitting job to respond to event {}", event);
       try {
         if (watcher instanceof ConnectionManager) {
-          zkConnManagerCallbackExecutor.submit(() -> watcher.process(event));
+          zkConnManagerCallbackExecutor.submit(() -> {
+            metrics.watchesFired.increment();
+            watcher.process(event);
+          });
         } else {
-          zkCallbackExecutor.submit(() -> watcher.process(event));
+          zkCallbackExecutor.submit(() -> {
+            metrics.watchesFired.increment();
+            watcher.process(event);
+          });
         }
       } catch (RejectedExecutionException e) {
         // If not a graceful shutdown
@@ -896,4 +984,47 @@ public class SolrZkClient implements Closeable {
       return false;
     }
   }
+
+  // all fields of this class are public because ReflectMapWriter requires them to be.
+  //however the object itself is private and only this class can modify it
+  public static class ZkMetrics implements ReflectMapWriter {
+    @JsonProperty
+    public final LongAdder watchesFired = new LongAdder();
+    @JsonProperty
+    public final LongAdder reads = new LongAdder();
+    @JsonProperty
+    public final LongAdder writes = new LongAdder();
+    @JsonProperty
+
+    public final LongAdder bytesRead = new LongAdder();
+    @JsonProperty
+
+    public final LongAdder bytesWritten = new LongAdder();
+
+    @JsonProperty
+    public final LongAdder multiOps = new LongAdder();
+
+    @JsonProperty
+    public final LongAdder cumulativeMultiOps = new LongAdder();
+
+    @JsonProperty
+    public final LongAdder childFetches = new LongAdder();
+
+    @JsonProperty
+    public final LongAdder cumulativeChildrenFetched = new LongAdder();
+
+    @JsonProperty
+    public final LongAdder existsChecks = new LongAdder();
+
+    @JsonProperty
+    public final LongAdder deletes = new LongAdder();
+
+    @JsonProperty
+    public final AtomicLong dataWatches = new AtomicLong();
+
+    @JsonProperty
+    public final AtomicLong childrenWatches = new AtomicLong();
+
+
+  }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java b/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java
index 0f6648e250d..0053a812411 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java
@@ -35,6 +35,8 @@ import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -1071,6 +1073,12 @@ public class JavaBinCodec implements PushWriter {
         daos.writeByte(SHORT);
         daos.writeShort(((Short) val).intValue());
         return true;
+      } else if (val instanceof LongAdder) {
+        daos.writeLong(((LongAdder) val).longValue());
+        return true;
+      } else if (val instanceof LongAccumulator) {
+        daos.writeLong(((LongAccumulator) val).longValue());
+        return true;
       }
       return false;
 
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/TextWriter.java b/solr/solrj/src/java/org/apache/solr/common/util/TextWriter.java
index f6b2c9e4cd3..7812da89f5b 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/TextWriter.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/TextWriter.java
@@ -29,6 +29,8 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.solr.common.EnumFieldValue;
 import org.apache.solr.common.IteratorWriter;
@@ -144,6 +146,10 @@ public interface TextWriter extends PushWriter {
       writeInt(name, ((AtomicInteger) val).get());
     } else if (val instanceof AtomicLong) {
       writeLong(name, ((AtomicLong) val).get());
+    } else if (val instanceof LongAdder) {
+      writeLong(name, val.toString());
+    } else if (val instanceof LongAccumulator) {
+      writeLong(name, val.toString());
     } else {
       // default... for debugging only
       writeStr(name, val.getClass().getName() + ':' + val.toString(), true);