You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/06/08 14:01:12 UTC
[lucene-solr] branch jira/solr-14470-2 updated: SOLR-14470: Fix a
merge mistake that left an earlier version here than on master.
This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch jira/solr-14470-2
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/jira/solr-14470-2 by this push:
new dc3b4df SOLR-14470: Fix a merge mistake that left an earlier version here than on master.
dc3b4df is described below
commit dc3b4df843b6c6d712f38116fa17149fb33ea640
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Mon Jun 8 15:55:20 2020 +0200
SOLR-14470: Fix a merge mistake that left an earlier version here than on master.
---
.../solr/client/solrj/io/stream/StatsStream.java | 268 ++++++++++-----------
1 file changed, 132 insertions(+), 136 deletions(-)
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
index f86feeb..c05fc3e 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
@@ -19,7 +19,7 @@ package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -41,9 +41,9 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParamete
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
import org.apache.solr.client.solrj.io.stream.metrics.Metric;
import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
@@ -55,53 +55,61 @@ public class StatsStream extends TupleStream implements Expressible {
private static final long serialVersionUID = 1;
+
+
private Metric[] metrics;
- private String zkHost;
private Tuple tuple;
+ private int index;
+ private String zkHost;
private SolrParams params;
private String collection;
- private boolean done;
- private boolean doCount;
- private Map<String, Metric> metricMap;
protected transient SolrClientCache cache;
protected transient CloudSolrClient cloudSolrClient;
- protected StreamContext streamContext;
+ private StreamContext context;
public StatsStream(String zkHost,
- String collection,
- SolrParams params,
- Metric[] metrics) {
- init(zkHost, collection, params, metrics);
- }
-
- private void init(String zkHost, String collection, SolrParams params, Metric[] metrics) {
- this.zkHost = zkHost;
- this.params = params;
- this.metrics = metrics;
- this.collection = collection;
- metricMap = new HashMap();
- for(Metric metric : metrics) {
- metricMap.put(metric.getIdentifier(), metric);
- }
+ String collection,
+ SolrParams params,
+ Metric[] metrics
+ ) throws IOException {
+ init(collection, params, metrics, zkHost);
}
public StatsStream(StreamExpression expression, StreamFactory factory) throws IOException{
// grab all parameters out
String collectionName = factory.getValueOperand(expression, 0);
+
+ if(collectionName.indexOf('"') > -1) {
+ collectionName = collectionName.replaceAll("\"", "").replaceAll(" ", "");
+ }
+
List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
- List<StreamExpression> metricExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Metric.class);
- StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
+ StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
+ List<StreamExpression> metricExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, Metric.class);
// Collection Name
if(null == collectionName){
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression));
}
+ // Construct the metrics
+ Metric[] metrics = null;
+ if(metricExpressions.size() > 0) {
+ metrics = new Metric[metricExpressions.size()];
+ for(int idx = 0; idx < metricExpressions.size(); ++idx){
+ metrics[idx] = factory.constructMetric(metricExpressions.get(idx));
+ }
+ } else {
+ metrics = new Metric[1];
+ metrics[0] = new CountMetric();
+ }
+
+ // pull out known named params
ModifiableSolrParams params = new ModifiableSolrParams();
for(StreamExpressionNamedParameter namedParam : namedParams){
if(!namedParam.getName().equals("zkHost")){
- params.set(namedParam.getName(), namedParam.getParameter().toString().trim());
+ params.add(namedParam.getName(), namedParam.getParameter().toString().trim());
}
}
@@ -120,46 +128,51 @@ public class StatsStream extends TupleStream implements Expressible {
zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
}
- /*
- if(null == zkHost){
- throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
- }
- */
+ // We've got all the required items
+ init(collectionName, params, metrics, zkHost);
+ }
- // metrics, optional - if not provided then why are you using this?
- Metric[] metrics = new Metric[metricExpressions.size()];
- for(int idx = 0; idx < metricExpressions.size(); ++idx){
- metrics[idx] = factory.constructMetric(metricExpressions.get(idx));
- }
+ public String getCollection() {
+ return this.collection;
+ }
- // We've got all the required items
- init(zkHost, collectionName, params, metrics);
+ private void init(String collection,
+ SolrParams params,
+ Metric[] metrics,
+ String zkHost) throws IOException {
+ this.zkHost = zkHost;
+ this.collection = collection;
+ this.metrics = metrics;
+ this.params = params;
}
@Override
public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
- // functionName(collectionName, param1, param2, ..., paramN, sort="comp", sum(fieldA), avg(fieldB))
-
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
-
// collection
- expression.addParameter(collection);
+ if(collection.indexOf(',') > -1) {
+ expression.addParameter("\""+collection+"\"");
+ } else {
+ expression.addParameter(collection);
+ }
// parameters
- ModifiableSolrParams mParams = new ModifiableSolrParams(params);
- for (Entry<String, String[]> param : mParams.getMap().entrySet()) {
- expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), String.join(",", param.getValue())));
- }
+ ModifiableSolrParams tmpParams = new ModifiableSolrParams(params);
- // zkHost
- expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
+ for (Entry<String, String[]> param : tmpParams.getMap().entrySet()) {
+ expression.addParameter(new StreamExpressionNamedParameter(param.getKey(),
+ String.join(",", param.getValue())));
+ }
// metrics
for(Metric metric : metrics){
expression.addParameter(metric.toExpression(factory));
}
+ // zkHost
+ expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
+
return expression;
}
@@ -173,45 +186,46 @@ public class StatsStream extends TupleStream implements Expressible {
explanation.setExpressionType(ExpressionType.STREAM_SOURCE);
explanation.setExpression(toExpression(factory).toString());
+ // child is a datastore so add it at this point
StreamExplanation child = new StreamExplanation(getStreamNodeId() + "-datastore");
- child.setFunctionName(String.format(Locale.ROOT, "solr (worker ? of ?)"));
- // TODO: fix this so we know the # of workers - check with Joel about a Stat's ability to be in a
- // parallel stream.
+ child.setFunctionName(String.format(Locale.ROOT, "solr (%s)", collection));
+ // TODO: fix this so we know the # of workers - check with Joel about a Topic's ability to be in a
+ // parallel stream.
child.setImplementingClass("Solr/Lucene");
child.setExpressionType(ExpressionType.DATASTORE);
- ModifiableSolrParams mParams = new ModifiableSolrParams(params);
- child.setExpression(mParams.getMap().entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
+
+ child.setExpression(params.stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), Arrays.toString(e.getValue()))).collect(Collectors.joining(",")));
+
explanation.addChild(child);
return explanation;
}
public void setStreamContext(StreamContext context) {
- streamContext = context;
+ this.context = context;
cache = context.getSolrClientCache();
}
public List<TupleStream> children() {
- return new ArrayList<>();
+ return new ArrayList();
}
public void open() throws IOException {
- ModifiableSolrParams paramsLoc = new ModifiableSolrParams(this.params);
- addStats(paramsLoc, metrics);
- paramsLoc.set("stats", "true");
+
+ String json = getJsonFacetString(metrics);
+
+ ModifiableSolrParams paramsLoc = new ModifiableSolrParams(params);
+ paramsLoc.set("json.facet", json);
paramsLoc.set("rows", "0");
- if (streamContext.isLocal()) {
- paramsLoc.set("distrib", "false");
- }
- Map<String, List<String>> shardsMap = (Map<String, List<String>>)streamContext.get("shards");
+ Map<String, List<String>> shardsMap = (Map<String, List<String>>)context.get("shards");
if(shardsMap == null) {
QueryRequest request = new QueryRequest(paramsLoc, SolrRequest.METHOD.POST);
- CloudSolrClient cloudSolrClient = cache.getCloudSolrClient(zkHost);
+ cloudSolrClient = cache.getCloudSolrClient(zkHost);
try {
NamedList response = cloudSolrClient.request(request, collection);
- this.tuple = getTuple(response);
+ getTuples(response, metrics);
} catch (Exception e) {
throw new IOException(e);
}
@@ -228,7 +242,7 @@ public class StatsStream extends TupleStream implements Expressible {
QueryRequest request = new QueryRequest(paramsLoc, SolrRequest.METHOD.POST);
try {
NamedList response = client.request(request);
- this.tuple = getTuple(response);
+ getTuples(response, metrics);
} catch (Exception e) {
throw new IOException(e);
}
@@ -251,106 +265,88 @@ public class StatsStream extends TupleStream implements Expressible {
}
public Tuple read() throws IOException {
- if(!done) {
- done = true;
+ if(index == 0) {
+ ++index;
return tuple;
} else {
return Tuple.EOF();
}
}
- public StreamComparator getStreamSort() {
- return null;
+ private String getJsonFacetString(Metric[] _metrics) {
+ StringBuilder buf = new StringBuilder();
+ appendJson(buf, _metrics);
+ return "{"+buf.toString()+"}";
}
- private void addStats(ModifiableSolrParams params, Metric[] _metrics) {
- Map<String, List<String>> m = new HashMap<>();
+ private void appendJson(StringBuilder buf,
+ Metric[] _metrics) {
+
+ int metricCount = 0;
for(Metric metric : _metrics) {
- String metricId = metric.getIdentifier();
- if(metricId.contains("(")) {
- metricId = metricId.substring(0, metricId.length()-1);
- String[] parts = metricId.split("\\(");
- String function = parts[0];
- String column = parts[1];
- List<String> stats = m.get(column);
-
- if(stats == null) {
- stats = new ArrayList<>();
+ String identifier = metric.getIdentifier();
+ if(!identifier.startsWith("count(")) {
+ if(metricCount>0) {
+ buf.append(",");
}
-
- if(!column.equals("*")) {
- m.put(column, stats);
- }
-
- if(function.equals("min")) {
- stats.add("min");
- } else if(function.equals("max")) {
- stats.add("max");
- } else if(function.equals("sum")) {
- stats.add("sum");
- } else if(function.equals("avg")) {
- stats.add("mean");
- } else if(function.equals("count")) {
- this.doCount = true;
+ if(identifier.startsWith("per(")) {
+ buf.append("\"facet_").append(metricCount).append("\":\"").append(identifier.replaceFirst("per", "percentile")).append('"');
+ } else if(identifier.startsWith("std(")) {
+ buf.append("\"facet_").append(metricCount).append("\":\"").append(identifier.replaceFirst("std", "stddev")).append('"');
+ } else {
+ buf.append("\"facet_").append(metricCount).append("\":\"").append(identifier).append('"');
}
+ ++metricCount;
}
}
+ }
- for(Entry<String, List<String>> entry : m.entrySet()) {
- StringBuilder buf = new StringBuilder();
- List<String> stats = entry.getValue();
- buf.append("{!");
-
- for(String stat : stats) {
- buf.append(stat).append("=").append("true ");
- }
+ private void getTuples(NamedList response,
+ Metric[] metrics) {
- buf.append("}").append(entry.getKey());
- params.add("stats.field", buf.toString());
- }
+ this.tuple = new Tuple();
+ NamedList facets = (NamedList)response.get("facets");
+ fillTuple(tuple, facets, metrics);
}
- private Tuple getTuple(NamedList response) {
- Tuple tuple = new Tuple();
- SolrDocumentList solrDocumentList = (SolrDocumentList) response.get("response");
-
- long count = solrDocumentList.getNumFound();
+ private void fillTuple(Tuple t,
+ NamedList nl,
+ Metric[] _metrics) {
- if(doCount) {
- tuple.put("count(*)", count);
+ if(nl == null) {
+ return;
}
- if(count != 0) {
- NamedList stats = (NamedList)response.get("stats");
- NamedList statsFields = (NamedList)stats.get("stats_fields");
-
- for(int i=0; i<statsFields.size(); i++) {
- String field = statsFields.getName(i);
- NamedList theStats = (NamedList)statsFields.getVal(i);
- for(int s=0; s<theStats.size(); s++) {
- addStat(tuple, field, theStats.getName(s), theStats.getVal(s));
+ int m = 0;
+ for(Metric metric : _metrics) {
+ String identifier = metric.getIdentifier();
+ if(!identifier.startsWith("count(")) {
+ if(nl.get("facet_"+m) != null) {
+ Object d = nl.get("facet_" + m);
+ if(d instanceof Number) {
+ if (metric.outputLong) {
+ t.put(identifier, Math.round(((Number)d).doubleValue()));
+ } else {
+ t.put(identifier, ((Number)d).doubleValue());
+ }
+ } else {
+ t.put(identifier, d);
+ }
}
+ ++m;
+ } else {
+ long l = ((Number)nl.get("count")).longValue();
+ t.put("count(*)", l);
}
}
- return tuple;
}
public int getCost() {
return 0;
}
- private void addStat(Tuple tuple, String field, String stat, Object val) {
- if(stat.equals("mean")) {
- String name = "avg("+field+")";
- Metric m = metricMap.get(name);
- if(m.outputLong) {
- Number num = (Number) val;
- tuple.put(name, Math.round(num.doubleValue()));
- } else {
- tuple.put(name, val);
- }
- } else {
- tuple.put(stat+"("+field+")", val);
- }
+ @Override
+ public StreamComparator getStreamSort() {
+ return null;
}
}