You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2015/10/09 23:34:44 UTC

svn commit: r1707819 - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/handler/ core/src/test/org/apache/solr/handler/ solrj/src/java/org/apache/solr/client/solrj/io/ solrj/src/java/org/apache/solr/client/solrj/io/stream/

Author: jbernste
Date: Fri Oct  9 21:34:43 2015
New Revision: 1707819

URL: http://svn.apache.org/viewvc?rev=1707819&view=rev
Log:
SOLR-8086: Add support for SELECT DISTINCT queries to the SQL interface

Added:
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/EditStream.java   (with props)
Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1707819&r1=1707818&r2=1707819&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Fri Oct  9 21:34:43 2015
@@ -71,6 +71,8 @@ New Features
 
 * SOLR-8038: Add the StatsStream to the Streaming API and wire it into the SQLHandler (Joel Bernstein)
 
+* SOLR-8086: Add support for SELECT DISTINCT queries to the SQL interface (Joel Bernstein) 
+
 * SOLR-7543: Basic graph traversal query
   Example: {!graph from="node_id" to="edge_id"}id:doc_1
   (Kevin Watters, yonik)

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SQLHandler.java?rev=1707819&r1=1707818&r2=1707819&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SQLHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SQLHandler.java Fri Oct  9 21:34:43 2015
@@ -33,15 +33,20 @@ import org.apache.solr.client.solrj.io.c
 import org.apache.solr.client.solrj.io.comp.FieldComparator;
 import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
+import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
+import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
 import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
 import org.apache.solr.client.solrj.io.stream.FacetStream;
 import org.apache.solr.client.solrj.io.stream.ParallelStream;
 import org.apache.solr.client.solrj.io.stream.RankStream;
+import org.apache.solr.client.solrj.io.stream.EditStream;
 import org.apache.solr.client.solrj.io.stream.RollupStream;
 import org.apache.solr.client.solrj.io.stream.StatsStream;
 import org.apache.solr.client.solrj.io.stream.StreamContext;
 import org.apache.solr.client.solrj.io.stream.TupleStream;
 import org.apache.solr.client.solrj.io.stream.ExceptionStream;
+import org.apache.solr.client.solrj.io.stream.UniqueStream;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 import org.apache.solr.client.solrj.io.stream.metrics.*;
 import org.apache.solr.common.SolrException;
@@ -66,6 +71,12 @@ public class SQLHandler extends RequestH
 
   private static String defaultZkhost = null;
   private static String defaultWorkerCollection = null;
+  private static List<String> remove;
+
+  static {
+    remove = new ArrayList();
+    remove.add("count(*)");
+  }
 
   private Logger logger = LoggerFactory.getLogger(SQLHandler.class);
 
@@ -144,6 +155,12 @@ public class SQLHandler extends RequestH
         } else {
           sqlStream = doGroupByWithAggregates(sqlVistor, numWorkers, workerCollection, workerZkhost);
         }
+      } else if(sqlVistor.isDistinct) {
+        if(aggregationMode == AggregationMode.FACET) {
+          sqlStream = doSelectDistinctFacets(sqlVistor);
+        } else {
+          sqlStream = doSelectDistinct(sqlVistor, numWorkers, workerCollection, workerZkhost);
+        }
       } else {
         sqlStream = doSelect(sqlVistor);
       }
@@ -238,6 +255,200 @@ public class SQLHandler extends RequestH
     return tupleStream;
   }
 
+  private static TupleStream doSelectDistinct(SQLVisitor sqlVisitor,
+                                              int numWorkers,
+                                              String workerCollection,
+                                              String workerZkHost) throws IOException {
+
+    Set<String> fieldSet = new HashSet();
+    Bucket[] buckets = getBuckets(sqlVisitor.fields, fieldSet);
+    Metric[] metrics = getMetrics(sqlVisitor.fields, fieldSet);
+
+    if(metrics.length > 0) {
+      throw new IOException("Select Distinct queries cannot include aggregate functions.");
+    }
+
+    String fl = fields(fieldSet);
+
+    String sort = null;
+    StreamEqualitor ecomp = null;
+    StreamComparator comp = null;
+
+    if(sqlVisitor.sorts != null && sqlVisitor.sorts.size() > 0) {
+      StreamComparator[] adjustedSorts = adjustSorts(sqlVisitor.sorts, buckets);
+      FieldEqualitor[] fieldEqualitors = new FieldEqualitor[adjustedSorts.length];
+      StringBuilder buf = new StringBuilder();
+      for(int i=0; i<adjustedSorts.length; i++) {
+        FieldComparator fieldComparator = (FieldComparator)adjustedSorts[i];
+        fieldEqualitors[i] = new FieldEqualitor(fieldComparator.getFieldName());
+        if(i>0) {
+          buf.append(",");
+        }
+        buf.append(fieldComparator.getFieldName()).append(" ").append(fieldComparator.getOrder().toString());
+      }
+
+      sort = buf.toString();
+
+      if(adjustedSorts.length == 1) {
+        ecomp = fieldEqualitors[0];
+        comp = adjustedSorts[0];
+      } else {
+        ecomp = new MultipleFieldEqualitor(fieldEqualitors);
+        comp = new MultipleFieldComparator(adjustedSorts);
+      }
+    } else {
+      StringBuilder sortBuf = new StringBuilder();
+      FieldEqualitor[] equalitors = new FieldEqualitor[buckets.length];
+      StreamComparator[] streamComparators = new StreamComparator[buckets.length];
+      for(int i=0; i<buckets.length; i++) {
+        equalitors[i] = new FieldEqualitor(buckets[i].toString());
+        streamComparators[i] = new FieldComparator(buckets[i].toString(), ComparatorOrder.ASCENDING);
+        if(i>0) {
+          sortBuf.append(',');
+        }
+        sortBuf.append(buckets[i].toString()).append(" asc");
+      }
+
+      sort = sortBuf.toString();
+
+      if(equalitors.length == 1) {
+        ecomp = equalitors[0];
+        comp = streamComparators[0];
+      } else {
+        ecomp = new MultipleFieldEqualitor(equalitors);
+        comp = new MultipleFieldComparator(streamComparators);
+      }
+    }
+
+    TableSpec tableSpec = new TableSpec(sqlVisitor.table, defaultZkhost);
+
+    String zkHost = tableSpec.zkHost;
+    String collection = tableSpec.collection;
+    Map<String, String> params = new HashMap();
+
+    params.put(CommonParams.FL, fl);
+    params.put(CommonParams.Q, sqlVisitor.query);
+    //Always use the /export handler for Distinct Queries because it requires exporting full result sets.
+    params.put(CommonParams.QT, "/export");
+
+    if(numWorkers > 1) {
+      params.put("partitionKeys", getPartitionKeys(buckets));
+    }
+
+    params.put("sort", sort);
+
+    TupleStream tupleStream = null;
+
+    CloudSolrStream cstream = new CloudSolrStream(zkHost, collection, params);
+    tupleStream = new UniqueStream(cstream, ecomp);
+
+    if(numWorkers > 1) {
+      // Do the unique in parallel
+      // Maintain the sort of the Tuples coming from the workers.
+      ParallelStream parallelStream = new ParallelStream(workerZkHost, workerCollection, tupleStream, numWorkers, comp);
+
+      StreamFactory factory = new StreamFactory()
+          .withFunctionName("search", CloudSolrStream.class)
+          .withFunctionName("parallel", ParallelStream.class)
+          .withFunctionName("unique", UniqueStream.class);
+
+      parallelStream.setStreamFactory(factory);
+      parallelStream.setObjectSerialize(false);
+      tupleStream = parallelStream;
+    }
+
+    if(sqlVisitor.limit > 0) {
+      tupleStream = new LimitStream(tupleStream, sqlVisitor.limit);
+    }
+
+    return tupleStream;
+  }
+
+  private static StreamComparator[] adjustSorts(List<SortItem> sorts, Bucket[] buckets) throws IOException {
+    List<FieldComparator> adjustedSorts = new ArrayList();
+    Set<String> bucketFields = new HashSet();
+    Set<String> sortFields = new HashSet();
+
+    for(SortItem sortItem : sorts) {
+      sortFields.add(stripSingleQuotes(stripQuotes(sortItem.getSortKey().toString())));
+      adjustedSorts.add(new FieldComparator(stripSingleQuotes(stripQuotes(sortItem.getSortKey().toString())),
+                                            ascDescComp(sortItem.getOrdering().toString())));
+    }
+
+    for(Bucket bucket : buckets) {
+      bucketFields.add(bucket.toString());
+    }
+
+    for(SortItem sortItem : sorts) {
+      String sortField = stripSingleQuotes(stripQuotes(sortItem.getSortKey().toString()));
+      if(!bucketFields.contains(sortField)) {
+        throw new IOException("All sort fields must be in the field list.");
+      }
+    }
+
+    //Add sort fields if needed
+    if(sorts.size() < buckets.length) {
+      for(Bucket bucket : buckets) {
+        String b = bucket.toString();
+        if(!sortFields.contains(b)) {
+          adjustedSorts.add(new FieldComparator(bucket.toString(), ComparatorOrder.ASCENDING));
+        }
+      }
+    }
+
+    return adjustedSorts.toArray(new FieldComparator[adjustedSorts.size()]);
+  }
+
+  private static TupleStream doSelectDistinctFacets(SQLVisitor sqlVisitor) throws IOException {
+
+    Set<String> fieldSet = new HashSet();
+    Bucket[] buckets = getBuckets(sqlVisitor.fields, fieldSet);
+    Metric[] metrics = getMetrics(sqlVisitor.fields, fieldSet);
+
+    if(metrics.length > 0) {
+      throw new IOException("Select Distinct queries cannot include aggregate functions.");
+    }
+
+    TableSpec tableSpec = new TableSpec(sqlVisitor.table, defaultZkhost);
+
+    String zkHost = tableSpec.zkHost;
+    String collection = tableSpec.collection;
+    Map<String, String> params = new HashMap();
+
+    params.put(CommonParams.Q, sqlVisitor.query);
+
+    int limit = sqlVisitor.limit > 0 ? sqlVisitor.limit : 100;
+
+    FieldComparator[] sorts = null;
+
+    if(sqlVisitor.sorts == null) {
+      sorts = new FieldComparator[buckets.length];
+      for(int i=0; i<sorts.length; i++) {
+        sorts[i] = new FieldComparator("index", ComparatorOrder.ASCENDING);
+      }
+    } else {
+      StreamComparator[] comps = adjustSorts(sqlVisitor.sorts, buckets);
+      sorts = new FieldComparator[comps.length];
+      for(int i=0; i<comps.length; i++) {
+        sorts[i] = (FieldComparator)comps[i];
+      }
+    }
+
+    TupleStream tupleStream = new FacetStream(zkHost,
+                                              collection,
+                                              params,
+                                              buckets,
+                                              metrics,
+                                              sorts,
+                                              limit);
+
+    if(sqlVisitor.limit > 0) {
+      tupleStream = new LimitStream(tupleStream, sqlVisitor.limit);
+    }
+
+    return new EditStream(tupleStream, remove);
+  }
+
   private static TupleStream doGroupByWithAggregatesFacets(SQLVisitor sqlVisitor) throws IOException {
 
     Set<String> fieldSet = new HashSet();
@@ -344,7 +555,7 @@ public class SQLHandler extends RequestH
         if (comma) {
           siBuf.append(",");
         }
-        siBuf.append(stripQuotes(sortItem.getSortKey().toString()) + " " + ascDesc(sortItem.getOrdering().toString()));
+        siBuf.append(stripSingleQuotes(stripQuotes(sortItem.getSortKey().toString())) + " " + ascDesc(sortItem.getOrdering().toString()));
       }
     } else {
       if(sqlVisitor.limit < 0) {
@@ -388,7 +599,7 @@ public class SQLHandler extends RequestH
     for(int i=0; i< buckets.length; i++) {
       Bucket bucket = buckets[i];
       SortItem sortItem = sortItems.get(i);
-      if(!bucket.toString().equals(stripQuotes(sortItem.getSortKey().toString()))) {
+      if(!bucket.toString().equals(stripSingleQuotes(stripQuotes(sortItem.getSortKey().toString())))) {
         return false;
       }
 
@@ -453,7 +664,7 @@ public class SQLHandler extends RequestH
   public static String getSortDirection(List<SortItem> sorts) {
     if(sorts != null && sorts.size() > 0) {
       for(SortItem item : sorts) {
-        return ascDesc(stripQuotes(item.getOrdering().toString()));
+        return ascDesc(stripSingleQuotes(stripQuotes(item.getOrdering().toString())));
       }
     }
 
@@ -482,7 +693,7 @@ public class SQLHandler extends RequestH
       String ordering = sortItem.getOrdering().toString();
       ComparatorOrder comparatorOrder = ascDescComp(ordering);
       String sortKey = sortItem.getSortKey().toString();
-      comps[i] = new FieldComparator(stripQuotes(sortKey), comparatorOrder);
+      comps[i] = new FieldComparator(stripSingleQuotes(stripQuotes(sortKey)), comparatorOrder);
     }
 
     if(comps.length == 1) {
@@ -499,7 +710,7 @@ public class SQLHandler extends RequestH
       String ordering = sortItem.getOrdering().toString();
       ComparatorOrder comparatorOrder = ascDescComp(ordering);
       String sortKey = sortItem.getSortKey().toString();
-      comps[i] = new FieldComparator(stripQuotes(sortKey), comparatorOrder);
+      comps[i] = new FieldComparator(stripSingleQuotes(stripQuotes(sortKey)), comparatorOrder);
     }
 
     return comps;
@@ -653,7 +864,7 @@ public class SQLHandler extends RequestH
         value = '"'+value+'"';
       }
 
-      buf.append('(').append(stripQuotes(field) + ":" + value).append(')');
+      buf.append('(').append(stripQuotes(stripSingleQuotes(field)) + ":" + value).append(')');
       return null;
     }
   }
@@ -668,6 +879,7 @@ public class SQLHandler extends RequestH
     public int limit = -1;
     public boolean groupByQuery;
     public Expression havingExpression;
+    public boolean isDistinct;
 
     public SQLVisitor(StringBuilder builder) {
       this.builder = builder;
@@ -731,8 +943,7 @@ public class SQLHandler extends RequestH
         this.groupByQuery = true;
         List<Expression> groups = node.getGroupBy();
         for(Expression group : groups) {
-          groupBy.add(stripQuotes(group.toString()));
-
+          groupBy.add(stripSingleQuotes(stripQuotes(group.toString())));
         }
       }
 
@@ -756,14 +967,14 @@ public class SQLHandler extends RequestH
     protected Void visitComparisonExpression(ComparisonExpression node, Integer index) {
       String field = node.getLeft().toString();
       String value = node.getRight().toString();
-      query = stripQuotes(field)+":"+stripQuotes(value);
+      query = stripSingleQuotes(stripQuotes(field))+":"+stripQuotes(value);
       return null;
     }
 
     protected Void visitSelect(Select node, Integer indent) {
       this.append(indent.intValue(), "SELECT");
       if(node.isDistinct()) {
-
+        this.isDistinct = true;
       }
 
       if(node.getSelectItems().size() > 1) {
@@ -781,7 +992,7 @@ public class SQLHandler extends RequestH
     }
 
     protected Void visitSingleColumn(SingleColumn node, Integer indent) {
-      fields.add(stripQuotes(ExpressionFormatter.formatExpression(node.getExpression())));
+      fields.add(stripSingleQuotes(stripQuotes(ExpressionFormatter.formatExpression(node.getExpression()))));
 
       if(node.getAlias().isPresent()) {
       }
@@ -794,7 +1005,7 @@ public class SQLHandler extends RequestH
     }
 
     protected Void visitTable(Table node, Integer indent) {
-      this.table = node.getName().toString();
+      this.table = stripSingleQuotes(node.getName().toString());
       return null;
     }
 
@@ -892,7 +1103,7 @@ public class SQLHandler extends RequestH
     }
   }
 
-  public static class HavingStream extends TupleStream {
+  private static class HavingStream extends TupleStream {
 
     private TupleStream stream;
     private HavingVisitor havingVisitor;

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java?rev=1707819&r1=1707818&r2=1707819&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java Fri Oct  9 21:34:43 2015
@@ -28,14 +28,7 @@ import com.facebook.presto.sql.tree.Stat
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.stream.ExceptionStream;
 import org.apache.solr.client.solrj.io.stream.SolrStream;
-import org.apache.solr.client.solrj.io.stream.StatsStream;
 import org.apache.solr.client.solrj.io.stream.TupleStream;
-import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
-import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
-import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
-import org.apache.solr.client.solrj.io.stream.metrics.Metric;
-import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
-import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
 import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
 import org.apache.solr.common.params.CommonParams;
 import org.junit.After;
@@ -95,13 +88,17 @@ public class TestSQLHandler extends Abst
     waitForRecoveriesToFinish(false);
     testPredicate();
     testBasicSelect();
+    testStringLiteralFields();
     testBasicGrouping();
     testBasicGroupingFacets();
+    testSelectDistinct();
+    testSelectDistinctFacets();
     testAggregatesWithoutGrouping();
     testSQLException();
     testTimeSeriesGrouping();
     testTimeSeriesGroupingFacet();
     testParallelBasicGrouping();
+    testParallelSelectDistinct();
     testParallelTimeSeriesGrouping();
   }
 
@@ -124,6 +121,15 @@ public class TestSQLHandler extends Abst
 
     assert(sqlVistor.query.equals("(c:\"d\")"));
 
+
+    //Upper case
+    parser = new SqlParser();
+    sql = "select a from b where ('CcC' = 'D')";
+    statement = parser.createStatement(sql);
+    sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
+    sqlVistor.process(statement, new Integer(0));
+    assert(sqlVistor.query.equals("(CcC:\"D\")"));
+
     //Phrase
     parser = new SqlParser();
     sql = "select a from b where (c = 'd d')";
@@ -200,11 +206,11 @@ public class TestSQLHandler extends Abst
 
     // Complex Lucene/Solr Query
     parser = new SqlParser();
-    sql = "select a from b where ((c = '[0 TO 100]') OR ((l = '(z*)') AND (m = '(j OR (k NOT s))')))";
+    sql = "select a from b where (('c' = '[0 TO 100]') OR ((l = '(z*)') AND ('M' = '(j OR (k NOT s))')))";
     statement = parser.createStatement(sql);
     sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
     sqlVistor.process(statement, new Integer(0));
-    assert(sqlVistor.query.equals("((c:[0 TO 100]) OR ((l:(z*)) AND (m:(j OR (k NOT s)))))"));
+    assert(sqlVistor.query.equals("((c:[0 TO 100]) OR ((l:(z*)) AND (M:(j OR (k NOT s)))))"));
   }
 
   private void testBasicSelect() throws Exception {
@@ -216,18 +222,18 @@ public class TestSQLHandler extends Abst
 
       commit();
 
-      indexr("id", "1", "text", "XXXX XXXX", "str_s", "a", "field_i", "7");
-      indexr("id", "2", "text", "XXXX XXXX", "str_s", "b", "field_i", "8");
-      indexr("id", "3", "text", "XXXX XXXX", "str_s", "a", "field_i", "20");
-      indexr("id", "4", "text", "XXXX XXXX", "str_s", "b", "field_i", "11");
-      indexr("id", "5", "text", "XXXX XXXX", "str_s", "c", "field_i", "30");
-      indexr("id", "6", "text", "XXXX XXXX", "str_s", "c", "field_i", "40");
-      indexr("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50");
-      indexr("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60");
+      indexDoc(sdoc("id", "1", "text", "XXXX XXXX", "str_s", "a", "field_i", "7"));
+      indexDoc(sdoc("id", "2", "text", "XXXX XXXX", "str_s", "b", "field_i", "8"));
+      indexDoc(sdoc("id", "3", "text", "XXXX XXXX", "str_s", "a", "field_i", "20"));
+      indexDoc(sdoc("id", "4", "text", "XXXX XXXX", "str_s", "b", "field_i", "11"));
+      indexDoc(sdoc("id", "5", "text", "XXXX XXXX", "str_s", "c", "field_i", "30"));
+      indexDoc(sdoc("id", "6", "text", "XXXX XXXX", "str_s", "c", "field_i", "40"));
+      indexDoc(sdoc("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50"));
+      indexDoc(sdoc("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60"));
       commit();
       Map params = new HashMap();
       params.put(CommonParams.QT, "/sql");
-      params.put("sql", "select id, field_i, str_s from collection1 where text='XXXX' order by field_i desc");
+      params.put("sql", "select 'id', field_i, str_s from collection1 where 'text'='XXXX' order by field_i desc");
 
       SolrStream solrStream = new SolrStream(jetty.url, params);
       List<Tuple> tuples = getTuples(solrStream);
@@ -319,6 +325,81 @@ public class TestSQLHandler extends Abst
     }
   }
 
+
+  private void testStringLiteralFields() throws Exception {
+    try {
+
+      CloudJettyRunner jetty = this.cloudJettys.get(0);
+
+      del("*:*");
+
+      commit();
+
+      indexDoc(sdoc("id", "1", "Text_t", "XXXX XXXX", "str_s", "a", "Field_i", "7"));
+      indexDoc(sdoc("id", "2", "Text_t", "XXXX XXXX", "str_s", "b", "Field_i", "8"));
+      indexDoc(sdoc("id", "3", "Text_t", "XXXX XXXX", "str_s", "a", "Field_i", "20"));
+      indexDoc(sdoc("id", "4", "Text_t", "XXXX XXXX", "str_s", "b", "Field_i", "11"));
+      indexDoc(sdoc("id", "5", "Text_t", "XXXX XXXX", "str_s", "c", "Field_i", "30"));
+      indexDoc(sdoc("id", "6", "Text_t", "XXXX XXXX", "str_s", "c", "Field_i", "40"));
+      indexDoc(sdoc("id", "7", "Text_t", "XXXX XXXX", "str_s", "c", "Field_i", "50"));
+      indexDoc(sdoc("id", "8", "Text_t", "XXXX XXXX", "str_s", "c", "Field_i", "60"));
+      commit();
+      Map params = new HashMap();
+      params.put(CommonParams.QT, "/sql");
+      params.put("sql", "select id, 'Field_i', str_s from Collection1 where 'Text_t'='XXXX' order by 'Field_i' desc");
+
+      SolrStream solrStream = new SolrStream(jetty.url, params);
+      List<Tuple> tuples = getTuples(solrStream);
+
+      assert(tuples.size() == 8);
+
+      Tuple tuple = null;
+
+      tuple = tuples.get(0);
+      assert(tuple.getLong("id") == 8);
+      assert(tuple.getLong("Field_i") == 60);
+      assert(tuple.get("str_s").equals("c"));
+
+      tuple = tuples.get(1);
+      assert(tuple.getLong("id") == 7);
+      assert(tuple.getLong("Field_i") == 50);
+      assert(tuple.get("str_s").equals("c"));
+
+      tuple = tuples.get(2);
+      assert(tuple.getLong("id") == 6);
+      assert(tuple.getLong("Field_i") == 40);
+      assert(tuple.get("str_s").equals("c"));
+
+      tuple = tuples.get(3);
+      assert(tuple.getLong("id") == 5);
+      assert(tuple.getLong("Field_i") == 30);
+      assert(tuple.get("str_s").equals("c"));
+
+      tuple = tuples.get(4);
+      assert(tuple.getLong("id") == 3);
+      assert(tuple.getLong("Field_i") == 20);
+      assert(tuple.get("str_s").equals("a"));
+
+      tuple = tuples.get(5);
+      assert(tuple.getLong("id") == 4);
+      assert(tuple.getLong("Field_i") == 11);
+      assert(tuple.get("str_s").equals("b"));
+
+      tuple = tuples.get(6);
+      assert(tuple.getLong("id") == 2);
+      assert(tuple.getLong("Field_i") == 8);
+      assert(tuple.get("str_s").equals("b"));
+
+      tuple = tuples.get(7);
+      assert(tuple.getLong("id") == 1);
+      assert(tuple.getLong("Field_i") == 7);
+      assert(tuple.get("str_s").equals("a"));
+
+    } finally {
+      delete();
+    }
+  }
+
   private void testSQLException() throws Exception {
     try {
 
@@ -328,14 +409,14 @@ public class TestSQLHandler extends Abst
 
       commit();
 
-      indexr("id", "1", "text", "XXXX XXXX", "str_s", "a", "field_i", "7");
-      indexr("id", "2", "text", "XXXX XXXX", "str_s", "b", "field_i", "8");
-      indexr("id", "3", "text", "XXXX XXXX", "str_s", "a", "field_i", "20");
-      indexr("id", "4", "text", "XXXX XXXX", "str_s", "b", "field_i", "11");
-      indexr("id", "5", "text", "XXXX XXXX", "str_s", "c", "field_i", "30");
-      indexr("id", "6", "text", "XXXX XXXX", "str_s", "c", "field_i", "40");
-      indexr("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50");
-      indexr("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60");
+      indexDoc(sdoc("id", "1", "text", "XXXX XXXX", "str_s", "a", "field_i", "7"));
+      indexDoc(sdoc("id", "2", "text", "XXXX XXXX", "str_s", "b", "field_i", "8"));
+      indexDoc(sdoc("id", "3", "text", "XXXX XXXX", "str_s", "a", "field_i", "20"));
+      indexDoc(sdoc("id", "4", "text", "XXXX XXXX", "str_s", "b", "field_i", "11"));
+      indexDoc(sdoc("id", "5", "text", "XXXX XXXX", "str_s", "c", "field_i", "30"));
+      indexDoc(sdoc("id", "6", "text", "XXXX XXXX", "str_s", "c", "field_i", "40"));
+      indexDoc(sdoc("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50"));
+      indexDoc(sdoc("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60"));
       commit();
 
       Map params = new HashMap();
@@ -417,7 +498,7 @@ public class TestSQLHandler extends Abst
       commit();
       Map params = new HashMap();
       params.put(CommonParams.QT, "/sql");
-      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s order by sum(field_i) asc limit 2");
+      params.put("sql", "select str_s, 'count(*)', sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by 'str_s' order by 'sum(field_i)' asc limit 2");
 
       SolrStream solrStream = new SolrStream(jetty.url, params);
       List<Tuple> tuples = getTuples(solrStream);
@@ -530,7 +611,8 @@ public class TestSQLHandler extends Abst
     }
   }
 
-  private void testBasicGroupingFacets() throws Exception {
+
+  private void testSelectDistinctFacets() throws Exception {
     try {
 
       CloudJettyRunner jetty = this.cloudJettys.get(0);
@@ -539,128 +621,166 @@ public class TestSQLHandler extends Abst
 
       commit();
 
-      indexr("id", "1", "text", "XXXX XXXX", "str_s", "a", "field_i", "7");
-      indexr("id", "2", "text", "XXXX XXXX", "str_s", "b", "field_i", "8");
+      indexr("id", "1", "text", "XXXX XXXX", "str_s", "a", "field_i", "1");
+      indexr("id", "2", "text", "XXXX XXXX", "str_s", "b", "field_i", "2");
       indexr("id", "3", "text", "XXXX XXXX", "str_s", "a", "field_i", "20");
-      indexr("id", "4", "text", "XXXX XXXX", "str_s", "b", "field_i", "11");
+      indexr("id", "4", "text", "XXXX XXXX", "str_s", "b", "field_i", "2");
       indexr("id", "5", "text", "XXXX XXXX", "str_s", "c", "field_i", "30");
-      indexr("id", "6", "text", "XXXX XXXX", "str_s", "c", "field_i", "40");
+      indexr("id", "6", "text", "XXXX XXXX", "str_s", "c", "field_i", "30");
       indexr("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50");
       indexr("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60");
       commit();
       Map params = new HashMap();
       params.put(CommonParams.QT, "/sql");
       params.put("aggregationMode", "facet");
-      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s order by sum(field_i) asc limit 2");
+      params.put("sql", "select distinct 'str_s', 'field_i' from collection1 order by 'str_s' asc, 'field_i' asc");
 
       SolrStream solrStream = new SolrStream(jetty.url, params);
       List<Tuple> tuples = getTuples(solrStream);
 
-      //Only two results because of the limit.
-      assert(tuples.size() == 2);
+      assert(tuples.size() == 6);
 
       Tuple tuple = null;
 
       tuple = tuples.get(0);
-      assert(tuple.get("str_s").equals("b"));
-      assert(tuple.getDouble("count(*)") == 2);
-      assert(tuple.getDouble("sum(field_i)") == 19);
-      assert(tuple.getDouble("min(field_i)") == 8);
-      assert(tuple.getDouble("max(field_i)") == 11);
-      assert(tuple.getDouble("avg(field_i)") == 9.5D);
+      assert(tuple.get("str_s").equals("a"));
+      assert(tuple.getLong("field_i") == 1);
 
       tuple = tuples.get(1);
       assert(tuple.get("str_s").equals("a"));
-      assert(tuple.getDouble("count(*)") == 2);
-      assert(tuple.getDouble("sum(field_i)") == 27);
-      assert(tuple.getDouble("min(field_i)") == 7);
-      assert(tuple.getDouble("max(field_i)") == 20);
-      assert(tuple.getDouble("avg(field_i)") == 13.5D);
+      assert(tuple.getLong("field_i") == 20);
+
+      tuple = tuples.get(2);
+      assert(tuple.get("str_s").equals("b"));
+      assert(tuple.getLong("field_i") == 2);
+
+      tuple = tuples.get(3);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 30);
+
+      tuple = tuples.get(4);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 50);
+
+      tuple = tuples.get(5);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 60);
+
 
+      //reverse the sort
       params = new HashMap();
       params.put(CommonParams.QT, "/sql");
       params.put("aggregationMode", "facet");
-      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where (text='XXXX' AND NOT text='XXXX XXX') group by str_s order by str_s desc");
+      params.put("sql", "select distinct str_s, field_i from collection1 order by str_s desc, field_i desc");
 
       solrStream = new SolrStream(jetty.url, params);
       tuples = getTuples(solrStream);
 
-      //The sort by and order by match and no limit is applied. All the Tuples should be returned in
-      //this scenario.
-
-      assert(tuples.size() == 3);
+      assert(tuples.size() == 6);
 
       tuple = tuples.get(0);
       assert(tuple.get("str_s").equals("c"));
-      assert(tuple.getDouble("count(*)") == 4);
-      assert(tuple.getDouble("sum(field_i)") == 180);
-      assert(tuple.getDouble("min(field_i)") == 30);
-      assert(tuple.getDouble("max(field_i)") == 60);
-      assert(tuple.getDouble("avg(field_i)") == 45);
+      assert(tuple.getLong("field_i") == 60);
 
       tuple = tuples.get(1);
-      assert(tuple.get("str_s").equals("b"));
-      assert(tuple.getDouble("count(*)") == 2);
-      assert(tuple.getDouble("sum(field_i)") == 19);
-      assert(tuple.getDouble("min(field_i)") == 8);
-      assert(tuple.getDouble("max(field_i)") == 11);
-      assert(tuple.getDouble("avg(field_i)") == 9.5D);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 50);
+
 
       tuple = tuples.get(2);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 30);
+
+      tuple = tuples.get(3);
+      assert(tuple.get("str_s").equals("b"));
+      assert(tuple.getLong("field_i") == 2);
+
+
+      tuple = tuples.get(4);
       assert(tuple.get("str_s").equals("a"));
-      assert(tuple.getDouble("count(*)") == 2);
-      assert(tuple.getDouble("sum(field_i)") == 27);
-      assert(tuple.getDouble("min(field_i)") == 7);
-      assert(tuple.getDouble("max(field_i)") == 20);
-      assert(tuple.getDouble("avg(field_i)") == 13.5D);
+      assert(tuple.getLong("field_i") == 20);
 
+      tuple = tuples.get(5);
+      assert(tuple.get("str_s").equals("a"));
+      assert(tuple.getLong("field_i") == 1);
 
+
+      //test with limit
       params = new HashMap();
       params.put(CommonParams.QT, "/sql");
       params.put("aggregationMode", "facet");
-      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having sum(field_i) = 19");
+      params.put("sql", "select distinct str_s, field_i from collection1 order by str_s desc, field_i desc limit 2");
 
       solrStream = new SolrStream(jetty.url, params);
       tuples = getTuples(solrStream);
 
-      assert(tuples.size() == 1);
+      assert(tuples.size() == 2);
 
       tuple = tuples.get(0);
-      assert(tuple.get("str_s").equals("b"));
-      assert(tuple.getDouble("count(*)") == 2);
-      assert(tuple.getDouble("sum(field_i)") == 19);
-      assert(tuple.getDouble("min(field_i)") == 8);
-      assert(tuple.getDouble("max(field_i)") == 11);
-      assert(tuple.getDouble("avg(field_i)") == 9.5D);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 60);
 
-      params = new HashMap();
+      tuple = tuples.get(1);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 50);
+
+
+      // Test without a sort. Sort should be asc by default.
+
+      new HashMap();
       params.put(CommonParams.QT, "/sql");
       params.put("aggregationMode", "facet");
-      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 8))");
+      params.put("sql", "select distinct str_s, field_i from collection1");
 
       solrStream = new SolrStream(jetty.url, params);
       tuples = getTuples(solrStream);
 
-      //Only two results because of the limit.
-      assert(tuples.size() == 1);
+      assert(tuples.size() == 6);
 
       tuple = tuples.get(0);
+      assert(tuple.get("str_s").equals("a"));
+      assert(tuple.getLong("field_i") == 1);
+
+      tuple = tuples.get(1);
+      assert(tuple.get("str_s").equals("a"));
+      assert(tuple.getLong("field_i") == 20);
+
+      tuple = tuples.get(2);
       assert(tuple.get("str_s").equals("b"));
-      assert(tuple.getDouble("count(*)") == 2);
-      assert(tuple.getDouble("sum(field_i)") == 19);
-      assert(tuple.getDouble("min(field_i)") == 8);
-      assert(tuple.getDouble("max(field_i)") == 11);
-      assert(tuple.getDouble("avg(field_i)") == 9.5D);
+      assert(tuple.getLong("field_i") == 2);
 
-      params = new HashMap();
+      tuple = tuples.get(3);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 30);
+
+      tuple = tuples.get(4);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 50);
+
+      tuple = tuples.get(5);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 60);
+
+
+      // Test with a predicate.
+
+      new HashMap();
       params.put(CommonParams.QT, "/sql");
       params.put("aggregationMode", "facet");
-      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 100))");
+      params.put("sql", "select distinct str_s, field_i from collection1 where str_s = 'a'");
 
       solrStream = new SolrStream(jetty.url, params);
       tuples = getTuples(solrStream);
 
-      assert(tuples.size() == 0);
+      assert(tuples.size() == 2);
+
+      tuple = tuples.get(0);
+      assert(tuple.get("str_s").equals("a"));
+      assert (tuple.getLong("field_i") == 1);
+
+      tuple = tuples.get(1);
+      assert(tuple.get("str_s").equals("a"));
+      assert(tuple.getLong("field_i") == 20);
 
 
     } finally {
@@ -669,10 +789,7 @@ public class TestSQLHandler extends Abst
   }
 
 
-
-
-
-  private void testParallelBasicGrouping() throws Exception {
+  private void testSelectDistinct() throws Exception {
     try {
 
       CloudJettyRunner jetty = this.cloudJettys.get(0);
@@ -681,43 +798,532 @@ public class TestSQLHandler extends Abst
 
       commit();
 
-      indexr("id", "1", "text", "XXXX XXXX", "str_s", "a", "field_i", "7");
-      indexr("id", "2", "text", "XXXX XXXX", "str_s", "b", "field_i", "8");
+      indexr("id", "1", "text", "XXXX XXXX", "str_s", "a", "field_i", "1");
+      indexr("id", "2", "text", "XXXX XXXX", "str_s", "b", "field_i", "2");
       indexr("id", "3", "text", "XXXX XXXX", "str_s", "a", "field_i", "20");
-      indexr("id", "4", "text", "XXXX XXXX", "str_s", "b", "field_i", "11");
+      indexr("id", "4", "text", "XXXX XXXX", "str_s", "b", "field_i", "2");
       indexr("id", "5", "text", "XXXX XXXX", "str_s", "c", "field_i", "30");
-      indexr("id", "6", "text", "XXXX XXXX", "str_s", "c", "field_i", "40");
+      indexr("id", "6", "text", "XXXX XXXX", "str_s", "c", "field_i", "30");
       indexr("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50");
       indexr("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60");
       commit();
       Map params = new HashMap();
       params.put(CommonParams.QT, "/sql");
-      params.put("numWorkers", "2");
-      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s order by sum(field_i) asc limit 2");
+      params.put("sql", "select distinct 'str_s', 'field_i' from collection1 order by 'str_s' asc, 'field_i' asc");
 
       SolrStream solrStream = new SolrStream(jetty.url, params);
       List<Tuple> tuples = getTuples(solrStream);
 
-      //Only two results because of the limit.
-      assert(tuples.size() == 2);
+      assert(tuples.size() == 6);
 
       Tuple tuple = null;
 
       tuple = tuples.get(0);
-      assert(tuple.get("str_s").equals("b"));
-      assert(tuple.getDouble("count(*)") == 2);
-      assert(tuple.getDouble("sum(field_i)") == 19);
-      assert(tuple.getDouble("min(field_i)") == 8);
-      assert(tuple.getDouble("max(field_i)") == 11);
-      assert(tuple.getDouble("avg(field_i)") == 9.5D);
+      assert(tuple.get("str_s").equals("a"));
+      assert(tuple.getLong("field_i") == 1);
 
       tuple = tuples.get(1);
       assert(tuple.get("str_s").equals("a"));
-      assert(tuple.getDouble("count(*)") == 2);
-      assert(tuple.getDouble("sum(field_i)") == 27);
-      assert(tuple.getDouble("min(field_i)") == 7);
-      assert(tuple.getDouble("max(field_i)") == 20);
-      assert(tuple.getDouble("avg(field_i)") == 13.5D);
+      assert(tuple.getLong("field_i") == 20);
+
+      tuple = tuples.get(2);
+      assert(tuple.get("str_s").equals("b"));
+      assert(tuple.getLong("field_i") == 2);
+
+      tuple = tuples.get(3);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 30);
+
+      tuple = tuples.get(4);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 50);
+
+      tuple = tuples.get(5);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 60);
+
+
+      //reverse the sort
+      params = new HashMap();
+      params.put(CommonParams.QT, "/sql");
+      params.put("sql", "select distinct str_s, field_i from collection1 order by str_s desc, field_i desc");
+
+      solrStream = new SolrStream(jetty.url, params);
+      tuples = getTuples(solrStream);
+
+      assert(tuples.size() == 6);
+
+      tuple = tuples.get(0);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 60);
+
+      tuple = tuples.get(1);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 50);
+
+
+      tuple = tuples.get(2);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 30);
+
+      tuple = tuples.get(3);
+      assert(tuple.get("str_s").equals("b"));
+      assert(tuple.getLong("field_i") == 2);
+
+
+      tuple = tuples.get(4);
+      assert(tuple.get("str_s").equals("a"));
+      assert(tuple.getLong("field_i") == 20);
+
+      tuple = tuples.get(5);
+      assert(tuple.get("str_s").equals("a"));
+      assert(tuple.getLong("field_i") == 1);
+
+
+      //test with limit
+      params = new HashMap();
+      params.put(CommonParams.QT, "/sql");
+      params.put("sql", "select distinct str_s, field_i from collection1 order by str_s desc, field_i desc limit 2");
+
+      solrStream = new SolrStream(jetty.url, params);
+      tuples = getTuples(solrStream);
+
+      assert(tuples.size() == 2);
+
+      tuple = tuples.get(0);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 60);
+
+      tuple = tuples.get(1);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 50);
+
+
+      // Test without a sort. Sort should be asc by default.
+
+      new HashMap();
+      params.put(CommonParams.QT, "/sql");
+      params.put("sql", "select distinct str_s, field_i from collection1");
+
+      solrStream = new SolrStream(jetty.url, params);
+      tuples = getTuples(solrStream);
+
+      assert(tuples.size() == 6);
+
+      tuple = tuples.get(0);
+      assert(tuple.get("str_s").equals("a"));
+      assert(tuple.getLong("field_i") == 1);
+
+      tuple = tuples.get(1);
+      assert(tuple.get("str_s").equals("a"));
+      assert(tuple.getLong("field_i") == 20);
+
+      tuple = tuples.get(2);
+      assert(tuple.get("str_s").equals("b"));
+      assert(tuple.getLong("field_i") == 2);
+
+      tuple = tuples.get(3);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 30);
+
+      tuple = tuples.get(4);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 50);
+
+      tuple = tuples.get(5);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 60);
+
+      // Test with a predicate.
+
+      new HashMap();
+      params.put(CommonParams.QT, "/sql");
+      params.put("sql", "select distinct str_s, field_i from collection1 where str_s = 'a'");
+
+      solrStream = new SolrStream(jetty.url, params);
+      tuples = getTuples(solrStream);
+
+      assert(tuples.size() == 2);
+
+      tuple = tuples.get(0);
+      assert(tuple.get("str_s").equals("a"));
+      assert(tuple.getLong("field_i") == 1);
+
+      tuple = tuples.get(1);
+      assert(tuple.get("str_s").equals("a"));
+      assert(tuple.getLong("field_i") == 20);
+
+
+    } finally {
+      delete();
+    }
+  }
+
+  private void testParallelSelectDistinct() throws Exception {
+    try {
+
+      CloudJettyRunner jetty = this.cloudJettys.get(0);
+
+      del("*:*");
+
+      commit();
+
+      indexr("id", "1", "text", "XXXX XXXX", "str_s", "a", "field_i", "1");
+      indexr("id", "2", "text", "XXXX XXXX", "str_s", "b", "field_i", "2");
+      indexr("id", "3", "text", "XXXX XXXX", "str_s", "a", "field_i", "20");
+      indexr("id", "4", "text", "XXXX XXXX", "str_s", "b", "field_i", "2");
+      indexr("id", "5", "text", "XXXX XXXX", "str_s", "c", "field_i", "30");
+      indexr("id", "6", "text", "XXXX XXXX", "str_s", "c", "field_i", "30");
+      indexr("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50");
+      indexr("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60");
+      commit();
+      Map params = new HashMap();
+      params.put(CommonParams.QT, "/sql");
+      params.put("numWorkers", "2");
+      params.put("sql", "select distinct str_s, field_i from collection1 order by str_s asc, field_i asc");
+
+      SolrStream solrStream = new SolrStream(jetty.url, params);
+      List<Tuple> tuples = getTuples(solrStream);
+
+      assert(tuples.size() == 6);
+
+      Tuple tuple = null;
+
+      tuple = tuples.get(0);
+      assert(tuple.get("str_s").equals("a"));
+      assert(tuple.getLong("field_i") == 1);
+
+      tuple = tuples.get(1);
+      assert(tuple.get("str_s").equals("a"));
+      assert(tuple.getLong("field_i") == 20);
+
+      tuple = tuples.get(2);
+      assert(tuple.get("str_s").equals("b"));
+      assert(tuple.getLong("field_i") == 2);
+
+      tuple = tuples.get(3);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 30);
+
+      tuple = tuples.get(4);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 50);
+
+      tuple = tuples.get(5);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 60);
+
+
+      //reverse the sort
+      params = new HashMap();
+      params.put(CommonParams.QT, "/sql");
+      params.put("numWorkers", "2");
+      params.put("sql", "select distinct str_s, field_i from collection1 order by str_s desc, field_i desc");
+
+      solrStream = new SolrStream(jetty.url, params);
+      tuples = getTuples(solrStream);
+
+      assert(tuples.size() == 6);
+
+      tuple = tuples.get(0);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 60);
+
+      tuple = tuples.get(1);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 50);
+
+
+      tuple = tuples.get(2);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 30);
+
+      tuple = tuples.get(3);
+      assert(tuple.get("str_s").equals("b"));
+      assert(tuple.getLong("field_i") == 2);
+
+
+      tuple = tuples.get(4);
+      assert(tuple.get("str_s").equals("a"));
+      assert(tuple.getLong("field_i") == 20);
+
+      tuple = tuples.get(5);
+      assert(tuple.get("str_s").equals("a"));
+      assert(tuple.getLong("field_i") == 1);
+
+
+      //test with limit
+      params = new HashMap();
+      params.put(CommonParams.QT, "/sql");
+      params.put("numWorkers", "2");
+      params.put("sql", "select distinct str_s, field_i from collection1 order by str_s desc, field_i desc limit 2");
+
+      solrStream = new SolrStream(jetty.url, params);
+      tuples = getTuples(solrStream);
+
+      assert(tuples.size() == 2);
+
+      tuple = tuples.get(0);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 60);
+
+      tuple = tuples.get(1);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 50);
+
+
+      // Test without a sort. Sort should be asc by default.
+
+      new HashMap();
+      params.put(CommonParams.QT, "/sql");
+      params.put("numWorkers", "2");
+      params.put("sql", "select distinct str_s, field_i from collection1");
+
+      solrStream = new SolrStream(jetty.url, params);
+      tuples = getTuples(solrStream);
+
+      assert(tuples.size() == 6);
+
+      tuple = tuples.get(0);
+      assert(tuple.get("str_s").equals("a"));
+      assert(tuple.getLong("field_i") == 1);
+
+      tuple = tuples.get(1);
+      assert(tuple.get("str_s").equals("a"));
+      assert(tuple.getLong("field_i") == 20);
+
+      tuple = tuples.get(2);
+      assert(tuple.get("str_s").equals("b"));
+      assert(tuple.getLong("field_i") == 2);
+
+      tuple = tuples.get(3);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 30);
+
+      tuple = tuples.get(4);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 50);
+
+      tuple = tuples.get(5);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getLong("field_i") == 60);
+
+      // Test with a predicate.
+
+      new HashMap();
+      params.put(CommonParams.QT, "/sql");
+      params.put("numWorkers", "2");
+      params.put("sql", "select distinct str_s, field_i from collection1 where str_s = 'a'");
+
+      solrStream = new SolrStream(jetty.url, params);
+      tuples = getTuples(solrStream);
+
+      assert(tuples.size() == 2);
+
+      tuple = tuples.get(0);
+      assert(tuple.get("str_s").equals("a"));
+      assert(tuple.getLong("field_i") == 1);
+
+      tuple = tuples.get(1);
+      assert(tuple.get("str_s").equals("a"));
+      assert(tuple.getLong("field_i") == 20);
+
+    } finally {
+      delete();
+    }
+  }
+
+
+
+  private void testBasicGroupingFacets() throws Exception {
+    try {
+
+      CloudJettyRunner jetty = this.cloudJettys.get(0);
+
+      del("*:*");
+
+      commit();
+
+      indexr("id", "1", "text", "XXXX XXXX", "str_s", "a", "field_i", "7");
+      indexr("id", "2", "text", "XXXX XXXX", "str_s", "b", "field_i", "8");
+      indexr("id", "3", "text", "XXXX XXXX", "str_s", "a", "field_i", "20");
+      indexr("id", "4", "text", "XXXX XXXX", "str_s", "b", "field_i", "11");
+      indexr("id", "5", "text", "XXXX XXXX", "str_s", "c", "field_i", "30");
+      indexr("id", "6", "text", "XXXX XXXX", "str_s", "c", "field_i", "40");
+      indexr("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50");
+      indexr("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60");
+      commit();
+      Map params = new HashMap();
+      params.put(CommonParams.QT, "/sql");
+      params.put("aggregationMode", "facet");
+      params.put("sql", "select 'str_s', 'count(*)', sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by 'str_s' order by 'sum(field_i)' asc limit 2");
+
+      SolrStream solrStream = new SolrStream(jetty.url, params);
+      List<Tuple> tuples = getTuples(solrStream);
+
+      //Only two results because of the limit.
+      assert(tuples.size() == 2);
+
+      Tuple tuple = null;
+
+      tuple = tuples.get(0);
+      assert(tuple.get("str_s").equals("b"));
+      assert(tuple.getDouble("count(*)") == 2);
+      assert(tuple.getDouble("sum(field_i)") == 19);
+      assert(tuple.getDouble("min(field_i)") == 8);
+      assert(tuple.getDouble("max(field_i)") == 11);
+      assert(tuple.getDouble("avg(field_i)") == 9.5D);
+
+      tuple = tuples.get(1);
+      assert(tuple.get("str_s").equals("a"));
+      assert(tuple.getDouble("count(*)") == 2);
+      assert(tuple.getDouble("sum(field_i)") == 27);
+      assert(tuple.getDouble("min(field_i)") == 7);
+      assert(tuple.getDouble("max(field_i)") == 20);
+      assert(tuple.getDouble("avg(field_i)") == 13.5D);
+
+      params = new HashMap();
+      params.put(CommonParams.QT, "/sql");
+      params.put("aggregationMode", "facet");
+      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where (text='XXXX' AND NOT text='XXXX XXX') group by str_s order by str_s desc");
+
+      solrStream = new SolrStream(jetty.url, params);
+      tuples = getTuples(solrStream);
+
+      //The sort by and order by match and no limit is applied. All the Tuples should be returned in
+      //this scenario.
+
+      assert(tuples.size() == 3);
+
+      tuple = tuples.get(0);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getDouble("count(*)") == 4);
+      assert(tuple.getDouble("sum(field_i)") == 180);
+      assert(tuple.getDouble("min(field_i)") == 30);
+      assert(tuple.getDouble("max(field_i)") == 60);
+      assert(tuple.getDouble("avg(field_i)") == 45);
+
+      tuple = tuples.get(1);
+      assert(tuple.get("str_s").equals("b"));
+      assert(tuple.getDouble("count(*)") == 2);
+      assert(tuple.getDouble("sum(field_i)") == 19);
+      assert(tuple.getDouble("min(field_i)") == 8);
+      assert(tuple.getDouble("max(field_i)") == 11);
+      assert(tuple.getDouble("avg(field_i)") == 9.5D);
+
+      tuple = tuples.get(2);
+      assert(tuple.get("str_s").equals("a"));
+      assert(tuple.getDouble("count(*)") == 2);
+      assert(tuple.getDouble("sum(field_i)") == 27);
+      assert(tuple.getDouble("min(field_i)") == 7);
+      assert(tuple.getDouble("max(field_i)") == 20);
+      assert(tuple.getDouble("avg(field_i)") == 13.5D);
+
+
+      params = new HashMap();
+      params.put(CommonParams.QT, "/sql");
+      params.put("aggregationMode", "facet");
+      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having sum(field_i) = 19");
+
+      solrStream = new SolrStream(jetty.url, params);
+      tuples = getTuples(solrStream);
+
+      assert(tuples.size() == 1);
+
+      tuple = tuples.get(0);
+      assert(tuple.get("str_s").equals("b"));
+      assert(tuple.getDouble("count(*)") == 2);
+      assert(tuple.getDouble("sum(field_i)") == 19);
+      assert(tuple.getDouble("min(field_i)") == 8);
+      assert(tuple.getDouble("max(field_i)") == 11);
+      assert(tuple.getDouble("avg(field_i)") == 9.5D);
+
+      params = new HashMap();
+      params.put(CommonParams.QT, "/sql");
+      params.put("aggregationMode", "facet");
+      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 8))");
+
+      solrStream = new SolrStream(jetty.url, params);
+      tuples = getTuples(solrStream);
+
+      //Only two results because of the limit.
+      assert(tuples.size() == 1);
+
+      tuple = tuples.get(0);
+      assert(tuple.get("str_s").equals("b"));
+      assert(tuple.getDouble("count(*)") == 2);
+      assert(tuple.getDouble("sum(field_i)") == 19);
+      assert(tuple.getDouble("min(field_i)") == 8);
+      assert(tuple.getDouble("max(field_i)") == 11);
+      assert(tuple.getDouble("avg(field_i)") == 9.5D);
+
+      params = new HashMap();
+      params.put(CommonParams.QT, "/sql");
+      params.put("aggregationMode", "facet");
+      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 100))");
+
+      solrStream = new SolrStream(jetty.url, params);
+      tuples = getTuples(solrStream);
+
+      assert(tuples.size() == 0);
+
+
+    } finally {
+      delete();
+    }
+  }
+
+
+
+
+
+  private void testParallelBasicGrouping() throws Exception {
+    try {
+
+      CloudJettyRunner jetty = this.cloudJettys.get(0);
+
+      del("*:*");
+
+      commit();
+
+      indexr("id", "1", "text", "XXXX XXXX", "str_s", "a", "field_i", "7");
+      indexr("id", "2", "text", "XXXX XXXX", "str_s", "b", "field_i", "8");
+      indexr("id", "3", "text", "XXXX XXXX", "str_s", "a", "field_i", "20");
+      indexr("id", "4", "text", "XXXX XXXX", "str_s", "b", "field_i", "11");
+      indexr("id", "5", "text", "XXXX XXXX", "str_s", "c", "field_i", "30");
+      indexr("id", "6", "text", "XXXX XXXX", "str_s", "c", "field_i", "40");
+      indexr("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50");
+      indexr("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60");
+      commit();
+      Map params = new HashMap();
+      params.put(CommonParams.QT, "/sql");
+      params.put("numWorkers", "2");
+      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s order by sum(field_i) asc limit 2");
+
+      SolrStream solrStream = new SolrStream(jetty.url, params);
+      List<Tuple> tuples = getTuples(solrStream);
+
+      //Only two results because of the limit.
+      assert(tuples.size() == 2);
+
+      Tuple tuple = null;
+
+      tuple = tuples.get(0);
+      assert(tuple.get("str_s").equals("b"));
+      assert(tuple.getDouble("count(*)") == 2);
+      assert(tuple.getDouble("sum(field_i)") == 19);
+      assert(tuple.getDouble("min(field_i)") == 8);
+      assert(tuple.getDouble("max(field_i)") == 11);
+      assert(tuple.getDouble("avg(field_i)") == 9.5D);
+
+      tuple = tuples.get(1);
+      assert(tuple.get("str_s").equals("a"));
+      assert(tuple.getDouble("count(*)") == 2);
+      assert(tuple.getDouble("sum(field_i)") == 27);
+      assert(tuple.getDouble("min(field_i)") == 7);
+      assert(tuple.getDouble("max(field_i)") == 20);
+      assert(tuple.getDouble("avg(field_i)") == 13.5D);
+
 
       params = new HashMap();
       params.put(CommonParams.QT, "/sql");

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java?rev=1707819&r1=1707818&r2=1707819&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java Fri Oct  9 21:34:43 2015
@@ -63,6 +63,10 @@ public class Tuple implements Cloneable
     this.fields.put(key, value);
   }
 
+  public void remove(Object key) {
+    this.fields.remove(key);
+  }
+
   public String getString(Object key) {
     return this.fields.get(key).toString();
   }

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java?rev=1707819&r1=1707818&r2=1707819&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java Fri Oct  9 21:34:43 2015
@@ -286,12 +286,25 @@ public class CloudSolrStream extends Tup
 
       ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
       ClusterState clusterState = zkStateReader.getClusterState();
+
       //System.out.println("Connected to zk an got cluster state.");
 
       Collection<Slice> slices = clusterState.getActiveSlices(this.collection);
 
       if(slices == null) {
-        throw new Exception("Collection not found:"+this.collection);
+
+        String colLower = this.collection.toLowerCase(Locale.getDefault());
+        //Try case insensitive match
+        for(String col : clusterState.getCollections()) {
+          if(col.toLowerCase(Locale.getDefault()).equals(colLower)) {
+            slices = clusterState.getActiveSlices(col);
+            break;
+          }
+        }
+
+        if(slices == null) {
+          throw new Exception("Collection not found:" + this.collection);
+        }
       }
 
       params.put("distrib","false"); // We are the aggregator.

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/EditStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/EditStream.java?rev=1707819&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/EditStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/EditStream.java Fri Oct  9 21:34:43 2015
@@ -0,0 +1,76 @@
+package org.apache.solr.client.solrj.io.stream;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+
+public class EditStream extends TupleStream {
+
+  private static final long serialVersionUID = 1;
+  private TupleStream stream;
+  private List<String> remove;
+
+  public EditStream(TupleStream stream, List<String> remove) {
+    this.stream = stream;
+    this.remove = remove;
+  }
+
+  public void setStreamContext(StreamContext context) {
+    this.stream.setStreamContext(context);
+  }
+
+  public List<TupleStream> children() {
+    List<TupleStream> l = new ArrayList();
+    l.add(stream);
+    return l;
+  }
+
+  public void open() throws IOException {
+    stream.open();
+  }
+
+  public void close() throws IOException {
+    stream.close();
+  }
+
+  public Tuple read() throws IOException {
+    Tuple tuple = stream.read();
+    if(tuple.EOF) {
+      return tuple;
+    } else {
+      for(String key : remove) {
+        tuple.remove(key);
+      }
+
+      return tuple;
+    }
+  }
+
+  public StreamComparator getStreamSort() {
+    return stream.getStreamSort();
+  }
+
+  public int getCost() {
+    return 0;
+  }
+}
\ No newline at end of file