You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/06/08 14:01:12 UTC

[lucene-solr] branch jira/solr-14470-2 updated: SOLR-14470: Fix a merge mistake that left an earlier version here than on master.

This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch jira/solr-14470-2
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/jira/solr-14470-2 by this push:
     new dc3b4df  SOLR-14470: Fix a merge mistake that left an earlier version here than on master.
dc3b4df is described below

commit dc3b4df843b6c6d712f38116fa17149fb33ea640
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Mon Jun 8 15:55:20 2020 +0200

    SOLR-14470: Fix a merge mistake that left an earlier version here than on master.
---
 .../solr/client/solrj/io/stream/StatsStream.java   | 268 ++++++++++-----------
 1 file changed, 132 insertions(+), 136 deletions(-)

diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
index f86feeb..c05fc3e 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
@@ -19,7 +19,7 @@ package org.apache.solr.client.solrj.io.stream;
 import java.io.IOException;
 
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -41,9 +41,9 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParamete
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.Metric;
 import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
@@ -55,53 +55,61 @@ public class StatsStream extends TupleStream implements Expressible  {
 
   private static final long serialVersionUID = 1;
 
+
+
   private Metric[] metrics;
-  private String zkHost;
   private Tuple tuple;
+  private int index;
+  private String zkHost;
   private SolrParams params;
   private String collection;
-  private boolean done;
-  private boolean doCount;
-  private Map<String, Metric> metricMap;
   protected transient SolrClientCache cache;
   protected transient CloudSolrClient cloudSolrClient;
-  protected StreamContext streamContext;
+  private StreamContext context;
 
   public StatsStream(String zkHost,
-                     String collection,
-                     SolrParams params,
-                     Metric[] metrics) {
-    init(zkHost, collection, params, metrics);
-  }
-
-  private void init(String zkHost, String collection, SolrParams params, Metric[] metrics) {
-    this.zkHost  = zkHost;
-    this.params = params;
-    this.metrics = metrics;
-    this.collection = collection;
-    metricMap = new HashMap();
-    for(Metric metric : metrics) {
-      metricMap.put(metric.getIdentifier(), metric);
-    }
+                          String collection,
+                          SolrParams params,
+                          Metric[] metrics
+                          ) throws IOException {
+    init(collection, params, metrics, zkHost);
   }
 
   public StatsStream(StreamExpression expression, StreamFactory factory) throws IOException{
     // grab all parameters out
     String collectionName = factory.getValueOperand(expression, 0);
+
+    if(collectionName.indexOf('"') > -1) {
+      collectionName = collectionName.replaceAll("\"", "").replaceAll(" ", "");
+    }
+
     List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
-    List<StreamExpression> metricExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Metric.class);
-    StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
 
+    StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
+    List<StreamExpression> metricExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, Metric.class);
 
     // Collection Name
     if(null == collectionName){
       throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression));
     }
 
+    // Construct the metrics
+    Metric[] metrics = null;
+    if(metricExpressions.size() > 0) {
+      metrics = new Metric[metricExpressions.size()];
+      for(int idx = 0; idx < metricExpressions.size(); ++idx){
+        metrics[idx] = factory.constructMetric(metricExpressions.get(idx));
+      }
+    } else {
+      metrics = new Metric[1];
+      metrics[0] = new CountMetric();
+    }
+
+    // pull out known named params
     ModifiableSolrParams params = new ModifiableSolrParams();
     for(StreamExpressionNamedParameter namedParam : namedParams){
       if(!namedParam.getName().equals("zkHost")){
-        params.set(namedParam.getName(), namedParam.getParameter().toString().trim());
+        params.add(namedParam.getName(), namedParam.getParameter().toString().trim());
       }
     }
 
@@ -120,46 +128,51 @@ public class StatsStream extends TupleStream implements Expressible  {
       zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
     }
 
-    /*
-    if(null == zkHost){
-      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
-    }
-    */
+    // We've got all the required items
+    init(collectionName, params, metrics, zkHost);
+  }
 
-    // metrics, optional - if not provided then why are you using this?
-    Metric[] metrics = new Metric[metricExpressions.size()];
-    for(int idx = 0; idx < metricExpressions.size(); ++idx){
-      metrics[idx] = factory.constructMetric(metricExpressions.get(idx));
-    }
+  public String getCollection() {
+    return this.collection;
+  }
 
-    // We've got all the required items
-    init(zkHost, collectionName, params, metrics);
+  private void init(String collection,
+                    SolrParams params,
+                    Metric[] metrics,
+                    String zkHost) throws IOException {
+    this.zkHost  = zkHost;
+    this.collection = collection;
+    this.metrics = metrics;
+    this.params = params;
   }
 
   @Override
   public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
-    // functionName(collectionName, param1, param2, ..., paramN, sort="comp", sum(fieldA), avg(fieldB))
-
     // function name
     StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
-
     // collection
-    expression.addParameter(collection);
+    if(collection.indexOf(',') > -1) {
+      expression.addParameter("\""+collection+"\"");
+    } else {
+      expression.addParameter(collection);
+    }
 
     // parameters
-    ModifiableSolrParams mParams = new ModifiableSolrParams(params);
-    for (Entry<String, String[]> param : mParams.getMap().entrySet()) {
-      expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), String.join(",", param.getValue())));
-    }
+    ModifiableSolrParams tmpParams = new ModifiableSolrParams(params);
 
-    // zkHost
-    expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
+    for (Entry<String, String[]> param : tmpParams.getMap().entrySet()) {
+      expression.addParameter(new StreamExpressionNamedParameter(param.getKey(),
+          String.join(",", param.getValue())));
+    }
 
     // metrics
     for(Metric metric : metrics){
       expression.addParameter(metric.toExpression(factory));
     }
 
+    // zkHost
+    expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
+
     return expression;
   }
 
@@ -173,45 +186,46 @@ public class StatsStream extends TupleStream implements Expressible  {
     explanation.setExpressionType(ExpressionType.STREAM_SOURCE);
     explanation.setExpression(toExpression(factory).toString());
 
+    // child is a datastore so add it at this point
     StreamExplanation child = new StreamExplanation(getStreamNodeId() + "-datastore");
-    child.setFunctionName(String.format(Locale.ROOT, "solr (worker ? of ?)"));
-      // TODO: fix this so we know the # of workers - check with Joel about a Stat's ability to be in a
-      // parallel stream.
+    child.setFunctionName(String.format(Locale.ROOT, "solr (%s)", collection));
+    // TODO: fix this so we know the # of workers - check with Joel about a Topic's ability to be in a
+    // parallel stream.
 
     child.setImplementingClass("Solr/Lucene");
     child.setExpressionType(ExpressionType.DATASTORE);
-    ModifiableSolrParams mParams = new ModifiableSolrParams(params);
-    child.setExpression(mParams.getMap().entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
+
+    child.setExpression(params.stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), Arrays.toString(e.getValue()))).collect(Collectors.joining(",")));
+
     explanation.addChild(child);
 
     return explanation;
   }
 
   public void setStreamContext(StreamContext context) {
-    streamContext = context;
+    this.context = context;
     cache = context.getSolrClientCache();
   }
 
   public List<TupleStream> children() {
-    return new ArrayList<>();
+    return new ArrayList();
   }
 
   public void open() throws IOException {
-    ModifiableSolrParams paramsLoc = new ModifiableSolrParams(this.params);
-    addStats(paramsLoc, metrics);
-    paramsLoc.set("stats", "true");
+
+    String json = getJsonFacetString(metrics);
+
+    ModifiableSolrParams paramsLoc = new ModifiableSolrParams(params);
+    paramsLoc.set("json.facet", json);
     paramsLoc.set("rows", "0");
-    if (streamContext.isLocal()) {
-      paramsLoc.set("distrib", "false");
-    }
 
-    Map<String, List<String>> shardsMap = (Map<String, List<String>>)streamContext.get("shards");
+    Map<String, List<String>> shardsMap = (Map<String, List<String>>)context.get("shards");
     if(shardsMap == null) {
       QueryRequest request = new QueryRequest(paramsLoc, SolrRequest.METHOD.POST);
-      CloudSolrClient cloudSolrClient = cache.getCloudSolrClient(zkHost);
+      cloudSolrClient = cache.getCloudSolrClient(zkHost);
       try {
         NamedList response = cloudSolrClient.request(request, collection);
-        this.tuple = getTuple(response);
+        getTuples(response, metrics);
       } catch (Exception e) {
         throw new IOException(e);
       }
@@ -228,7 +242,7 @@ public class StatsStream extends TupleStream implements Expressible  {
       QueryRequest request = new QueryRequest(paramsLoc, SolrRequest.METHOD.POST);
       try {
         NamedList response = client.request(request);
-        this.tuple = getTuple(response);
+        getTuples(response, metrics);
       } catch (Exception e) {
         throw new IOException(e);
       }
@@ -251,106 +265,88 @@ public class StatsStream extends TupleStream implements Expressible  {
   }
 
   public Tuple read() throws IOException {
-    if(!done) {
-      done = true;
+    if(index == 0) {
+      ++index;
       return tuple;
     } else {
       return Tuple.EOF();
     }
   }
 
-  public StreamComparator getStreamSort() {
-    return null;
+  private String getJsonFacetString(Metric[] _metrics) {
+    StringBuilder buf = new StringBuilder();
+    appendJson(buf, _metrics);
+    return "{"+buf.toString()+"}";
   }
 
-  private void addStats(ModifiableSolrParams params, Metric[] _metrics) {
-    Map<String, List<String>> m = new HashMap<>();
+  private void appendJson(StringBuilder buf,
+                          Metric[] _metrics) {
+    
+    int metricCount = 0;
     for(Metric metric : _metrics) {
-      String metricId = metric.getIdentifier();
-      if(metricId.contains("(")) {
-        metricId = metricId.substring(0, metricId.length()-1);
-        String[] parts = metricId.split("\\(");
-        String function = parts[0];
-        String column = parts[1];
-        List<String> stats = m.get(column);
-
-        if(stats == null) {
-          stats = new ArrayList<>();
+      String identifier = metric.getIdentifier();
+      if(!identifier.startsWith("count(")) {
+        if(metricCount>0) {
+          buf.append(",");
         }
-
-        if(!column.equals("*")) {
-          m.put(column, stats);
-        }
-
-        if(function.equals("min")) {
-          stats.add("min");
-        } else if(function.equals("max")) {
-          stats.add("max");
-        } else if(function.equals("sum")) {
-          stats.add("sum");
-        } else if(function.equals("avg")) {
-          stats.add("mean");
-        } else if(function.equals("count")) {
-          this.doCount = true;
+        if(identifier.startsWith("per(")) {
+          buf.append("\"facet_").append(metricCount).append("\":\"").append(identifier.replaceFirst("per", "percentile")).append('"');
+        } else if(identifier.startsWith("std(")) {
+          buf.append("\"facet_").append(metricCount).append("\":\"").append(identifier.replaceFirst("std", "stddev")).append('"');
+        } else {
+          buf.append("\"facet_").append(metricCount).append("\":\"").append(identifier).append('"');
         }
+        ++metricCount;
       }
     }
+  }
 
-    for(Entry<String, List<String>> entry : m.entrySet()) {
-      StringBuilder buf = new StringBuilder();
-      List<String> stats = entry.getValue();
-      buf.append("{!");
-
-      for(String stat : stats) {
-        buf.append(stat).append("=").append("true ");
-      }
+  private void getTuples(NamedList response,
+                         Metric[] metrics) {
 
-      buf.append("}").append(entry.getKey());
-      params.add("stats.field", buf.toString());
-    }
+    this.tuple = new Tuple();
+    NamedList facets = (NamedList)response.get("facets");
+    fillTuple(tuple, facets, metrics);
   }
 
-  private Tuple getTuple(NamedList response) {
-    Tuple tuple = new Tuple();
-    SolrDocumentList solrDocumentList = (SolrDocumentList) response.get("response");
-
-    long count = solrDocumentList.getNumFound();
+  private void fillTuple(Tuple t,
+                         NamedList nl,
+                         Metric[] _metrics) {
 
-    if(doCount) {
-      tuple.put("count(*)", count);
+    if(nl == null) {
+      return;
     }
 
-    if(count != 0) {
-      NamedList stats = (NamedList)response.get("stats");
-      NamedList statsFields = (NamedList)stats.get("stats_fields");
-
-      for(int i=0; i<statsFields.size(); i++) {
-        String field = statsFields.getName(i);
-        NamedList theStats = (NamedList)statsFields.getVal(i);
-        for(int s=0; s<theStats.size(); s++) {
-          addStat(tuple, field, theStats.getName(s), theStats.getVal(s));
+    int m = 0;
+    for(Metric metric : _metrics) {
+      String identifier = metric.getIdentifier();
+      if(!identifier.startsWith("count(")) {
+        if(nl.get("facet_"+m) != null) {
+          Object d = nl.get("facet_" + m);
+          if(d instanceof Number) {
+            if (metric.outputLong) {
+              t.put(identifier, Math.round(((Number)d).doubleValue()));
+            } else {
+              t.put(identifier, ((Number)d).doubleValue());
+            }
+          } else {
+            t.put(identifier, d);
+          }
         }
+        ++m;
+      } else {
+        long l = ((Number)nl.get("count")).longValue();
+        t.put("count(*)", l);
       }
     }
-    return tuple;
   }
 
   public int getCost() {
     return 0;
   }
 
-  private void addStat(Tuple tuple, String field, String stat, Object val) {
-    if(stat.equals("mean")) {
-      String name = "avg("+field+")";
-      Metric m = metricMap.get(name);
-      if(m.outputLong) {
-        Number num = (Number) val;
-        tuple.put(name, Math.round(num.doubleValue()));
-      } else {
-        tuple.put(name, val);
-      }
-    } else {
-      tuple.put(stat+"("+field+")", val);
-    }
+  @Override
+  public StreamComparator getStreamSort() {
+    return null;
   }
 }