You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by th...@apache.org on 2021/07/06 20:24:42 UTC

[lucene-solr] branch branch_8x updated: SOLR-15475: Implement COUNT and APPROX_COUNT_DISTINCT aggregation functions for Parallel SQL (#2528)

This is an automated email from the ASF dual-hosted git repository.

thelabdude pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_8x by this push:
     new c412ca3  SOLR-15475: Implement COUNT and APPROX_COUNT_DISTINCT aggregation functions for Parallel SQL (#2528)
c412ca3 is described below

commit c412ca3e6ef608a05f97b21354f7967ff1947822
Author: Timothy Potter <th...@gmail.com>
AuthorDate: Tue Jul 6 14:24:26 2021 -0600

    SOLR-15475: Implement COUNT and APPROX_COUNT_DISTINCT aggregation functions for Parallel SQL (#2528)
---
 solr/CHANGES.txt                                   |   4 +-
 .../org/apache/solr/handler/sql/SolrAggregate.java |  38 +-
 .../java/org/apache/solr/handler/sql/SolrRel.java  |   5 +-
 .../org/apache/solr/handler/sql/SolrTable.java     | 481 +++++++++++----------
 .../org/apache/solr/handler/TestSQLHandler.java    |  40 ++
 .../solr-ref-guide/src/parallel-sql-interface.adoc |  45 +-
 .../io/stream/metrics/CountDistinctMetric.java     |  15 +-
 .../solrj/io/stream/metrics/CountMetric.java       |   1 +
 8 files changed, 364 insertions(+), 265 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index d42c384..df56eb7 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -23,10 +23,12 @@ Improvements
 
 * SOLR-15456: Get field type info from luke for custom fields instead of defaulting to String in Parallel SQL (Timothy Potter)
 
-* SOLR-15461: SOLR-15461: Upgrade Apache Calcite to 1.27.0. (Mark Miller, Timothy Potter)
+* SOLR-15461: Upgrade Apache Calcite to 1.27.0. (Mark Miller, Timothy Potter)
 
 * SOLR-15489: Implement OFFSET & FETCH for LIMIT SQL queries (Timothy Potter)
 
+* SOLR-15475: Implement COUNT and APPROX_COUNT_DISTINCT aggregation functions for Parallel SQL (Timothy Potter)
+
 Optimizations
 ---------------------
 * SOLR-15433: Replace transient core cache LRU by Caffeine cache. (Bruno Roustant)
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrAggregate.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrAggregate.java
index 6ae02e0..e1c6a6f 100644
--- a/solr/core/src/java/org/apache/solr/handler/sql/SolrAggregate.java
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrAggregate.java
@@ -16,6 +16,10 @@
  */
 package org.apache.solr.handler.sql;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
@@ -27,7 +31,8 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Pair;
 
-import java.util.*;
+import static org.apache.solr.client.solrj.io.stream.metrics.CountDistinctMetric.APPROX_COUNT_DISTINCT;
+import static org.apache.solr.client.solrj.io.stream.metrics.CountDistinctMetric.COUNT_DISTINCT;
 
 /**
  * Implementation of {@link org.apache.calcite.rel.core.Aggregate} relational expression in Solr.
@@ -42,6 +47,13 @@ class SolrAggregate extends Aggregate implements SolrRel {
       SqlStdOperatorTable.AVG
   );
 
+  // Returns the Solr agg metric identifier (includes column) for the SQL metric
+  static String solrAggMetricId(String metric, String column) {
+    // CountDistinctMetric's getIdentifer returns "countDist" but all others return a lowercased value
+    String funcName = COUNT_DISTINCT.equals(metric) ? COUNT_DISTINCT : metric.toLowerCase(Locale.ROOT);
+    return String.format(Locale.ROOT, "%s(%s)", funcName, column);
+  }
+
   SolrAggregate(
       RelOptCluster cluster,
       RelTraitSet traitSet,
@@ -72,25 +84,19 @@ class SolrAggregate extends Aggregate implements SolrRel {
     implementor.visitChild(0, getInput());
 
     final List<String> inNames = SolrRules.solrFieldNames(getInput().getRowType());
-
-
-    for(Pair<AggregateCall, String> namedAggCall : getNamedAggCalls()) {
+    for (Pair<AggregateCall, String> namedAggCall : getNamedAggCalls()) {
 
       AggregateCall aggCall = namedAggCall.getKey();
-
       Pair<String, String> metric = toSolrMetric(implementor, aggCall, inNames);
-      implementor.addReverseAggMapping(namedAggCall.getValue(), metric.getKey().toLowerCase(Locale.ROOT)+"("+metric.getValue()+")");
-      implementor.addMetricPair(namedAggCall.getValue(), metric.getKey(), metric.getValue());
-      /*
-      if(aggCall.getName() == null) {
-        System.out.println("AGG:"+namedAggCall.getValue()+":"+ aggCall.getAggregation().getName() + "(" + inNames.get(aggCall.getArgList().get(0)) + ")");
-        implementor.addFieldMapping(namedAggCall.getValue(),
-          aggCall.getAggregation().getName() + "(" + inNames.get(aggCall.getArgList().get(0)) + ")");
-      }
-      */
+
+      boolean isDistinct = SqlStdOperatorTable.COUNT.equals(aggCall.getAggregation()) && aggCall.isDistinct();
+      // map the SQL COUNT to either countDist or hll for distinct ops, otherwise, the metric names map over directly
+      String metricKey = isDistinct ? (aggCall.isApproximate() ? APPROX_COUNT_DISTINCT : COUNT_DISTINCT) : metric.getKey();
+      implementor.addReverseAggMapping(namedAggCall.getValue(), solrAggMetricId(metricKey, metric.getValue()));
+      implementor.addMetricPair(namedAggCall.getValue(), metricKey, metric.getValue());
     }
 
-    for(int group : getGroupSet()) {
+    for (int group : getGroupSet()) {
       String inName = inNames.get(group);
       implementor.addBucket(inName);
     }
@@ -108,7 +114,7 @@ class SolrAggregate extends Aggregate implements SolrRel {
       case 1:
         String inName = inNames.get(args.get(0));
         String name = implementor.fieldMappings.getOrDefault(inName, inName);
-        if(SUPPORTED_AGGREGATIONS.contains(aggregation)) {
+        if (SUPPORTED_AGGREGATIONS.contains(aggregation)) {
           return new Pair<>(aggregation.getName(), name);
         }
       default:
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrRel.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrRel.java
index 3467e4a..db34c43 100644
--- a/solr/core/src/java/org/apache/solr/handler/sql/SolrRel.java
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrRel.java
@@ -23,6 +23,8 @@ import org.apache.calcite.util.Pair;
 
 import java.util.*;
 
+import static org.apache.solr.handler.sql.SolrAggregate.solrAggMetricId;
+
 /**
  * Relational expression that uses Solr calling convention.
  */
@@ -84,9 +86,8 @@ interface SolrRel extends RelNode {
       column = this.fieldMappings.getOrDefault(column, column);
       this.metricPairs.add(new Pair<>(metric, column));
 
-      String metricIdentifier = metric.toLowerCase(Locale.ROOT) + "(" + column + ")";
       if(outName != null) {
-        this.addFieldMapping(outName, metricIdentifier, true);
+        this.addFieldMapping(outName, solrAggMetricId(metric, column), true);
       }
     }
 
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java
index 8397e35..ec7f4db 100644
--- a/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java
@@ -16,8 +16,24 @@
  */
 package org.apache.solr.handler.sql;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 import org.apache.calcite.adapter.java.AbstractQueryableTable;
-import org.apache.calcite.linq4j.*;
+import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.linq4j.Queryable;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.RelNode;
@@ -36,7 +52,6 @@ import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
 import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
 import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
 import org.apache.solr.client.solrj.io.eval.AndEvaluator;
-import org.apache.solr.client.solrj.io.eval.RecursiveBooleanEvaluator;
 import org.apache.solr.client.solrj.io.eval.EqualToEvaluator;
 import org.apache.solr.client.solrj.io.eval.GreaterThanEqualToEvaluator;
 import org.apache.solr.client.solrj.io.eval.GreaterThanEvaluator;
@@ -45,16 +60,33 @@ import org.apache.solr.client.solrj.io.eval.LessThanEvaluator;
 import org.apache.solr.client.solrj.io.eval.NotEvaluator;
 import org.apache.solr.client.solrj.io.eval.OrEvaluator;
 import org.apache.solr.client.solrj.io.eval.RawValueEvaluator;
-import org.apache.solr.client.solrj.io.stream.*;
+import org.apache.solr.client.solrj.io.eval.RecursiveBooleanEvaluator;
+import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
+import org.apache.solr.client.solrj.io.stream.FacetStream;
+import org.apache.solr.client.solrj.io.stream.HavingStream;
+import org.apache.solr.client.solrj.io.stream.ParallelStream;
+import org.apache.solr.client.solrj.io.stream.RankStream;
+import org.apache.solr.client.solrj.io.stream.RollupStream;
+import org.apache.solr.client.solrj.io.stream.SortStream;
+import org.apache.solr.client.solrj.io.stream.StatsStream;
+import org.apache.solr.client.solrj.io.stream.StreamContext;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
+import org.apache.solr.client.solrj.io.stream.UniqueStream;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
-import org.apache.solr.client.solrj.io.stream.metrics.*;
+import org.apache.solr.client.solrj.io.stream.metrics.Bucket;
+import org.apache.solr.client.solrj.io.stream.metrics.CountDistinctMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.Metric;
+import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 
-import java.io.IOException;
-import java.util.*;
-import java.util.stream.Collectors;
+import static org.apache.solr.client.solrj.io.stream.metrics.CountDistinctMetric.APPROX_COUNT_DISTINCT;
+import static org.apache.solr.client.solrj.io.stream.metrics.CountDistinctMetric.COUNT_DISTINCT;
 import static org.apache.solr.common.params.CommonParams.SORT;
 
 /**
@@ -73,6 +105,115 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
     this.collection = collection;
   }
 
+  private static StreamComparator bucketSortComp(List<Bucket> buckets, Map<String, String> dirs) {
+    FieldComparator[] comps = new FieldComparator[buckets.size()];
+    for (int i = 0; i < buckets.size(); i++) {
+      ComparatorOrder comparatorOrder = ComparatorOrder.fromString(dirs.get(buckets.get(i).toString()));
+      String sortKey = buckets.get(i).toString();
+      comps[i] = new FieldComparator(sortKey, comparatorOrder);
+    }
+
+    if (comps.length == 1) {
+      return comps[0];
+    } else {
+      return new MultipleFieldComparator(comps);
+    }
+  }
+
+  private static StreamComparator bucketSortComp(Bucket[] buckets, String dir) {
+    FieldComparator[] comps = new FieldComparator[buckets.length];
+    for (int i = 0; i < buckets.length; i++) {
+      ComparatorOrder comparatorOrder = ascDescComp(dir);
+      String sortKey = buckets[i].toString();
+      comps[i] = new FieldComparator(sortKey, comparatorOrder);
+    }
+
+    if (comps.length == 1) {
+      return comps[0];
+    } else {
+      return new MultipleFieldComparator(comps);
+    }
+  }
+
+  private static String getSortDirection(List<Pair<String, String>> orders) {
+    if (orders != null && orders.size() > 0) {
+      for (Pair<String, String> item : orders) {
+        return item.getValue();
+      }
+    }
+
+    return "asc";
+  }
+
+  private static String bucketSort(Bucket[] buckets, String dir) {
+    StringBuilder buf = new StringBuilder();
+    boolean comma = false;
+    for (Bucket bucket : buckets) {
+      if (comma) {
+        buf.append(",");
+      }
+      buf.append(bucket.toString()).append(" ").append(dir);
+      comma = true;
+    }
+
+    return buf.toString();
+  }
+
+  private static String getPartitionKeys(Bucket[] buckets) {
+    StringBuilder buf = new StringBuilder();
+    boolean comma = false;
+    for (Bucket bucket : buckets) {
+      if (comma) {
+        buf.append(",");
+      }
+      buf.append(bucket.toString());
+      comma = true;
+    }
+    return buf.toString();
+  }
+
+  private static boolean sortsEqual(Bucket[] buckets, String direction, List<Pair<String, String>> orders) {
+
+    if (buckets.length != orders.size()) {
+      return false;
+    }
+
+    for (int i = 0; i < buckets.length; i++) {
+      Bucket bucket = buckets[i];
+      Pair<String, String> order = orders.get(i);
+      if (!bucket.toString().equals(order.getKey())) {
+        return false;
+      }
+
+      if (!order.getValue().toLowerCase(Locale.ROOT).contains(direction.toLowerCase(Locale.ROOT))) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private static FieldComparator[] getComps(List<Pair<String, String>> orders) {
+    FieldComparator[] comps = new FieldComparator[orders.size()];
+    for (int i = 0; i < orders.size(); i++) {
+      Pair<String, String> sortItem = orders.get(i);
+      String ordering = sortItem.getValue();
+      ComparatorOrder comparatorOrder = ascDescComp(ordering);
+      String sortKey = sortItem.getKey();
+      comps[i] = new FieldComparator(sortKey, comparatorOrder);
+    }
+
+    return comps;
+  }
+
+  private static ComparatorOrder ascDescComp(String s) {
+    if (s.toLowerCase(Locale.ROOT).contains("desc")) {
+      return ComparatorOrder.DESCENDING;
+    } else {
+      return ComparatorOrder.ASCENDING;
+    }
+  }
+
   public String toString() {
     return "SolrTable {" + collection + "}";
   }
@@ -83,17 +224,18 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
     }
     return protoRowType.apply(typeFactory);
   }
-  
+
   private Enumerable<Object> query(final Properties properties) {
     return query(properties, Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(),
         Collections.emptyList(), null, null, null, null);
   }
 
-  /** Executes a Solr query on the underlying table.
+  /**
+   * Executes a Solr query on the underlying table.
    *
    * @param properties Connections properties
-   * @param fields List of fields to project
-   * @param query A string for the query
+   * @param fields     List of fields to project
+   * @param query      A string for the query
    * @return Enumerator of results
    */
   private Enumerable<Object> query(final Properties properties,
@@ -115,7 +257,7 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
     if (query == null) {
       q = DEFAULT_QUERY;
     } else {
-      if(negative) {
+      if (negative) {
         q = DEFAULT_QUERY + " AND " + query;
       } else {
         q = query;
@@ -128,30 +270,30 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
       if (metricPairs.isEmpty() && buckets.isEmpty()) {
         tupleStream = handleSelect(zk, collection, q, fields, orders, limit, offset);
       } else {
-        if(buckets.isEmpty()) {
+        if (buckets.isEmpty()) {
           tupleStream = handleStats(zk, collection, q, metricPairs, fields);
         } else {
-          if(mapReduce) {
+          if (mapReduce) {
             tupleStream = handleGroupByMapReduce(zk,
-                                                 collection,
-                                                 properties,
-                                                 fields,
-                                                 q,
-                                                 orders,
-                                                 buckets,
-                                                 metricPairs,
-                                                 limit,
-                                                 havingPredicate);
+                collection,
+                properties,
+                fields,
+                q,
+                orders,
+                buckets,
+                metricPairs,
+                limit,
+                havingPredicate);
           } else {
             tupleStream = handleGroupByFacet(zk,
-                                             collection,
-                                             fields,
-                                             q,
-                                             orders,
-                                             buckets,
-                                             metricPairs,
-                                             limit,
-                                             havingPredicate);
+                collection,
+                fields,
+                q,
+                orders,
+                buckets,
+                metricPairs,
+                limit,
+                havingPredicate);
           }
         }
       }
@@ -173,36 +315,6 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
     };
   }
 
-  private static StreamComparator bucketSortComp(List<Bucket> buckets, Map<String,String> dirs) {
-    FieldComparator[] comps = new FieldComparator[buckets.size()];
-    for(int i=0; i<buckets.size(); i++) {
-      ComparatorOrder comparatorOrder = ComparatorOrder.fromString(dirs.get(buckets.get(i).toString()));
-      String sortKey = buckets.get(i).toString();
-      comps[i] = new FieldComparator(sortKey, comparatorOrder);
-    }
-
-    if(comps.length == 1) {
-      return comps[0];
-    } else {
-      return new MultipleFieldComparator(comps);
-    }
-  }
-
-  private static StreamComparator bucketSortComp(Bucket[] buckets, String dir) {
-    FieldComparator[] comps = new FieldComparator[buckets.length];
-    for(int i=0; i<buckets.length; i++) {
-      ComparatorOrder comparatorOrder = ascDescComp(dir);
-      String sortKey = buckets[i].toString();
-      comps[i] = new FieldComparator(sortKey, comparatorOrder);
-    }
-
-    if(comps.length == 1) {
-      return comps[0];
-    } else {
-      return new MultipleFieldComparator(comps);
-    }
-  }
-
   private String getSortDirection(Map.Entry<String, String> order) {
     String direction = order.getValue();
     return direction == null ? "asc" : direction;
@@ -210,7 +322,7 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
 
   private StreamComparator getComp(List<? extends Map.Entry<String, String>> orders) {
     FieldComparator[] comps = new FieldComparator[orders.size()];
-    for(int i = 0; i < orders.size(); i++) {
+    for (int i = 0; i < orders.size(); i++) {
       Map.Entry<String, String> order = orders.get(i);
       String direction = getSortDirection(order);
       ComparatorOrder comparatorOrder = ComparatorOrder.fromString(direction);
@@ -218,7 +330,7 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
       comps[i] = new FieldComparator(sortKey, comparatorOrder);
     }
 
-    if(comps.length == 1) {
+    if (comps.length == 1) {
       return comps[0];
     } else {
       return new MultipleFieldComparator(comps);
@@ -228,7 +340,7 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
   private List<Metric> buildMetrics(List<Pair<String, String>> metricPairs, boolean ifEmptyCount) {
     List<Metric> metrics = new ArrayList<>(metricPairs.size());
     metrics.addAll(metricPairs.stream().map(this::getMetric).collect(Collectors.toList()));
-    if(metrics.size() == 0 && ifEmptyCount) {
+    if (metrics.size() == 0 && ifEmptyCount) {
       metrics.add(new CountMetric());
     }
     return metrics;
@@ -236,6 +348,10 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
 
   private Metric getMetric(Pair<String, String> metricPair) {
     switch (metricPair.getKey()) {
+      case COUNT_DISTINCT:
+        return new CountDistinctMetric(metricPair.getValue());
+      case APPROX_COUNT_DISTINCT:
+        return new CountDistinctMetric(metricPair.getValue(), true);
       case "COUNT":
         return new CountMetric(metricPair.getValue());
       case "SUM":
@@ -265,13 +381,13 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
     params.add(CommonParams.Q, query);
 
     //Validate the fields
-    for(Map.Entry<String, Class> entry : fields) {
+    for (Map.Entry<String, Class> entry : fields) {
       String fname = entry.getKey();
-      if(limit == null && "score".equals(fname)) {
+      if (limit == null && "score".equals(fname)) {
         throw new IOException("score is not a valid field for unlimited queries.");
       }
 
-      if(fname.contains("*")) {
+      if (fname.contains("*")) {
         throw new IOException("* is not supported for column selection.");
       }
     }
@@ -283,7 +399,7 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
     } else {
       if (limit == null) {
         params.add(SORT, "_version_ desc");
-        fl = fl+",_version_";
+        fl = fl + ",_version_";
       } else {
         params.add(SORT, "score desc");
         if (!fl.contains("score")) {
@@ -293,11 +409,11 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
     }
 
     params.add(CommonParams.FL, fl);
-    
+
     if (offset != null && limit == null) {
       throw new IOException("OFFSET without LIMIT not supported by Solr! Specify desired limit using 'FETCH NEXT <LIMIT> ROWS ONLY'");
     }
-    
+
     if (limit != null) {
       int limitInt = Integer.parseInt(limit);
       // if there's an offset, then we need to fetch offset + limit rows from each shard and then sort accordingly
@@ -323,8 +439,8 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
 
   private String getSort(List<Pair<String, String>> orders) {
     StringBuilder buf = new StringBuilder();
-    for(Pair<String, String> pair : orders) {
-      if(buf.length() > 0) {
+    for (Pair<String, String> pair : orders) {
+      if (buf.length() > 0) {
         buf.append(",");
       }
       buf.append(pair.getKey()).append(" ").append(pair.getValue());
@@ -342,9 +458,9 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
   @SuppressWarnings({"rawtypes"})
   private String getFields(List<Map.Entry<String, Class>> fields) {
     StringBuilder buf = new StringBuilder();
-    for(Map.Entry<String, Class> field : fields) {
+    for (Map.Entry<String, Class> field : fields) {
 
-      if(buf.length() > 0) {
+      if (buf.length() > 0) {
         buf.append(",");
       }
 
@@ -356,9 +472,9 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
 
   private String getFields(Set<String> fieldSet) {
     StringBuilder buf = new StringBuilder();
-    for(String field : fieldSet) {
+    for (String field : fieldSet) {
 
-      if(buf.length() > 0) {
+      if (buf.length() > 0) {
         buf.append(",");
       }
 
@@ -369,18 +485,17 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
     return buf.toString();
   }
 
-
   @SuppressWarnings({"unchecked", "rawtypes"})
   private Set<String> getFieldSet(Metric[] metrics, List<Map.Entry<String, Class>> fields) {
     HashSet set = new HashSet<>();
-    for(Metric metric : metrics) {
-      for(String column : metric.getColumns()) {
+    for (Metric metric : metrics) {
+      for (String column : metric.getColumns()) {
         set.add(column);
       }
     }
 
-    for(Map.Entry<String, Class> field : fields) {
-      if(field.getKey().indexOf('(') == -1) {
+    for (Map.Entry<String, Class> field : fields) {
+      if (field.getKey().indexOf('(') == -1) {
         set.add(field.getKey());
       }
     }
@@ -388,64 +503,6 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
     return set;
   }
 
-  private static String getSortDirection(List<Pair<String, String>> orders) {
-    if(orders != null && orders.size() > 0) {
-      for(Pair<String,String> item : orders) {
-        return item.getValue();
-      }
-    }
-
-    return "asc";
-  }
-
-  private static String bucketSort(Bucket[] buckets, String dir) {
-    StringBuilder buf = new StringBuilder();
-    boolean comma = false;
-    for(Bucket bucket : buckets) {
-      if(comma) {
-        buf.append(",");
-      }
-      buf.append(bucket.toString()).append(" ").append(dir);
-      comma = true;
-    }
-
-    return buf.toString();
-  }
-
-  private static String getPartitionKeys(Bucket[] buckets) {
-    StringBuilder buf = new StringBuilder();
-    boolean comma = false;
-    for(Bucket bucket : buckets) {
-      if(comma) {
-        buf.append(",");
-      }
-      buf.append(bucket.toString());
-      comma = true;
-    }
-    return buf.toString();
-  }
-
-  private static boolean sortsEqual(Bucket[] buckets, String direction, List<Pair<String, String>> orders) {
-
-    if(buckets.length != orders.size()) {
-      return false;
-    }
-
-    for(int i=0; i< buckets.length; i++) {
-      Bucket bucket = buckets[i];
-      Pair<String, String> order = orders.get(i);
-      if(!bucket.toString().equals(order.getKey())) {
-        return false;
-      }
-
-      if(!order.getValue().toLowerCase(Locale.ROOT).contains(direction.toLowerCase(Locale.ROOT))) {
-        return false;
-      }
-    }
-
-    return true;
-  }
-
   @SuppressWarnings({"rawtypes"})
   private TupleStream handleGroupByMapReduce(String zk,
                                              String collection,
@@ -459,7 +516,7 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
                                              final String havingPredicate) throws IOException {
 
     Map<String, Class> fmap = new HashMap<>();
-    for(Map.Entry<String, Class> entry : fields) {
+    for (Map.Entry<String, Class> entry : fields) {
       fmap.put(entry.getKey(), entry.getValue());
     }
 
@@ -468,12 +525,12 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
     Bucket[] buckets = buildBuckets(_buckets, fields);
     Metric[] metrics = buildMetrics(metricPairs, false).toArray(new Metric[0]);
 
-    if(metrics.length == 0) {
+    if (metrics.length == 0) {
       return handleSelectDistinctMapReduce(zk, collection, properties, fields, query, orders, buckets, limit);
     } else {
-      for(Metric metric : metrics) {
+      for (Metric metric : metrics) {
         Class c = fmap.get(metric.getIdentifier());
-        if(Long.class.equals(c)) {
+        if (Long.class.equals(c)) {
           metric.outputLong = true;
         }
       }
@@ -481,7 +538,7 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
 
     Set<String> fieldSet = getFieldSet(metrics, fields);
 
-    if(metrics.length == 0) {
+    if (metrics.length == 0) {
       throw new IOException("Group by queries must include atleast one aggregate function.");
     }
 
@@ -497,7 +554,7 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
     //Always use the /export handler for Group By Queries because it requires exporting full result sets.
     params.set(CommonParams.QT, "/export");
 
-    if(numWorkers > 1) {
+    if (numWorkers > 1) {
       params.set("partitionKeys", getPartitionKeys(buckets));
     }
 
@@ -528,17 +585,16 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
         .withFunctionName("having", HavingStream.class)
         .withFunctionName("gteq", GreaterThanEqualToEvaluator.class);
 
-    if(havingPredicate != null) {
-      RecursiveBooleanEvaluator booleanOperation = (RecursiveBooleanEvaluator)factory.constructEvaluator(StreamExpressionParser.parse(havingPredicate));
+    if (havingPredicate != null) {
+      RecursiveBooleanEvaluator booleanOperation = (RecursiveBooleanEvaluator) factory.constructEvaluator(StreamExpressionParser.parse(havingPredicate));
       tupleStream = new HavingStream(tupleStream, booleanOperation);
     }
 
-    if(numWorkers > 1) {
+    if (numWorkers > 1) {
       // Do the rollups in parallel
       // Maintain the sort of the Tuples coming from the workers.
       StreamComparator comp = bucketSortComp(buckets, sortDirection);
-      @SuppressWarnings("resource")
-      final ParallelStream parallelStream = new ParallelStream(zk, collection, tupleStream, numWorkers, comp);
+      @SuppressWarnings("resource") final ParallelStream parallelStream = new ParallelStream(zk, collection, tupleStream, numWorkers, comp);
 
 
       parallelStream.setStreamFactory(factory);
@@ -548,8 +604,8 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
     //TODO: Currently we are not pushing down the having clause.
     //      We need to push down the having clause to ensure that LIMIT does not cut off records prior to the having filter.
 
-    if(orders != null && orders.size() > 0) {
-      if(!sortsEqual(buckets, sortDirection, orders)) {
+    if (orders != null && orders.size() > 0) {
+      if (!sortsEqual(buckets, sortDirection, orders)) {
         int lim = (limit == null) ? 100 : Integer.parseInt(limit);
         StreamComparator comp = getComp(orders);
         //Rank the Tuples
@@ -559,13 +615,13 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
       } else {
         // Sort is the same as the same as the underlying stream
         // Only need to limit the result, not Rank the result
-        if(limit != null) {
+        if (limit != null) {
           tupleStream = new LimitStream(tupleStream, Integer.parseInt(limit));
         }
       }
     } else {
       //No order by, check for limit
-      if(limit != null) {
+      if (limit != null) {
         tupleStream = new LimitStream(tupleStream, Integer.parseInt(limit));
       }
     }
@@ -577,10 +633,10 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
   private Bucket[] buildBuckets(List<String> buckets, List<Map.Entry<String, Class>> fields) {
     Bucket[] bucketsArray = new Bucket[buckets.size()];
 
-    int i=0;
-    for(Map.Entry<String,Class> field : fields) {
+    int i = 0;
+    for (Map.Entry<String, Class> field : fields) {
       String fieldName = field.getKey();
-      if(buckets.contains(fieldName)) {
+      if (buckets.contains(fieldName)) {
         bucketsArray[i++] = new Bucket(fieldName);
       }
     }
@@ -601,7 +657,7 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
 
 
     Map<String, Class> fmap = new HashMap<>();
-    for(Map.Entry<String, Class> f : fields) {
+    for (Map.Entry<String, Class> f : fields) {
       fmap.put(f.getKey(), f.getValue());
     }
 
@@ -610,13 +666,13 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
 
     Bucket[] buckets = buildBuckets(bucketFields, fields);
     Metric[] metrics = buildMetrics(metricPairs, true).toArray(new Metric[0]);
-    if(metrics.length == 0) {
+    if (metrics.length == 0) {
       metrics = new Metric[1];
       metrics[0] = new CountMetric();
     } else {
-      for(Metric metric : metrics) {
+      for (Metric metric : metrics) {
         Class c = fmap.get(metric.getIdentifier());
-        if(Long.class.equals(c)) {
+        if (Long.class.equals(c)) {
           metric.outputLong = true;
         }
       }
@@ -626,25 +682,24 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
 
     FieldComparator[] sorts = null;
 
-    if(orders == null || orders.size() == 0) {
+    if (orders == null || orders.size() == 0) {
       sorts = new FieldComparator[buckets.length];
-      for(int i=0; i<sorts.length; i++) {
+      for (int i = 0; i < sorts.length; i++) {
         sorts[i] = new FieldComparator("index", ComparatorOrder.ASCENDING);
       }
     } else {
       sorts = getComps(orders);
     }
 
-    int overfetch = (int)(limit * 1.25);
+    int overfetch = (int) (limit * 1.25);
 
     TupleStream tupleStream = new FacetStream(zkHost,
-                                              collection,
-                                              solrParams,
-                                              buckets,
-                                              metrics,
-                                              sorts,
-                                              overfetch);
-
+        collection,
+        solrParams,
+        buckets,
+        metrics,
+        sorts,
+        overfetch);
 
 
     StreamFactory factory = new StreamFactory()
@@ -666,13 +721,12 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
         .withFunctionName("lteq", LessThanEqualToEvaluator.class)
         .withFunctionName("gteq", GreaterThanEqualToEvaluator.class);
 
-    if(havingPredicate != null) {
-      RecursiveBooleanEvaluator booleanOperation = (RecursiveBooleanEvaluator)factory.constructEvaluator(StreamExpressionParser.parse(havingPredicate));
+    if (havingPredicate != null) {
+      RecursiveBooleanEvaluator booleanOperation = (RecursiveBooleanEvaluator) factory.constructEvaluator(StreamExpressionParser.parse(havingPredicate));
       tupleStream = new HavingStream(tupleStream, booleanOperation);
     }
 
-    if(lim != null)
-    {
+    if (lim != null) {
       tupleStream = new LimitStream(tupleStream, limit);
     }
 
@@ -687,7 +741,7 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
                                                     final String query,
                                                     final List<Pair<String, String>> orders,
                                                     final Bucket[] buckets,
-                                                    final String limit) throws IOException{
+                                                    final String limit) throws IOException {
 
     int numWorkers = Integer.parseInt(properties.getProperty("numWorkers", "1"));
 
@@ -697,16 +751,16 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
     StreamEqualitor ecomp = null;
     StreamComparator comp = null;
 
-    if(orders != null && orders.size() > 0) {
+    if (orders != null && orders.size() > 0) {
       StreamComparator[] adjustedSorts = adjustSorts(orders, buckets);
       // Because of the way adjustSorts works we know that each FieldComparator has a single
       // field name. For this reason we can just look at the leftFieldName
       FieldEqualitor[] fieldEqualitors = new FieldEqualitor[adjustedSorts.length];
       StringBuilder buf = new StringBuilder();
-      for(int i=0; i<adjustedSorts.length; i++) {
-        FieldComparator fieldComparator = (FieldComparator)adjustedSorts[i];
+      for (int i = 0; i < adjustedSorts.length; i++) {
+        FieldComparator fieldComparator = (FieldComparator) adjustedSorts[i];
         fieldEqualitors[i] = new FieldEqualitor(fieldComparator.getLeftFieldName());
-        if(i>0) {
+        if (i > 0) {
           buf.append(",");
         }
         buf.append(fieldComparator.getLeftFieldName()).append(" ").append(fieldComparator.getOrder().toString());
@@ -714,7 +768,7 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
 
       sort = buf.toString();
 
-      if(adjustedSorts.length == 1) {
+      if (adjustedSorts.length == 1) {
         ecomp = fieldEqualitors[0];
         comp = adjustedSorts[0];
       } else {
@@ -725,10 +779,10 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
       StringBuilder sortBuf = new StringBuilder();
       FieldEqualitor[] equalitors = new FieldEqualitor[buckets.length];
       StreamComparator[] streamComparators = new StreamComparator[buckets.length];
-      for(int i=0; i<buckets.length; i++) {
+      for (int i = 0; i < buckets.length; i++) {
         equalitors[i] = new FieldEqualitor(buckets[i].toString());
         streamComparators[i] = new FieldComparator(buckets[i].toString(), ComparatorOrder.ASCENDING);
-        if(i>0) {
+        if (i > 0) {
           sortBuf.append(',');
         }
         sortBuf.append(buckets[i].toString()).append(" asc");
@@ -736,7 +790,7 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
 
       sort = sortBuf.toString();
 
-      if(equalitors.length == 1) {
+      if (equalitors.length == 1) {
         ecomp = equalitors[0];
         comp = streamComparators[0];
       } else {
@@ -753,7 +807,7 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
     //Always use the /export handler for Distinct Queries because it requires exporting full result sets.
     params.set(CommonParams.QT, "/export");
 
-    if(numWorkers > 1) {
+    if (numWorkers > 1) {
       params.set("partitionKeys", getPartitionKeys(buckets));
     }
 
@@ -764,11 +818,10 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
     CloudSolrStream cstream = new CloudSolrStream(zkHost, collection, params);
     tupleStream = new UniqueStream(cstream, ecomp);
 
-    if(numWorkers > 1) {
+    if (numWorkers > 1) {
       // Do the unique in parallel
       // Maintain the sort of the Tuples coming from the workers.
-      @SuppressWarnings("resource")
-      final ParallelStream parallelStream = new ParallelStream(zkHost, collection, tupleStream, numWorkers, comp);
+      @SuppressWarnings("resource") final ParallelStream parallelStream = new ParallelStream(zkHost, collection, tupleStream, numWorkers, comp);
 
       StreamFactory factory = new StreamFactory()
           .withFunctionName("search", CloudSolrStream.class)
@@ -779,41 +832,40 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
       tupleStream = parallelStream;
     }
 
-    if(limit != null) {
+    if (limit != null) {
       tupleStream = new LimitStream(tupleStream, Integer.parseInt(limit));
     }
 
     return tupleStream;
   }
 
-
   private StreamComparator[] adjustSorts(List<Pair<String, String>> orders, Bucket[] buckets) throws IOException {
     List<FieldComparator> adjustedSorts = new ArrayList<>();
     Set<String> bucketFields = new HashSet<>();
     Set<String> sortFields = new HashSet<>();
 
     ComparatorOrder comparatorOrder = ComparatorOrder.ASCENDING;
-    for(Pair<String, String> order : orders) {
+    for (Pair<String, String> order : orders) {
       sortFields.add(order.getKey());
       adjustedSorts.add(new FieldComparator(order.getKey(), ascDescComp(order.getValue())));
       comparatorOrder = ascDescComp(order.getValue());
     }
 
-    for(Bucket bucket : buckets) {
+    for (Bucket bucket : buckets) {
       bucketFields.add(bucket.toString());
     }
 
-    for(String sf : sortFields) {
-      if(!bucketFields.contains(sf)) {
+    for (String sf : sortFields) {
+      if (!bucketFields.contains(sf)) {
         throw new IOException("All sort fields must be in the field list.");
       }
     }
 
     //Add sort fields if needed
-    if(sortFields.size() < buckets.length) {
-      for(Bucket bucket : buckets) {
+    if (sortFields.size() < buckets.length) {
+      for (Bucket bucket : buckets) {
         String b = bucket.toString();
-        if(!sortFields.contains(b)) {
+        if (!sortFields.contains(b)) {
           adjustedSorts.add(new FieldComparator(bucket.toString(), comparatorOrder));
         }
       }
@@ -831,7 +883,7 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
 
 
     Map<String, Class> fmap = new HashMap<>();
-    for(Map.Entry<String, Class> entry : fields) {
+    for (Map.Entry<String, Class> entry : fields) {
       fmap.put(entry.getKey(), entry.getValue());
     }
 
@@ -839,9 +891,9 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
     solrParams.add(CommonParams.Q, query);
     Metric[] metrics = buildMetrics(metricPairs, false).toArray(new Metric[0]);
 
-    for(Metric metric : metrics) {
+    for (Metric metric : metrics) {
       Class c = fmap.get(metric.getIdentifier());
-      if(Long.class.equals(c)) {
+      if (Long.class.equals(c)) {
         metric.outputLong = true;
       }
     }
@@ -865,8 +917,7 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
     }
 
     public Enumerator<T> enumerator() {
-      @SuppressWarnings("unchecked")
-      final Enumerable<T> enumerable = (Enumerable<T>) getTable().query(getProperties());
+      @SuppressWarnings("unchecked") final Enumerable<T> enumerable = (Enumerable<T>) getTable().query(getProperties());
       return enumerable.enumerator();
     }
 
@@ -878,35 +929,15 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
       return schema.unwrap(SolrSchema.class).properties;
     }
 
-    /** Called via code-generation.
+    /**
+     * Called via code-generation.
      *
      * @see SolrMethod#SOLR_QUERYABLE_QUERY
      */
-    @SuppressWarnings({"rawtypes","UnusedDeclaration"})
+    @SuppressWarnings({"rawtypes", "UnusedDeclaration"})
     public Enumerable<Object> query(List<Map.Entry<String, Class>> fields, String query, List<Pair<String, String>> order,
                                     List<String> buckets, List<Pair<String, String>> metricPairs, String limit, String negativeQuery, String havingPredicate, String offset) {
       return getTable().query(getProperties(), fields, query, order, buckets, metricPairs, limit, negativeQuery, havingPredicate, offset);
     }
   }
-
-  private static FieldComparator[] getComps(List<Pair<String, String>> orders) {
-    FieldComparator[] comps = new FieldComparator[orders.size()];
-    for(int i=0; i<orders.size(); i++) {
-      Pair<String,String> sortItem = orders.get(i);
-      String ordering = sortItem.getValue();
-      ComparatorOrder comparatorOrder = ascDescComp(ordering);
-      String sortKey = sortItem.getKey();
-      comps[i] = new FieldComparator(sortKey, comparatorOrder);
-    }
-
-    return comps;
-  }
-
-  private static ComparatorOrder ascDescComp(String s) {
-    if(s.toLowerCase(Locale.ROOT).contains("desc")) {
-      return ComparatorOrder.DESCENDING;
-    } else {
-      return ComparatorOrder.ASCENDING;
-    }
-  }
 }
diff --git a/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java b/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
index 4c67a3b..c8689dc 100644
--- a/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
+++ b/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
@@ -2227,4 +2227,44 @@ public class TestSQLHandler extends SolrCloudTestCase {
     // Solr doesn't support OFFSET w/o LIMIT
     expectThrows(IOException.class, () -> expectResults("SELECT id FROM $ALIAS ORDER BY id DESC OFFSET 5", 5));
   }
+
+  @Test
+  public void testCountDistinct() throws Exception {
+    UpdateRequest updateRequest = new UpdateRequest();
+    final int cardinality = 5;
+    final int maxDocs = 100; // keep this an even # b/c we divide by 2 in this test
+    final String padFmt = "%03d";
+    for (int i = 0; i < maxDocs; i++) {
+      updateRequest = addDocForDistinctTests(i, updateRequest, cardinality, padFmt);
+    }
+    updateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    List<Tuple> tuples = expectResults("SELECT COUNT(1) AS total_rows, COUNT(distinct str_s) AS distinct_str, MIN(str_s) AS min_str, MAX(str_s) AS max_str FROM $ALIAS", 1);
+    Tuple firstRow = tuples.get(0);
+    assertEquals(maxDocs, (long) firstRow.getLong("total_rows"));
+    assertEquals(cardinality, (long) firstRow.getLong("distinct_str"));
+
+    String expectedMin = String.format(Locale.ROOT, padFmt, 0);
+    String expectedMax = String.format(Locale.ROOT, padFmt, cardinality - 1); // max is card-1
+    assertEquals(expectedMin, firstRow.getString("min_str"));
+    assertEquals(expectedMax, firstRow.getString("max_str"));
+
+    tuples = expectResults("SELECT DISTINCT str_s FROM $ALIAS ORDER BY str_s ASC", cardinality);
+    for (int t = 0; t < tuples.size(); t++) {
+      assertEquals(String.format(Locale.ROOT, padFmt, t), tuples.get(t).getString("str_s"));
+    }
+
+    tuples = expectResults("SELECT APPROX_COUNT_DISTINCT(distinct str_s) AS approx_distinct FROM $ALIAS", 1);
+    firstRow = tuples.get(0);
+    assertEquals(cardinality, (long) firstRow.getLong("approx_distinct"));
+
+    tuples = expectResults("SELECT country_s, COUNT(*) AS count_per_bucket FROM $ALIAS GROUP BY country_s", 2);
+    assertEquals(maxDocs/2L, (long)tuples.get(0).getLong("count_per_bucket"));
+    assertEquals(maxDocs/2L, (long)tuples.get(1).getLong("count_per_bucket"));
+  }
+
+  private UpdateRequest addDocForDistinctTests(int id, UpdateRequest updateRequest, int cardinality, String padFmt) {
+    String country = id % 2 == 0 ? "US" : "CA";
+    return updateRequest.add("id", String.valueOf(id), "str_s", String.format(Locale.ROOT, padFmt, id % cardinality), "country_s", country);
+  }
 }
diff --git a/solr/solr-ref-guide/src/parallel-sql-interface.adoc b/solr/solr-ref-guide/src/parallel-sql-interface.adoc
index 640608f..5f8ff0b 100644
--- a/solr/solr-ref-guide/src/parallel-sql-interface.adoc
+++ b/solr/solr-ref-guide/src/parallel-sql-interface.adoc
@@ -256,14 +256,14 @@ The parallel SQL interface supports and pushes down most common SQL operators, s
 |IN |Specify multiple values (shorthand for multiple OR clasues) |`fielda IN (10,20,30)` |`(fielda:10 OR fielda:20 OR fielda:30)`
 |LIKE |Wildcard match on string or text fields |`fielda LIKE 'day%'` |`{!complexphrase}fielda:"day*"`
 |BETWEEN |Range match |`fielda BETWEEN 2 AND 4` |`fielda: [2 TO 4]`
-|IS NULL |Match columns with null value |`fielda IS NULL` |`(*:* -field:*)`
+|IS NULL |Match columns with null value |`fielda IS NULL` |+++(*:* -field:*)+++
 |IS NOT NULL |Match columns with value |`fielda IS NOT NULL` |`field:*`
 |===
 
 * IN, LIKE, BETWEEN support the NOT keyword to find rows where the condition is not true, such as `fielda NOT LIKE 'day%'`
 * String literals must be wrapped in single-quotes; double-quotes indicate database objects and not a string literal.
 * A simplistic LIKE can be used with an asterisk wildcard, such as `field = 'sam*'`; this is Solr specific and not part of the SQL standard.
-* When performing ANDed range queries over a multi-valued field, Apache Calcite short-circuits to zero results if the ANDed predicates appear to be disjoint sets. For example, +++b_is \<= 2 AND b_is >= 5+++ appears to Calcite to be disjoint sets, which they are from a single-valued field perspective. However, this may not be the case with multi-valued fields, as Solr might match documents. The work-around is to use Solr query syntax directly inside of an equals expression wrapped in pare [...]
+* * When performing ANDed range queries over a multi-valued field, Apache Calcite short-circuits to zero results if the ANDed predicates appear to be disjoint sets. For example, +++b_is \<= 2 AND b_is >= 5+++ appears to Calcite to be disjoint sets, which they are from a single-valued field perspective. However, this may not be the case with multi-valued fields, as Solr might match documents. The work-around is to use Solr query syntax directly inside of an equals expression wrapped in pa [...]
 
 === ORDER BY Clause
 
@@ -316,15 +316,18 @@ SELECT distinct fieldA as fa, fieldB as fb FROM tableA ORDER BY fa desc, fb desc
 
 === Statistical Functions
 
-The SQL interface supports simple statistics calculated on numeric fields. The supported functions are `count(*)`, `min`, `max`, `sum`, and `avg`.
+The SQL interface supports simple statistics calculated on numeric fields.
+The supported functions are `COUNT(*)`, `COUNT(DISTINCT field)`, `APPROX_COUNT_DISTINCT(field)`, `MIN`, `MAX`, `SUM`, and `AVG`.
 
 Because these functions never require data to be shuffled, the aggregations are pushed down into the search engine and are generated by the <<the-stats-component.adoc#,StatsComponent>>.
 
 [source,sql]
 ----
-SELECT count(*) as count, sum(fieldB) as sum FROM tableA WHERE fieldC = 'Hello'
+SELECT COUNT(*) as count, SUM(fieldB) as sum FROM tableA WHERE fieldC = 'Hello'
 ----
 
+The `APPROX_COUNT_DISTINCT` metric uses Solr's HyperLogLog (hll) statistical function to compute an approximate cardinality for the given field and should be used when query performance is important and an exact count is not needed.
+
 === GROUP BY Aggregations
 
 The SQL interface also supports `GROUP BY` aggregate queries.
@@ -337,8 +340,13 @@ Here is a basic example of a GROUP BY query that requests aggregations:
 
 [source,sql]
 ----
-SELECT fieldA as fa, fieldB as fb, count(*) as count, sum(fieldC) as sum, avg(fieldY) as avg FROM tableA WHERE fieldC = 'term1 term2'
-GROUP BY fa, fb HAVING sum > 1000 ORDER BY sum asc LIMIT 100
+  SELECT fieldA as fa, fieldB as fb, COUNT(*) as count, SUM(fieldC) as sum, AVG(fieldY) as avg
+    FROM tableA
+   WHERE fieldC = 'term1 term2'
+GROUP BY fa, fb
+  HAVING sum > 1000
+ORDER BY sum asc
+   LIMIT 100
 ----
 
 Let's break this down into pieces:
@@ -347,27 +355,30 @@ Let's break this down into pieces:
 
 The Column Identifiers can contain both fields in the Solr index and aggregate functions. The supported aggregate functions are:
 
-* `count(*)`: Counts the number of records over a set of buckets.
-* `sum(field)`: Sums a numeric field over over a set of buckets.
-* `avg(field)`: Averages a numeric field over a set of buckets.
-* `min(field)`: Returns the min value of a numeric field over a set of buckets.
-* `max:(field)`: Returns the max value of a numerics over a set of buckets.
+* `COUNT(*)`: Counts the number of records over a set of buckets.
+* `SUM(field)`: Sums a numeric field over over a set of buckets.
+* `AVG(field)`: Averages a numeric field over a set of buckets.
+* `MIN(field)`: Returns the min value of a numeric field over a set of buckets.
+* `MAX(field)`: Returns the max value of a numerics over a set of buckets.
 
 The non-function fields in the field list determine the fields to calculate the aggregations over.
 
+Computing the number of distinct values for a specific field within each group using `COUNT(DISTINCT <field>)` is not currently supported by Solr;
+only `COUNT(*)` can be computed for each GROUP BY dimension.
+
 === HAVING Clause
 
 The `HAVING` clause may contain any function listed in the field list. Complex `HAVING` clauses such as this are supported:
 
 [source,sql]
 ----
-SELECT fieldA, fieldB, count(*), sum(fieldC), avg(fieldY)
-FROM tableA
-WHERE fieldC = 'term1 term2'
+  SELECT fieldA, fieldB, COUNT(*), SUM(fieldC), AVG(fieldY)
+    FROM tableA
+   WHERE fieldC = 'term1 term2'
 GROUP BY fieldA, fieldB
-HAVING ((sum(fieldC) > 1000) AND (avg(fieldY) <= 10))
-ORDER BY sum(fieldC) asc
-LIMIT 100
+  HAVING ((SUM(fieldC) > 1000) AND (AVG(fieldY) <= 10))
+ORDER BY SUM(fieldC) ASC
+   LIMIT 100
 ----
 
 == Best Practices
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/CountDistinctMetric.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/CountDistinctMetric.java
index 623fc22..e9e9bc3 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/CountDistinctMetric.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/CountDistinctMetric.java
@@ -26,10 +26,17 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 
 public class CountDistinctMetric extends Metric {
 
+    public static final String COUNT_DISTINCT = "countDist";
+    public static final String APPROX_COUNT_DISTINCT = "hll";
+
     private String columnName;
 
     public CountDistinctMetric(String columnName){
-        init("countDist", columnName);
+        this(columnName, false);
+    }
+
+    public CountDistinctMetric(String columnName, boolean isApproximate){
+        init(isApproximate ? APPROX_COUNT_DISTINCT : COUNT_DISTINCT, columnName);
     }
 
     public CountDistinctMetric(StreamExpression expression, StreamFactory factory) throws IOException{
@@ -58,7 +65,7 @@ public class CountDistinctMetric extends Metric {
     }
 
     public Metric newInstance() {
-        return new MeanMetric(columnName, outputLong);
+        return new CountDistinctMetric(columnName);
     }
 
     public String[] getColumns() {
@@ -66,8 +73,8 @@ public class CountDistinctMetric extends Metric {
     }
 
     public Number getValue() {
-       //No op for now
-       return null;
+        //No op for now
+        return null;
     }
 
     @Override
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/CountMetric.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/CountMetric.java
index 093b95e..681e3ca 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/CountMetric.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/CountMetric.java
@@ -58,6 +58,7 @@ public class CountMetric extends Metric {
   private void init(String functionName, String columnName){
     this.columnName = columnName;
     this.isAllColumns = "*".equals(this.columnName);
+    this.outputLong = true;
     setFunctionName(functionName);
     setIdentifier(functionName, "(", columnName, ")");
   }