You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by th...@apache.org on 2021/07/02 22:09:25 UTC

[solr] branch main updated: SOLR-15499: StatsStream implement ParallelMetricsRollup to allow for tiered computation for collection aliases (#197)

This is an automated email from the ASF dual-hosted git repository.

thelabdude pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new 98c091d  SOLR-15499: StatsStream implement ParallelMetricsRollup to allow for tiered computation for collection aliases (#197)
98c091d is described below

commit 98c091dd759c5ad6135ee0a61a6572df0de0119e
Author: Timothy Potter <th...@gmail.com>
AuthorDate: Fri Jul 2 16:09:19 2021 -0600

    SOLR-15499: StatsStream implement ParallelMetricsRollup to allow for tiered computation for collection aliases (#197)
---
 solr/CHANGES.txt                                   |  3 ++
 .../solr/client/solrj/io/stream/FacetStream.java   |  4 +-
 .../solrj/io/stream/ParallelMetricsRollup.java     |  9 ++++
 .../solr/client/solrj/io/stream/StatsStream.java   | 63 ++++++++++++++++++++--
 .../stream/ParallelFacetStreamOverAliasTest.java   | 57 ++++++++++++++++++--
 5 files changed, 124 insertions(+), 12 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 86ea9a2..0f707b0 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -155,6 +155,9 @@ when told to. The admin UI now tells it to. (Nazerke Seidan, David Smiley)
 
 * SOLR-15475: Implement COUNT and APPROX_COUNT_DISTINCT aggregation functions for Parallel SQL (Timothy Potter)
 
+* SOLR-15499: StatsStream implement ParallelMetricsRollup to allow for tiered computation of SQL metrics
+  over collection aliases backed by many collections, potentially with many shards in each (Timothy Potter)
+
 Other Changes
 ----------------------
 * SOLR-14656: Autoscaling framework removed (Ishan Chattopadhyaya, noble, Ilan Ginzburg)
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
index fd66fd5..f2ac698 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
@@ -66,7 +66,7 @@ public class FacetStream extends TupleStream implements Expressible, ParallelMet
   private static final long serialVersionUID = 1;
 
   // allow client apps to disable the auto-plist via system property if they want to turn it off globally
-  private static final boolean defaultTieredEnabled =
+  static final boolean defaultTieredEnabled =
       Boolean.parseBoolean(System.getProperty("solr.facet.stream.tiered", "true"));
 
   static final String TIERED_PARAM = "tiered";
@@ -932,7 +932,7 @@ public class FacetStream extends TupleStream implements Expressible, ParallelMet
    * @return A mapping of fields produced by the rollup stream to their output name.
    */
   protected Map<String, String> getRollupSelectFields(Metric[] rollupMetrics) {
-    Map<String, String> map = new HashMap<>();
+    Map<String, String> map = new HashMap<>(rollupMetrics.length * 2);
     for (Bucket b : buckets) {
       String key = b.toString();
       map.put(key, key);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelMetricsRollup.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelMetricsRollup.java
index 0d0f1c3..75ff8f4 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelMetricsRollup.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelMetricsRollup.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
 
+import org.apache.solr.client.solrj.io.stream.metrics.CountDistinctMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
@@ -91,15 +92,19 @@ public interface ParallelMetricsRollup {
       if (next instanceof SumMetric) {
         // sum of sums
         nextRollup = new SumMetric(next.getIdentifier());
+        nextRollup.outputLong = next.outputLong;
       } else if (next instanceof MinMetric) {
         // min of mins
         nextRollup = new MinMetric(next.getIdentifier());
+        nextRollup.outputLong = next.outputLong;
       } else if (next instanceof MaxMetric) {
         // max of max
         nextRollup = new MaxMetric(next.getIdentifier());
+        nextRollup.outputLong = next.outputLong;
       } else if (next instanceof CountMetric) {
         // sum of counts
         nextRollup = new SumMetric(next.getIdentifier());
+        nextRollup.outputLong = next.outputLong;
         count = (CountMetric) next;
       } else if (next instanceof MeanMetric) {
         // WeightedSumMetric must have a count to compute the weighted avg. rollup from ...
@@ -118,6 +123,10 @@ public interface ParallelMetricsRollup {
         } else {
           return Optional.empty(); // can't properly rollup mean metrics w/o a count (reqd by WeightedSumMetric)
         }
+      } else if (next instanceof CountDistinctMetric) {
+        // rollup of count distinct is the max across the tiers
+        nextRollup = new MaxMetric(next.getIdentifier());
+        nextRollup.outputLong = next.outputLong;
       } else {
         return Optional.empty(); // can't parallelize this expr!
       }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
index 778ee82..ddc0276 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
@@ -17,17 +17,19 @@
 package org.apache.solr.client.solrj.io.stream;
 
 import java.io.IOException;
-
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.ClusterStateProvider;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
@@ -41,6 +43,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParamete
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.client.solrj.io.stream.metrics.Bucket;
 import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.Metric;
 import org.apache.solr.client.solrj.request.QueryRequest;
@@ -48,14 +51,18 @@ import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
 
+import static org.apache.solr.client.solrj.io.stream.FacetStream.TIERED_PARAM;
+import static org.apache.solr.client.solrj.io.stream.FacetStream.defaultTieredEnabled;
+
 /**
  * @since 6.6.0
  */
-public class StatsStream extends TupleStream implements Expressible  {
+public class StatsStream extends TupleStream implements Expressible, ParallelMetricsRollup  {
 
   private static final long serialVersionUID = 1;
 
-
+  // use a single "*" rollup bucket
+  private static final Bucket[] STATS_BUCKET = new Bucket[]{new Bucket("*")};
 
   private Metric[] metrics;
   private Tuple tuple;
@@ -66,6 +73,7 @@ public class StatsStream extends TupleStream implements Expressible  {
   protected transient SolrClientCache cache;
   protected transient CloudSolrClient cloudSolrClient;
   private StreamContext context;
+  protected transient TupleStream parallelizedStream;
 
   public StatsStream(String zkHost,
                           String collection,
@@ -213,14 +221,28 @@ public class StatsStream extends TupleStream implements Expressible  {
 
   public void open() throws IOException {
 
+    @SuppressWarnings({"unchecked"})
+    Map<String, List<String>> shardsMap = (Map<String, List<String>>)context.get("shards");
+
+    // Parallelize the stats stream across multiple collections for an alias using plist if possible
+    if (shardsMap == null && params.getBool(TIERED_PARAM, defaultTieredEnabled)) {
+      ClusterStateProvider clusterStateProvider = cache.getCloudSolrClient(zkHost).getClusterStateProvider();
+      final List<String> resolved = clusterStateProvider != null ? clusterStateProvider.resolveAlias(collection) : null;
+      if (resolved != null && resolved.size() > 1) {
+        Optional<TupleStream> maybeParallelize = openParallelStream(context, resolved, metrics);
+        if (maybeParallelize.isPresent()) {
+          this.parallelizedStream = maybeParallelize.get();
+          return; // we're using a plist to parallelize the facet operation
+        } // else, there's a metric that we can't rollup over the plist results safely ... no plist for you!
+      }
+    }
+
     String json = getJsonFacetString(metrics);
 
     ModifiableSolrParams paramsLoc = new ModifiableSolrParams(params);
     paramsLoc.set("json.facet", json);
     paramsLoc.set("rows", "0");
 
-    @SuppressWarnings({"unchecked"})
-    Map<String, List<String>> shardsMap = (Map<String, List<String>>)context.get("shards");
     if(shardsMap == null) {
       QueryRequest request = new QueryRequest(paramsLoc, SolrRequest.METHOD.POST);
       cloudSolrClient = cache.getCloudSolrClient(zkHost);
@@ -266,6 +288,10 @@ public class StatsStream extends TupleStream implements Expressible  {
   }
 
   public Tuple read() throws IOException {
+    if (parallelizedStream != null) {
+      return parallelizedStream.read();
+    }
+
     if(index == 0) {
       ++index;
       return tuple;
@@ -352,4 +378,31 @@ public class StatsStream extends TupleStream implements Expressible  {
   public StreamComparator getStreamSort() {
     return null;
   }
+
+  @Override
+  public TupleStream[] parallelize(List<String> partitions) throws IOException {
+    final ModifiableSolrParams withoutTieredParam = new ModifiableSolrParams(params);
+    withoutTieredParam.remove(TIERED_PARAM); // each individual request is not tiered
+
+    TupleStream[] streams = new TupleStream[partitions.size()];
+    for (int p = 0; p < streams.length; p++) {
+      streams[p] = new StatsStream(zkHost, partitions.get(p), withoutTieredParam, metrics);
+    }
+    return streams;
+  }
+
+  @Override
+  public TupleStream getSortedRollupStream(ParallelListStream plist, Metric[] rollupMetrics) throws IOException {
+    return new SelectStream(new HashRollupStream(plist, STATS_BUCKET, rollupMetrics), getRollupSelectFields(rollupMetrics));
+  }
+
+  // Map the rollup metric to the original metric name so that we can project out the correct field names in the tuple
+  protected Map<String, String> getRollupSelectFields(Metric[] rollupMetrics) {
+    Map<String, String> map = new HashMap<>(rollupMetrics.length * 2);
+    for (Metric m : rollupMetrics) {
+      String[] cols = m.getColumns();
+      map.put(m.getIdentifier(), cols != null && cols.length > 0 ? cols[0] : "*");
+    }
+    return map;
+  }
 }
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/ParallelFacetStreamOverAliasTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/ParallelFacetStreamOverAliasTest.java
index aede95b..639929f 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/ParallelFacetStreamOverAliasTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/ParallelFacetStreamOverAliasTest.java
@@ -40,15 +40,25 @@ import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.client.solrj.io.stream.metrics.CountDistinctMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.Metric;
+import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.handler.SolrDefaultStreamFactory;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.apache.solr.client.solrj.io.stream.FacetStream.TIERED_PARAM;
+
 /**
  * Verify auto-plist with rollup over a facet expression when using collection alias over multiple collections.
  */
@@ -63,6 +73,8 @@ public class ParallelFacetStreamOverAliasTest extends SolrCloudTestCase {
   private static final int NUM_DOCS_PER_COLLECTION = 40;
   private static final int NUM_SHARDS_PER_COLLECTION = 4;
   private static final int CARDINALITY = 10;
+  private static final int BUCKET_SIZE_LIMIT = Math.max(CARDINALITY * 2, 100);
+
   private static final RandomGenerator rand = new JDKRandomGenerator(5150);
   private static List<String> listOfCollections;
   private static SolrClientCache solrClientCache;
@@ -166,7 +178,7 @@ public class ParallelFacetStreamOverAliasTest extends SolrCloudTestCase {
         "  q=\"*:*\", \n" +
         "  buckets=\"a_i\", \n" +
         "  bucketSorts=\"a_i asc\", \n" +
-        "  bucketSizeLimit=100, \n" +
+        "  bucketSizeLimit=" + BUCKET_SIZE_LIMIT + ", \n" +
         "  sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)\n" +
         ")\n";
 
@@ -188,7 +200,7 @@ public class ParallelFacetStreamOverAliasTest extends SolrCloudTestCase {
         "  q=\"*:*\", \n" +
         "  buckets=\"a_i,b_i\", \n" + /* two dimensions here ~ doubles the number of tuples */
         "  bucketSorts=\"sum(a_d) desc\", \n" +
-        "  bucketSizeLimit=100, \n" +
+        "  bucketSizeLimit=" + BUCKET_SIZE_LIMIT + ", \n" +
         "  sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)\n" +
         ")\n";
 
@@ -197,7 +209,6 @@ public class ParallelFacetStreamOverAliasTest extends SolrCloudTestCase {
 
   @Test
   public void testParallelFacetSortByDimensions() throws Exception {
-
     // notice we're sorting the stream by a metric, but internally, that doesn't work for parallelization
     // so the rollup has to sort by dimensions and then apply a final re-sort once the parallel streams are merged
     String facetExprTmpl = "" +
@@ -207,13 +218,49 @@ public class ParallelFacetStreamOverAliasTest extends SolrCloudTestCase {
         "  q=\"*:*\", \n" +
         "  buckets=\"a_i,b_i\", \n" +
         "  bucketSorts=\"a_i asc, b_i asc\", \n" +
-        "  bucketSizeLimit=100, \n" +
+        "  bucketSizeLimit=" + BUCKET_SIZE_LIMIT + ", \n" +
         "  sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)\n" +
         ")\n";
 
     compareTieredStreamWithNonTiered(facetExprTmpl, 2);
   }
 
+  @Test
+  public void testParallelStats() throws Exception {
+    Metric[] metrics = new Metric[]{
+        new CountMetric(),
+        new CountDistinctMetric("a_i"),
+        new SumMetric("b_i"),
+        new MinMetric("a_i"),
+        new MaxMetric("a_i"),
+        new MeanMetric("a_d")
+    };
+
+    String zkHost = cluster.getZkServer().getZkAddress();
+    StreamContext streamContext = new StreamContext();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    ModifiableSolrParams solrParams = new ModifiableSolrParams();
+    solrParams.add(CommonParams.Q, "*:*");
+    solrParams.add(TIERED_PARAM, "true");
+
+    // tiered stats stream
+    StatsStream statsStream = new StatsStream(zkHost, ALIAS_NAME, solrParams, metrics);
+    statsStream.setStreamContext(streamContext);
+    List<Tuple> tieredTuples = getTuples(statsStream);
+    assertEquals(1, tieredTuples.size());
+    assertNotNull(statsStream.parallelizedStream);
+
+    solrParams = new ModifiableSolrParams();
+    solrParams.add(CommonParams.Q, "*:*");
+    solrParams.add(TIERED_PARAM, "false");
+    statsStream = new StatsStream(zkHost, ALIAS_NAME, solrParams, metrics);
+    statsStream.setStreamContext(streamContext);
+    // tiered should match non-tiered results
+    assertListOfTuplesEquals(tieredTuples, getTuples(statsStream));
+    assertNull(statsStream.parallelizedStream);
+  }
+
   // execute the provided expression with tiered=true and compare to results of tiered=false
   private void compareTieredStreamWithNonTiered(String facetExprTmpl, int dims) throws IOException {
     String facetExpr = String.format(Locale.US, facetExprTmpl, ALIAS_NAME, "true");
@@ -246,7 +293,7 @@ public class ParallelFacetStreamOverAliasTest extends SolrCloudTestCase {
     assertTrue(stream instanceof FacetStream);
     FacetStream facetStream = (FacetStream) stream;
     TupleStream[] parallelStreams = facetStream.parallelize(listOfCollections);
-    assertEquals(2, parallelStreams.length);
+    assertEquals(NUM_COLLECTIONS, parallelStreams.length);
     assertTrue(parallelStreams[0] instanceof FacetStream);
 
     Optional<Metric[]> rollupMetrics = facetStream.getRollupMetrics(facetStream.getMetrics().toArray(new Metric[0]));