You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2019/02/12 10:56:05 UTC

[lucene-solr] branch branch_8x updated: SOLR-13155: Add command-line option for testing autoscaling configurations.

This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_8x by this push:
     new 98cea54  SOLR-13155: Add command-line option for testing autoscaling configurations.
98cea54 is described below

commit 98cea5440f618ec2e3e63cf3f595ed34519a7d20
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Tue Feb 12 11:53:15 2019 +0100

    SOLR-13155: Add command-line option for testing autoscaling configurations.
---
 solr/CHANGES.txt                                   |   2 +
 solr/bin/solr                                      |  19 +-
 .../java/org/apache/solr/util/RedactionUtils.java  |  29 ++
 .../src/java/org/apache/solr/util/SolrCLI.java     | 297 +++++++++++++++++++++
 .../solr/client/solrj/cloud/autoscaling/Row.java   |   4 +
 5 files changed, 350 insertions(+), 1 deletion(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index a4406ad..00236d2 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -183,6 +183,8 @@ New Features
 
 * SOLR-13147: Add movingMAD Stream Evaluator (Joel Bernstein)
 
+* SOLR-13155: Add command-line option for testing autoscaling configurations. (ab)
+
 Bug Fixes
 ----------------------
 
diff --git a/solr/bin/solr b/solr/bin/solr
index e3d0d4e..a61d493 100755
--- a/solr/bin/solr
+++ b/solr/bin/solr
@@ -312,7 +312,7 @@ function print_usage() {
   if [ -z "$CMD" ]; then
     echo ""
     echo "Usage: solr COMMAND OPTIONS"
-    echo "       where COMMAND is one of: start, stop, restart, status, healthcheck, create, create_core, create_collection, delete, version, zk, auth, assert, config"
+    echo "       where COMMAND is one of: start, stop, restart, status, healthcheck, create, create_core, create_collection, delete, version, zk, auth, assert, config, autoscaling"
     echo ""
     echo "  Standalone server example (start Solr running in the background on port 8984):"
     echo ""
@@ -632,6 +632,17 @@ function print_usage() {
     echo ""
     echo "  -V                                     Enable more verbose output."
     echo ""
+  elif [ "$CMD" == "autoscaling" ]; then
+    echo ""
+    echo "Usage: solr autoscaling [-z zkHost] [-a <autoscaling.json.file>] [-s] [-d] [-n] [-r]"
+    echo ""
+    echo "  Calculate autoscaling policy suggestions and diagnostic information, using either the deployed"
+    echo "  autoscaling configuration or the one supplied on the command line. This calculation takes place"
+    echo "  on the client-side without affecting the running cluster except for fetching the node and replica"
+    echo "  metrics from the cluster. For detailed usage instructions, do:"
+    echo ""
+    echo "    bin/solr autoscaling -help"
+    echo ""
   fi
 } # end print_usage
 
@@ -1343,6 +1354,12 @@ if [[ "$SCRIPT_CMD" == "zk" ]]; then
   exit $?
 fi
 
+
+if [[ "$SCRIPT_CMD" == "autoscaling" ]]; then
+  run_tool autoscaling $@
+  exit $?
+fi
+
 if [[ "$SCRIPT_CMD" == "auth" ]]; then
 
   VERBOSE=""
diff --git a/solr/core/src/java/org/apache/solr/util/RedactionUtils.java b/solr/core/src/java/org/apache/solr/util/RedactionUtils.java
index afa2abf..2661a28 100644
--- a/solr/core/src/java/org/apache/solr/util/RedactionUtils.java
+++ b/solr/core/src/java/org/apache/solr/util/RedactionUtils.java
@@ -17,6 +17,10 @@
 
 package org.apache.solr.util;
 
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.regex.Pattern;
 
 public class RedactionUtils {
@@ -47,5 +51,30 @@ public class RedactionUtils {
     RedactionUtils.redactSystemProperty = redactSystemProperty;
   }
 
+  /**
+   * Replace actual names found in a string with meaningless randomized names.
+   * @param names actual names
+   * @param redactionPrefix prefix to use for redacted names
+   * @param data string to redact
+   * @return redacted string where all actual names have been replaced.
+   */
+  public static String redactNames(Collection<String> names, String redactionPrefix, String data) {
+    Set<String> uniqueNames = new TreeSet<>(names);
+    Set<Integer> uniqueCode = new HashSet<>();
+    // minimal(ish) hash
+    int codeShift = 0;
+    int codeSpace = names.size();
+    for (String name : uniqueNames) {
+      int code = Math.abs(name.hashCode() % codeSpace);
+      while (uniqueCode.contains(code)) {
+        codeShift++;
+        codeSpace = names.size() << codeShift;
+        code = Math.abs(name.hashCode() % codeSpace);
+      }
+      uniqueCode.add(code);
+      data = data.replaceAll("\\Q" + name + "\\E", redactionPrefix + Integer.toString(code, Character.MAX_RADIX));
+    }
+    return data;
+  }
 
 }
diff --git a/solr/core/src/java/org/apache/solr/util/SolrCLI.java b/solr/core/src/java/org/apache/solr/util/SolrCLI.java
index 03aa5f8..a892a35 100755
--- a/solr/core/src/java/org/apache/solr/util/SolrCLI.java
+++ b/solr/core/src/java/org/apache/solr/util/SolrCLI.java
@@ -19,12 +19,14 @@ package org.apache.solr.util;
 import javax.net.ssl.SSLPeerUnverifiedException;
 import java.io.Console;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintStream;
 import java.lang.invoke.MethodHandles;
 import java.net.ConnectException;
+import java.net.MalformedURLException;
 import java.net.Socket;
 import java.net.SocketException;
 import java.net.URL;
@@ -43,6 +45,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
@@ -50,9 +53,11 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Scanner;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.zip.ZipEntry;
@@ -72,6 +77,7 @@ import org.apache.commons.exec.Executor;
 import org.apache.commons.exec.OS;
 import org.apache.commons.exec.environment.EnvironmentUtils;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.BooleanUtils;
 import org.apache.commons.lang.SystemUtils;
 import org.apache.http.HttpEntity;
@@ -93,15 +99,24 @@ import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.DistributedQueue;
+import org.apache.solr.client.solrj.cloud.DistributedQueueFactory;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
+import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
+import org.apache.solr.client.solrj.cloud.autoscaling.Row;
+import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.impl.HttpClientUtil;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder;
+import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
 import org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
 import org.apache.solr.client.solrj.response.CollectionAdminResponse;
 import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.MapWriter;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
@@ -117,6 +132,8 @@ import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.ContentStreamBase;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.common.util.Utils;
 import org.apache.solr.security.Sha256AuthenticationProvider;
 import org.apache.solr.util.configuration.SSLConfigurationsFactory;
 import org.noggit.CharArr;
@@ -392,6 +409,8 @@ public class SolrCLI {
       return new UtilsTool();
     else if ("auth".equals(toolType))
       return new AuthTool();
+    else if ("autoscaling".equals(toolType))
+      return new AutoscalingTool();
 
     // If you add a built-in tool to this class, add it here to avoid
     // classpath scanning
@@ -410,6 +429,7 @@ public class SolrCLI {
     formatter.printHelp("healthcheck", getToolOptions(new HealthcheckTool()));
     formatter.printHelp("status", getToolOptions(new StatusTool()));
     formatter.printHelp("api", getToolOptions(new ApiTool()));
+    formatter.printHelp("autoscaling", getToolOptions(new AutoscalingTool()));
     formatter.printHelp("create_collection", getToolOptions(new CreateCollectionTool()));
     formatter.printHelp("create_core", getToolOptions(new CreateCoreTool()));
     formatter.printHelp("create", getToolOptions(new CreateTool()));
@@ -832,6 +852,283 @@ public class SolrCLI {
   }
   
 
+  public static class AutoscalingTool extends SolrCloudTool {
+    static final String NODE_REDACTION_PREFIX = "N_";
+    static final String COLL_REDACTION_PREFIX = "COLL_";
+
+    public AutoscalingTool() {
+      this(System.out);
+    }
+
+    public AutoscalingTool(PrintStream stdout) {
+      super(stdout);
+    }
+
+    @Override
+    public Option[] getOptions() {
+      return new Option[] {
+          OptionBuilder
+              .withArgName("HOST")
+              .hasArg()
+              .isRequired(false)
+              .withDescription("Address of the Zookeeper ensemble; defaults to: "+ZK_HOST)
+              .create("zkHost"),
+          OptionBuilder
+              .withArgName("CONFIG")
+              .hasArg()
+              .isRequired(false)
+              .withDescription("Autoscaling config file, defaults to the one deployed in the cluster.")
+              .withLongOpt("config")
+              .create("a"),
+          OptionBuilder
+              .withDescription("Show calculated suggestions")
+              .withLongOpt("suggestions")
+              .create("s"),
+          OptionBuilder
+              .withDescription("Show ClusterState (collections layout)")
+              .withLongOpt("clusterState")
+              .create("c"),
+          OptionBuilder
+              .withDescription("Show calculated diagnostics")
+              .withLongOpt("diagnostics")
+              .create("d"),
+          OptionBuilder
+              .withDescription("Show sorted nodes with diagnostics")
+              .withLongOpt("sortedNodes")
+              .create("n"),
+          OptionBuilder
+              .withDescription("Redact node and collection names (original names will be consistently randomized)")
+              .withLongOpt("redact")
+              .create("r"),
+          OptionBuilder
+              .withDescription("Show summarized collection & node statistics.")
+              .create("stats"),
+          OptionBuilder
+              .withDescription("Turn on all options to get all available information.")
+              .create("all")
+
+      };
+    }
+
+    @Override
+    public String getName() {
+      return "autoscaling";
+    }
+
+    @Override
+    protected void runCloudTool(CloudSolrClient cloudSolrClient, CommandLine cli) throws Exception {
+      DistributedQueueFactory dummmyFactory = new DistributedQueueFactory() {
+        @Override
+        public DistributedQueue makeQueue(String path) throws IOException {
+          throw new UnsupportedOperationException("makeQueue");
+        }
+
+        @Override
+        public void removeQueue(String path) throws IOException {
+          throw new UnsupportedOperationException("removeQueue");
+        }
+      };
+      try (SolrClientCloudManager clientCloudManager = new SolrClientCloudManager(dummmyFactory, cloudSolrClient)) {
+        AutoScalingConfig config = null;
+        HashSet<String> liveNodes = new HashSet<>();
+        String configFile = cli.getOptionValue("a");
+        if (configFile != null) {
+          log.info("- reading autoscaling config from " + configFile);
+          config = new AutoScalingConfig(IOUtils.toByteArray(new FileInputStream(configFile)));
+        } else {
+          log.info("- reading autoscaling config from the cluster.");
+          config = clientCloudManager.getDistribStateManager().getAutoScalingConfig();
+        }
+        log.info("- calculating suggestions...");
+        long start = TimeSource.NANO_TIME.getTimeNs();
+        // collect live node names for optional redaction
+        liveNodes.addAll(clientCloudManager.getClusterStateProvider().getLiveNodes());
+        List<Suggester.SuggestionInfo> suggestions = PolicyHelper.getSuggestions(config, clientCloudManager);
+        long end = TimeSource.NANO_TIME.getTimeNs();
+        log.info("  (took " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms)");
+        log.info("- calculating diagnostics...");
+        start = TimeSource.NANO_TIME.getTimeNs();
+        // update the live nodes
+        liveNodes.addAll(clientCloudManager.getClusterStateProvider().getLiveNodes());
+        Policy.Session session = config.getPolicy().createSession(clientCloudManager);
+        MapWriter mw = PolicyHelper.getDiagnostics(session);
+        Map<String, Object> diagnostics = new LinkedHashMap<>();
+        mw.toMap(diagnostics);
+        end = TimeSource.NANO_TIME.getTimeNs();
+        log.info("  (took " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms)");
+        boolean withSuggestions = cli.hasOption("s");
+        boolean withDiagnostics = cli.hasOption("d") || cli.hasOption("n");
+        boolean withSortedNodes = cli.hasOption("n");
+        boolean withClusterState = cli.hasOption("c");
+        boolean withStats = cli.hasOption("stats");
+        boolean redact = cli.hasOption("r");
+        if (cli.hasOption("all")) {
+          withSuggestions = true;
+          withDiagnostics = true;
+          withSortedNodes = true;
+          withClusterState = true;
+          withStats = true;
+        }
+        // prepare to redact also host names / IPs in base_url and other properties
+        Set<String> redactNames = new HashSet<>();
+        for (String nodeName : liveNodes) {
+          String urlString = Utils.getBaseUrlForNodeName(nodeName, "http");
+          try {
+            URL u = new URL(urlString);
+            // protocol format
+            redactNames.add(u.getHost() + ":" + u.getPort());
+            // node name format
+            redactNames.add(u.getHost() + "_" + u.getPort() + "_");
+          } catch (MalformedURLException e) {
+            log.warn("Invalid URL for node name " + nodeName + ", replacing including protocol and path", e);
+            redactNames.add(urlString);
+            redactNames.add(Utils.getBaseUrlForNodeName(nodeName, "https"));
+          }
+        }
+        // redact collection names too
+        Set<String> redactCollections = new HashSet<>();
+        ClusterState clusterState = clientCloudManager.getClusterStateProvider().getClusterState();
+        clusterState.forEachCollection(coll -> redactCollections.add(coll.getName()));
+        if (!withSuggestions && !withDiagnostics) {
+          withSuggestions = true;
+        }
+        Map<String, Object> results = new LinkedHashMap<>();
+        if (withClusterState) {
+          Map<String, Object> map = new LinkedHashMap<>();
+          map.put("znodeVersion", clusterState.getZNodeVersion());
+          map.put("liveNodes", new TreeSet<>(clusterState.getLiveNodes()));
+          map.put("collections", clusterState.getCollectionsMap());
+          results.put("CLUSTERSTATE", map);
+        }
+        if (withStats) {
+          Map<String, Map<String, Number>> collStats = new TreeMap<>();
+          clusterState.forEachCollection(coll -> {
+            Map<String, Number> perColl = collStats.computeIfAbsent(coll.getName(), n -> new LinkedHashMap<>());
+            AtomicInteger numCores = new AtomicInteger();
+            HashMap<String, Map<String, AtomicInteger>> nodes = new HashMap<>();
+            coll.getSlices().forEach(s -> {
+              numCores.addAndGet(s.getReplicas().size());
+              s.getReplicas().forEach(r -> {
+                nodes.computeIfAbsent(r.getNodeName(), n -> new HashMap<>())
+                    .computeIfAbsent(s.getName(), slice -> new AtomicInteger()).incrementAndGet();
+              });
+            });
+            int maxCoresPerNode = 0;
+            int minCoresPerNode = 0;
+            int maxActualShardsPerNode = 0;
+            int minActualShardsPerNode = 0;
+            int maxShardReplicasPerNode = 0;
+            int minShardReplicasPerNode = 0;
+            if (!nodes.isEmpty()) {
+              minCoresPerNode = Integer.MAX_VALUE;
+              minActualShardsPerNode = Integer.MAX_VALUE;
+              minShardReplicasPerNode = Integer.MAX_VALUE;
+              for (Map<String, AtomicInteger> counts : nodes.values()) {
+                int total = counts.values().stream().mapToInt(c -> c.get()).sum();
+                for (AtomicInteger count : counts.values()) {
+                  if (count.get() > maxShardReplicasPerNode) {
+                    maxShardReplicasPerNode = count.get();
+                  }
+                  if (count.get() < minShardReplicasPerNode) {
+                    minShardReplicasPerNode = count.get();
+                  }
+                }
+                if (total > maxCoresPerNode) {
+                  maxCoresPerNode = total;
+                }
+                if (total < minCoresPerNode) {
+                  minCoresPerNode = total;
+                }
+                if (counts.size() > maxActualShardsPerNode) {
+                  maxActualShardsPerNode = counts.size();
+                }
+                if (counts.size() < minActualShardsPerNode) {
+                  minActualShardsPerNode = counts.size();
+                }
+              }
+            }
+            perColl.put("activeShards", coll.getActiveSlices().size());
+            perColl.put("inactiveShards", coll.getSlices().size() - coll.getActiveSlices().size());
+            perColl.put("rf", coll.getReplicationFactor());
+            perColl.put("maxShardsPerNode", coll.getMaxShardsPerNode());
+            perColl.put("maxActualShardsPerNode", maxActualShardsPerNode);
+            perColl.put("minActualShardsPerNode", minActualShardsPerNode);
+            perColl.put("maxShardReplicasPerNode", maxShardReplicasPerNode);
+            perColl.put("minShardReplicasPerNode", minShardReplicasPerNode);
+            perColl.put("numCores", numCores.get());
+            perColl.put("numNodes", nodes.size());
+            perColl.put("maxCoresPerNode", maxCoresPerNode);
+            perColl.put("minCoresPerNode", minCoresPerNode);
+          });
+          Map<String, Map<String, Object>> nodeStats = new TreeMap<>();
+          for (Row row : session.getSortedNodes()) {
+            Map<String, Object> nodeStat = nodeStats.computeIfAbsent(row.node, n -> new LinkedHashMap<>());
+            nodeStat.put("isLive", row.isLive());
+            nodeStat.put("freedisk", row.getVal("freedisk", 0));
+            nodeStat.put("totaldisk", row.getVal("totaldisk", 0));
+            nodeStat.put("cores", row.getVal("cores", 0));
+            Map<String, Map<String, Map<String, Object>>> collReplicas = new TreeMap<>();
+            row.forEachReplica(ri -> {
+              Map<String, Object> perReplica = collReplicas.computeIfAbsent(ri.getCollection(), c -> new TreeMap<>())
+                  .computeIfAbsent(ri.getCore().substring(ri.getCollection().length() + 1), core -> new LinkedHashMap<>());
+              perReplica.put("INDEX.sizeInGB", ri.getVariable("INDEX.sizeInGB"));
+              perReplica.put("coreNode", ri.getName());
+              if (ri.getBool("leader", false)) {
+                perReplica.put("leader", true);
+                Double totalSize = (Double)collStats.computeIfAbsent(ri.getCollection(), c -> new HashMap<>())
+                    .computeIfAbsent("avgShardSize", size -> 0.0);
+                Number riSize = (Number)ri.getVariable("INDEX.sizeInGB");
+                if (riSize != null) {
+                  totalSize += riSize.doubleValue();
+                  collStats.get(ri.getCollection()).put("avgShardSize", totalSize);
+                  Double max = (Double)collStats.get(ri.getCollection()).get("maxShardSize");
+                  if (max == null) max = 0.0;
+                  if (riSize.doubleValue() > max) {
+                    collStats.get(ri.getCollection()).put("maxShardSize", riSize.doubleValue());
+                  }
+                  Double min = (Double)collStats.get(ri.getCollection()).get("minShardSize");
+                  if (min == null) min = Double.MAX_VALUE;
+                  if (riSize.doubleValue() < min) {
+                    collStats.get(ri.getCollection()).put("minShardSize", riSize.doubleValue());
+                  }
+                }
+              }
+              nodeStat.put("replicas", collReplicas);
+            });
+          }
+
+          // calculate average per shard
+          for (Map<String, Number> perColl : collStats.values()) {
+            Double avg = (Double)perColl.get("avgShardSize");
+            if (avg != null) {
+              avg = avg / ((Number)perColl.get("activeShards")).doubleValue();
+              perColl.put("avgShardSize", avg);
+            }
+          }
+          Map<String, Object> stats = new LinkedHashMap<>();
+          results.put("STATISTICS", stats);
+          stats.put("nodeStats", nodeStats);
+          stats.put("collectionStats", collStats);
+        }
+        if (withSuggestions) {
+          results.put("SUGGESTIONS", suggestions);
+        }
+        if (!withSortedNodes) {
+          diagnostics.remove("sortedNodes");
+        }
+        if (withDiagnostics) {
+          results.put("DIAGNOSTICS", diagnostics);
+        }
+        String data = Utils.toJSONString(results);
+        if (redact) {
+          data = RedactionUtils.redactNames(redactCollections, COLL_REDACTION_PREFIX, data);
+          data = RedactionUtils.redactNames(redactNames, NODE_REDACTION_PREFIX, data);
+        }
+        stdout.println(data);
+      }
+    }
+  }
+
   /**
    * Get the status of a Solr server.
    */
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java
index 52e6169..2cc48ea 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java
@@ -351,6 +351,10 @@ public class Row implements MapWriter {
     return cells;
   }
 
+  public boolean isLive() {
+    return isLive;
+  }
+
   public void forEachReplica(Consumer<ReplicaInfo> consumer) {
     forEachReplica(collectionVsShardVsReplicas, consumer);
   }