You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by th...@apache.org on 2021/07/02 22:09:25 UTC
[solr] branch main updated: SOLR-15499: StatsStream implement
ParallelMetricsRollup to allow for tiered computation for collection
aliases (#197)
This is an automated email from the ASF dual-hosted git repository.
thelabdude pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 98c091d SOLR-15499: StatsStream implement ParallelMetricsRollup to allow for tiered computation for collection aliases (#197)
98c091d is described below
commit 98c091dd759c5ad6135ee0a61a6572df0de0119e
Author: Timothy Potter <th...@gmail.com>
AuthorDate: Fri Jul 2 16:09:19 2021 -0600
SOLR-15499: StatsStream implement ParallelMetricsRollup to allow for tiered computation for collection aliases (#197)
---
solr/CHANGES.txt | 3 ++
.../solr/client/solrj/io/stream/FacetStream.java | 4 +-
.../solrj/io/stream/ParallelMetricsRollup.java | 9 ++++
.../solr/client/solrj/io/stream/StatsStream.java | 63 ++++++++++++++++++++--
.../stream/ParallelFacetStreamOverAliasTest.java | 57 ++++++++++++++++++--
5 files changed, 124 insertions(+), 12 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 86ea9a2..0f707b0 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -155,6 +155,9 @@ when told to. The admin UI now tells it to. (Nazerke Seidan, David Smiley)
* SOLR-15475: Implement COUNT and APPROX_COUNT_DISTINCT aggregation functions for Parallel SQL (Timothy Potter)
+* SOLR-15499: StatsStream implement ParallelMetricsRollup to allow for tiered computation of SQL metrics
+ over collection aliases backed by many collections, potentially with many shards in each (Timothy Potter)
+
Other Changes
----------------------
* SOLR-14656: Autoscaling framework removed (Ishan Chattopadhyaya, noble, Ilan Ginzburg)
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
index fd66fd5..f2ac698 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
@@ -66,7 +66,7 @@ public class FacetStream extends TupleStream implements Expressible, ParallelMet
private static final long serialVersionUID = 1;
// allow client apps to disable the auto-plist via system property if they want to turn it off globally
- private static final boolean defaultTieredEnabled =
+ static final boolean defaultTieredEnabled =
Boolean.parseBoolean(System.getProperty("solr.facet.stream.tiered", "true"));
static final String TIERED_PARAM = "tiered";
@@ -932,7 +932,7 @@ public class FacetStream extends TupleStream implements Expressible, ParallelMet
* @return A mapping of fields produced by the rollup stream to their output name.
*/
protected Map<String, String> getRollupSelectFields(Metric[] rollupMetrics) {
- Map<String, String> map = new HashMap<>();
+ Map<String, String> map = new HashMap<>(rollupMetrics.length * 2);
for (Bucket b : buckets) {
String key = b.toString();
map.put(key, key);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelMetricsRollup.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelMetricsRollup.java
index 0d0f1c3..75ff8f4 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelMetricsRollup.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelMetricsRollup.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Optional;
+import org.apache.solr.client.solrj.io.stream.metrics.CountDistinctMetric;
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
@@ -91,15 +92,19 @@ public interface ParallelMetricsRollup {
if (next instanceof SumMetric) {
// sum of sums
nextRollup = new SumMetric(next.getIdentifier());
+ nextRollup.outputLong = next.outputLong;
} else if (next instanceof MinMetric) {
// min of mins
nextRollup = new MinMetric(next.getIdentifier());
+ nextRollup.outputLong = next.outputLong;
} else if (next instanceof MaxMetric) {
// max of max
nextRollup = new MaxMetric(next.getIdentifier());
+ nextRollup.outputLong = next.outputLong;
} else if (next instanceof CountMetric) {
// sum of counts
nextRollup = new SumMetric(next.getIdentifier());
+ nextRollup.outputLong = next.outputLong;
count = (CountMetric) next;
} else if (next instanceof MeanMetric) {
// WeightedSumMetric must have a count to compute the weighted avg. rollup from ...
@@ -118,6 +123,10 @@ public interface ParallelMetricsRollup {
} else {
return Optional.empty(); // can't properly rollup mean metrics w/o a count (reqd by WeightedSumMetric)
}
+ } else if (next instanceof CountDistinctMetric) {
+ // rollup of count distinct is the max across the tiers
+ nextRollup = new MaxMetric(next.getIdentifier());
+ nextRollup.outputLong = next.outputLong;
} else {
return Optional.empty(); // can't parallelize this expr!
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
index 778ee82..ddc0276 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
@@ -17,17 +17,19 @@
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
-
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
@@ -41,6 +43,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParamete
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.client.solrj.io.stream.metrics.Bucket;
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
import org.apache.solr.client.solrj.io.stream.metrics.Metric;
import org.apache.solr.client.solrj.request.QueryRequest;
@@ -48,14 +51,18 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
+import static org.apache.solr.client.solrj.io.stream.FacetStream.TIERED_PARAM;
+import static org.apache.solr.client.solrj.io.stream.FacetStream.defaultTieredEnabled;
+
/**
* @since 6.6.0
*/
-public class StatsStream extends TupleStream implements Expressible {
+public class StatsStream extends TupleStream implements Expressible, ParallelMetricsRollup {
private static final long serialVersionUID = 1;
-
+ // use a single "*" rollup bucket
+ private static final Bucket[] STATS_BUCKET = new Bucket[]{new Bucket("*")};
private Metric[] metrics;
private Tuple tuple;
@@ -66,6 +73,7 @@ public class StatsStream extends TupleStream implements Expressible {
protected transient SolrClientCache cache;
protected transient CloudSolrClient cloudSolrClient;
private StreamContext context;
+ protected transient TupleStream parallelizedStream;
public StatsStream(String zkHost,
String collection,
@@ -213,14 +221,28 @@ public class StatsStream extends TupleStream implements Expressible {
public void open() throws IOException {
+ @SuppressWarnings({"unchecked"})
+ Map<String, List<String>> shardsMap = (Map<String, List<String>>)context.get("shards");
+
+ // Parallelize the stats stream across multiple collections for an alias using plist if possible
+ if (shardsMap == null && params.getBool(TIERED_PARAM, defaultTieredEnabled)) {
+ ClusterStateProvider clusterStateProvider = cache.getCloudSolrClient(zkHost).getClusterStateProvider();
+ final List<String> resolved = clusterStateProvider != null ? clusterStateProvider.resolveAlias(collection) : null;
+ if (resolved != null && resolved.size() > 1) {
+ Optional<TupleStream> maybeParallelize = openParallelStream(context, resolved, metrics);
+ if (maybeParallelize.isPresent()) {
+ this.parallelizedStream = maybeParallelize.get();
+ return; // we're using a plist to parallelize the facet operation
+ } // else, there's a metric that we can't rollup over the plist results safely ... no plist for you!
+ }
+ }
+
String json = getJsonFacetString(metrics);
ModifiableSolrParams paramsLoc = new ModifiableSolrParams(params);
paramsLoc.set("json.facet", json);
paramsLoc.set("rows", "0");
- @SuppressWarnings({"unchecked"})
- Map<String, List<String>> shardsMap = (Map<String, List<String>>)context.get("shards");
if(shardsMap == null) {
QueryRequest request = new QueryRequest(paramsLoc, SolrRequest.METHOD.POST);
cloudSolrClient = cache.getCloudSolrClient(zkHost);
@@ -266,6 +288,10 @@ public class StatsStream extends TupleStream implements Expressible {
}
public Tuple read() throws IOException {
+ if (parallelizedStream != null) {
+ return parallelizedStream.read();
+ }
+
if(index == 0) {
++index;
return tuple;
@@ -352,4 +378,31 @@ public class StatsStream extends TupleStream implements Expressible {
public StreamComparator getStreamSort() {
return null;
}
+
+ @Override
+ public TupleStream[] parallelize(List<String> partitions) throws IOException {
+ final ModifiableSolrParams withoutTieredParam = new ModifiableSolrParams(params);
+ withoutTieredParam.remove(TIERED_PARAM); // each individual request is not tiered
+
+ TupleStream[] streams = new TupleStream[partitions.size()];
+ for (int p = 0; p < streams.length; p++) {
+ streams[p] = new StatsStream(zkHost, partitions.get(p), withoutTieredParam, metrics);
+ }
+ return streams;
+ }
+
+ @Override
+ public TupleStream getSortedRollupStream(ParallelListStream plist, Metric[] rollupMetrics) throws IOException {
+ return new SelectStream(new HashRollupStream(plist, STATS_BUCKET, rollupMetrics), getRollupSelectFields(rollupMetrics));
+ }
+
+ // Map the rollup metric to the original metric name so that we can project out the correct field names in the tuple
+ protected Map<String, String> getRollupSelectFields(Metric[] rollupMetrics) {
+ Map<String, String> map = new HashMap<>(rollupMetrics.length * 2);
+ for (Metric m : rollupMetrics) {
+ String[] cols = m.getColumns();
+ map.put(m.getIdentifier(), cols != null && cols.length > 0 ? cols[0] : "*");
+ }
+ return map;
+ }
}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/ParallelFacetStreamOverAliasTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/ParallelFacetStreamOverAliasTest.java
index aede95b..639929f 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/ParallelFacetStreamOverAliasTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/ParallelFacetStreamOverAliasTest.java
@@ -40,15 +40,25 @@ import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.client.solrj.io.stream.metrics.CountDistinctMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
import org.apache.solr.client.solrj.io.stream.metrics.Metric;
+import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.handler.SolrDefaultStreamFactory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import static org.apache.solr.client.solrj.io.stream.FacetStream.TIERED_PARAM;
+
/**
* Verify auto-plist with rollup over a facet expression when using collection alias over multiple collections.
*/
@@ -63,6 +73,8 @@ public class ParallelFacetStreamOverAliasTest extends SolrCloudTestCase {
private static final int NUM_DOCS_PER_COLLECTION = 40;
private static final int NUM_SHARDS_PER_COLLECTION = 4;
private static final int CARDINALITY = 10;
+ private static final int BUCKET_SIZE_LIMIT = Math.max(CARDINALITY * 2, 100);
+
private static final RandomGenerator rand = new JDKRandomGenerator(5150);
private static List<String> listOfCollections;
private static SolrClientCache solrClientCache;
@@ -166,7 +178,7 @@ public class ParallelFacetStreamOverAliasTest extends SolrCloudTestCase {
" q=\"*:*\", \n" +
" buckets=\"a_i\", \n" +
" bucketSorts=\"a_i asc\", \n" +
- " bucketSizeLimit=100, \n" +
+ " bucketSizeLimit=" + BUCKET_SIZE_LIMIT + ", \n" +
" sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)\n" +
")\n";
@@ -188,7 +200,7 @@ public class ParallelFacetStreamOverAliasTest extends SolrCloudTestCase {
" q=\"*:*\", \n" +
" buckets=\"a_i,b_i\", \n" + /* two dimensions here ~ doubles the number of tuples */
" bucketSorts=\"sum(a_d) desc\", \n" +
- " bucketSizeLimit=100, \n" +
+ " bucketSizeLimit=" + BUCKET_SIZE_LIMIT + ", \n" +
" sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)\n" +
")\n";
@@ -197,7 +209,6 @@ public class ParallelFacetStreamOverAliasTest extends SolrCloudTestCase {
@Test
public void testParallelFacetSortByDimensions() throws Exception {
-
// notice we're sorting the stream by a metric, but internally, that doesn't work for parallelization
// so the rollup has to sort by dimensions and then apply a final re-sort once the parallel streams are merged
String facetExprTmpl = "" +
@@ -207,13 +218,49 @@ public class ParallelFacetStreamOverAliasTest extends SolrCloudTestCase {
" q=\"*:*\", \n" +
" buckets=\"a_i,b_i\", \n" +
" bucketSorts=\"a_i asc, b_i asc\", \n" +
- " bucketSizeLimit=100, \n" +
+ " bucketSizeLimit=" + BUCKET_SIZE_LIMIT + ", \n" +
" sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)\n" +
")\n";
compareTieredStreamWithNonTiered(facetExprTmpl, 2);
}
+ @Test
+ public void testParallelStats() throws Exception {
+ Metric[] metrics = new Metric[]{
+ new CountMetric(),
+ new CountDistinctMetric("a_i"),
+ new SumMetric("b_i"),
+ new MinMetric("a_i"),
+ new MaxMetric("a_i"),
+ new MeanMetric("a_d")
+ };
+
+ String zkHost = cluster.getZkServer().getZkAddress();
+ StreamContext streamContext = new StreamContext();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ ModifiableSolrParams solrParams = new ModifiableSolrParams();
+ solrParams.add(CommonParams.Q, "*:*");
+ solrParams.add(TIERED_PARAM, "true");
+
+ // tiered stats stream
+ StatsStream statsStream = new StatsStream(zkHost, ALIAS_NAME, solrParams, metrics);
+ statsStream.setStreamContext(streamContext);
+ List<Tuple> tieredTuples = getTuples(statsStream);
+ assertEquals(1, tieredTuples.size());
+ assertNotNull(statsStream.parallelizedStream);
+
+ solrParams = new ModifiableSolrParams();
+ solrParams.add(CommonParams.Q, "*:*");
+ solrParams.add(TIERED_PARAM, "false");
+ statsStream = new StatsStream(zkHost, ALIAS_NAME, solrParams, metrics);
+ statsStream.setStreamContext(streamContext);
+ // tiered should match non-tiered results
+ assertListOfTuplesEquals(tieredTuples, getTuples(statsStream));
+ assertNull(statsStream.parallelizedStream);
+ }
+
// execute the provided expression with tiered=true and compare to results of tiered=false
private void compareTieredStreamWithNonTiered(String facetExprTmpl, int dims) throws IOException {
String facetExpr = String.format(Locale.US, facetExprTmpl, ALIAS_NAME, "true");
@@ -246,7 +293,7 @@ public class ParallelFacetStreamOverAliasTest extends SolrCloudTestCase {
assertTrue(stream instanceof FacetStream);
FacetStream facetStream = (FacetStream) stream;
TupleStream[] parallelStreams = facetStream.parallelize(listOfCollections);
- assertEquals(2, parallelStreams.length);
+ assertEquals(NUM_COLLECTIONS, parallelStreams.length);
assertTrue(parallelStreams[0] instanceof FacetStream);
Optional<Metric[]> rollupMetrics = facetStream.getRollupMetrics(facetStream.getMetrics().toArray(new Metric[0]));