You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2015/09/10 01:37:35 UTC
svn commit: r1702132 - in /lucene/dev/trunk/solr: ./
core/src/java/org/apache/solr/handler/ core/src/test/org/apache/solr/handler/
solrj/src/java/org/apache/solr/client/solrj/io/stream/
solrj/src/test/org/apache/solr/client/solrj/io/stream/
Author: jbernste
Date: Wed Sep 9 23:37:35 2015
New Revision: 1702132
URL: http://svn.apache.org/r1702132
Log:
SOLR-7903: Add the FacetStream to the Streaming API and wire it into the SQLHandler
Added:
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java (with props)
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1702132&r1=1702131&r2=1702132&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Wed Sep 9 23:37:35 2015
@@ -65,6 +65,8 @@ New Features
* SOLR-7707: Add StreamExpression Support to RollupStream (Dennis Gove, Joel Bernstein)
+* SOLR-7903: Add the FacetStream to the Streaming API and wire it into the SQLHandler (Joel Bernstein)
+
Optimizations
----------------------
* SOLR-7876: Speed up queries and operations that use many terms when timeAllowed has not been
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SQLHandler.java?rev=1702132&r1=1702131&r2=1702132&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SQLHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SQLHandler.java Wed Sep 9 23:37:35 2015
@@ -35,6 +35,7 @@ import org.apache.solr.client.solrj.io.c
import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
+import org.apache.solr.client.solrj.io.stream.FacetStream;
import org.apache.solr.client.solrj.io.stream.ParallelStream;
import org.apache.solr.client.solrj.io.stream.RankStream;
import org.apache.solr.client.solrj.io.stream.RollupStream;
@@ -87,6 +88,7 @@ public class SQLHandler extends RequestH
int numWorkers = params.getInt("numWorkers", 1);
String workerCollection = params.get("workerCollection", defaultWorkerCollection);
String workerZkhost = params.get("workerZkhost",defaultZkhost);
+ String mode = params.get("aggregationMode", "map_reduce");
StreamContext context = new StreamContext();
try {
@@ -94,7 +96,7 @@ public class SQLHandler extends RequestH
throw new Exception("sql parameter cannot be null");
}
- TupleStream tupleStream = SQLTupleStreamParser.parse(sql, numWorkers, workerCollection, workerZkhost);
+ TupleStream tupleStream = SQLTupleStreamParser.parse(sql, numWorkers, workerCollection, workerZkhost, AggregationMode.getMode(mode));
context.numWorkers = numWorkers;
context.setSolrClientCache(StreamHandler.clientCache);
tupleStream.setStreamContext(context);
@@ -126,7 +128,8 @@ public class SQLHandler extends RequestH
public static TupleStream parse(String sql,
int numWorkers,
String workerCollection,
- String workerZkhost) throws IOException {
+ String workerZkhost,
+ AggregationMode aggregationMode) throws IOException {
SqlParser parser = new SqlParser();
Statement statement = parser.createStatement(sql);
@@ -137,7 +140,11 @@ public class SQLHandler extends RequestH
TupleStream sqlStream = null;
if(sqlVistor.groupByQuery) {
- sqlStream = doGroupByWithAggregates(sqlVistor, numWorkers, workerCollection, workerZkhost);
+ if(aggregationMode == AggregationMode.FACET) {
+ sqlStream = doGroupByWithAggregatesFacets(sqlVistor);
+ } else {
+ sqlStream = doGroupByWithAggregates(sqlVistor, numWorkers, workerCollection, workerZkhost);
+ }
} else {
sqlStream = doSelect(sqlVistor);
}
@@ -232,6 +239,56 @@ public class SQLHandler extends RequestH
return tupleStream;
}
+ private static TupleStream doGroupByWithAggregatesFacets(SQLVisitor sqlVisitor) throws IOException {
+
+ Set<String> fieldSet = new HashSet();
+ Bucket[] buckets = getBuckets(sqlVisitor.groupBy, fieldSet);
+ Metric[] metrics = getMetrics(sqlVisitor.fields, fieldSet);
+ if(metrics.length == 0) {
+ throw new IOException("Group by queries must include atleast one aggregate function.");
+ }
+
+ TableSpec tableSpec = new TableSpec(sqlVisitor.table, defaultZkhost);
+
+ String zkHost = tableSpec.zkHost;
+ String collection = tableSpec.collection;
+ Map<String, String> params = new HashMap();
+
+ params.put(CommonParams.Q, sqlVisitor.query);
+
+ int limit = sqlVisitor.limit > 0 ? sqlVisitor.limit : 100;
+
+ FieldComparator[] sorts = null;
+
+ if(sqlVisitor.sorts == null) {
+ sorts = new FieldComparator[buckets.length];
+ for(int i=0; i<sorts.length; i++) {
+ sorts[i] = new FieldComparator("index", ComparatorOrder.ASCENDING);
+ }
+ } else {
+ sorts = getComps(sqlVisitor.sorts);
+ }
+
+ TupleStream tupleStream = new FacetStream(zkHost,
+ collection,
+ params,
+ buckets,
+ metrics,
+ sorts,
+ limit);
+
+ if(sqlVisitor.havingExpression != null) {
+ tupleStream = new HavingStream(tupleStream, sqlVisitor.havingExpression);
+ }
+
+ if(sqlVisitor.limit > 0)
+ {
+ tupleStream = new LimitStream(tupleStream, sqlVisitor.limit);
+ }
+
+ return tupleStream;
+ }
+
private static TupleStream doSelect(SQLVisitor sqlVisitor) throws IOException {
List<String> fields = sqlVisitor.fields;
StringBuilder flbuf = new StringBuilder();
@@ -408,6 +465,20 @@ public class SQLHandler extends RequestH
}
}
+ private static FieldComparator[] getComps(List<SortItem> sortItems) {
+ FieldComparator[] comps = new FieldComparator[sortItems.size()];
+ for(int i=0; i<sortItems.size(); i++) {
+ SortItem sortItem = sortItems.get(i);
+ String ordering = sortItem.getOrdering().toString();
+ ComparatorOrder comparatorOrder = ascDescComp(ordering);
+ String sortKey = sortItem.getSortKey().toString();
+ comps[i] = new FieldComparator(stripQuotes(sortKey), comparatorOrder);
+ }
+
+ return comps;
+ }
+
+
private static String fields(Set<String> fieldSet) {
StringBuilder buf = new StringBuilder();
boolean comma = false;
@@ -778,6 +849,22 @@ public class SQLHandler extends RequestH
}
}
+ public static enum AggregationMode {
+
+ MAP_REDUCE,
+ FACET;
+
+ public static AggregationMode getMode(String mode) throws IOException{
+ if(mode.equalsIgnoreCase("facet")) {
+ return FACET;
+ } else if(mode.equalsIgnoreCase("map_reduce")) {
+ return MAP_REDUCE;
+ } else {
+ throw new IOException("Invalid aggregation mode:"+mode);
+ }
+ }
+ }
+
public static class HavingStream extends TupleStream {
private TupleStream stream;
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java?rev=1702132&r1=1702131&r2=1702132&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java Wed Sep 9 23:37:35 2015
@@ -88,8 +88,10 @@ public class TestSQLHandler extends Abst
testPredicate();
testBasicSelect();
testBasicGrouping();
+ testBasicGroupingFacets();
testSQLException();
testTimeSeriesGrouping();
+ testTimeSeriesGroupingFacet();
testParallelBasicGrouping();
testParallelTimeSeriesGrouping();
}
@@ -519,6 +521,148 @@ public class TestSQLHandler extends Abst
}
}
+ private void testBasicGroupingFacets() throws Exception {
+ try {
+
+ CloudJettyRunner jetty = this.cloudJettys.get(0);
+
+ del("*:*");
+
+ commit();
+
+ indexr("id", "1", "text", "XXXX XXXX", "str_s", "a", "field_i", "7");
+ indexr("id", "2", "text", "XXXX XXXX", "str_s", "b", "field_i", "8");
+ indexr("id", "3", "text", "XXXX XXXX", "str_s", "a", "field_i", "20");
+ indexr("id", "4", "text", "XXXX XXXX", "str_s", "b", "field_i", "11");
+ indexr("id", "5", "text", "XXXX XXXX", "str_s", "c", "field_i", "30");
+ indexr("id", "6", "text", "XXXX XXXX", "str_s", "c", "field_i", "40");
+ indexr("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50");
+ indexr("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60");
+ commit();
+ Map params = new HashMap();
+ params.put(CommonParams.QT, "/sql");
+ params.put("aggregationMode", "facet");
+ params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s order by sum(field_i) asc limit 2");
+
+ SolrStream solrStream = new SolrStream(jetty.url, params);
+ List<Tuple> tuples = getTuples(solrStream);
+
+ //Only two results because of the limit.
+ assert(tuples.size() == 2);
+
+ Tuple tuple = null;
+
+ tuple = tuples.get(0);
+ assert(tuple.get("str_s").equals("b"));
+ assert(tuple.getDouble("count(*)") == 2);
+ assert(tuple.getDouble("sum(field_i)") == 19);
+ assert(tuple.getDouble("min(field_i)") == 8);
+ assert(tuple.getDouble("max(field_i)") == 11);
+ assert(tuple.getDouble("avg(field_i)") == 9.5D);
+
+ tuple = tuples.get(1);
+ assert(tuple.get("str_s").equals("a"));
+ assert(tuple.getDouble("count(*)") == 2);
+ assert(tuple.getDouble("sum(field_i)") == 27);
+ assert(tuple.getDouble("min(field_i)") == 7);
+ assert(tuple.getDouble("max(field_i)") == 20);
+ assert(tuple.getDouble("avg(field_i)") == 13.5D);
+
+ params = new HashMap();
+ params.put(CommonParams.QT, "/sql");
+ params.put("aggregationMode", "facet");
+ params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where (text='XXXX' AND NOT text='XXXX XXX') group by str_s order by str_s desc");
+
+ solrStream = new SolrStream(jetty.url, params);
+ tuples = getTuples(solrStream);
+
+ //The sort by and order by match and no limit is applied. All the Tuples should be returned in
+ //this scenario.
+
+ assert(tuples.size() == 3);
+
+ tuple = tuples.get(0);
+ assert(tuple.get("str_s").equals("c"));
+ assert(tuple.getDouble("count(*)") == 4);
+ assert(tuple.getDouble("sum(field_i)") == 180);
+ assert(tuple.getDouble("min(field_i)") == 30);
+ assert(tuple.getDouble("max(field_i)") == 60);
+ assert(tuple.getDouble("avg(field_i)") == 45);
+
+ tuple = tuples.get(1);
+ assert(tuple.get("str_s").equals("b"));
+ assert(tuple.getDouble("count(*)") == 2);
+ assert(tuple.getDouble("sum(field_i)") == 19);
+ assert(tuple.getDouble("min(field_i)") == 8);
+ assert(tuple.getDouble("max(field_i)") == 11);
+ assert(tuple.getDouble("avg(field_i)") == 9.5D);
+
+ tuple = tuples.get(2);
+ assert(tuple.get("str_s").equals("a"));
+ assert(tuple.getDouble("count(*)") == 2);
+ assert(tuple.getDouble("sum(field_i)") == 27);
+ assert(tuple.getDouble("min(field_i)") == 7);
+ assert(tuple.getDouble("max(field_i)") == 20);
+ assert(tuple.getDouble("avg(field_i)") == 13.5D);
+
+
+ params = new HashMap();
+ params.put(CommonParams.QT, "/sql");
+ params.put("aggregationMode", "facet");
+ params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having sum(field_i) = 19");
+
+ solrStream = new SolrStream(jetty.url, params);
+ tuples = getTuples(solrStream);
+
+ assert(tuples.size() == 1);
+
+ tuple = tuples.get(0);
+ assert(tuple.get("str_s").equals("b"));
+ assert(tuple.getDouble("count(*)") == 2);
+ assert(tuple.getDouble("sum(field_i)") == 19);
+ assert(tuple.getDouble("min(field_i)") == 8);
+ assert(tuple.getDouble("max(field_i)") == 11);
+ assert(tuple.getDouble("avg(field_i)") == 9.5D);
+
+ params = new HashMap();
+ params.put(CommonParams.QT, "/sql");
+ params.put("aggregationMode", "facet");
+ params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 8))");
+
+ solrStream = new SolrStream(jetty.url, params);
+ tuples = getTuples(solrStream);
+
+ //Only two results because of the limit.
+ assert(tuples.size() == 1);
+
+ tuple = tuples.get(0);
+ assert(tuple.get("str_s").equals("b"));
+ assert(tuple.getDouble("count(*)") == 2);
+ assert(tuple.getDouble("sum(field_i)") == 19);
+ assert(tuple.getDouble("min(field_i)") == 8);
+ assert(tuple.getDouble("max(field_i)") == 11);
+ assert(tuple.getDouble("avg(field_i)") == 9.5D);
+
+ params = new HashMap();
+ params.put(CommonParams.QT, "/sql");
+ params.put("aggregationMode", "facet");
+ params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 100))");
+
+ solrStream = new SolrStream(jetty.url, params);
+ tuples = getTuples(solrStream);
+
+ assert(tuples.size() == 0);
+
+
+ } finally {
+ delete();
+ }
+ }
+
+
+
+
+
private void testParallelBasicGrouping() throws Exception {
try {
@@ -664,6 +808,8 @@ public class TestSQLHandler extends Abst
}
}
+
+
private void testTimeSeriesGrouping() throws Exception {
try {
@@ -781,6 +927,126 @@ public class TestSQLHandler extends Abst
}
}
+
+ private void testTimeSeriesGroupingFacet() throws Exception {
+ try {
+
+ CloudJettyRunner jetty = this.cloudJettys.get(0);
+
+ del("*:*");
+
+ commit();
+
+ indexr("id", "1", "year_i", "2015", "month_i", "11", "day_i", "7", "item_i", "5");
+ indexr("id", "2", "year_i", "2015", "month_i", "11", "day_i", "7", "item_i", "10");
+ indexr("id", "3", "year_i", "2015", "month_i", "11", "day_i", "8", "item_i", "30");
+ indexr("id", "4", "year_i", "2015", "month_i", "11", "day_i", "8", "item_i", "12");
+ indexr("id", "5", "year_i", "2015", "month_i", "10", "day_i", "1", "item_i", "4");
+ indexr("id", "6", "year_i", "2015", "month_i", "10", "day_i", "3", "item_i", "5");
+ indexr("id", "7", "year_i", "2014", "month_i", "4", "day_i", "4", "item_i", "6");
+ indexr("id", "8", "year_i", "2014", "month_i", "4", "day_i", "2", "item_i", "1");
+
+ commit();
+ Map params = new HashMap();
+ params.put(CommonParams.QT, "/sql");
+ params.put("aggregationMode", "facet");
+ params.put("sql", "select year_i, sum(item_i) from collection1 group by year_i order by year_i desc");
+
+ SolrStream solrStream = new SolrStream(jetty.url, params);
+ List<Tuple> tuples = getTuples(solrStream);
+
+ //Only two results because of the limit.
+ assert(tuples.size() == 2);
+
+ Tuple tuple = null;
+
+ tuple = tuples.get(0);
+ assert(tuple.getLong("year_i") == 2015);
+ assert(tuple.getDouble("sum(item_i)") == 66);
+
+ tuple = tuples.get(1);
+ assert(tuple.getLong("year_i") == 2014);
+ assert(tuple.getDouble("sum(item_i)") == 7);
+
+
+ params = new HashMap();
+ params.put(CommonParams.QT, "/sql");
+ params.put("aggregationMode", "facet");
+ params.put("sql", "select year_i, month_i, sum(item_i) from collection1 group by year_i, month_i order by year_i desc, month_i desc");
+
+ solrStream = new SolrStream(jetty.url, params);
+ tuples = getTuples(solrStream);
+
+ //Only two results because of the limit.
+ assert(tuples.size() == 3);
+
+ tuple = tuples.get(0);
+ assert(tuple.getLong("year_i") == 2015);
+ assert(tuple.getLong("month_i") == 11);
+ assert(tuple.getDouble("sum(item_i)") == 57);
+
+ tuple = tuples.get(1);
+ assert(tuple.getLong("year_i") == 2015);
+ assert(tuple.getLong("month_i") == 10);
+ assert(tuple.getDouble("sum(item_i)") == 9);
+
+ tuple = tuples.get(2);
+ assert(tuple.getLong("year_i") == 2014);
+ assert(tuple.getLong("month_i") == 4);
+ assert(tuple.getDouble("sum(item_i)") == 7);
+
+
+ params = new HashMap();
+ params.put(CommonParams.QT, "/sql");
+ params.put("aggregationMode", "facet");
+ params.put("sql", "select year_i, month_i, day_i, sum(item_i) from collection1 group by year_i, month_i, day_i order by year_i desc, month_i desc, day_i desc");
+
+ solrStream = new SolrStream(jetty.url, params);
+ tuples = getTuples(solrStream);
+
+ //Only two results because of the limit.
+ assert(tuples.size() == 6);
+
+ tuple = tuples.get(0);
+ assert(tuple.getLong("year_i") == 2015);
+ assert(tuple.getLong("month_i") == 11);
+ assert(tuple.getLong("day_i") == 8);
+ assert(tuple.getDouble("sum(item_i)") == 42);
+
+ tuple = tuples.get(1);
+ assert(tuple.getLong("year_i") == 2015);
+ assert(tuple.getLong("month_i") == 11);
+ assert(tuple.getLong("day_i") == 7);
+ assert(tuple.getDouble("sum(item_i)") == 15);
+
+ tuple = tuples.get(2);
+ assert(tuple.getLong("year_i") == 2015);
+ assert(tuple.getLong("month_i") == 10);
+ assert(tuple.getLong("day_i") == 3);
+ assert(tuple.getDouble("sum(item_i)") == 5);
+
+ tuple = tuples.get(3);
+ assert(tuple.getLong("year_i") == 2015);
+ assert(tuple.getLong("month_i") == 10);
+ assert(tuple.getLong("day_i") == 1);
+ assert(tuple.getDouble("sum(item_i)") == 4);
+
+ tuple = tuples.get(4);
+ assert(tuple.getLong("year_i") == 2014);
+ assert(tuple.getLong("month_i") == 4);
+ assert(tuple.getLong("day_i") == 4);
+ assert(tuple.getDouble("sum(item_i)") == 6);
+
+ tuple = tuples.get(5);
+ assert(tuple.getLong("year_i") == 2014);
+ assert(tuple.getLong("month_i") == 4);
+ assert(tuple.getLong("day_i") == 2);
+ assert(tuple.getDouble("sum(item_i)") == 1);
+ } finally {
+ delete();
+ }
+ }
+
private void testParallelTimeSeriesGrouping() throws Exception {
try {
@@ -919,7 +1185,6 @@ public class TestSQLHandler extends Abst
return tuples;
}
-
protected Tuple getTuple(TupleStream tupleStream) throws IOException {
tupleStream.open();
Tuple t = tupleStream.read();
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java?rev=1702132&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java Wed Sep 9 23:37:35 2015
@@ -0,0 +1,286 @@
+package org.apache.solr.client.solrj.io.stream;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Collections;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.io.SolrClientCache;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.metrics.Bucket;
+import org.apache.solr.client.solrj.io.stream.metrics.Metric;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+
+/**
+ * The FacetStream abstracts the output from the JSON facet API as a Stream of Tuples. This provides an alternative to the
+ * RollupStream which uses Map/Reduce to perform aggregations.
+ **/
+
+public class FacetStream extends TupleStream {
+
+ private static final long serialVersionUID = 1;
+
+ private Bucket[] buckets;
+ private Metric[] metrics;
+ private int limit;
+ private FieldComparator[] sorts;
+ private List<Tuple> tuples = new ArrayList();
+ private int index;
+ private String zkHost;
+ private Map<String, String> props;
+ private String collection;
+ protected transient SolrClientCache cache;
+ protected transient CloudSolrClient cloudSolrClient;
+
+ public FacetStream(String zkHost,
+ String collection,
+ Map<String, String> props,
+ Bucket[] buckets,
+ Metric[] metrics,
+ FieldComparator[] sorts,
+ int limit) {
+ this.zkHost = zkHost;
+ this.props = props;
+ this.buckets = buckets;
+ this.metrics = metrics;
+ this.limit = limit;
+ this.collection = collection;
+ this.sorts = sorts;
+ }
+
+ public void setStreamContext(StreamContext context) {
+ cache = context.getSolrClientCache();
+ }
+
+ public List<TupleStream> children() {
+ List<TupleStream> l = new ArrayList();
+ return l;
+ }
+
+ public void open() throws IOException {
+ if(cache != null) {
+ cloudSolrClient = cache.getCloudSolrClient(zkHost);
+ } else {
+ cloudSolrClient = new CloudSolrClient(zkHost);
+ }
+
+ FieldComparator[] adjustedSorts = adjustSorts(buckets, sorts);
+ String json = getJsonFacetString(buckets, metrics, adjustedSorts, limit);
+
+ ModifiableSolrParams params = getParams(this.props);
+ params.add("json.facet", json);
+ params.add("rows", "0");
+
+ QueryRequest request = new QueryRequest(params);
+ try {
+ NamedList response = cloudSolrClient.request(request, collection);
+ getTuples(response, buckets, metrics);
+ Collections.sort(tuples, getStreamSort());
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ public void close() throws IOException {
+ if(cache == null) {
+ cloudSolrClient.close();
+ }
+ }
+
+ public Tuple read() throws IOException {
+ if(index < tuples.size() && index < limit) {
+ Tuple tuple = tuples.get(index);
+ ++index;
+ return tuple;
+ } else {
+ Map fields = new HashMap();
+ fields.put("EOF", true);
+ Tuple tuple = new Tuple(fields);
+ return tuple;
+ }
+ }
+
+ private ModifiableSolrParams getParams(Map<String, String> props) {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ for(String key : props.keySet()) {
+ String value = props.get(key);
+ params.add(key, value);
+ }
+ return params;
+ }
+
+ private String getJsonFacetString(Bucket[] _buckets, Metric[] _metrics, FieldComparator[] _sorts, int _limit) {
+ StringBuilder buf = new StringBuilder();
+ appendJson(buf, _buckets, _metrics, _sorts, _limit, 0);
+ return "{"+buf.toString()+"}";
+ }
+
+ private FieldComparator[] adjustSorts(Bucket[] _buckets, FieldComparator[] _sorts) throws IOException {
+ if(_buckets.length == _sorts.length) {
+ return _sorts;
+ } else if(_sorts.length == 1) {
+ FieldComparator[] adjustedSorts = new FieldComparator[_buckets.length];
+ if (_sorts[0].getFieldName().contains("(")) {
+ //Its a metric sort so apply the same sort criteria at each level.
+ for (int i = 0; i < adjustedSorts.length; i++) {
+ adjustedSorts[i] = _sorts[0];
+ }
+ } else {
+ //Its an index sort so apply an index sort at each level.
+ for (int i = 0; i < adjustedSorts.length; i++) {
+ adjustedSorts[i] = new FieldComparator(_buckets[i].toString(), _sorts[0].getOrder());
+ }
+ }
+ return adjustedSorts;
+ } else {
+ throw new IOException("If multiple sorts are specified there must be a sort for each bucket.");
+ }
+ }
+
+ private void appendJson(StringBuilder buf,
+ Bucket[] _buckets,
+ Metric[] _metrics,
+ FieldComparator[] _sorts,
+ int _limit,
+ int level) {
+ buf.append('"');
+ buf.append(_buckets[level].toString());
+ buf.append('"');
+ buf.append(":{");
+ buf.append("\"type\":\"terms\"");
+ buf.append(",\"field\":\""+_buckets[level].toString()+"\"");
+ buf.append(",\"limit\":"+_limit);
+ buf.append(",\"sort\":{\""+getFacetSort(_sorts[level].getFieldName(), _metrics)+"\":\""+_sorts[level].getOrder()+"\"}");
+
+ buf.append(",\"facet\":{");
+ int metricCount = 0;
+ for(Metric metric : _metrics) {
+ String identifier = metric.getIdentifier();
+ if(!identifier.startsWith("count(")) {
+ if(metricCount>0) {
+ buf.append(",");
+ }
+ buf.append("\"facet_" + metricCount + "\":\"" +identifier+"\"");
+ ++metricCount;
+ }
+ }
+ ++level;
+ if(level < _buckets.length) {
+ if(metricCount>0) {
+ buf.append(",");
+ }
+ appendJson(buf, _buckets, _metrics, _sorts, _limit, level);
+ }
+ buf.append("}}");
+ }
+
+ private String getFacetSort(String id, Metric[] _metrics) {
+ int index = 0;
+ for(Metric metric : _metrics) {
+ if(metric.getIdentifier().startsWith("count(")) {
+ if(id.startsWith("count(")) {
+ return "count";
+ }
+ } else {
+ if (id.equals(_metrics[index].getIdentifier())) {
+ return "facet_" + index;
+ }
+ ++index;
+ }
+ }
+ return "index";
+ }
+
+ private void getTuples(NamedList response,
+ Bucket[] buckets,
+ Metric[] metrics) {
+
+ Tuple tuple = new Tuple(new HashMap());
+ NamedList facets = (NamedList)response.get("facets");
+ fillTuples(0,
+ tuples,
+ tuple,
+ facets,
+ buckets,
+ metrics);
+
+ }
+
+ private void fillTuples(int level,
+ List<Tuple> tuples,
+ Tuple currentTuple,
+ NamedList facets,
+ Bucket[] _buckets,
+ Metric[] _metrics) {
+
+ String bucketName = _buckets[level].toString();
+ NamedList nl = (NamedList)facets.get(bucketName);
+ List allBuckets = (List)nl.get("buckets");
+ for(int b=0; b<allBuckets.size(); b++) {
+ NamedList bucket = (NamedList)allBuckets.get(b);
+ Object val = bucket.get("val");
+ Tuple t = currentTuple.clone();
+ t.put(bucketName, val);
+ int nextLevel = level+1;
+ if(nextLevel<_buckets.length) {
+ fillTuples(nextLevel,
+ tuples,
+ t.clone(),
+ bucket,
+ _buckets,
+ _metrics);
+ } else {
+ int m = 0;
+ for(Metric metric : _metrics) {
+ String identifier = metric.getIdentifier();
+ if(!identifier.startsWith("count(")) {
+ double d = (double)bucket.get("facet_"+m);
+ t.put(identifier, d);
+ ++m;
+ } else {
+ long l = (long)bucket.get("count");
+ t.put("count(*)", l);
+ }
+ }
+ tuples.add(t);
+ }
+ }
+ }
+
+ public int getCost() {
+ return 0;
+ }
+
+ @Override
+ public StreamComparator getStreamSort() {
+ if(sorts.length > 1) {
+ return new MultipleFieldComparator(sorts);
+ } else {
+ return sorts[0];
+ }
+ }
+}
\ No newline at end of file
Modified: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java?rev=1702132&r1=1702131&r2=1702132&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java Wed Sep 9 23:37:35 2015
@@ -575,6 +575,584 @@ public class StreamingTest extends Abstr
assert(t.getException().contains("undefined field:"));
}
+ private void testFacetStream() throws Exception {
+
+ indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+ indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
+ indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+ indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+ indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+ indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
+ indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
+ indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
+ indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
+ indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
+
+ commit();
+
+ String zkHost = zkServer.getZkAddress();
+
+ Map paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
+
+ Bucket[] buckets = {new Bucket("a_s")};
+
+ Metric[] metrics = {new SumMetric("a_i"),
+ new SumMetric("a_f"),
+ new MinMetric("a_i"),
+ new MinMetric("a_f"),
+ new MaxMetric("a_i"),
+ new MaxMetric("a_f"),
+ new MeanMetric("a_i"),
+ new MeanMetric("a_f"),
+ new CountMetric()};
+
+ FieldComparator[] sorts = {new FieldComparator("sum(a_i)",
+ ComparatorOrder.ASCENDING)};
+
+ FacetStream facetStream = new FacetStream(zkHost,
+ "collection1",
+ paramsA,
+ buckets,
+ metrics,
+ sorts,
+ 100);
+
+ List<Tuple> tuples = getTuples(facetStream);
+
+ assert(tuples.size() == 3);
+
+ //Test Long and Double Sums
+
+ Tuple tuple = tuples.get(0);
+ String bucket = tuple.getString("a_s");
+ Double sumi = tuple.getDouble("sum(a_i)");
+ Double sumf = tuple.getDouble("sum(a_f)");
+ Double mini = tuple.getDouble("min(a_i)");
+ Double minf = tuple.getDouble("min(a_f)");
+ Double maxi = tuple.getDouble("max(a_i)");
+ Double maxf = tuple.getDouble("max(a_f)");
+ Double avgi = tuple.getDouble("avg(a_i)");
+ Double avgf = tuple.getDouble("avg(a_f)");
+ Double count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket.equals("hello4"));
+ assertTrue(sumi.longValue() == 15);
+ assertTrue(sumf.doubleValue() == 11.0D);
+ assertTrue(mini.doubleValue() == 4.0D);
+ assertTrue(minf.doubleValue() == 4.0D);
+ assertTrue(maxi.doubleValue() == 11.0D);
+ assertTrue(maxf.doubleValue() == 7.0D);
+ assertTrue(avgi.doubleValue() == 7.5D);
+ assertTrue(avgf.doubleValue() == 5.5D);
+ assertTrue(count.doubleValue() == 2);
+
+ tuple = tuples.get(1);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket.equals("hello0"));
+ assertTrue(sumi.doubleValue() == 17.0D);
+ assertTrue(sumf.doubleValue() == 18.0D);
+ assertTrue(mini.doubleValue() == 0.0D);
+ assertTrue(minf.doubleValue() == 1.0D);
+ assertTrue(maxi.doubleValue() == 14.0D);
+ assertTrue(maxf.doubleValue() == 10.0D);
+ assertTrue(avgi.doubleValue() == 4.25D);
+ assertTrue(avgf.doubleValue() == 4.5D);
+ assertTrue(count.doubleValue() == 4);
+
+ tuple = tuples.get(2);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket.equals("hello3"));
+ assertTrue(sumi.doubleValue() == 38.0D);
+ assertTrue(sumf.doubleValue() == 26.0D);
+ assertTrue(mini.doubleValue() == 3.0D);
+ assertTrue(minf.doubleValue() == 3.0D);
+ assertTrue(maxi.doubleValue() == 13.0D);
+ assertTrue(maxf.doubleValue() == 9.0D);
+ assertTrue(avgi.doubleValue() == 9.5D);
+ assertTrue(avgf.doubleValue() == 6.5D);
+ assertTrue(count.doubleValue() == 4);
+
+
+ //Reverse the Sort.
+
+ sorts[0] = new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING);
+
+ facetStream = new FacetStream(zkHost,
+ "collection1",
+ paramsA,
+ buckets,
+ metrics,
+ sorts,
+ 100);
+
+ tuples = getTuples(facetStream);
+
+ assert(tuples.size() == 3);
+
+ //Test Long and Double Sums
+
+ tuple = tuples.get(0);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket.equals("hello3"));
+ assertTrue(sumi.doubleValue() == 38.0D);
+ assertTrue(sumf.doubleValue() == 26.0D);
+ assertTrue(mini.doubleValue() == 3.0D);
+ assertTrue(minf.doubleValue() == 3.0D);
+ assertTrue(maxi.doubleValue() == 13.0D);
+ assertTrue(maxf.doubleValue() == 9.0D);
+ assertTrue(avgi.doubleValue() == 9.5D);
+ assertTrue(avgf.doubleValue() == 6.5D);
+ assertTrue(count.doubleValue() == 4);
+
+ tuple = tuples.get(1);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket.equals("hello0"));
+ assertTrue(sumi.doubleValue() == 17.0D);
+ assertTrue(sumf.doubleValue() == 18.0D);
+ assertTrue(mini.doubleValue() == 0.0D);
+ assertTrue(minf.doubleValue() == 1.0D);
+ assertTrue(maxi.doubleValue() == 14.0D);
+ assertTrue(maxf.doubleValue() == 10.0D);
+ assertTrue(avgi.doubleValue() == 4.25D);
+ assertTrue(avgf.doubleValue() == 4.5D);
+ assertTrue(count.doubleValue() == 4);
+
+ tuple = tuples.get(2);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket.equals("hello4"));
+ assertTrue(sumi.longValue() == 15);
+ assertTrue(sumf.doubleValue() == 11.0D);
+ assertTrue(mini.doubleValue() == 4.0D);
+ assertTrue(minf.doubleValue() == 4.0D);
+ assertTrue(maxi.doubleValue() == 11.0D);
+ assertTrue(maxf.doubleValue() == 7.0D);
+ assertTrue(avgi.doubleValue() == 7.5D);
+ assertTrue(avgf.doubleValue() == 5.5D);
+ assertTrue(count.doubleValue() == 2);
+
+
+ //Test index sort
+
+ sorts[0] = new FieldComparator("a_s", ComparatorOrder.DESCENDING);
+
+
+ facetStream = new FacetStream(zkHost,
+ "collection1",
+ paramsA,
+ buckets,
+ metrics,
+ sorts,
+ 100);
+
+ tuples = getTuples(facetStream);
+
+ assert(tuples.size() == 3);
+
+
+ tuple = tuples.get(0);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+
+ assertTrue(bucket.equals("hello4"));
+ assertTrue(sumi.longValue() == 15);
+ assertTrue(sumf.doubleValue() == 11.0D);
+ assertTrue(mini.doubleValue() == 4.0D);
+ assertTrue(minf.doubleValue() == 4.0D);
+ assertTrue(maxi.doubleValue() == 11.0D);
+ assertTrue(maxf.doubleValue() == 7.0D);
+ assertTrue(avgi.doubleValue() == 7.5D);
+ assertTrue(avgf.doubleValue() == 5.5D);
+ assertTrue(count.doubleValue() == 2);
+
+
+ tuple = tuples.get(1);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket.equals("hello3"));
+ assertTrue(sumi.doubleValue() == 38.0D);
+ assertTrue(sumf.doubleValue() == 26.0D);
+ assertTrue(mini.doubleValue() == 3.0D);
+ assertTrue(minf.doubleValue() == 3.0D);
+ assertTrue(maxi.doubleValue() == 13.0D);
+ assertTrue(maxf.doubleValue() == 9.0D);
+ assertTrue(avgi.doubleValue() == 9.5D);
+ assertTrue(avgf.doubleValue() == 6.5D);
+ assertTrue(count.doubleValue() == 4);
+
+
+ tuple = tuples.get(2);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket.equals("hello0"));
+ assertTrue(sumi.doubleValue() == 17.0D);
+ assertTrue(sumf.doubleValue() == 18.0D);
+ assertTrue(mini.doubleValue() == 0.0D);
+ assertTrue(minf.doubleValue() == 1.0D);
+ assertTrue(maxi.doubleValue() == 14.0D);
+ assertTrue(maxf.doubleValue() == 10.0D);
+ assertTrue(avgi.doubleValue() == 4.25D);
+ assertTrue(avgf.doubleValue() == 4.5D);
+ assertTrue(count.doubleValue() == 4);
+
+ //Test index sort
+
+ sorts[0] = new FieldComparator("a_s", ComparatorOrder.ASCENDING);
+
+ facetStream = new FacetStream(zkHost,
+ "collection1",
+ paramsA,
+ buckets,
+ metrics,
+ sorts,
+ 100);
+
+ tuples = getTuples(facetStream);
+
+ assert(tuples.size() == 3);
+
+
+ tuple = tuples.get(0);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket.equals("hello0"));
+ assertTrue(sumi.doubleValue() == 17.0D);
+ assertTrue(sumf.doubleValue() == 18.0D);
+ assertTrue(mini.doubleValue() == 0.0D);
+ assertTrue(minf.doubleValue() == 1.0D);
+ assertTrue(maxi.doubleValue() == 14.0D);
+ assertTrue(maxf.doubleValue() == 10.0D);
+ assertTrue(avgi.doubleValue() == 4.25D);
+ assertTrue(avgf.doubleValue() == 4.5D);
+ assertTrue(count.doubleValue() == 4);
+
+
+ tuple = tuples.get(1);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket.equals("hello3"));
+ assertTrue(sumi.doubleValue() == 38.0D);
+ assertTrue(sumf.doubleValue() == 26.0D);
+ assertTrue(mini.doubleValue() == 3.0D);
+ assertTrue(minf.doubleValue() == 3.0D);
+ assertTrue(maxi.doubleValue() == 13.0D);
+ assertTrue(maxf.doubleValue() == 9.0D);
+ assertTrue(avgi.doubleValue() == 9.5D);
+ assertTrue(avgf.doubleValue() == 6.5D);
+ assertTrue(count.doubleValue() == 4);
+
+
+ tuple = tuples.get(2);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket.equals("hello4"));
+ assertTrue(sumi.longValue() == 15);
+ assertTrue(sumf.doubleValue() == 11.0D);
+ assertTrue(mini.doubleValue() == 4.0D);
+ assertTrue(minf.doubleValue() == 4.0D);
+ assertTrue(maxi.doubleValue() == 11.0D);
+ assertTrue(maxf.doubleValue() == 7.0D);
+ assertTrue(avgi.doubleValue() == 7.5D);
+ assertTrue(avgf.doubleValue() == 5.5D);
+ assertTrue(count.doubleValue() == 2);
+
+ del("*:*");
+ commit();
+ }
+
+
+ private void testSubFacetStream() throws Exception {
+
+ indexr(id, "0", "level1_s", "hello0", "level2_s", "a", "a_i", "0", "a_f", "1");
+ indexr(id, "2", "level1_s", "hello0", "level2_s", "a", "a_i", "2", "a_f", "2");
+ indexr(id, "3", "level1_s", "hello3", "level2_s", "a", "a_i", "3", "a_f", "3");
+ indexr(id, "4", "level1_s", "hello4", "level2_s", "a", "a_i", "4", "a_f", "4");
+ indexr(id, "1", "level1_s", "hello0", "level2_s", "b", "a_i", "1", "a_f", "5");
+ indexr(id, "5", "level1_s", "hello3", "level2_s", "b", "a_i", "10", "a_f", "6");
+ indexr(id, "6", "level1_s", "hello4", "level2_s", "b", "a_i", "11", "a_f", "7");
+ indexr(id, "7", "level1_s", "hello3", "level2_s", "b", "a_i", "12", "a_f", "8");
+ indexr(id, "8", "level1_s", "hello3", "level2_s", "b", "a_i", "13", "a_f", "9");
+ indexr(id, "9", "level1_s", "hello0", "level2_s", "b", "a_i", "14", "a_f", "10");
+
+ commit();
+
+ String zkHost = zkServer.getZkAddress();
+
+ Map paramsA = mapParams("q","*:*","fl","a_i,a_f");
+
+ Bucket[] buckets = {new Bucket("level1_s"), new Bucket("level2_s")};
+
+ Metric[] metrics = {new SumMetric("a_i"),
+ new CountMetric()};
+
+ FieldComparator[] sorts = {new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING), new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING)};
+
+
+ FacetStream facetStream = new FacetStream(
+ zkHost,
+ "collection1",
+ paramsA,
+ buckets,
+ metrics,
+ sorts,
+ 100);
+
+ List<Tuple> tuples = getTuples(facetStream);
+ assert(tuples.size() == 6);
+
+ Tuple tuple = tuples.get(0);
+ String bucket1 = tuple.getString("level1_s");
+ String bucket2 = tuple.getString("level2_s");
+ Double sumi = tuple.getDouble("sum(a_i)");
+ Double count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket1.equals("hello3"));
+ assertTrue(bucket2.equals("b"));
+ assertTrue(sumi.longValue() == 35);
+ assertTrue(count.doubleValue() == 3);
+
+ tuple = tuples.get(1);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket1.equals("hello0"));
+ assertTrue(bucket2.equals("b"));
+ assertTrue(sumi.longValue() == 15);
+ assertTrue(count.doubleValue() == 2);
+
+ tuple = tuples.get(2);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket1.equals("hello4"));
+ assertTrue(bucket2.equals("b"));
+ assertTrue(sumi.longValue() == 11);
+ assertTrue(count.doubleValue() == 1);
+
+ tuple = tuples.get(3);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket1.equals("hello4"));
+ assertTrue(bucket2.equals("a"));
+ assertTrue(sumi.longValue() == 4);
+ assertTrue(count.doubleValue() == 1);
+
+ tuple = tuples.get(4);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket1.equals("hello3"));
+ assertTrue(bucket2.equals("a"));
+ assertTrue(sumi.longValue() == 3);
+ assertTrue(count.doubleValue() == 1);
+
+ tuple = tuples.get(5);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket1.equals("hello0"));
+ assertTrue(bucket2.equals("a"));
+ assertTrue(sumi.longValue() == 2);
+ assertTrue(count.doubleValue() == 2);
+
+ sorts[0] = new FieldComparator("level1_s", ComparatorOrder.DESCENDING );
+ sorts[1] = new FieldComparator("level2_s", ComparatorOrder.DESCENDING );
+ facetStream = new FacetStream(
+ zkHost,
+ "collection1",
+ paramsA,
+ buckets,
+ metrics,
+ sorts,
+ 100);
+
+ tuples = getTuples(facetStream);
+ assert(tuples.size() == 6);
+
+ tuple = tuples.get(0);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket1.equals("hello4"));
+ assertTrue(bucket2.equals("b"));
+ assertTrue(sumi.longValue() == 11);
+ assertTrue(count.doubleValue() == 1);
+
+ tuple = tuples.get(1);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket1.equals("hello4"));
+ assertTrue(bucket2.equals("a"));
+ assertTrue(sumi.longValue() == 4);
+ assertTrue(count.doubleValue() == 1);
+
+ tuple = tuples.get(2);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket1.equals("hello3"));
+ assertTrue(bucket2.equals("b"));
+ assertTrue(sumi.longValue() == 35);
+ assertTrue(count.doubleValue() == 3);
+
+ tuple = tuples.get(3);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket1.equals("hello3"));
+ assertTrue(bucket2.equals("a"));
+ assertTrue(sumi.longValue() == 3);
+ assertTrue(count.doubleValue() == 1);
+
+ tuple = tuples.get(4);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket1.equals("hello0"));
+ assertTrue(bucket2.equals("b"));
+ assertTrue(sumi.longValue() == 15);
+ assertTrue(count.doubleValue() == 2);
+
+ tuple = tuples.get(5);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket1.equals("hello0"));
+ assertTrue(bucket2.equals("a"));
+ assertTrue(sumi.longValue() == 2);
+ assertTrue(count.doubleValue() == 2);
+
+ del("*:*");
+ commit();
+ }
+
private void testRollupStream() throws Exception {
indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
@@ -1121,6 +1699,8 @@ public class StreamingTest extends Abstr
testReducerStream();
testRollupStream();
testZeroReducerStream();
+ testFacetStream();
+ testSubFacetStream();
//testExceptionStream();
testParallelEOF();
testParallelUniqueStream();