You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by th...@apache.org on 2021/07/06 21:10:22 UTC

[lucene-solr] branch branch_8x updated: SOLR-15499: StatsStream implement ParallelMetricsRollup to allow for tiered computation of SQL metrics (#2529)

This is an automated email from the ASF dual-hosted git repository.

thelabdude pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_8x by this push:
     new ea640e7  SOLR-15499: StatsStream implement ParallelMetricsRollup to allow for tiered computation of SQL metrics (#2529)
ea640e7 is described below

commit ea640e7ecb5f3b7425422f684927252472647305
Author: Timothy Potter <th...@gmail.com>
AuthorDate: Tue Jul 6 15:10:02 2021 -0600

    SOLR-15499: StatsStream implement ParallelMetricsRollup to allow for tiered computation of SQL metrics (#2529)
---
 solr/CHANGES.txt                                   |  3 +
 .../solr/client/solrj/io/stream/FacetStream.java   |  4 +-
 .../solrj/io/stream/ParallelMetricsRollup.java     |  9 +++
 .../solr/client/solrj/io/stream/StatsStream.java   | 76 ++++++++++++++++++----
 .../stream/ParallelFacetStreamOverAliasTest.java   | 57 ++++++++++++++--
 5 files changed, 129 insertions(+), 20 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index df56eb7..a9966a4 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -29,6 +29,9 @@ Improvements
 
 * SOLR-15475: Implement COUNT and APPROX_COUNT_DISTINCT aggregation functions for Parallel SQL (Timothy Potter)
 
+* SOLR-15499: StatsStream implement ParallelMetricsRollup to allow for tiered computation of SQL metrics
+  over collection aliases backed by many collections, potentially with many shards in each (Timothy Potter)
+
 Optimizations
 ---------------------
 * SOLR-15433: Replace transient core cache LRU by Caffeine cache. (Bruno Roustant)
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
index 0f42c61..f463f92 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
@@ -66,7 +66,7 @@ public class FacetStream extends TupleStream implements Expressible, ParallelMet
   private static final long serialVersionUID = 1;
 
   // allow client apps to disable the auto-plist via system property if they want to turn it off globally
-  private static final boolean defaultTieredEnabled =
+  static final boolean defaultTieredEnabled =
       Boolean.parseBoolean(System.getProperty("solr.facet.stream.tiered", "false"));
 
   static final String TIERED_PARAM = "tiered";
@@ -937,7 +937,7 @@ public class FacetStream extends TupleStream implements Expressible, ParallelMet
    * @return A mapping of fields produced by the rollup stream to their output name.
    */
   protected Map<String, String> getRollupSelectFields(Metric[] rollupMetrics) {
-    Map<String, String> map = new HashMap<>();
+    Map<String, String> map = new HashMap<>(rollupMetrics.length * 2);
     for (Bucket b : buckets) {
       String key = b.toString();
       map.put(key, key);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelMetricsRollup.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelMetricsRollup.java
index 5674f2e..187500e 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelMetricsRollup.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelMetricsRollup.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
 
+import org.apache.solr.client.solrj.io.stream.metrics.CountDistinctMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
@@ -91,15 +92,19 @@ public interface ParallelMetricsRollup {
       if (next instanceof SumMetric) {
         // sum of sums
         nextRollup = new SumMetric(next.getIdentifier());
+        nextRollup.outputLong = next.outputLong;
       } else if (next instanceof MinMetric) {
         // min of mins
         nextRollup = new MinMetric(next.getIdentifier());
+        nextRollup.outputLong = next.outputLong;
       } else if (next instanceof MaxMetric) {
         // max of max
         nextRollup = new MaxMetric(next.getIdentifier());
+        nextRollup.outputLong = next.outputLong;
       } else if (next instanceof CountMetric) {
         // sum of counts
         nextRollup = new SumMetric(next.getIdentifier());
+        nextRollup.outputLong = next.outputLong;
         count = (CountMetric) next;
       } else if (next instanceof MeanMetric) {
         // WeightedSumMetric must have a count to compute the weighted avg. rollup from ...
@@ -118,6 +123,10 @@ public interface ParallelMetricsRollup {
         } else {
           return Optional.empty(); // can't properly rollup mean metrics w/o a count (reqd by WeightedSumMetric)
         }
+      } else if (next instanceof CountDistinctMetric) {
+        // rollup of count distinct is the max across the tiers
+        nextRollup = new MaxMetric(next.getIdentifier());
+        nextRollup.outputLong = next.outputLong;
       } else {
         return Optional.empty(); // can't parallelize this expr!
       }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
index eb9e0ff..aa6ab91 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
@@ -17,17 +17,19 @@
 package org.apache.solr.client.solrj.io.stream;
 
 import java.io.IOException;
-
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.ClusterStateProvider;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
@@ -41,6 +43,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParamete
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.client.solrj.io.stream.metrics.Bucket;
 import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.Metric;
 import org.apache.solr.client.solrj.request.QueryRequest;
@@ -48,14 +51,18 @@ import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
 
+import static org.apache.solr.client.solrj.io.stream.FacetStream.TIERED_PARAM;
+import static org.apache.solr.client.solrj.io.stream.FacetStream.defaultTieredEnabled;
+
 /**
  * @since 6.6.0
  */
-public class StatsStream extends TupleStream implements Expressible  {
+public class StatsStream extends TupleStream implements Expressible, ParallelMetricsRollup  {
 
   private static final long serialVersionUID = 1;
 
-
+  // use a single "*" rollup bucket
+  private static final Bucket[] STATS_BUCKET = new Bucket[]{new Bucket("*")};
 
   private Metric[] metrics;
   private Tuple tuple;
@@ -66,6 +73,7 @@ public class StatsStream extends TupleStream implements Expressible  {
   protected transient SolrClientCache cache;
   protected transient CloudSolrClient cloudSolrClient;
   private StreamContext context;
+  protected transient TupleStream parallelizedStream;
 
   public StatsStream(String zkHost,
                      String collection,
@@ -213,20 +221,33 @@ public class StatsStream extends TupleStream implements Expressible  {
 
   public void open() throws IOException {
 
+    @SuppressWarnings({"unchecked"})
+    Map<String, List<String>> shardsMap = (Map<String, List<String>>)context.get("shards");
+
+    // Parallelize the stats stream across multiple collections for an alias using plist if possible
+    if (shardsMap == null && params.getBool(TIERED_PARAM, defaultTieredEnabled)) {
+      ClusterStateProvider clusterStateProvider = cache.getCloudSolrClient(zkHost).getClusterStateProvider();
+      final List<String> resolved = clusterStateProvider != null ? clusterStateProvider.resolveAlias(collection) : null;
+      if (resolved != null && resolved.size() > 1) {
+        Optional<TupleStream> maybeParallelize = openParallelStream(context, resolved, metrics);
+        if (maybeParallelize.isPresent()) {
+          this.parallelizedStream = maybeParallelize.get();
+          return; // we're using a plist to parallelize the facet operation
+        } // else, there's a metric that we can't rollup over the plist results safely ... no plist for you!
+      }
+    }
+
     String json = getJsonFacetString(metrics);
 
     ModifiableSolrParams paramsLoc = new ModifiableSolrParams(params);
     paramsLoc.set("json.facet", json);
     paramsLoc.set("rows", "0");
 
-    @SuppressWarnings({"unchecked"})
-    Map<String, List<String>> shardsMap = (Map<String, List<String>>)context.get("shards");
     if(shardsMap == null) {
       QueryRequest request = new QueryRequest(paramsLoc, SolrRequest.METHOD.POST);
       cloudSolrClient = cache.getCloudSolrClient(zkHost);
       try {
-        @SuppressWarnings({"rawtypes"})
-        NamedList response = cloudSolrClient.request(request, collection);
+        NamedList<?> response = cloudSolrClient.request(request, collection);
         getTuples(response, metrics);
       } catch (Exception e) {
         throw new IOException(e);
@@ -243,8 +264,7 @@ public class StatsStream extends TupleStream implements Expressible  {
 
       QueryRequest request = new QueryRequest(paramsLoc, SolrRequest.METHOD.POST);
       try {
-        @SuppressWarnings({"rawtypes"})
-        NamedList response = client.request(request);
+        NamedList<?> response = client.request(request);
         getTuples(response, metrics);
       } catch (Exception e) {
         throw new IOException(e);
@@ -268,6 +288,10 @@ public class StatsStream extends TupleStream implements Expressible  {
   }
 
   public Tuple read() throws IOException {
+    if (parallelizedStream != null) {
+      return parallelizedStream.read();
+    }
+
     if(index == 0) {
       ++index;
       return tuple;
@@ -306,17 +330,16 @@ public class StatsStream extends TupleStream implements Expressible  {
     }
   }
 
-  private void getTuples(@SuppressWarnings({"rawtypes"})NamedList response,
+  private void getTuples(NamedList<?> response,
                          Metric[] metrics) {
 
     this.tuple = new Tuple();
-    @SuppressWarnings({"rawtypes"})
-    NamedList facets = (NamedList)response.get("facets");
+    NamedList<?> facets = (NamedList<?>)response.get("facets");
     fillTuple(tuple, facets, metrics);
   }
 
   private void fillTuple(Tuple t,
-                         @SuppressWarnings({"rawtypes"})NamedList nl,
+                         NamedList<?> nl,
                          Metric[] _metrics) {
 
     if(nl == null) {
@@ -355,4 +378,31 @@ public class StatsStream extends TupleStream implements Expressible  {
   public StreamComparator getStreamSort() {
     return null;
   }
+
+  @Override
+  public TupleStream[] parallelize(List<String> partitions) throws IOException {
+    final ModifiableSolrParams withoutTieredParam = new ModifiableSolrParams(params);
+    withoutTieredParam.remove(TIERED_PARAM); // each individual request is not tiered
+
+    TupleStream[] streams = new TupleStream[partitions.size()];
+    for (int p = 0; p < streams.length; p++) {
+      streams[p] = new StatsStream(zkHost, partitions.get(p), withoutTieredParam, metrics);
+    }
+    return streams;
+  }
+
+  @Override
+  public TupleStream getSortedRollupStream(ParallelListStream plist, Metric[] rollupMetrics) throws IOException {
+    return new SelectStream(new HashRollupStream(plist, STATS_BUCKET, rollupMetrics), getRollupSelectFields(rollupMetrics));
+  }
+
+  // Map the rollup metric to the original metric name so that we can project out the correct field names in the tuple
+  protected Map<String, String> getRollupSelectFields(Metric[] rollupMetrics) {
+    Map<String, String> map = new HashMap<>(rollupMetrics.length * 2);
+    for (Metric m : rollupMetrics) {
+      String[] cols = m.getColumns();
+      map.put(m.getIdentifier(), cols != null && cols.length > 0 ? cols[0] : "*");
+    }
+    return map;
+  }
 }
\ No newline at end of file
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/ParallelFacetStreamOverAliasTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/ParallelFacetStreamOverAliasTest.java
index d47b735..7195ca4 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/ParallelFacetStreamOverAliasTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/ParallelFacetStreamOverAliasTest.java
@@ -40,15 +40,25 @@ import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.client.solrj.io.stream.metrics.CountDistinctMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.Metric;
+import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.handler.SolrDefaultStreamFactory;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.apache.solr.client.solrj.io.stream.FacetStream.TIERED_PARAM;
+
 /**
  * Verify auto-plist with rollup over a facet expression when using collection alias over multiple collections.
  */
@@ -63,6 +73,8 @@ public class ParallelFacetStreamOverAliasTest extends SolrCloudTestCase {
   private static final int NUM_DOCS_PER_COLLECTION = 40;
   private static final int NUM_SHARDS_PER_COLLECTION = 4;
   private static final int CARDINALITY = 10;
+  private static final int BUCKET_SIZE_LIMIT = Math.max(CARDINALITY * 2, 100);
+
   private static final RandomGenerator rand = new JDKRandomGenerator(5150);
   private static List<String> listOfCollections;
   private static SolrClientCache solrClientCache;
@@ -169,7 +181,7 @@ public class ParallelFacetStreamOverAliasTest extends SolrCloudTestCase {
         "  q=\"*:*\", \n" +
         "  buckets=\"a_i\", \n" +
         "  bucketSorts=\"a_i asc\", \n" +
-        "  bucketSizeLimit=100, \n" +
+        "  bucketSizeLimit=" + BUCKET_SIZE_LIMIT + ", \n" +
         "  sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)\n" +
         ")\n";
 
@@ -191,7 +203,7 @@ public class ParallelFacetStreamOverAliasTest extends SolrCloudTestCase {
         "  q=\"*:*\", \n" +
         "  buckets=\"a_i,b_i\", \n" + /* two dimensions here ~ doubles the number of tuples */
         "  bucketSorts=\"sum(a_d) desc\", \n" +
-        "  bucketSizeLimit=100, \n" +
+        "  bucketSizeLimit=" + BUCKET_SIZE_LIMIT + ", \n" +
         "  sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)\n" +
         ")\n";
 
@@ -200,7 +212,6 @@ public class ParallelFacetStreamOverAliasTest extends SolrCloudTestCase {
 
   @Test
   public void testParallelFacetSortByDimensions() throws Exception {
-
     // notice we're sorting the stream by a metric, but internally, that doesn't work for parallelization
     // so the rollup has to sort by dimensions and then apply a final re-sort once the parallel streams are merged
     String facetExprTmpl = "" +
@@ -210,13 +221,49 @@ public class ParallelFacetStreamOverAliasTest extends SolrCloudTestCase {
         "  q=\"*:*\", \n" +
         "  buckets=\"a_i,b_i\", \n" +
         "  bucketSorts=\"a_i asc, b_i asc\", \n" +
-        "  bucketSizeLimit=100, \n" +
+        "  bucketSizeLimit=" + BUCKET_SIZE_LIMIT + ", \n" +
         "  sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)\n" +
         ")\n";
 
     compareTieredStreamWithNonTiered(facetExprTmpl, 2);
   }
 
+  @Test
+  public void testParallelStats() throws Exception {
+    Metric[] metrics = new Metric[]{
+        new CountMetric(),
+        new CountDistinctMetric("a_i"),
+        new SumMetric("b_i"),
+        new MinMetric("a_i"),
+        new MaxMetric("a_i"),
+        new MeanMetric("a_d")
+    };
+
+    String zkHost = cluster.getZkServer().getZkAddress();
+    StreamContext streamContext = new StreamContext();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    ModifiableSolrParams solrParams = new ModifiableSolrParams();
+    solrParams.add(CommonParams.Q, "*:*");
+    solrParams.add(TIERED_PARAM, "true");
+
+    // tiered stats stream
+    StatsStream statsStream = new StatsStream(zkHost, ALIAS_NAME, solrParams, metrics);
+    statsStream.setStreamContext(streamContext);
+    List<Tuple> tieredTuples = getTuples(statsStream);
+    assertEquals(1, tieredTuples.size());
+    assertNotNull(statsStream.parallelizedStream);
+
+    solrParams = new ModifiableSolrParams();
+    solrParams.add(CommonParams.Q, "*:*");
+    solrParams.add(TIERED_PARAM, "false");
+    statsStream = new StatsStream(zkHost, ALIAS_NAME, solrParams, metrics);
+    statsStream.setStreamContext(streamContext);
+    // tiered should match non-tiered results
+    assertListOfTuplesEquals(tieredTuples, getTuples(statsStream));
+    assertNull(statsStream.parallelizedStream);
+  }
+
   // execute the provided expression with tiered=true and compare to results of tiered=false
   private void compareTieredStreamWithNonTiered(String facetExprTmpl, int dims) throws IOException {
     String facetExpr = String.format(Locale.US, facetExprTmpl, ALIAS_NAME, "true");
@@ -249,7 +296,7 @@ public class ParallelFacetStreamOverAliasTest extends SolrCloudTestCase {
     assertTrue(stream instanceof FacetStream);
     FacetStream facetStream = (FacetStream) stream;
     TupleStream[] parallelStreams = facetStream.parallelize(listOfCollections);
-    assertEquals(2, parallelStreams.length);
+    assertEquals(NUM_COLLECTIONS, parallelStreams.length);
     assertTrue(parallelStreams[0] instanceof FacetStream);
 
     Optional<Metric[]> rollupMetrics = facetStream.getRollupMetrics(facetStream.getMetrics().toArray(new Metric[0]));