You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by th...@apache.org on 2021/07/06 20:10:35 UTC

[lucene-solr] branch branch_8x updated: SOLR-15208: Add the countDist aggregation to the stats, facet and timeseries Streaming Expressions (#2527)

This is an automated email from the ASF dual-hosted git repository.

thelabdude pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_8x by this push:
     new b5797fc  SOLR-15208: Add the countDist aggregation to the stats, facet and timeseries Streaming Expressions (#2527)
b5797fc is described below

commit b5797fceba26a04d3e36c02a797e69835423aa2d
Author: Timothy Potter <th...@gmail.com>
AuthorDate: Tue Jul 6 14:10:21 2021 -0600

    SOLR-15208: Add the countDist aggregation to the stats, facet and timeseries Streaming Expressions (#2527)
---
 solr/CHANGES.txt                                   |  2 +
 .../java/org/apache/solr/client/solrj/io/Lang.java |  9 +--
 .../solr/client/solrj/io/stream/FacetStream.java   |  4 +-
 .../solr/client/solrj/io/stream/StatsStream.java   |  2 +
 .../client/solrj/io/stream/TimeSeriesStream.java   |  2 +
 .../io/stream/metrics/CountDistinctMetric.java     | 77 ++++++++++++++++++++++
 .../org/apache/solr/client/solrj/io/TestLang.java  |  2 +-
 .../solrj/io/stream/StreamExpressionTest.java      | 44 ++++++++-----
 8 files changed, 117 insertions(+), 25 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 9ba16e6..d42c384 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -13,6 +13,8 @@ New Features
 * SOLR-15472: New shards.preference option for preferring replicas based on their leader status, i.e.
   `shards.preference=replica.leader:false` would prefer non-leader replicas. (wei wang via Timothy Potter)
 
+* SOLR-15208: Add the countDist aggregation to the stats, facet and timeseries Streaming Expressions (Joel Bernstein)
+
 Improvements
 ---------------------
 * SOLR-15460: Implement LIKE, IS NOT NULL, IS NULL, and support wildcard * in equals string literal for Parallel SQL (Timothy Potter, Houston Putman)
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
index 3ea5492..ea27420 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
@@ -24,13 +24,7 @@ import org.apache.solr.client.solrj.io.ops.GroupOperation;
 import org.apache.solr.client.solrj.io.ops.ReplaceOperation;
 import org.apache.solr.client.solrj.io.stream.*;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
-import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
-import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
-import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
-import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
-import org.apache.solr.client.solrj.io.stream.metrics.PercentileMetric;
-import org.apache.solr.client.solrj.io.stream.metrics.StdMetric;
-import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.*;
 import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
 import org.apache.solr.client.solrj.io.comp.FieldComparator;
 import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
@@ -119,6 +113,7 @@ public class Lang {
         .withFunctionName("per", PercentileMetric.class)
         .withFunctionName("std", StdMetric.class)
         .withFunctionName("count", CountMetric.class)
+        .withFunctionName("countDist", CountDistinctMetric.class)
 
             // tuple manipulation operations
         .withFunctionName("replace", ReplaceOperation.class)
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
index 43ed11b..0f42c61 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
@@ -634,7 +634,7 @@ public class FacetStream extends TupleStream implements Expressible, ParallelMet
 
     for(Metric metric: metrics) {
       String func = metric.getFunctionName();
-      if(!func.equals("count") && !func.equals("per") && !func.equals("std")) {
+      if(!func.equals("count") && !func.equals("per") && !func.equals("std") && !func.equals("countDist")) {
         if (!json.contains(metric.getIdentifier())) {
           return false;
         }
@@ -764,6 +764,8 @@ public class FacetStream extends TupleStream implements Expressible, ParallelMet
             buf.append("\"facet_").append(metricCount).append("\":\"").append(identifier.replaceFirst("per", "percentile")).append('"');
           } else if (identifier.startsWith("std(")) {
             buf.append("\"facet_").append(metricCount).append("\":\"").append(identifier.replaceFirst("std", "stddev")).append('"');
+          } else if (identifier.startsWith("countDist(")) {
+            buf.append("\"facet_").append(metricCount).append("\":\"").append(identifier.replaceFirst("countDist", "unique")).append('"');
           } else {
             buf.append('"').append(facetKey).append("\":\"").append(identifier).append('"');
           }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
index 3403ae6..eb9e0ff 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
@@ -296,6 +296,8 @@ public class StatsStream extends TupleStream implements Expressible  {
           buf.append("\"facet_").append(metricCount).append("\":\"").append(identifier.replaceFirst("per", "percentile")).append('"');
         } else if(identifier.startsWith("std(")) {
           buf.append("\"facet_").append(metricCount).append("\":\"").append(identifier.replaceFirst("std", "stddev")).append('"');
+        } else if (identifier.startsWith("countDist(")) {
+          buf.append("\"facet_").append(metricCount).append("\":\"").append(identifier.replaceFirst("countDist", "unique")).append('"');
         } else {
           buf.append("\"facet_").append(metricCount).append("\":\"").append(identifier).append('"');
         }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java
index a7cbee7..c40c346 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java
@@ -366,6 +366,8 @@ public class TimeSeriesStream extends TupleStream implements Expressible  {
           buf.append("\"facet_").append(metricCount).append("\":\"").append(identifier.replaceFirst("per", "percentile")).append('"');
         } else if(identifier.startsWith("std(")) {
           buf.append("\"facet_").append(metricCount).append("\":\"").append(identifier.replaceFirst("std", "stddev")).append('"');
+        }  else if (identifier.startsWith("countDist(")) {
+          buf.append("\"facet_").append(metricCount).append("\":\"").append(identifier.replaceFirst("countDist", "unique")).append('"');
         } else {
           buf.append("\"facet_").append(metricCount).append("\":\"").append(identifier).append('"');
         }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/CountDistinctMetric.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/CountDistinctMetric.java
new file mode 100644
index 0000000..623fc22
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/CountDistinctMetric.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream.metrics;
+
+import java.io.IOException;
+import java.util.Locale;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public class CountDistinctMetric extends Metric {
+
+    private String columnName;
+
+    public CountDistinctMetric(String columnName){
+        init("countDist", columnName);
+    }
+
+    public CountDistinctMetric(StreamExpression expression, StreamFactory factory) throws IOException{
+        // grab all parameters out
+        String functionName = expression.getFunctionName();
+        String columnName = factory.getValueOperand(expression, 0);
+
+
+        // validate expression contains only what we want.
+        if(null == columnName){
+            throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expected %s(columnName)", expression, functionName));
+        }
+
+        init(functionName, columnName);
+    }
+
+    private void init(String functionName, String columnName){
+        this.columnName = columnName;
+        this.outputLong = true;
+        setFunctionName(functionName);
+        setIdentifier(functionName, "(", columnName, ")");
+    }
+
+    public void update(Tuple tuple) {
+        //Nop for now
+    }
+
+    public Metric newInstance() {
+        return new MeanMetric(columnName, outputLong);
+    }
+
+    public String[] getColumns() {
+        return new String[]{columnName};
+    }
+
+    public Number getValue() {
+       //No op for now
+       return null;
+    }
+
+    @Override
+    public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
+        return new StreamExpression(getFunctionName()).withParameter(columnName).withParameter(Boolean.toString(outputLong));
+    }
+}
\ No newline at end of file
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
index 61cc90d..739be66 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
@@ -85,7 +85,7 @@ public class TestLang extends SolrTestCase {
       "getSupportPoints", "pairSort", "log10", "plist", "recip", "pivot", "ltrim", "rtrim", "export",
       "zplot", "natural", "repeat", "movingMAD", "hashRollup", "noop", "var", "stddev", "recNum", "isNull",
       "notNull", "matches", "projectToBorder", "double", "long", "parseCSV", "parseTSV", "dateTime",
-       "split", "upper", "trim", "lower", "trunc", "cosine", "dbscan", "per", "std", "drill", "input"};
+       "split", "upper", "trim", "lower", "trunc", "cosine", "dbscan", "per", "std", "drill", "input", "countDist"};
 
   @Test
   public void testLang() {
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index 356efee..62c91d3 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -43,13 +43,7 @@ import org.apache.solr.client.solrj.io.comp.FieldComparator;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
-import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
-import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
-import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
-import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
-import org.apache.solr.client.solrj.io.stream.metrics.PercentileMetric;
-import org.apache.solr.client.solrj.io.stream.metrics.StdMetric;
-import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.*;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.cloud.SolrCloudTestCase;
@@ -765,7 +759,8 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     .withFunctionName("avg", MeanMetric.class)
     .withFunctionName("count", CountMetric.class)
     .withFunctionName("std", StdMetric.class)
-    .withFunctionName("per", PercentileMetric.class);
+    .withFunctionName("per", PercentileMetric.class)
+    .withFunctionName("countDist", CountDistinctMetric.class);
 
     StreamExpression expression;
     TupleStream stream;
@@ -774,7 +769,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     SolrClientCache cache = new SolrClientCache();
     try {
       streamContext.setSolrClientCache(cache);
-      String expr = "stats(" + COLLECTIONORALIAS + ", q=*:*, sum(a_i), sum(a_f), min(a_i), min(a_f), max(a_i), max(a_f), avg(a_i), avg(a_f), std(a_i), std(a_f), per(a_i, 50), per(a_f, 50), count(*))";
+      String expr = "stats(" + COLLECTIONORALIAS + ", q=*:*, sum(a_i), sum(a_f), min(a_i), min(a_f), max(a_i), max(a_f), avg(a_i), avg(a_f), std(a_i), std(a_f), per(a_i, 50), per(a_f, 50), countDist(a_s), count(*))";
       expression = StreamExpressionParser.parse(expr);
       stream = factory.constructStream(expression);
       stream.setStreamContext(streamContext);
@@ -800,6 +795,8 @@ public class StreamExpressionTest extends SolrCloudTestCase {
       Double peri = tuple.getDouble("per(a_i,50)");
       Double perf = tuple.getDouble("per(a_f,50)");
       Double count = tuple.getDouble("count(*)");
+      Long countDist = tuple.getLong("countDist(a_s)");
+
 
       assertTrue(sumi.longValue() == 70);
       assertTrue(sumf.doubleValue() == 55.0D);
@@ -814,6 +811,8 @@ public class StreamExpressionTest extends SolrCloudTestCase {
       assertTrue(peri.doubleValue() == 7.0D);
       assertTrue(perf.doubleValue() == 5.5D);
       assertTrue(count.doubleValue() == 10);
+      assertEquals(countDist.longValue(), 3L);
+
 
 
       //Test without query
@@ -1233,7 +1232,8 @@ public class StreamExpressionTest extends SolrCloudTestCase {
       .withFunctionName("avg", MeanMetric.class)
       .withFunctionName("std", StdMetric.class)
       .withFunctionName("per", PercentileMetric.class)
-      .withFunctionName("count", CountMetric.class);
+      .withFunctionName("count", CountMetric.class)
+      .withFunctionName("countDist", CountDistinctMetric.class);
 
     // Basic test
     clause = "facet("
@@ -1250,7 +1250,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
               +   "avg(a_i), avg(a_f), "
               +   "std(a_i), std(a_f),"
               +   "per(a_i, 50), per(a_f, 50),"
-              +   "count(*)"
+              +   "count(*), countDist(a_i)"
               + ")";
 
     stream = factory.constructStream(clause);
@@ -1273,8 +1273,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     Double stdf = tuple.getDouble("std(a_f)");
     Double peri = tuple.getDouble("per(a_i,50)");
     Double perf = tuple.getDouble("per(a_f,50)");
-
-
+    Long countDist = tuple.getLong("countDist(a_i)");
     Double count = tuple.getDouble("count(*)");
 
     assertTrue(bucket.equals("hello4"));
@@ -1291,6 +1290,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     assertTrue(stdf.doubleValue() == 1.5D);
     assertTrue(peri.doubleValue() == 7.5D);
     assertTrue(perf.doubleValue() ==  5.5D);
+    assertEquals(countDist.longValue(), 2);
 
 
     tuple = tuples.get(1);
@@ -1308,6 +1308,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     stdf = tuple.getDouble("std(a_f)");
     peri = tuple.getDouble("per(a_i,50)");
     perf = tuple.getDouble("per(a_f,50)");
+    countDist = tuple.getLong("countDist(a_i)");
 
 
     assertTrue(bucket.equals("hello0"));
@@ -1324,6 +1325,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     assertTrue(stdf.doubleValue() == 3.5D);
     assertTrue(peri.doubleValue() == 1.5D);
     assertTrue(perf.doubleValue() ==  3.5D);
+    assertEquals(countDist.longValue(), 4);
 
 
     tuple = tuples.get(2);
@@ -1341,7 +1343,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     stdf = tuple.getDouble("std(a_f)");
     peri = tuple.getDouble("per(a_i,50)");
     perf = tuple.getDouble("per(a_f,50)");
-
+    countDist = tuple.getLong("countDist(a_i)");
 
     assertTrue(bucket.equals("hello3"));
     assertTrue(sumi.doubleValue() == 38.0D);
@@ -1357,7 +1359,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     assertTrue(stdf.doubleValue() == 2.29128784747792D);
     assertTrue(peri.doubleValue() == 11.0D);
     assertTrue(perf.doubleValue() ==  7.0D);
-
+    assertEquals(countDist.longValue(), 4);
 
     //Reverse the Sort.
 
@@ -2715,7 +2717,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
         "end=\"2017-12-01T01:00:00.000Z\", " +
         "gap=\"+1YEAR\", " +
         "field=\"test_dt\", " +
-        "count(*), sum(price_f), max(price_f), min(price_f), avg(price_f), std(price_f), per(price_f, 50))";
+        "count(*), sum(price_f), max(price_f), min(price_f), avg(price_f), std(price_f), per(price_f, 50), countDist(id))";
     ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
     paramsLoc.set("expr", expr);
     paramsLoc.set("qt", "/stream");
@@ -2726,6 +2728,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     StreamContext context = new StreamContext();
     solrStream.setStreamContext(context);
     List<Tuple> tuples = getTuples(solrStream);
+
     assertTrue(tuples.size() == 5);
 
     assertTrue(tuples.get(0).get("test_dt").equals("2013-01-01T01:00:00Z"));
@@ -2736,6 +2739,8 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     assertTrue(tuples.get(0).getDouble("avg(price_f)").equals(100D));
     assertTrue(tuples.get(0).getDouble("std(price_f)").equals(0D));
     assertTrue(tuples.get(0).getDouble("per(price_f,50)").equals(100D));
+    assertTrue(tuples.get(0).getLong("countDist(id)").equals(100L));
+
 
     assertTrue(tuples.get(1).get("test_dt").equals("2014-01-01T01:00:00Z"));
     assertTrue(tuples.get(1).getLong("count(*)").equals(50L));
@@ -2745,6 +2750,8 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     assertTrue(tuples.get(1).getDouble("avg(price_f)").equals(500D));
     assertTrue(tuples.get(1).getDouble("std(price_f)").equals(0D));
     assertTrue(tuples.get(1).getDouble("per(price_f,50)").equals(500D));
+    assertTrue(tuples.get(1).getLong("countDist(id)").equals(50L));
+
 
     assertTrue(tuples.get(2).get("test_dt").equals("2015-01-01T01:00:00Z"));
     assertTrue(tuples.get(2).getLong("count(*)").equals(50L));
@@ -2754,6 +2761,8 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     assertTrue(tuples.get(2).getDouble("avg(price_f)").equals(300D));
     assertTrue(tuples.get(2).getDouble("std(price_f)").equals(0D));
     assertTrue(tuples.get(2).getDouble("per(price_f,50)").equals(300D));
+    assertTrue(tuples.get(2).getLong("countDist(id)").equals(50L));
+
 
     assertTrue(tuples.get(3).get("test_dt").equals("2016-01-01T01:00:00Z"));
     assertTrue(tuples.get(3).getLong("count(*)").equals(50L));
@@ -2763,6 +2772,8 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     assertTrue(tuples.get(3).getDouble("avg(price_f)").equals(400D));
     assertTrue(tuples.get(3).getDouble("std(price_f)").equals(0D));
     assertTrue(tuples.get(3).getDouble("per(price_f,50)").equals(400D));
+    assertTrue(tuples.get(3).getLong("countDist(id)").equals(50L));
+
 
     assertTrue(tuples.get(4).get("test_dt").equals("2017-01-01T01:00:00Z"));
     assertEquals((long)tuples.get(4).getLong("count(*)"), 0L);
@@ -2772,6 +2783,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     assertTrue(tuples.get(4).getDouble("avg(price_f)").equals(0D));
     assertTrue(tuples.get(4).getDouble("std(price_f)").equals(0D));
     assertTrue(tuples.get(4).getDouble("per(price_f,50)").equals(0D));
+    assertTrue(tuples.get(4).getLong("countDist(id)").equals(0L));
 
     expr = "timeseries("+COLLECTIONORALIAS+", q=\"*:*\", start=\"2013-01-01T01:00:00.000Z\", " +
         "end=\"2016-12-01T01:00:00.000Z\", " +