You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2015/09/10 01:37:35 UTC

svn commit: r1702132 - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/handler/ core/src/test/org/apache/solr/handler/ solrj/src/java/org/apache/solr/client/solrj/io/stream/ solrj/src/test/org/apache/solr/client/solrj/io/stream/

Author: jbernste
Date: Wed Sep  9 23:37:35 2015
New Revision: 1702132

URL: http://svn.apache.org/r1702132
Log:
SOLR-7903: Add the FacetStream to the Streaming API and wire it into the SQLHandler

Added:
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java   (with props)
Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
    lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1702132&r1=1702131&r2=1702132&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Wed Sep  9 23:37:35 2015
@@ -65,6 +65,8 @@ New Features
 
 * SOLR-7707: Add StreamExpression Support to RollupStream (Dennis Gove, Joel Bernstein)
 
+* SOLR-7903: Add the FacetStream to the Streaming API and wire it into the SQLHandler (Joel Bernstein)
+
 Optimizations
 ----------------------
 * SOLR-7876: Speed up queries and operations that use many terms when timeAllowed has not been

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SQLHandler.java?rev=1702132&r1=1702131&r2=1702132&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SQLHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SQLHandler.java Wed Sep  9 23:37:35 2015
@@ -35,6 +35,7 @@ import org.apache.solr.client.solrj.io.c
 import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
 import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
+import org.apache.solr.client.solrj.io.stream.FacetStream;
 import org.apache.solr.client.solrj.io.stream.ParallelStream;
 import org.apache.solr.client.solrj.io.stream.RankStream;
 import org.apache.solr.client.solrj.io.stream.RollupStream;
@@ -87,6 +88,7 @@ public class SQLHandler extends RequestH
     int numWorkers = params.getInt("numWorkers", 1);
     String workerCollection = params.get("workerCollection", defaultWorkerCollection);
     String workerZkhost = params.get("workerZkhost",defaultZkhost);
+    String mode = params.get("aggregationMode", "map_reduce");
     StreamContext context = new StreamContext();
     try {
 
@@ -94,7 +96,7 @@ public class SQLHandler extends RequestH
         throw new Exception("sql parameter cannot be null");
       }
 
-      TupleStream tupleStream = SQLTupleStreamParser.parse(sql, numWorkers, workerCollection, workerZkhost);
+      TupleStream tupleStream = SQLTupleStreamParser.parse(sql, numWorkers, workerCollection, workerZkhost, AggregationMode.getMode(mode));
       context.numWorkers = numWorkers;
       context.setSolrClientCache(StreamHandler.clientCache);
       tupleStream.setStreamContext(context);
@@ -126,7 +128,8 @@ public class SQLHandler extends RequestH
     public static TupleStream parse(String sql,
                                     int numWorkers,
                                     String workerCollection,
-                                    String workerZkhost) throws IOException {
+                                    String workerZkhost,
+                                    AggregationMode aggregationMode) throws IOException {
       SqlParser parser = new SqlParser();
       Statement statement = parser.createStatement(sql);
 
@@ -137,7 +140,11 @@ public class SQLHandler extends RequestH
       TupleStream sqlStream = null;
 
       if(sqlVistor.groupByQuery) {
-        sqlStream = doGroupByWithAggregates(sqlVistor, numWorkers, workerCollection, workerZkhost);
+        if(aggregationMode == AggregationMode.FACET) {
+          sqlStream = doGroupByWithAggregatesFacets(sqlVistor);
+        } else {
+          sqlStream = doGroupByWithAggregates(sqlVistor, numWorkers, workerCollection, workerZkhost);
+        }
       } else {
         sqlStream = doSelect(sqlVistor);
       }
@@ -232,6 +239,56 @@ public class SQLHandler extends RequestH
     return tupleStream;
   }
 
+  private static TupleStream doGroupByWithAggregatesFacets(SQLVisitor sqlVisitor) throws IOException {
+
+    Set<String> fieldSet = new HashSet();
+    Bucket[] buckets = getBuckets(sqlVisitor.groupBy, fieldSet);
+    Metric[] metrics = getMetrics(sqlVisitor.fields, fieldSet);
+    if(metrics.length == 0) {
+      throw new IOException("Group by queries must include atleast one aggregate function.");
+    }
+
+    TableSpec tableSpec = new TableSpec(sqlVisitor.table, defaultZkhost);
+
+    String zkHost = tableSpec.zkHost;
+    String collection = tableSpec.collection;
+    Map<String, String> params = new HashMap();
+
+    params.put(CommonParams.Q, sqlVisitor.query);
+
+    int limit = sqlVisitor.limit > 0 ? sqlVisitor.limit : 100;
+
+    FieldComparator[] sorts = null;
+
+    if(sqlVisitor.sorts == null) {
+      sorts = new FieldComparator[buckets.length];
+      for(int i=0; i<sorts.length; i++) {
+        sorts[i] = new FieldComparator("index", ComparatorOrder.ASCENDING);
+      }
+    } else {
+      sorts = getComps(sqlVisitor.sorts);
+    }
+
+    TupleStream tupleStream = new FacetStream(zkHost,
+                                              collection,
+                                              params,
+                                              buckets,
+                                              metrics,
+                                              sorts,
+                                              limit);
+
+    if(sqlVisitor.havingExpression != null) {
+      tupleStream = new HavingStream(tupleStream, sqlVisitor.havingExpression);
+    }
+
+    if(sqlVisitor.limit > 0)
+    {
+      tupleStream = new LimitStream(tupleStream, sqlVisitor.limit);
+    }
+
+    return tupleStream;
+  }
+
   private static TupleStream doSelect(SQLVisitor sqlVisitor) throws IOException {
     List<String> fields = sqlVisitor.fields;
     StringBuilder flbuf = new StringBuilder();
@@ -408,6 +465,20 @@ public class SQLHandler extends RequestH
     }
   }
 
+  private static FieldComparator[] getComps(List<SortItem> sortItems) {
+    FieldComparator[] comps = new FieldComparator[sortItems.size()];
+    for(int i=0; i<sortItems.size(); i++) {
+      SortItem sortItem = sortItems.get(i);
+      String ordering = sortItem.getOrdering().toString();
+      ComparatorOrder comparatorOrder = ascDescComp(ordering);
+      String sortKey = sortItem.getSortKey().toString();
+      comps[i] = new FieldComparator(stripQuotes(sortKey), comparatorOrder);
+    }
+
+    return comps;
+  }
+
+
   private static String fields(Set<String> fieldSet) {
     StringBuilder buf = new StringBuilder();
     boolean comma = false;
@@ -778,6 +849,22 @@ public class SQLHandler extends RequestH
     }
   }
 
+  public static enum AggregationMode {
+
+    MAP_REDUCE,
+    FACET;
+
+    public static AggregationMode getMode(String mode) throws IOException{
+      if(mode.equalsIgnoreCase("facet")) {
+        return FACET;
+      } else if(mode.equalsIgnoreCase("map_reduce")) {
+        return MAP_REDUCE;
+      } else {
+        throw new IOException("Invalid aggregation mode:"+mode);
+      }
+    }
+  }
+
   public static class HavingStream extends TupleStream {
 
     private TupleStream stream;

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java?rev=1702132&r1=1702131&r2=1702132&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java Wed Sep  9 23:37:35 2015
@@ -88,8 +88,10 @@ public class TestSQLHandler extends Abst
     testPredicate();
     testBasicSelect();
     testBasicGrouping();
+    testBasicGroupingFacets();
     testSQLException();
     testTimeSeriesGrouping();
+    testTimeSeriesGroupingFacet();
     testParallelBasicGrouping();
     testParallelTimeSeriesGrouping();
   }
@@ -519,6 +521,148 @@ public class TestSQLHandler extends Abst
     }
   }
 
+  private void testBasicGroupingFacets() throws Exception {
+    try {
+
+      CloudJettyRunner jetty = this.cloudJettys.get(0);
+
+      del("*:*");
+
+      commit();
+
+      indexr("id", "1", "text", "XXXX XXXX", "str_s", "a", "field_i", "7");
+      indexr("id", "2", "text", "XXXX XXXX", "str_s", "b", "field_i", "8");
+      indexr("id", "3", "text", "XXXX XXXX", "str_s", "a", "field_i", "20");
+      indexr("id", "4", "text", "XXXX XXXX", "str_s", "b", "field_i", "11");
+      indexr("id", "5", "text", "XXXX XXXX", "str_s", "c", "field_i", "30");
+      indexr("id", "6", "text", "XXXX XXXX", "str_s", "c", "field_i", "40");
+      indexr("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50");
+      indexr("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60");
+      commit();
+      Map params = new HashMap();
+      params.put(CommonParams.QT, "/sql");
+      params.put("aggregationMode", "facet");
+      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s order by sum(field_i) asc limit 2");
+
+      SolrStream solrStream = new SolrStream(jetty.url, params);
+      List<Tuple> tuples = getTuples(solrStream);
+
+      //Only two results because of the limit.
+      assert(tuples.size() == 2);
+
+      Tuple tuple = null;
+
+      tuple = tuples.get(0);
+      assert(tuple.get("str_s").equals("b"));
+      assert(tuple.getDouble("count(*)") == 2);
+      assert(tuple.getDouble("sum(field_i)") == 19);
+      assert(tuple.getDouble("min(field_i)") == 8);
+      assert(tuple.getDouble("max(field_i)") == 11);
+      assert(tuple.getDouble("avg(field_i)") == 9.5D);
+
+      tuple = tuples.get(1);
+      assert(tuple.get("str_s").equals("a"));
+      assert(tuple.getDouble("count(*)") == 2);
+      assert(tuple.getDouble("sum(field_i)") == 27);
+      assert(tuple.getDouble("min(field_i)") == 7);
+      assert(tuple.getDouble("max(field_i)") == 20);
+      assert(tuple.getDouble("avg(field_i)") == 13.5D);
+
+      params = new HashMap();
+      params.put(CommonParams.QT, "/sql");
+      params.put("aggregationMode", "facet");
+      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where (text='XXXX' AND NOT text='XXXX XXX') group by str_s order by str_s desc");
+
+      solrStream = new SolrStream(jetty.url, params);
+      tuples = getTuples(solrStream);
+
+      //The sort by and order by match and no limit is applied. All the Tuples should be returned in
+      //this scenario.
+
+      assert(tuples.size() == 3);
+
+      tuple = tuples.get(0);
+      assert(tuple.get("str_s").equals("c"));
+      assert(tuple.getDouble("count(*)") == 4);
+      assert(tuple.getDouble("sum(field_i)") == 180);
+      assert(tuple.getDouble("min(field_i)") == 30);
+      assert(tuple.getDouble("max(field_i)") == 60);
+      assert(tuple.getDouble("avg(field_i)") == 45);
+
+      tuple = tuples.get(1);
+      assert(tuple.get("str_s").equals("b"));
+      assert(tuple.getDouble("count(*)") == 2);
+      assert(tuple.getDouble("sum(field_i)") == 19);
+      assert(tuple.getDouble("min(field_i)") == 8);
+      assert(tuple.getDouble("max(field_i)") == 11);
+      assert(tuple.getDouble("avg(field_i)") == 9.5D);
+
+      tuple = tuples.get(2);
+      assert(tuple.get("str_s").equals("a"));
+      assert(tuple.getDouble("count(*)") == 2);
+      assert(tuple.getDouble("sum(field_i)") == 27);
+      assert(tuple.getDouble("min(field_i)") == 7);
+      assert(tuple.getDouble("max(field_i)") == 20);
+      assert(tuple.getDouble("avg(field_i)") == 13.5D);
+
+
+      params = new HashMap();
+      params.put(CommonParams.QT, "/sql");
+      params.put("aggregationMode", "facet");
+      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having sum(field_i) = 19");
+
+      solrStream = new SolrStream(jetty.url, params);
+      tuples = getTuples(solrStream);
+
+      assert(tuples.size() == 1);
+
+      tuple = tuples.get(0);
+      assert(tuple.get("str_s").equals("b"));
+      assert(tuple.getDouble("count(*)") == 2);
+      assert(tuple.getDouble("sum(field_i)") == 19);
+      assert(tuple.getDouble("min(field_i)") == 8);
+      assert(tuple.getDouble("max(field_i)") == 11);
+      assert(tuple.getDouble("avg(field_i)") == 9.5D);
+
+      params = new HashMap();
+      params.put(CommonParams.QT, "/sql");
+      params.put("aggregationMode", "facet");
+      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 8))");
+
+      solrStream = new SolrStream(jetty.url, params);
+      tuples = getTuples(solrStream);
+
+      //Only two results because of the limit.
+      assert(tuples.size() == 1);
+
+      tuple = tuples.get(0);
+      assert(tuple.get("str_s").equals("b"));
+      assert(tuple.getDouble("count(*)") == 2);
+      assert(tuple.getDouble("sum(field_i)") == 19);
+      assert(tuple.getDouble("min(field_i)") == 8);
+      assert(tuple.getDouble("max(field_i)") == 11);
+      assert(tuple.getDouble("avg(field_i)") == 9.5D);
+
+      params = new HashMap();
+      params.put(CommonParams.QT, "/sql");
+      params.put("aggregationMode", "facet");
+      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 100))");
+
+      solrStream = new SolrStream(jetty.url, params);
+      tuples = getTuples(solrStream);
+
+      assert(tuples.size() == 0);
+
+
+    } finally {
+      delete();
+    }
+  }
+
+
+
+
+
   private void testParallelBasicGrouping() throws Exception {
     try {
 
@@ -664,6 +808,8 @@ public class TestSQLHandler extends Abst
     }
   }
 
+
+
   private void testTimeSeriesGrouping() throws Exception {
     try {
 
@@ -781,6 +927,126 @@ public class TestSQLHandler extends Abst
     }
   }
 
+
+  private void testTimeSeriesGroupingFacet() throws Exception {
+    try {
+
+      CloudJettyRunner jetty = this.cloudJettys.get(0);
+
+      del("*:*");
+
+      commit();
+
+      indexr("id", "1", "year_i", "2015", "month_i", "11", "day_i", "7", "item_i", "5");
+      indexr("id", "2", "year_i", "2015", "month_i", "11", "day_i", "7", "item_i", "10");
+      indexr("id", "3", "year_i", "2015", "month_i", "11", "day_i", "8", "item_i", "30");
+      indexr("id", "4", "year_i", "2015", "month_i", "11", "day_i", "8", "item_i", "12");
+      indexr("id", "5", "year_i", "2015", "month_i", "10", "day_i", "1", "item_i", "4");
+      indexr("id", "6", "year_i", "2015", "month_i", "10", "day_i", "3", "item_i", "5");
+      indexr("id", "7", "year_i", "2014", "month_i", "4", "day_i", "4", "item_i", "6");
+      indexr("id", "8", "year_i", "2014", "month_i", "4", "day_i", "2", "item_i", "1");
+
+      commit();
+      Map params = new HashMap();
+      params.put(CommonParams.QT, "/sql");
+      params.put("aggregationMode", "facet");
+      params.put("sql", "select year_i, sum(item_i) from collection1 group by year_i order by year_i desc");
+
+      SolrStream solrStream = new SolrStream(jetty.url, params);
+      List<Tuple> tuples = getTuples(solrStream);
+
+      //Only two results because of the limit.
+      assert(tuples.size() == 2);
+
+      Tuple tuple = null;
+
+      tuple = tuples.get(0);
+      assert(tuple.getLong("year_i") == 2015);
+      assert(tuple.getDouble("sum(item_i)") == 66);
+
+      tuple = tuples.get(1);
+      assert(tuple.getLong("year_i") == 2014);
+      assert(tuple.getDouble("sum(item_i)") == 7);
+
+
+      params = new HashMap();
+      params.put(CommonParams.QT, "/sql");
+      params.put("aggregationMode", "facet");
+      params.put("sql", "select year_i, month_i, sum(item_i) from collection1 group by year_i, month_i order by year_i desc, month_i desc");
+
+      solrStream = new SolrStream(jetty.url, params);
+      tuples = getTuples(solrStream);
+
+      //Only two results because of the limit.
+      assert(tuples.size() == 3);
+
+      tuple = tuples.get(0);
+      assert(tuple.getLong("year_i") == 2015);
+      assert(tuple.getLong("month_i") == 11);
+      assert(tuple.getDouble("sum(item_i)") == 57);
+
+      tuple = tuples.get(1);
+      assert(tuple.getLong("year_i") == 2015);
+      assert(tuple.getLong("month_i") == 10);
+      assert(tuple.getDouble("sum(item_i)") == 9);
+
+      tuple = tuples.get(2);
+      assert(tuple.getLong("year_i") == 2014);
+      assert(tuple.getLong("month_i") == 4);
+      assert(tuple.getDouble("sum(item_i)") == 7);
+
+
+      params = new HashMap();
+      params.put(CommonParams.QT, "/sql");
+      params.put("aggregationMode", "facet");
+      params.put("sql", "select year_i, month_i, day_i, sum(item_i) from collection1 group by year_i, month_i, day_i order by year_i desc, month_i desc, day_i desc");
+
+      solrStream = new SolrStream(jetty.url, params);
+      tuples = getTuples(solrStream);
+
+      //Only two results because of the limit.
+      assert(tuples.size() == 6);
+
+      tuple = tuples.get(0);
+      assert(tuple.getLong("year_i") == 2015);
+      assert(tuple.getLong("month_i") == 11);
+      assert(tuple.getLong("day_i") == 8);
+      assert(tuple.getDouble("sum(item_i)") == 42);
+
+      tuple = tuples.get(1);
+      assert(tuple.getLong("year_i") == 2015);
+      assert(tuple.getLong("month_i") == 11);
+      assert(tuple.getLong("day_i") == 7);
+      assert(tuple.getDouble("sum(item_i)") == 15);
+
+      tuple = tuples.get(2);
+      assert(tuple.getLong("year_i") == 2015);
+      assert(tuple.getLong("month_i") == 10);
+      assert(tuple.getLong("day_i") == 3);
+      assert(tuple.getDouble("sum(item_i)") == 5);
+
+      tuple = tuples.get(3);
+      assert(tuple.getLong("year_i") == 2015);
+      assert(tuple.getLong("month_i") == 10);
+      assert(tuple.getLong("day_i") == 1);
+      assert(tuple.getDouble("sum(item_i)") == 4);
+
+      tuple = tuples.get(4);
+      assert(tuple.getLong("year_i") == 2014);
+      assert(tuple.getLong("month_i") == 4);
+      assert(tuple.getLong("day_i") == 4);
+      assert(tuple.getDouble("sum(item_i)") == 6);
+
+      tuple = tuples.get(5);
+      assert(tuple.getLong("year_i") == 2014);
+      assert(tuple.getLong("month_i") == 4);
+      assert(tuple.getLong("day_i") == 2);
+      assert(tuple.getDouble("sum(item_i)") == 1);
+    } finally {
+      delete();
+    }
+  }
+
   private void testParallelTimeSeriesGrouping() throws Exception {
     try {
 
@@ -919,7 +1185,6 @@ public class TestSQLHandler extends Abst
     return tuples;
   }
 
-
   protected Tuple getTuple(TupleStream tupleStream) throws IOException {
     tupleStream.open();
     Tuple t = tupleStream.read();

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java?rev=1702132&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java Wed Sep  9 23:37:35 2015
@@ -0,0 +1,286 @@
+package org.apache.solr.client.solrj.io.stream;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Collections;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.io.SolrClientCache;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.metrics.Bucket;
+import org.apache.solr.client.solrj.io.stream.metrics.Metric;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+
+/**
+ *  The FacetStream abstracts the output from the JSON facet API as a Stream of Tuples. This provides an alternative to the
+ *  RollupStream which uses Map/Reduce to perform aggregations.
+ **/
+
+public class FacetStream extends TupleStream  {
+
+  private static final long serialVersionUID = 1;
+
+  private Bucket[] buckets;
+  private Metric[] metrics;
+  private int limit;
+  private FieldComparator[] sorts;
+  private List<Tuple> tuples = new ArrayList();
+  private int index;
+  private String zkHost;
+  private Map<String, String> props;
+  private String collection;
+  protected transient SolrClientCache cache;
+  protected transient CloudSolrClient cloudSolrClient;
+
+  public FacetStream(String zkHost,
+                     String collection,
+                     Map<String, String> props,
+                     Bucket[] buckets,
+                     Metric[] metrics,
+                     FieldComparator[] sorts,
+                     int limit) {
+    this.zkHost  = zkHost;
+    this.props   = props;
+    this.buckets = buckets;
+    this.metrics = metrics;
+    this.limit   = limit;
+    this.collection = collection;
+    this.sorts = sorts;
+  }
+
+  public void setStreamContext(StreamContext context) {
+    cache = context.getSolrClientCache();
+  }
+
+  public List<TupleStream> children() {
+    List<TupleStream> l =  new ArrayList();
+    return l;
+  }
+
+  public void open() throws IOException {
+    if(cache != null) {
+      cloudSolrClient = cache.getCloudSolrClient(zkHost);
+    } else {
+      cloudSolrClient = new CloudSolrClient(zkHost);
+    }
+
+    FieldComparator[] adjustedSorts = adjustSorts(buckets, sorts);
+    String json = getJsonFacetString(buckets, metrics, adjustedSorts, limit);
+
+    ModifiableSolrParams params = getParams(this.props);
+    params.add("json.facet", json);
+    params.add("rows", "0");
+
+    QueryRequest request = new QueryRequest(params);
+    try {
+      NamedList response = cloudSolrClient.request(request, collection);
+      getTuples(response, buckets, metrics);
+      Collections.sort(tuples, getStreamSort());
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  public void close() throws IOException {
+    if(cache == null) {
+      cloudSolrClient.close();
+    }
+  }
+
+  public Tuple read() throws IOException {
+    if(index < tuples.size() && index < limit) {
+      Tuple tuple = tuples.get(index);
+      ++index;
+      return tuple;
+    } else {
+      Map fields = new HashMap();
+      fields.put("EOF", true);
+      Tuple tuple = new Tuple(fields);
+      return tuple;
+    }
+  }
+
+  private ModifiableSolrParams getParams(Map<String, String> props) {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    for(String key : props.keySet()) {
+      String value = props.get(key);
+      params.add(key, value);
+    }
+    return params;
+  }
+
+  private String getJsonFacetString(Bucket[] _buckets, Metric[] _metrics, FieldComparator[] _sorts, int _limit) {
+    StringBuilder buf = new StringBuilder();
+    appendJson(buf, _buckets, _metrics, _sorts, _limit, 0);
+    return "{"+buf.toString()+"}";
+  }
+
+  private FieldComparator[] adjustSorts(Bucket[] _buckets, FieldComparator[] _sorts) throws IOException {
+    if(_buckets.length == _sorts.length) {
+      return _sorts;
+    } else if(_sorts.length == 1) {
+      FieldComparator[] adjustedSorts = new FieldComparator[_buckets.length];
+      if (_sorts[0].getFieldName().contains("(")) {
+        //Its a metric sort so apply the same sort criteria at each level.
+        for (int i = 0; i < adjustedSorts.length; i++) {
+          adjustedSorts[i] = _sorts[0];
+        }
+      } else {
+        //Its an index sort so apply an index sort at each level.
+        for (int i = 0; i < adjustedSorts.length; i++) {
+          adjustedSorts[i] = new FieldComparator(_buckets[i].toString(), _sorts[0].getOrder());
+        }
+      }
+      return adjustedSorts;
+    } else {
+      throw new IOException("If multiple sorts are specified there must be a sort for each bucket.");
+    }
+  }
+
+  private void appendJson(StringBuilder buf,
+                          Bucket[] _buckets,
+                          Metric[] _metrics,
+                          FieldComparator[] _sorts,
+                          int _limit,
+                          int level) {
+    buf.append('"');
+    buf.append(_buckets[level].toString());
+    buf.append('"');
+    buf.append(":{");
+    buf.append("\"type\":\"terms\"");
+    buf.append(",\"field\":\""+_buckets[level].toString()+"\"");
+    buf.append(",\"limit\":"+_limit);
+    buf.append(",\"sort\":{\""+getFacetSort(_sorts[level].getFieldName(), _metrics)+"\":\""+_sorts[level].getOrder()+"\"}");
+
+    buf.append(",\"facet\":{");
+    int metricCount = 0;
+    for(Metric metric : _metrics) {
+      String identifier = metric.getIdentifier();
+      if(!identifier.startsWith("count(")) {
+        if(metricCount>0) {
+          buf.append(",");
+        }
+        buf.append("\"facet_" + metricCount + "\":\"" +identifier+"\"");
+        ++metricCount;
+      }
+    }
+    ++level;
+    if(level < _buckets.length) {
+      if(metricCount>0) {
+        buf.append(",");
+      }
+      appendJson(buf, _buckets, _metrics, _sorts, _limit, level);
+    }
+    buf.append("}}");
+  }
+
+  private String getFacetSort(String id, Metric[] _metrics) {
+    int index = 0;
+    for(Metric metric : _metrics) {
+      if(metric.getIdentifier().startsWith("count(")) {
+        if(id.startsWith("count(")) {
+          return "count";
+        }
+      } else {
+        if (id.equals(_metrics[index].getIdentifier())) {
+          return "facet_" + index;
+        }
+        ++index;
+      }
+    }
+    return "index";
+  }
+
+  private void getTuples(NamedList response,
+                                Bucket[] buckets,
+                                Metric[] metrics) {
+
+    Tuple tuple = new Tuple(new HashMap());
+    NamedList facets = (NamedList)response.get("facets");
+    fillTuples(0,
+               tuples,
+               tuple,
+               facets,
+               buckets,
+               metrics);
+
+  }
+
+  private void fillTuples(int level,
+                          List<Tuple> tuples,
+                          Tuple currentTuple,
+                          NamedList facets,
+                          Bucket[] _buckets,
+                          Metric[] _metrics) {
+
+    String bucketName = _buckets[level].toString();
+    NamedList nl = (NamedList)facets.get(bucketName);
+    List allBuckets = (List)nl.get("buckets");
+    for(int b=0; b<allBuckets.size(); b++) {
+      NamedList bucket = (NamedList)allBuckets.get(b);
+      Object val = bucket.get("val");
+      Tuple t = currentTuple.clone();
+      t.put(bucketName, val);
+      int nextLevel = level+1;
+      if(nextLevel<_buckets.length) {
+        fillTuples(nextLevel,
+                   tuples,
+                   t.clone(),
+                   bucket,
+                   _buckets,
+                   _metrics);
+      } else {
+        int m = 0;
+        for(Metric metric : _metrics) {
+          String identifier = metric.getIdentifier();
+          if(!identifier.startsWith("count(")) {
+            double d = (double)bucket.get("facet_"+m);
+            t.put(identifier, d);
+            ++m;
+          } else {
+            long l = (long)bucket.get("count");
+            t.put("count(*)", l);
+          }
+        }
+        tuples.add(t);
+      }
+    }
+  }
+
+  public int getCost() {
+    return 0;
+  }
+
+  @Override
+  public StreamComparator getStreamSort() {
+    if(sorts.length > 1) {
+      return new MultipleFieldComparator(sorts);
+    } else {
+      return sorts[0];
+    }
+  }
+}
\ No newline at end of file

Modified: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java?rev=1702132&r1=1702131&r2=1702132&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java Wed Sep  9 23:37:35 2015
@@ -575,6 +575,584 @@ public class StreamingTest extends Abstr
     assert(t.getException().contains("undefined field:"));
   }
 
+  private void testFacetStream() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
+    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
+    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
+    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
+    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
+
+    commit();
+
+    String zkHost = zkServer.getZkAddress();
+
+    Map paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
+
+    Bucket[] buckets =  {new Bucket("a_s")};
+
+    Metric[] metrics = {new SumMetric("a_i"),
+                        new SumMetric("a_f"),
+                        new MinMetric("a_i"),
+                        new MinMetric("a_f"),
+                        new MaxMetric("a_i"),
+                        new MaxMetric("a_f"),
+                        new MeanMetric("a_i"),
+                        new MeanMetric("a_f"),
+                        new CountMetric()};
+
+    FieldComparator[] sorts = {new FieldComparator("sum(a_i)",
+                                                   ComparatorOrder.ASCENDING)};
+
+    FacetStream facetStream = new FacetStream(zkHost,
+                                              "collection1",
+                                              paramsA,
+                                              buckets,
+                                              metrics,
+                                              sorts,
+                                              100);
+
+    List<Tuple> tuples = getTuples(facetStream);
+
+    assert(tuples.size() == 3);
+
+    //Test Long and Double Sums
+
+    Tuple tuple = tuples.get(0);
+    String bucket = tuple.getString("a_s");
+    Double sumi = tuple.getDouble("sum(a_i)");
+    Double sumf = tuple.getDouble("sum(a_f)");
+    Double mini = tuple.getDouble("min(a_i)");
+    Double minf = tuple.getDouble("min(a_f)");
+    Double maxi = tuple.getDouble("max(a_i)");
+    Double maxf = tuple.getDouble("max(a_f)");
+    Double avgi = tuple.getDouble("avg(a_i)");
+    Double avgf = tuple.getDouble("avg(a_f)");
+    Double count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket.equals("hello4"));
+    assertTrue(sumi.longValue() == 15);
+    assertTrue(sumf.doubleValue() == 11.0D);
+    assertTrue(mini.doubleValue() == 4.0D);
+    assertTrue(minf.doubleValue() == 4.0D);
+    assertTrue(maxi.doubleValue() == 11.0D);
+    assertTrue(maxf.doubleValue() == 7.0D);
+    assertTrue(avgi.doubleValue() == 7.5D);
+    assertTrue(avgf.doubleValue() == 5.5D);
+    assertTrue(count.doubleValue() == 2);
+
+    tuple = tuples.get(1);
+    bucket = tuple.getString("a_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    sumf = tuple.getDouble("sum(a_f)");
+    mini = tuple.getDouble("min(a_i)");
+    minf = tuple.getDouble("min(a_f)");
+    maxi = tuple.getDouble("max(a_i)");
+    maxf = tuple.getDouble("max(a_f)");
+    avgi = tuple.getDouble("avg(a_i)");
+    avgf = tuple.getDouble("avg(a_f)");
+    count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket.equals("hello0"));
+    assertTrue(sumi.doubleValue() == 17.0D);
+    assertTrue(sumf.doubleValue() == 18.0D);
+    assertTrue(mini.doubleValue() == 0.0D);
+    assertTrue(minf.doubleValue() == 1.0D);
+    assertTrue(maxi.doubleValue() == 14.0D);
+    assertTrue(maxf.doubleValue() == 10.0D);
+    assertTrue(avgi.doubleValue() == 4.25D);
+    assertTrue(avgf.doubleValue() == 4.5D);
+    assertTrue(count.doubleValue() == 4);
+
+    tuple = tuples.get(2);
+    bucket = tuple.getString("a_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    sumf = tuple.getDouble("sum(a_f)");
+    mini = tuple.getDouble("min(a_i)");
+    minf = tuple.getDouble("min(a_f)");
+    maxi = tuple.getDouble("max(a_i)");
+    maxf = tuple.getDouble("max(a_f)");
+    avgi = tuple.getDouble("avg(a_i)");
+    avgf = tuple.getDouble("avg(a_f)");
+    count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket.equals("hello3"));
+    assertTrue(sumi.doubleValue() == 38.0D);
+    assertTrue(sumf.doubleValue() == 26.0D);
+    assertTrue(mini.doubleValue() == 3.0D);
+    assertTrue(minf.doubleValue() == 3.0D);
+    assertTrue(maxi.doubleValue() == 13.0D);
+    assertTrue(maxf.doubleValue() == 9.0D);
+    assertTrue(avgi.doubleValue() == 9.5D);
+    assertTrue(avgf.doubleValue() == 6.5D);
+    assertTrue(count.doubleValue() == 4);
+
+
+    //Reverse the Sort.
+
+    sorts[0] = new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING);
+
+    facetStream = new FacetStream(zkHost,
+                                  "collection1",
+                                  paramsA,
+                                  buckets,
+                                  metrics,
+                                  sorts,
+                                  100);
+
+    tuples = getTuples(facetStream);
+
+    assert(tuples.size() == 3);
+
+    //Test Long and Double Sums
+
+    tuple = tuples.get(0);
+    bucket = tuple.getString("a_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    sumf = tuple.getDouble("sum(a_f)");
+    mini = tuple.getDouble("min(a_i)");
+    minf = tuple.getDouble("min(a_f)");
+    maxi = tuple.getDouble("max(a_i)");
+    maxf = tuple.getDouble("max(a_f)");
+    avgi = tuple.getDouble("avg(a_i)");
+    avgf = tuple.getDouble("avg(a_f)");
+    count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket.equals("hello3"));
+    assertTrue(sumi.doubleValue() == 38.0D);
+    assertTrue(sumf.doubleValue() == 26.0D);
+    assertTrue(mini.doubleValue() == 3.0D);
+    assertTrue(minf.doubleValue() == 3.0D);
+    assertTrue(maxi.doubleValue() == 13.0D);
+    assertTrue(maxf.doubleValue() == 9.0D);
+    assertTrue(avgi.doubleValue() == 9.5D);
+    assertTrue(avgf.doubleValue() == 6.5D);
+    assertTrue(count.doubleValue() == 4);
+
+    tuple = tuples.get(1);
+    bucket = tuple.getString("a_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    sumf = tuple.getDouble("sum(a_f)");
+    mini = tuple.getDouble("min(a_i)");
+    minf = tuple.getDouble("min(a_f)");
+    maxi = tuple.getDouble("max(a_i)");
+    maxf = tuple.getDouble("max(a_f)");
+    avgi = tuple.getDouble("avg(a_i)");
+    avgf = tuple.getDouble("avg(a_f)");
+    count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket.equals("hello0"));
+    assertTrue(sumi.doubleValue() == 17.0D);
+    assertTrue(sumf.doubleValue() == 18.0D);
+    assertTrue(mini.doubleValue() == 0.0D);
+    assertTrue(minf.doubleValue() == 1.0D);
+    assertTrue(maxi.doubleValue() == 14.0D);
+    assertTrue(maxf.doubleValue() == 10.0D);
+    assertTrue(avgi.doubleValue() == 4.25D);
+    assertTrue(avgf.doubleValue() == 4.5D);
+    assertTrue(count.doubleValue() == 4);
+
+    tuple = tuples.get(2);
+    bucket = tuple.getString("a_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    sumf = tuple.getDouble("sum(a_f)");
+    mini = tuple.getDouble("min(a_i)");
+    minf = tuple.getDouble("min(a_f)");
+    maxi = tuple.getDouble("max(a_i)");
+    maxf = tuple.getDouble("max(a_f)");
+    avgi = tuple.getDouble("avg(a_i)");
+    avgf = tuple.getDouble("avg(a_f)");
+    count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket.equals("hello4"));
+    assertTrue(sumi.longValue() == 15);
+    assertTrue(sumf.doubleValue() == 11.0D);
+    assertTrue(mini.doubleValue() == 4.0D);
+    assertTrue(minf.doubleValue() == 4.0D);
+    assertTrue(maxi.doubleValue() == 11.0D);
+    assertTrue(maxf.doubleValue() == 7.0D);
+    assertTrue(avgi.doubleValue() == 7.5D);
+    assertTrue(avgf.doubleValue() == 5.5D);
+    assertTrue(count.doubleValue() == 2);
+
+
+    //Test index sort
+
+    sorts[0] = new FieldComparator("a_s", ComparatorOrder.DESCENDING);
+
+
+    facetStream = new FacetStream(zkHost,
+                                  "collection1",
+                                  paramsA,
+                                  buckets,
+                                  metrics,
+                                  sorts,
+                                  100);
+
+    tuples = getTuples(facetStream);
+
+    assert(tuples.size() == 3);
+
+
+    tuple = tuples.get(0);
+    bucket = tuple.getString("a_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    sumf = tuple.getDouble("sum(a_f)");
+    mini = tuple.getDouble("min(a_i)");
+    minf = tuple.getDouble("min(a_f)");
+    maxi = tuple.getDouble("max(a_i)");
+    maxf = tuple.getDouble("max(a_f)");
+    avgi = tuple.getDouble("avg(a_i)");
+    avgf = tuple.getDouble("avg(a_f)");
+    count = tuple.getDouble("count(*)");
+
+
+    assertTrue(bucket.equals("hello4"));
+    assertTrue(sumi.longValue() == 15);
+    assertTrue(sumf.doubleValue() == 11.0D);
+    assertTrue(mini.doubleValue() == 4.0D);
+    assertTrue(minf.doubleValue() == 4.0D);
+    assertTrue(maxi.doubleValue() == 11.0D);
+    assertTrue(maxf.doubleValue() == 7.0D);
+    assertTrue(avgi.doubleValue() == 7.5D);
+    assertTrue(avgf.doubleValue() == 5.5D);
+    assertTrue(count.doubleValue() == 2);
+
+
+    tuple = tuples.get(1);
+    bucket = tuple.getString("a_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    sumf = tuple.getDouble("sum(a_f)");
+    mini = tuple.getDouble("min(a_i)");
+    minf = tuple.getDouble("min(a_f)");
+    maxi = tuple.getDouble("max(a_i)");
+    maxf = tuple.getDouble("max(a_f)");
+    avgi = tuple.getDouble("avg(a_i)");
+    avgf = tuple.getDouble("avg(a_f)");
+    count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket.equals("hello3"));
+    assertTrue(sumi.doubleValue() == 38.0D);
+    assertTrue(sumf.doubleValue() == 26.0D);
+    assertTrue(mini.doubleValue() == 3.0D);
+    assertTrue(minf.doubleValue() == 3.0D);
+    assertTrue(maxi.doubleValue() == 13.0D);
+    assertTrue(maxf.doubleValue() == 9.0D);
+    assertTrue(avgi.doubleValue() == 9.5D);
+    assertTrue(avgf.doubleValue() == 6.5D);
+    assertTrue(count.doubleValue() == 4);
+
+
+    tuple = tuples.get(2);
+    bucket = tuple.getString("a_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    sumf = tuple.getDouble("sum(a_f)");
+    mini = tuple.getDouble("min(a_i)");
+    minf = tuple.getDouble("min(a_f)");
+    maxi = tuple.getDouble("max(a_i)");
+    maxf = tuple.getDouble("max(a_f)");
+    avgi = tuple.getDouble("avg(a_i)");
+    avgf = tuple.getDouble("avg(a_f)");
+    count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket.equals("hello0"));
+    assertTrue(sumi.doubleValue() == 17.0D);
+    assertTrue(sumf.doubleValue() == 18.0D);
+    assertTrue(mini.doubleValue() == 0.0D);
+    assertTrue(minf.doubleValue() == 1.0D);
+    assertTrue(maxi.doubleValue() == 14.0D);
+    assertTrue(maxf.doubleValue() == 10.0D);
+    assertTrue(avgi.doubleValue() == 4.25D);
+    assertTrue(avgf.doubleValue() == 4.5D);
+    assertTrue(count.doubleValue() == 4);
+
+    //Test index sort
+
+    sorts[0] = new FieldComparator("a_s", ComparatorOrder.ASCENDING);
+
+    facetStream = new FacetStream(zkHost,
+                                  "collection1",
+                                  paramsA,
+                                  buckets,
+                                  metrics,
+                                  sorts,
+                                  100);
+
+    tuples = getTuples(facetStream);
+
+    assert(tuples.size() == 3);
+
+
+    tuple = tuples.get(0);
+    bucket = tuple.getString("a_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    sumf = tuple.getDouble("sum(a_f)");
+    mini = tuple.getDouble("min(a_i)");
+    minf = tuple.getDouble("min(a_f)");
+    maxi = tuple.getDouble("max(a_i)");
+    maxf = tuple.getDouble("max(a_f)");
+    avgi = tuple.getDouble("avg(a_i)");
+    avgf = tuple.getDouble("avg(a_f)");
+    count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket.equals("hello0"));
+    assertTrue(sumi.doubleValue() == 17.0D);
+    assertTrue(sumf.doubleValue() == 18.0D);
+    assertTrue(mini.doubleValue() == 0.0D);
+    assertTrue(minf.doubleValue() == 1.0D);
+    assertTrue(maxi.doubleValue() == 14.0D);
+    assertTrue(maxf.doubleValue() == 10.0D);
+    assertTrue(avgi.doubleValue() == 4.25D);
+    assertTrue(avgf.doubleValue() == 4.5D);
+    assertTrue(count.doubleValue() == 4);
+
+
+    tuple = tuples.get(1);
+    bucket = tuple.getString("a_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    sumf = tuple.getDouble("sum(a_f)");
+    mini = tuple.getDouble("min(a_i)");
+    minf = tuple.getDouble("min(a_f)");
+    maxi = tuple.getDouble("max(a_i)");
+    maxf = tuple.getDouble("max(a_f)");
+    avgi = tuple.getDouble("avg(a_i)");
+    avgf = tuple.getDouble("avg(a_f)");
+    count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket.equals("hello3"));
+    assertTrue(sumi.doubleValue() == 38.0D);
+    assertTrue(sumf.doubleValue() == 26.0D);
+    assertTrue(mini.doubleValue() == 3.0D);
+    assertTrue(minf.doubleValue() == 3.0D);
+    assertTrue(maxi.doubleValue() == 13.0D);
+    assertTrue(maxf.doubleValue() == 9.0D);
+    assertTrue(avgi.doubleValue() == 9.5D);
+    assertTrue(avgf.doubleValue() == 6.5D);
+    assertTrue(count.doubleValue() == 4);
+
+
+    tuple = tuples.get(2);
+    bucket = tuple.getString("a_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    sumf = tuple.getDouble("sum(a_f)");
+    mini = tuple.getDouble("min(a_i)");
+    minf = tuple.getDouble("min(a_f)");
+    maxi = tuple.getDouble("max(a_i)");
+    maxf = tuple.getDouble("max(a_f)");
+    avgi = tuple.getDouble("avg(a_i)");
+    avgf = tuple.getDouble("avg(a_f)");
+    count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket.equals("hello4"));
+    assertTrue(sumi.longValue() == 15);
+    assertTrue(sumf.doubleValue() == 11.0D);
+    assertTrue(mini.doubleValue() == 4.0D);
+    assertTrue(minf.doubleValue() == 4.0D);
+    assertTrue(maxi.doubleValue() == 11.0D);
+    assertTrue(maxf.doubleValue() == 7.0D);
+    assertTrue(avgi.doubleValue() == 7.5D);
+    assertTrue(avgf.doubleValue() == 5.5D);
+    assertTrue(count.doubleValue() == 2);
+
+    del("*:*");
+    commit();
+  }
+
+
+  private void testSubFacetStream() throws Exception {
+
+    indexr(id, "0", "level1_s", "hello0", "level2_s", "a", "a_i", "0", "a_f", "1");
+    indexr(id, "2", "level1_s", "hello0", "level2_s", "a", "a_i", "2", "a_f", "2");
+    indexr(id, "3", "level1_s", "hello3", "level2_s", "a", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "level1_s", "hello4", "level2_s", "a", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "level1_s", "hello0", "level2_s", "b", "a_i", "1", "a_f", "5");
+    indexr(id, "5", "level1_s", "hello3", "level2_s", "b", "a_i", "10", "a_f", "6");
+    indexr(id, "6", "level1_s", "hello4", "level2_s", "b", "a_i", "11", "a_f", "7");
+    indexr(id, "7", "level1_s", "hello3", "level2_s", "b", "a_i", "12", "a_f", "8");
+    indexr(id, "8", "level1_s", "hello3", "level2_s", "b", "a_i", "13", "a_f", "9");
+    indexr(id, "9", "level1_s", "hello0", "level2_s", "b", "a_i", "14", "a_f", "10");
+
+    commit();
+
+    String zkHost = zkServer.getZkAddress();
+
+    Map paramsA = mapParams("q","*:*","fl","a_i,a_f");
+
+    Bucket[] buckets =  {new Bucket("level1_s"), new Bucket("level2_s")};
+
+    Metric[] metrics = {new SumMetric("a_i"),
+                        new CountMetric()};
+
+    FieldComparator[] sorts = {new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING), new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING)};
+
+
+    FacetStream facetStream = new FacetStream(
+        zkHost,
+        "collection1",
+        paramsA,
+        buckets,
+        metrics,
+        sorts,
+        100);
+
+    List<Tuple> tuples = getTuples(facetStream);
+    assert(tuples.size() == 6);
+
+    Tuple tuple = tuples.get(0);
+    String bucket1 = tuple.getString("level1_s");
+    String bucket2 = tuple.getString("level2_s");
+    Double sumi = tuple.getDouble("sum(a_i)");
+    Double count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket1.equals("hello3"));
+    assertTrue(bucket2.equals("b"));
+    assertTrue(sumi.longValue() == 35);
+    assertTrue(count.doubleValue() == 3);
+
+    tuple = tuples.get(1);
+    bucket1 = tuple.getString("level1_s");
+    bucket2 = tuple.getString("level2_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket1.equals("hello0"));
+    assertTrue(bucket2.equals("b"));
+    assertTrue(sumi.longValue() == 15);
+    assertTrue(count.doubleValue() == 2);
+
+    tuple = tuples.get(2);
+    bucket1 = tuple.getString("level1_s");
+    bucket2 = tuple.getString("level2_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket1.equals("hello4"));
+    assertTrue(bucket2.equals("b"));
+    assertTrue(sumi.longValue() == 11);
+    assertTrue(count.doubleValue() == 1);
+
+    tuple = tuples.get(3);
+    bucket1 = tuple.getString("level1_s");
+    bucket2 = tuple.getString("level2_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket1.equals("hello4"));
+    assertTrue(bucket2.equals("a"));
+    assertTrue(sumi.longValue() == 4);
+    assertTrue(count.doubleValue() == 1);
+
+    tuple = tuples.get(4);
+    bucket1 = tuple.getString("level1_s");
+    bucket2 = tuple.getString("level2_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket1.equals("hello3"));
+    assertTrue(bucket2.equals("a"));
+    assertTrue(sumi.longValue() == 3);
+    assertTrue(count.doubleValue() == 1);
+
+    tuple = tuples.get(5);
+    bucket1 = tuple.getString("level1_s");
+    bucket2 = tuple.getString("level2_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket1.equals("hello0"));
+    assertTrue(bucket2.equals("a"));
+    assertTrue(sumi.longValue() == 2);
+    assertTrue(count.doubleValue() == 2);
+
+    sorts[0] =  new FieldComparator("level1_s", ComparatorOrder.DESCENDING );
+    sorts[1] =  new FieldComparator("level2_s", ComparatorOrder.DESCENDING );
+    facetStream = new FacetStream(
+        zkHost,
+        "collection1",
+        paramsA,
+        buckets,
+        metrics,
+        sorts,
+        100);
+
+    tuples = getTuples(facetStream);
+    assert(tuples.size() == 6);
+
+    tuple = tuples.get(0);
+    bucket1 = tuple.getString("level1_s");
+    bucket2 = tuple.getString("level2_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket1.equals("hello4"));
+    assertTrue(bucket2.equals("b"));
+    assertTrue(sumi.longValue() == 11);
+    assertTrue(count.doubleValue() == 1);
+
+    tuple = tuples.get(1);
+    bucket1 = tuple.getString("level1_s");
+    bucket2 = tuple.getString("level2_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket1.equals("hello4"));
+    assertTrue(bucket2.equals("a"));
+    assertTrue(sumi.longValue() == 4);
+    assertTrue(count.doubleValue() == 1);
+
+    tuple = tuples.get(2);
+    bucket1 = tuple.getString("level1_s");
+    bucket2 = tuple.getString("level2_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket1.equals("hello3"));
+    assertTrue(bucket2.equals("b"));
+    assertTrue(sumi.longValue() == 35);
+    assertTrue(count.doubleValue() == 3);
+
+    tuple = tuples.get(3);
+    bucket1 = tuple.getString("level1_s");
+    bucket2 = tuple.getString("level2_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket1.equals("hello3"));
+    assertTrue(bucket2.equals("a"));
+    assertTrue(sumi.longValue() == 3);
+    assertTrue(count.doubleValue() == 1);
+
+    tuple = tuples.get(4);
+    bucket1 = tuple.getString("level1_s");
+    bucket2 = tuple.getString("level2_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket1.equals("hello0"));
+    assertTrue(bucket2.equals("b"));
+    assertTrue(sumi.longValue() == 15);
+    assertTrue(count.doubleValue() == 2);
+
+    tuple = tuples.get(5);
+    bucket1 = tuple.getString("level1_s");
+    bucket2 = tuple.getString("level2_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket1.equals("hello0"));
+    assertTrue(bucket2.equals("a"));
+    assertTrue(sumi.longValue() == 2);
+    assertTrue(count.doubleValue() == 2);
+
+    del("*:*");
+    commit();
+  }
+
   private void testRollupStream() throws Exception {
 
     indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
@@ -1121,6 +1699,8 @@ public class StreamingTest extends Abstr
     testReducerStream();
     testRollupStream();
     testZeroReducerStream();
+    testFacetStream();
+    testSubFacetStream();
     //testExceptionStream();
     testParallelEOF();
     testParallelUniqueStream();