You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by th...@apache.org on 2021/07/06 21:10:22 UTC
[lucene-solr] branch branch_8x updated: SOLR-15499: StatsStream
implement ParallelMetricsRollup to allow for tiered computation of SQL
metrics (#2529)
This is an automated email from the ASF dual-hosted git repository.
thelabdude pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/branch_8x by this push:
new ea640e7 SOLR-15499: StatsStream implement ParallelMetricsRollup to allow for tiered computation of SQL metrics (#2529)
ea640e7 is described below
commit ea640e7ecb5f3b7425422f684927252472647305
Author: Timothy Potter <th...@gmail.com>
AuthorDate: Tue Jul 6 15:10:02 2021 -0600
SOLR-15499: StatsStream implement ParallelMetricsRollup to allow for tiered computation of SQL metrics (#2529)
---
solr/CHANGES.txt | 3 +
.../solr/client/solrj/io/stream/FacetStream.java | 4 +-
.../solrj/io/stream/ParallelMetricsRollup.java | 9 +++
.../solr/client/solrj/io/stream/StatsStream.java | 76 ++++++++++++++++++----
.../stream/ParallelFacetStreamOverAliasTest.java | 57 ++++++++++++++--
5 files changed, 129 insertions(+), 20 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index df56eb7..a9966a4 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -29,6 +29,9 @@ Improvements
* SOLR-15475: Implement COUNT and APPROX_COUNT_DISTINCT aggregation functions for Parallel SQL (Timothy Potter)
+* SOLR-15499: StatsStream implement ParallelMetricsRollup to allow for tiered computation of SQL metrics
+ over collection aliases backed by many collections, potentially with many shards in each (Timothy Potter)
+
Optimizations
---------------------
* SOLR-15433: Replace transient core cache LRU by Caffeine cache. (Bruno Roustant)
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
index 0f42c61..f463f92 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
@@ -66,7 +66,7 @@ public class FacetStream extends TupleStream implements Expressible, ParallelMet
private static final long serialVersionUID = 1;
// allow client apps to disable the auto-plist via system property if they want to turn it off globally
- private static final boolean defaultTieredEnabled =
+ static final boolean defaultTieredEnabled =
Boolean.parseBoolean(System.getProperty("solr.facet.stream.tiered", "false"));
static final String TIERED_PARAM = "tiered";
@@ -937,7 +937,7 @@ public class FacetStream extends TupleStream implements Expressible, ParallelMet
* @return A mapping of fields produced by the rollup stream to their output name.
*/
protected Map<String, String> getRollupSelectFields(Metric[] rollupMetrics) {
- Map<String, String> map = new HashMap<>();
+ Map<String, String> map = new HashMap<>(rollupMetrics.length * 2);
for (Bucket b : buckets) {
String key = b.toString();
map.put(key, key);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelMetricsRollup.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelMetricsRollup.java
index 5674f2e..187500e 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelMetricsRollup.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelMetricsRollup.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Optional;
+import org.apache.solr.client.solrj.io.stream.metrics.CountDistinctMetric;
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
@@ -91,15 +92,19 @@ public interface ParallelMetricsRollup {
if (next instanceof SumMetric) {
// sum of sums
nextRollup = new SumMetric(next.getIdentifier());
+ nextRollup.outputLong = next.outputLong;
} else if (next instanceof MinMetric) {
// min of mins
nextRollup = new MinMetric(next.getIdentifier());
+ nextRollup.outputLong = next.outputLong;
} else if (next instanceof MaxMetric) {
// max of max
nextRollup = new MaxMetric(next.getIdentifier());
+ nextRollup.outputLong = next.outputLong;
} else if (next instanceof CountMetric) {
// sum of counts
nextRollup = new SumMetric(next.getIdentifier());
+ nextRollup.outputLong = next.outputLong;
count = (CountMetric) next;
} else if (next instanceof MeanMetric) {
// WeightedSumMetric must have a count to compute the weighted avg. rollup from ...
@@ -118,6 +123,10 @@ public interface ParallelMetricsRollup {
} else {
return Optional.empty(); // can't properly rollup mean metrics w/o a count (reqd by WeightedSumMetric)
}
+ } else if (next instanceof CountDistinctMetric) {
+ // rollup of count distinct is the max across the tiers
+ nextRollup = new MaxMetric(next.getIdentifier());
+ nextRollup.outputLong = next.outputLong;
} else {
return Optional.empty(); // can't parallelize this expr!
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
index eb9e0ff..aa6ab91 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
@@ -17,17 +17,19 @@
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
-
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
@@ -41,6 +43,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParamete
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.client.solrj.io.stream.metrics.Bucket;
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
import org.apache.solr.client.solrj.io.stream.metrics.Metric;
import org.apache.solr.client.solrj.request.QueryRequest;
@@ -48,14 +51,18 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
+import static org.apache.solr.client.solrj.io.stream.FacetStream.TIERED_PARAM;
+import static org.apache.solr.client.solrj.io.stream.FacetStream.defaultTieredEnabled;
+
/**
* @since 6.6.0
*/
-public class StatsStream extends TupleStream implements Expressible {
+public class StatsStream extends TupleStream implements Expressible, ParallelMetricsRollup {
private static final long serialVersionUID = 1;
-
+ // use a single "*" rollup bucket
+ private static final Bucket[] STATS_BUCKET = new Bucket[]{new Bucket("*")};
private Metric[] metrics;
private Tuple tuple;
@@ -66,6 +73,7 @@ public class StatsStream extends TupleStream implements Expressible {
protected transient SolrClientCache cache;
protected transient CloudSolrClient cloudSolrClient;
private StreamContext context;
+ protected transient TupleStream parallelizedStream;
public StatsStream(String zkHost,
String collection,
@@ -213,20 +221,33 @@ public class StatsStream extends TupleStream implements Expressible {
public void open() throws IOException {
+ @SuppressWarnings({"unchecked"})
+ Map<String, List<String>> shardsMap = (Map<String, List<String>>)context.get("shards");
+
+ // Parallelize the stats stream across multiple collections for an alias using plist if possible
+ if (shardsMap == null && params.getBool(TIERED_PARAM, defaultTieredEnabled)) {
+ ClusterStateProvider clusterStateProvider = cache.getCloudSolrClient(zkHost).getClusterStateProvider();
+ final List<String> resolved = clusterStateProvider != null ? clusterStateProvider.resolveAlias(collection) : null;
+ if (resolved != null && resolved.size() > 1) {
+ Optional<TupleStream> maybeParallelize = openParallelStream(context, resolved, metrics);
+ if (maybeParallelize.isPresent()) {
+ this.parallelizedStream = maybeParallelize.get();
+ return; // we're using a plist to parallelize the facet operation
+ } // else, there's a metric that we can't rollup over the plist results safely ... no plist for you!
+ }
+ }
+
String json = getJsonFacetString(metrics);
ModifiableSolrParams paramsLoc = new ModifiableSolrParams(params);
paramsLoc.set("json.facet", json);
paramsLoc.set("rows", "0");
- @SuppressWarnings({"unchecked"})
- Map<String, List<String>> shardsMap = (Map<String, List<String>>)context.get("shards");
if(shardsMap == null) {
QueryRequest request = new QueryRequest(paramsLoc, SolrRequest.METHOD.POST);
cloudSolrClient = cache.getCloudSolrClient(zkHost);
try {
- @SuppressWarnings({"rawtypes"})
- NamedList response = cloudSolrClient.request(request, collection);
+ NamedList<?> response = cloudSolrClient.request(request, collection);
getTuples(response, metrics);
} catch (Exception e) {
throw new IOException(e);
@@ -243,8 +264,7 @@ public class StatsStream extends TupleStream implements Expressible {
QueryRequest request = new QueryRequest(paramsLoc, SolrRequest.METHOD.POST);
try {
- @SuppressWarnings({"rawtypes"})
- NamedList response = client.request(request);
+ NamedList<?> response = client.request(request);
getTuples(response, metrics);
} catch (Exception e) {
throw new IOException(e);
@@ -268,6 +288,10 @@ public class StatsStream extends TupleStream implements Expressible {
}
public Tuple read() throws IOException {
+ if (parallelizedStream != null) {
+ return parallelizedStream.read();
+ }
+
if(index == 0) {
++index;
return tuple;
@@ -306,17 +330,16 @@ public class StatsStream extends TupleStream implements Expressible {
}
}
- private void getTuples(@SuppressWarnings({"rawtypes"})NamedList response,
+ private void getTuples(NamedList<?> response,
Metric[] metrics) {
this.tuple = new Tuple();
- @SuppressWarnings({"rawtypes"})
- NamedList facets = (NamedList)response.get("facets");
+ NamedList<?> facets = (NamedList<?>)response.get("facets");
fillTuple(tuple, facets, metrics);
}
private void fillTuple(Tuple t,
- @SuppressWarnings({"rawtypes"})NamedList nl,
+ NamedList<?> nl,
Metric[] _metrics) {
if(nl == null) {
@@ -355,4 +378,31 @@ public class StatsStream extends TupleStream implements Expressible {
public StreamComparator getStreamSort() {
return null;
}
+
+ @Override
+ public TupleStream[] parallelize(List<String> partitions) throws IOException {
+ final ModifiableSolrParams withoutTieredParam = new ModifiableSolrParams(params);
+ withoutTieredParam.remove(TIERED_PARAM); // each individual request is not tiered
+
+ TupleStream[] streams = new TupleStream[partitions.size()];
+ for (int p = 0; p < streams.length; p++) {
+ streams[p] = new StatsStream(zkHost, partitions.get(p), withoutTieredParam, metrics);
+ }
+ return streams;
+ }
+
+ @Override
+ public TupleStream getSortedRollupStream(ParallelListStream plist, Metric[] rollupMetrics) throws IOException {
+ return new SelectStream(new HashRollupStream(plist, STATS_BUCKET, rollupMetrics), getRollupSelectFields(rollupMetrics));
+ }
+
+ // Map the rollup metric to the original metric name so that we can project out the correct field names in the tuple
+ protected Map<String, String> getRollupSelectFields(Metric[] rollupMetrics) {
+ Map<String, String> map = new HashMap<>(rollupMetrics.length * 2);
+ for (Metric m : rollupMetrics) {
+ String[] cols = m.getColumns();
+ map.put(m.getIdentifier(), cols != null && cols.length > 0 ? cols[0] : "*");
+ }
+ return map;
+ }
}
\ No newline at end of file
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/ParallelFacetStreamOverAliasTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/ParallelFacetStreamOverAliasTest.java
index d47b735..7195ca4 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/ParallelFacetStreamOverAliasTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/ParallelFacetStreamOverAliasTest.java
@@ -40,15 +40,25 @@ import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.client.solrj.io.stream.metrics.CountDistinctMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
import org.apache.solr.client.solrj.io.stream.metrics.Metric;
+import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.handler.SolrDefaultStreamFactory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import static org.apache.solr.client.solrj.io.stream.FacetStream.TIERED_PARAM;
+
/**
* Verify auto-plist with rollup over a facet expression when using collection alias over multiple collections.
*/
@@ -63,6 +73,8 @@ public class ParallelFacetStreamOverAliasTest extends SolrCloudTestCase {
private static final int NUM_DOCS_PER_COLLECTION = 40;
private static final int NUM_SHARDS_PER_COLLECTION = 4;
private static final int CARDINALITY = 10;
+ private static final int BUCKET_SIZE_LIMIT = Math.max(CARDINALITY * 2, 100);
+
private static final RandomGenerator rand = new JDKRandomGenerator(5150);
private static List<String> listOfCollections;
private static SolrClientCache solrClientCache;
@@ -169,7 +181,7 @@ public class ParallelFacetStreamOverAliasTest extends SolrCloudTestCase {
" q=\"*:*\", \n" +
" buckets=\"a_i\", \n" +
" bucketSorts=\"a_i asc\", \n" +
- " bucketSizeLimit=100, \n" +
+ " bucketSizeLimit=" + BUCKET_SIZE_LIMIT + ", \n" +
" sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)\n" +
")\n";
@@ -191,7 +203,7 @@ public class ParallelFacetStreamOverAliasTest extends SolrCloudTestCase {
" q=\"*:*\", \n" +
" buckets=\"a_i,b_i\", \n" + /* two dimensions here ~ doubles the number of tuples */
" bucketSorts=\"sum(a_d) desc\", \n" +
- " bucketSizeLimit=100, \n" +
+ " bucketSizeLimit=" + BUCKET_SIZE_LIMIT + ", \n" +
" sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)\n" +
")\n";
@@ -200,7 +212,6 @@ public class ParallelFacetStreamOverAliasTest extends SolrCloudTestCase {
@Test
public void testParallelFacetSortByDimensions() throws Exception {
-
// notice we're sorting the stream by a metric, but internally, that doesn't work for parallelization
// so the rollup has to sort by dimensions and then apply a final re-sort once the parallel streams are merged
String facetExprTmpl = "" +
@@ -210,13 +221,49 @@ public class ParallelFacetStreamOverAliasTest extends SolrCloudTestCase {
" q=\"*:*\", \n" +
" buckets=\"a_i,b_i\", \n" +
" bucketSorts=\"a_i asc, b_i asc\", \n" +
- " bucketSizeLimit=100, \n" +
+ " bucketSizeLimit=" + BUCKET_SIZE_LIMIT + ", \n" +
" sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)\n" +
")\n";
compareTieredStreamWithNonTiered(facetExprTmpl, 2);
}
+ @Test
+ public void testParallelStats() throws Exception {
+ Metric[] metrics = new Metric[]{
+ new CountMetric(),
+ new CountDistinctMetric("a_i"),
+ new SumMetric("b_i"),
+ new MinMetric("a_i"),
+ new MaxMetric("a_i"),
+ new MeanMetric("a_d")
+ };
+
+ String zkHost = cluster.getZkServer().getZkAddress();
+ StreamContext streamContext = new StreamContext();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ ModifiableSolrParams solrParams = new ModifiableSolrParams();
+ solrParams.add(CommonParams.Q, "*:*");
+ solrParams.add(TIERED_PARAM, "true");
+
+ // tiered stats stream
+ StatsStream statsStream = new StatsStream(zkHost, ALIAS_NAME, solrParams, metrics);
+ statsStream.setStreamContext(streamContext);
+ List<Tuple> tieredTuples = getTuples(statsStream);
+ assertEquals(1, tieredTuples.size());
+ assertNotNull(statsStream.parallelizedStream);
+
+ solrParams = new ModifiableSolrParams();
+ solrParams.add(CommonParams.Q, "*:*");
+ solrParams.add(TIERED_PARAM, "false");
+ statsStream = new StatsStream(zkHost, ALIAS_NAME, solrParams, metrics);
+ statsStream.setStreamContext(streamContext);
+ // tiered should match non-tiered results
+ assertListOfTuplesEquals(tieredTuples, getTuples(statsStream));
+ assertNull(statsStream.parallelizedStream);
+ }
+
// execute the provided expression with tiered=true and compare to results of tiered=false
private void compareTieredStreamWithNonTiered(String facetExprTmpl, int dims) throws IOException {
String facetExpr = String.format(Locale.US, facetExprTmpl, ALIAS_NAME, "true");
@@ -249,7 +296,7 @@ public class ParallelFacetStreamOverAliasTest extends SolrCloudTestCase {
assertTrue(stream instanceof FacetStream);
FacetStream facetStream = (FacetStream) stream;
TupleStream[] parallelStreams = facetStream.parallelize(listOfCollections);
- assertEquals(2, parallelStreams.length);
+ assertEquals(NUM_COLLECTIONS, parallelStreams.length);
assertTrue(parallelStreams[0] instanceof FacetStream);
Optional<Metric[]> rollupMetrics = facetStream.getRollupMetrics(facetStream.getMetrics().toArray(new Metric[0]));