You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/02/16 13:47:07 UTC

[17/50] lucene-solr:jira/solr-9858: SOLR-8593: Refactoring and adding aggregationMode=facet methods

SOLR-8593: Refactoring and adding aggregationMode=facet methods


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/37fdc37f
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/37fdc37f
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/37fdc37f

Branch: refs/heads/jira/solr-9858
Commit: 37fdc37fc3d88054634482d39b5774893751f91f
Parents: 05a6170
Author: Joel Bernstein <jb...@apache.org>
Authored: Thu Dec 15 16:11:12 2016 -0500
Committer: Joel Bernstein <jb...@apache.org>
Committed: Thu Dec 15 16:12:08 2016 -0500

----------------------------------------------------------------------
 .../apache/solr/handler/sql/SolrAggregate.java  |   3 +
 .../org/apache/solr/handler/sql/SolrFilter.java |  70 ++-
 .../org/apache/solr/handler/sql/SolrMethod.java |  11 +-
 .../org/apache/solr/handler/sql/SolrRel.java    |   5 +
 .../org/apache/solr/handler/sql/SolrTable.java  | 567 ++++++++++++++-----
 .../handler/sql/SolrToEnumerableConverter.java  |   9 +-
 .../org/apache/solr/handler/TestSQLHandler.java |  21 +-
 .../solr/client/solrj/io/ops/AndOperation.java  | 101 ++++
 .../client/solrj/io/ops/BooleanOperation.java   |  24 +
 .../client/solrj/io/ops/EqualsOperation.java    |  70 +++
 .../io/ops/GreaterThanEqualToOperation.java     |  70 +++
 .../solrj/io/ops/GreaterThanOperation.java      |  70 +++
 .../solr/client/solrj/io/ops/LeafOperation.java |  59 ++
 .../solrj/io/ops/LessThanEqualToOperation.java  |  70 +++
 .../client/solrj/io/ops/LessThanOperation.java  |  70 +++
 .../solr/client/solrj/io/ops/NotOperation.java  |  87 +++
 .../solr/client/solrj/io/ops/OrOperation.java   |  71 +++
 .../client/solrj/io/stream/FacetStream.java     |   1 +
 .../client/solrj/io/stream/HavingStream.java    | 190 +++++++
 19 files changed, 1413 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/core/src/java/org/apache/solr/handler/sql/SolrAggregate.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrAggregate.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrAggregate.java
index f913585..2512099 100644
--- a/solr/core/src/java/org/apache/solr/handler/sql/SolrAggregate.java
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrAggregate.java
@@ -21,6 +21,7 @@ import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.sql.SqlAggFunction;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.util.ImmutableBitSet;
@@ -66,7 +67,9 @@ class SolrAggregate extends Aggregate implements SolrRel {
 
     final List<String> inNames = SolrRules.solrFieldNames(getInput().getRowType());
 
+
     for(Pair<AggregateCall, String> namedAggCall : getNamedAggCalls()) {
+
       AggregateCall aggCall = namedAggCall.getKey();
       Pair<String, String> metric = toSolrMetric(implementor, aggCall, inNames);
       implementor.addMetricPair(namedAggCall.getValue(), metric.getKey(), metric.getValue());

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/core/src/java/org/apache/solr/handler/sql/SolrFilter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrFilter.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrFilter.java
index c6eb33c..5f30926 100644
--- a/solr/core/src/java/org/apache/solr/handler/sql/SolrFilter.java
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrFilter.java
@@ -24,6 +24,7 @@ import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.util.Pair;
 
 import java.util.ArrayList;
@@ -56,38 +57,68 @@ class SolrFilter extends Filter implements SolrRel {
     Translator translator = new Translator(SolrRules.solrFieldNames(getRowType()));
     String query = translator.translateMatch(condition);
     implementor.addQuery(query);
+    implementor.setNegativeQuery(translator.negativeQuery);
   }
 
   /** Translates {@link RexNode} expressions into Solr query strings. */
   private static class Translator {
+
     private final List<String> fieldNames;
+    public boolean negativeQuery = true;
 
     Translator(List<String> fieldNames) {
       this.fieldNames = fieldNames;
     }
 
     private String translateMatch(RexNode condition) {
-      return translateOr(condition);
+      if(condition.getKind().belongsTo(SqlKind.COMPARISON)) {
+        return translateComparison(condition);
+      } else if(condition.isA(SqlKind.AND)) {
+        return "("+translateAnd(condition)+")";
+      } else if(condition.isA(SqlKind.OR)) {
+        return "(" + translateOr(condition) + ")";
+      } else {
+        return null;
+      }
     }
 
     private String translateOr(RexNode condition) {
       List<String> ors = new ArrayList<>();
       for (RexNode node : RelOptUtil.disjunctions(condition)) {
-        ors.add(translateAnd(node));
+        ors.add(translateMatch(node));
       }
       return String.join(" OR ", ors);
     }
 
+
+
     private String translateAnd(RexNode node0) {
-      List<String> ands = new ArrayList<>();
-      for (RexNode node : RelOptUtil.conjunctions(node0)) {
-        ands.add(translateMatch2(node));
+      List<String> andStrings = new ArrayList();
+      List<String> notStrings = new ArrayList();
+
+      List<RexNode> ands = new ArrayList();
+      List<RexNode> nots = new ArrayList();
+      RelOptUtil.decomposeConjunction(node0, ands, nots);
+
+
+      for(RexNode node: ands) {
+        andStrings.add(translateMatch(node));
       }
 
-      return String.join(" AND ", ands);
+      String andString = String.join(" AND ", andStrings);
+
+      if(nots.size() > 0) {
+        for(RexNode node: nots) {
+          notStrings.add(translateMatch(node));
+        }
+        String notString = String.join(" NOT ", notStrings);
+        return "("+ andString +") NOT ("+notString+")";
+      } else {
+        return andString;
+      }
     }
 
-    private String translateMatch2(RexNode node) {
+    private String translateComparison(RexNode node) {
       Pair<String, RexLiteral> binaryTranslated = null;
       if (((RexCall) node).getOperands().size() == 2) {
         binaryTranslated = translateBinary((RexCall) node);
@@ -95,19 +126,30 @@ class SolrFilter extends Filter implements SolrRel {
 
       switch (node.getKind()) {
         case NOT:
-          return "-"+translateMatch2(((RexCall) node).getOperands().get(0));
+          return "-"+translateComparison(((RexCall) node).getOperands().get(0));
         case EQUALS:
-          return binaryTranslated.getKey() + ":" + binaryTranslated.getValue().getValue2();
+          String terms = binaryTranslated.getValue().getValue2().toString().trim();
+          if(!terms.startsWith("(")){
+            terms = "\""+terms+"\"";
+          }
+
+          String clause = binaryTranslated.getKey() + ":" + terms;
+          this.negativeQuery = false;
+          return clause;
         case NOT_EQUALS:
-          return "-" + binaryTranslated.getKey() + ":" + binaryTranslated.getValue().getValue2();
+          return "-(" + binaryTranslated.getKey() + ":" + binaryTranslated.getValue().getValue2()+")";
         case LESS_THAN:
-          return binaryTranslated.getKey() + ": [ * TO " + binaryTranslated.getValue().getValue2() + " }";
+          this.negativeQuery = false;
+          return "("+binaryTranslated.getKey() + ": [ * TO " + binaryTranslated.getValue().getValue2() + " })";
         case LESS_THAN_OR_EQUAL:
-          return binaryTranslated.getKey() + ": [ * TO " + binaryTranslated.getValue().getValue2() + " ]";
+          this.negativeQuery = false;
+          return "("+binaryTranslated.getKey() + ": [ * TO " + binaryTranslated.getValue().getValue2() + " ])";
         case GREATER_THAN:
-          return binaryTranslated.getKey() + ": { " + binaryTranslated.getValue().getValue2() + " TO * ]";
+          this.negativeQuery = false;
+          return "("+binaryTranslated.getKey() + ": { " + binaryTranslated.getValue().getValue2() + " TO * ])";
         case GREATER_THAN_OR_EQUAL:
-          return binaryTranslated.getKey() + ": [ " + binaryTranslated.getValue().getValue2() + " TO * ]";
+          this.negativeQuery = false;
+          return "("+binaryTranslated.getKey() + ": [ " + binaryTranslated.getValue().getValue2() + " TO * ])";
         default:
           throw new AssertionError("cannot translate " + node);
       }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/core/src/java/org/apache/solr/handler/sql/SolrMethod.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrMethod.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrMethod.java
index 31c4548..4ec3fdb 100644
--- a/solr/core/src/java/org/apache/solr/handler/sql/SolrMethod.java
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrMethod.java
@@ -25,8 +25,15 @@ import java.util.List;
  * Builtin methods in the Solr adapter.
  */
 enum SolrMethod {
-  SOLR_QUERYABLE_QUERY(SolrTable.SolrQueryable.class, "query", List.class, String.class, List.class, List.class,
-      List.class, String.class);
+  SOLR_QUERYABLE_QUERY(SolrTable.SolrQueryable.class,
+                       "query",
+                       List.class,
+                       String.class,
+                       List.class,
+                       List.class,
+                       List.class,
+                       String.class,
+                       String.class);
 
   public final Method method;
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/core/src/java/org/apache/solr/handler/sql/SolrRel.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrRel.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrRel.java
index ea22951..b7843d7 100644
--- a/solr/core/src/java/org/apache/solr/handler/sql/SolrRel.java
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrRel.java
@@ -36,6 +36,7 @@ interface SolrRel extends RelNode {
   class Implementor {
     final Map<String, String> fieldMappings = new HashMap<>();
     String query = null;
+    boolean negativeQuery;
     String limitValue = null;
     final List<Pair<String, String>> orders = new ArrayList<>();
     final List<String> buckets = new ArrayList<>();
@@ -54,6 +55,10 @@ interface SolrRel extends RelNode {
       this.query = query;
     }
 
+    void setNegativeQuery(boolean negativeQuery) {
+      this.negativeQuery = negativeQuery;
+    }
+
     void addOrder(String column, String direction) {
       column = this.fieldMappings.getOrDefault(column, column);
       this.orders.add(new Pair<>(column, direction));

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java
index e5fd88f..14e69e6 100644
--- a/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java
@@ -72,7 +72,7 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
   
   private Enumerable<Object> query(final Properties properties) {
     return query(properties, Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(),
-        Collections.emptyList(), null);
+        Collections.emptyList(), null, null);
   }
 
   /** Executes a Solr query on the underlying table.
@@ -82,150 +82,58 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
    * @param query A string for the query
    * @return Enumerator of results
    */
-  private Enumerable<Object> query(final Properties properties, final List<Map.Entry<String, Class>> fields,
-                                   final String query, final List<Pair<String, String>> orders, final List<String> buckets,
-                                   final List<Pair<String, String>> metricPairs, final String limit) {
+  private Enumerable<Object> query(final Properties properties,
+                                   final List<Map.Entry<String, Class>> fields,
+                                   final String query,
+                                   final List<Pair<String, String>> orders,
+                                   final List<String> buckets,
+                                   final List<Pair<String, String>> metricPairs,
+                                   final String limit,
+                                   final String negativeQuery) {
     // SolrParams should be a ModifiableParams instead of a map
-    ModifiableSolrParams solrParams = new ModifiableSolrParams();
-    solrParams.add(CommonParams.OMIT_HEADER, "true");
+    boolean mapReduce = "map_reduce".equals(properties.getProperty("aggregationMode"));
+    boolean negative = Boolean.parseBoolean(negativeQuery);
+
+    String q = null;
 
     if (query == null) {
-      solrParams.add(CommonParams.Q, DEFAULT_QUERY);
+      q = DEFAULT_QUERY;
     } else {
-      solrParams.add(CommonParams.Q, DEFAULT_QUERY + " AND " + query);
-    }
-
-    // List<String> doesn't have add so must make a new ArrayList
-    List<String> fieldsList = new ArrayList<>(fields.size());
-    fieldsList.addAll(fields.stream().map(Map.Entry::getKey).collect(Collectors.toList()));
-    LinkedHashMap<String,String> ordersMap = new LinkedHashMap<>();
-    for (Pair<String,String> order : orders) {
-      ordersMap.put(order.getKey(), order.getValue());
-    }
-    List<Metric> metrics = buildMetrics(metricPairs);
-    List<Bucket> bucketsList = buckets.stream().map(Bucket::new).collect(Collectors.toList());
-
-    for(int i = buckets.size()-1; i >= 0; i--) {
-      if (!ordersMap.containsKey(buckets.get(i))) {
-        ordersMap.put(buckets.get(i), "asc");
-      }
-    }
-
-    boolean isReOrder = false;
-
-    for(Metric metric : metrics) {
-      String metricIdentifier = metric.getIdentifier();
-
-      ordersMap.remove(metricIdentifier);
-
-      if(fieldsList.contains(metricIdentifier)) {
-        fieldsList.remove(metricIdentifier);
-        isReOrder = true;
-      }
-
-      for(String column : metric.getColumns()) {
-        if (!fieldsList.contains(column)) {
-          fieldsList.add(column);
-        }
-
-        if (!ordersMap.containsKey(column)) {
-          ordersMap.put(column, "asc");
-        }
-      }
-    }
-
-    if (ordersMap.size() < 4) {
-      ordersMap.put(DEFAULT_VERSION_FIELD, "desc");
-
-      // Make sure the default sort field is in the field list
-      if (!fieldsList.contains(DEFAULT_VERSION_FIELD)) {
-        fieldsList.add(DEFAULT_VERSION_FIELD);
-      }
-    }
-
-    if(!ordersMap.isEmpty()) {
-      List<String> orderList = new ArrayList<>(ordersMap.size());
-      for(Map.Entry<String, String> order : ordersMap.entrySet()) {
-        String column = order.getKey();
-        if(!fieldsList.contains(column)) {
-          fieldsList.add(column);
-        }
-        orderList.add(column + " " + order.getValue());
+      if(negative) {
+        q = DEFAULT_QUERY + " AND " + query;
+      } else {
+        q = query;
       }
-      solrParams.add(CommonParams.SORT, String.join(",", orderList));
-    }
-
-    if (fieldsList.isEmpty()) {
-      solrParams.add(CommonParams.FL, "*");
-    } else {
-      solrParams.add(CommonParams.FL, String.join(",", fieldsList));
     }
 
     TupleStream tupleStream;
     String zk = properties.getProperty("zk");
     try {
-      if (metrics.isEmpty() && bucketsList.isEmpty()) {
-        solrParams.add(CommonParams.QT, "/export");
-        if (limit != null) {
-          solrParams.add(CommonParams.ROWS, limit);
-          tupleStream = new LimitStream(new CloudSolrStream(zk, collection, solrParams), Integer.parseInt(limit));
-        } else {
-          tupleStream = new CloudSolrStream(zk, collection, solrParams);
-        }
+      if (metricPairs.isEmpty() && buckets.isEmpty()) {
+        tupleStream = handleSelect(zk, collection, q, fields, orders, limit);
       } else {
-        Metric[] metricsArray = metrics.toArray(new Metric[metrics.size()]);
-        if(bucketsList.isEmpty()) {
-          solrParams.remove(CommonParams.FL);
-          solrParams.remove(CommonParams.SORT);
-          tupleStream = new StatsStream(zk, collection, solrParams, metricsArray);
+        if(buckets.isEmpty()) {
+          tupleStream = handleStats(zk, collection, q, metricPairs);
         } else {
-          solrParams.add(CommonParams.QT, "/export");
-
-          int numWorkers = Integer.parseInt(properties.getProperty("numWorkers", "1"));
-          if (numWorkers > 1) solrParams.add("partitionKeys",String.join(",", buckets));
-
-          tupleStream = new CloudSolrStream(zk, collection, solrParams);
-          tupleStream = new RollupStream(tupleStream, bucketsList.toArray(new Bucket[bucketsList.size()]), metricsArray);
-
-          if(numWorkers > 1) {
-            String workerZkHost = properties.getProperty("workerZkhost");
-            String workerCollection = properties.getProperty("workerCollection");
-            // Do the rollups in parallel
-            // Maintain the sort of the Tuples coming from the workers.
-            StreamComparator comp = bucketSortComp(bucketsList, ordersMap);
-
-            ParallelStream parallelStream = new ParallelStream(workerZkHost, workerCollection, tupleStream, numWorkers, comp);
-
-            StreamFactory factory = new StreamFactory()
-                .withFunctionName("search", CloudSolrStream.class)
-                .withFunctionName("parallel", ParallelStream.class)
-                .withFunctionName("rollup", RollupStream.class)
-                .withFunctionName("sum", SumMetric.class)
-                .withFunctionName("min", MinMetric.class)
-                .withFunctionName("max", MaxMetric.class)
-                .withFunctionName("avg", MeanMetric.class)
-                .withFunctionName("count", CountMetric.class);
-
-            parallelStream.setStreamFactory(factory);
-            tupleStream = parallelStream;
-            isReOrder = true;
-          }
-
-          if (isReOrder) {
-            int limitVal = limit == null ? 100 : Integer.parseInt(limit);
-            StreamComparator comp = getComp(orders);
-            if (orders.isEmpty() && !ordersMap.isEmpty()) {
-              // default order
-              comp = getComp(new ArrayList<>(ordersMap.entrySet()));
-            }
-            tupleStream = new RankStream(tupleStream, limitVal, comp);
+          if(mapReduce) {
+            tupleStream = handleGroupByMapReduce(zk,
+                                                 collection,
+                                                 properties,
+                                                 fields,
+                                                 q,
+                                                 orders,
+                                                 buckets,
+                                                 metricPairs,
+                                                 limit);
           } else {
-            // Sort is the same as the same as the underlying stream
-            // Only need to limit the result, not Rank the result
-            if (limit != null) {
-              solrParams.add(CommonParams.ROWS, limit);
-              tupleStream = new LimitStream(new CloudSolrStream(zk, collection, solrParams), Integer.parseInt(limit));
-            }
+            tupleStream = handleGroupByFacet(zk,
+                                             collection,
+                                             fields,
+                                             q,
+                                             orders,
+                                             buckets,
+                                             metricPairs,
+                                             limit);
           }
         }
       }
@@ -258,6 +166,21 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
     }
   }
 
+  private static StreamComparator bucketSortComp(Bucket[] buckets, String dir) {
+    FieldComparator[] comps = new FieldComparator[buckets.length];
+    for(int i=0; i<buckets.length; i++) {
+      ComparatorOrder comparatorOrder = ascDescComp(dir);
+      String sortKey = buckets[i].toString();
+      comps[i] = new FieldComparator(sortKey, comparatorOrder);
+    }
+
+    if(comps.length == 1) {
+      return comps[0];
+    } else {
+      return new MultipleFieldComparator(comps);
+    }
+  }
+
   private String getSortDirection(Map.Entry<String, String> order) {
     String direction = order.getValue();
     return direction == null ? "asc" : direction;
@@ -283,6 +206,9 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
   private List<Metric> buildMetrics(List<Pair<String, String>> metricPairs) {
     List<Metric> metrics = new ArrayList<>(metricPairs.size());
     metrics.addAll(metricPairs.stream().map(this::getMetric).collect(Collectors.toList()));
+    if(metrics.size() == 0) {
+      metrics.add(new CountMetric());
+    }
     return metrics;
   }
 
@@ -304,6 +230,358 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
     }
   }
 
+  private TupleStream handleSelect(String zk,
+                                   String collection,
+                                   String query,
+                                   List<Map.Entry<String, Class>> fields,
+                                   List<Pair<String, String>> orders,
+                                   String limit) throws IOException {
+
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.add(CommonParams.Q, query);
+
+    if(orders.size() > 0) {
+      params.add(CommonParams.SORT, getSort(orders));
+    } else {
+      params.add(CommonParams.SORT, "_version_ desc");
+    }
+
+    if(fields.size() > 0) {
+      params.add(CommonParams.FL, getFields(fields));
+    }
+
+    if (limit != null) {
+      params.add(CommonParams.ROWS, limit);
+      return new LimitStream(new CloudSolrStream(zk, collection, params), Integer.parseInt(limit));
+    } else {
+      params.add(CommonParams.QT, "/export");
+      return new CloudSolrStream(zk, collection, params);
+    }
+  }
+
+  private String getSort(List<Pair<String, String>> orders) {
+    StringBuilder buf = new StringBuilder();
+    for(Pair<String, String> pair : orders) {
+      if(buf.length() > 0) {
+        buf.append(",");
+      }
+      buf.append(pair.getKey()).append(" ").append(pair.getValue());
+    }
+
+    return buf.toString();
+  }
+
+  private String getFields(List<Map.Entry<String, Class>> fields) {
+    StringBuilder buf = new StringBuilder();
+    boolean appendVersion = true;
+    for(Map.Entry<String, Class> field : fields) {
+
+      if(buf.length() > 0) {
+        buf.append(",");
+      }
+
+      if(field.getKey().equals("_version_")) {
+        appendVersion = false;
+      }
+
+      buf.append(field.getKey());
+    }
+
+    if(appendVersion){
+      buf.append(",_version_");
+    }
+
+    return buf.toString();
+  }
+
+  private String getFields(Set<String> fieldSet) {
+    StringBuilder buf = new StringBuilder();
+    boolean appendVersion = true;
+    for(String field : fieldSet) {
+
+      if(buf.length() > 0) {
+        buf.append(",");
+      }
+
+      if(field.equals("_version_")) {
+        appendVersion = false;
+      }
+
+      buf.append(field);
+    }
+
+    if(appendVersion){
+      buf.append(",_version_");
+    }
+
+    return buf.toString();
+  }
+
+
+  private Set<String> getFieldSet(Metric[] metrics, List<Map.Entry<String, Class>> fields) {
+    HashSet set = new HashSet();
+    for(Metric metric : metrics) {
+      for(String column : metric.getColumns()) {
+        set.add(column);
+      }
+    }
+
+    for(Map.Entry<String, Class> field : fields) {
+      if(field.getKey().indexOf('(') == -1) {
+        set.add(field.getKey());
+      }
+    }
+
+    return set;
+  }
+
+  private static String getSortDirection(List<Pair<String, String>> orders) {
+    if(orders != null && orders.size() > 0) {
+      for(Pair<String,String> item : orders) {
+        return item.getValue();
+      }
+    }
+
+    return "asc";
+  }
+
+  private static String bucketSort(Bucket[] buckets, String dir) {
+    StringBuilder buf = new StringBuilder();
+    boolean comma = false;
+    for(Bucket bucket : buckets) {
+      if(comma) {
+        buf.append(",");
+      }
+      buf.append(bucket.toString()).append(" ").append(dir);
+      comma = true;
+    }
+
+    return buf.toString();
+  }
+
+  private static String getPartitionKeys(Bucket[] buckets) {
+    StringBuilder buf = new StringBuilder();
+    boolean comma = false;
+    for(Bucket bucket : buckets) {
+      if(comma) {
+        buf.append(",");
+      }
+      buf.append(bucket.toString());
+      comma = true;
+    }
+    return buf.toString();
+  }
+
+  private static boolean sortsEqual(Bucket[] buckets, String direction, List<Pair<String, String>> orders) {
+
+    if(buckets.length != orders.size()) {
+      return false;
+    }
+
+    for(int i=0; i< buckets.length; i++) {
+      Bucket bucket = buckets[i];
+      Pair<String, String> order = orders.get(i);
+      if(!bucket.toString().equals(order.getKey())) {
+        return false;
+      }
+
+      if(!order.getValue().toLowerCase(Locale.ROOT).contains(direction.toLowerCase(Locale.ROOT))) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private TupleStream handleGroupByMapReduce(String zk,
+                                             String collection,
+                                             Properties properties,
+                                             final List<Map.Entry<String, Class>> fields,
+                                             final String query,
+                                             final List<Pair<String, String>> orders,
+                                             final List<String> _buckets,
+                                             final List<Pair<String, String>> metricPairs,
+                                             final String limit) throws IOException {
+
+    int numWorkers = Integer.parseInt(properties.getProperty("numWorkers", "1"));
+
+    Bucket[] buckets = buildBuckets(_buckets, fields);
+    Metric[] metrics = buildMetrics(metricPairs).toArray(new Metric[0]);
+
+    Set<String> fieldSet = getFieldSet(metrics, fields);
+
+    if(metrics.length == 0) {
+      throw new IOException("Group by queries must include atleast one aggregate function.");
+    }
+
+    String fl = getFields(fieldSet);
+    String sortDirection = getSortDirection(orders);
+    String sort = bucketSort(buckets, sortDirection);
+
+    ModifiableSolrParams params = new ModifiableSolrParams();
+
+    params.set(CommonParams.FL, fl);
+    params.set(CommonParams.Q, query);
+    //Always use the /export handler for Group By Queries because it requires exporting full result sets.
+    params.set(CommonParams.QT, "/export");
+
+    if(numWorkers > 1) {
+      params.set("partitionKeys", getPartitionKeys(buckets));
+    }
+
+    params.set("sort", sort);
+
+    TupleStream tupleStream = null;
+
+    CloudSolrStream cstream = new CloudSolrStream(zk, collection, params);
+    tupleStream = new RollupStream(cstream, buckets, metrics);
+
+    if(numWorkers > 1) {
+      // Do the rollups in parallel
+      // Maintain the sort of the Tuples coming from the workers.
+      StreamComparator comp = bucketSortComp(buckets, sortDirection);
+      ParallelStream parallelStream = new ParallelStream(zk, collection, tupleStream, numWorkers, comp);
+
+      StreamFactory factory = new StreamFactory()
+          .withFunctionName("search", CloudSolrStream.class)
+          .withFunctionName("parallel", ParallelStream.class)
+          .withFunctionName("rollup", RollupStream.class)
+          .withFunctionName("sum", SumMetric.class)
+          .withFunctionName("min", MinMetric.class)
+          .withFunctionName("max", MaxMetric.class)
+          .withFunctionName("avg", MeanMetric.class)
+          .withFunctionName("count", CountMetric.class);
+
+      parallelStream.setStreamFactory(factory);
+      tupleStream = parallelStream;
+    }
+
+    //TODO: This should be done on the workers, but it won't serialize because it relies on Presto classes.
+    // Once we make this a Expressionable the problem will be solved.
+
+
+    if(orders != null && orders.size() > 0) {
+      int lim = limit == null ? 100 : Integer.parseInt(limit);
+      if(!sortsEqual(buckets, sortDirection, orders)) {
+        StreamComparator comp = getComp(orders);
+        //Rank the Tuples
+        //If parallel stream is used ALL the Rolled up tuples from the workers will be ranked
+        //Providing a true Top or Bottom.
+        tupleStream = new RankStream(tupleStream, lim, comp);
+      } else {
+        // Sort is the same as the same as the underlying stream
+        // Only need to limit the result, not Rank the result
+        if(lim > -1) {
+          tupleStream = new LimitStream(tupleStream, lim);
+        }
+      }
+    }
+
+    return tupleStream;
+  }
+
+  private Bucket[] buildBuckets(List<String> buckets, List<Map.Entry<String, Class>> fields) {
+    Bucket[] bucketsArray = new Bucket[buckets.size()];
+
+    int i=0;
+    for(Map.Entry<String,Class> field : fields) {
+      String fieldName = field.getKey();
+      if(buckets.contains(fieldName)) {
+        bucketsArray[i++] = new Bucket(fieldName);
+      }
+    }
+
+    return bucketsArray;
+  }
+
+
+  private TupleStream handleGroupByFacet(String zkHost,
+                                         String collection,
+                                         final List<Map.Entry<String, Class>> fields,
+                                         final String query,
+                                         final List<Pair<String, String>> orders,
+                                         final List<String> bucketFields,
+                                         final List<Pair<String, String>> metricPairs,
+                                         final String lim) throws IOException {
+
+    ModifiableSolrParams solrParams = new ModifiableSolrParams();
+    solrParams.add(CommonParams.Q, query);
+
+    Bucket[] buckets = buildBuckets(bucketFields, fields);
+    Metric[] metrics = buildMetrics(metricPairs).toArray(new Metric[0]);
+    if(metrics.length == 0) {
+      metrics = new Metric[1];
+      metrics[0] = new CountMetric();
+    }
+
+    int limit = lim != null ? Integer.parseInt(lim) : 100;
+
+    FieldComparator[] sorts = null;
+
+    if(orders == null || orders.size() == 0) {
+      sorts = new FieldComparator[buckets.length];
+      for(int i=0; i<sorts.length; i++) {
+        sorts[i] = new FieldComparator("index", ComparatorOrder.ASCENDING);
+      }
+    } else {
+      sorts = getComps(orders);
+    }
+
+    TupleStream tupleStream = new FacetStream(zkHost,
+                                              collection,
+                                              solrParams,
+                                              buckets,
+                                              metrics,
+                                              sorts,
+                                              limit);
+
+
+    if(lim != null)
+    {
+      tupleStream = new LimitStream(tupleStream, limit);
+    }
+
+    return tupleStream;
+  }
+
+  private TupleStream handleSelectDistinctMapReduce(final Properties properties,
+                                                    final List<Map.Entry<String, Class>> fields,
+                                                    final String query,
+                                                    final List<Pair<String, String>> orders,
+                                                    final List<String> buckets,
+                                                    final List<Pair<String, String>> metricPairs,
+                                                    final String limit) {
+
+
+
+
+
+
+    return null;
+  }
+
+  private TupleStream handleSelectDistinctFacet(final Properties properties,
+                                                final List<Map.Entry<String, Class>> fields,
+                                                final String query,
+                                                final List<Pair<String, String>> orders,
+                                                final List<String> buckets,
+                                                final List<Pair<String, String>> metricPairs,
+                                                final String limit) {
+    return null;
+  }
+
+  private TupleStream handleStats(String zk,
+                                  String collection,
+                                  String query,
+                                  List<Pair<String, String>> metricPairs) {
+
+
+    ModifiableSolrParams solrParams = new ModifiableSolrParams();
+    solrParams.add(CommonParams.Q, query);
+    Metric[] metrics = buildMetrics(metricPairs).toArray(new Metric[0]);
+    return new StatsStream(zk, collection, solrParams, metrics);
+  }
+
   public <T> Queryable<T> asQueryable(QueryProvider queryProvider, SchemaPlus schema, String tableName) {
     return new SolrQueryable<>(queryProvider, schema, this, tableName);
   }
@@ -339,8 +617,29 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
      */
     @SuppressWarnings("UnusedDeclaration")
     public Enumerable<Object> query(List<Map.Entry<String, Class>> fields, String query, List<Pair<String, String>> order,
-                                    List<String> buckets, List<Pair<String, String>> metricPairs, String limit) {
-      return getTable().query(getProperties(), fields, query, order, buckets, metricPairs, limit);
+                                    List<String> buckets, List<Pair<String, String>> metricPairs, String limit, String negativeQuery) {
+      return getTable().query(getProperties(), fields, query, order, buckets, metricPairs, limit, negativeQuery);
+    }
+  }
+
+  private static FieldComparator[] getComps(List<Pair<String, String>> orders) {
+    FieldComparator[] comps = new FieldComparator[orders.size()];
+    for(int i=0; i<orders.size(); i++) {
+      Pair<String,String> sortItem = orders.get(i);
+      String ordering = sortItem.getValue();
+      ComparatorOrder comparatorOrder = ascDescComp(ordering);
+      String sortKey = sortItem.getKey();
+      comps[i] = new FieldComparator(sortKey, comparatorOrder);
+    }
+
+    return comps;
+  }
+
+  private static ComparatorOrder ascDescComp(String s) {
+    if(s.toLowerCase(Locale.ROOT).contains("desc")) {
+      return ComparatorOrder.DESCENDING;
+    } else {
+      return ComparatorOrder.ASCENDING;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/core/src/java/org/apache/solr/handler/sql/SolrToEnumerableConverter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrToEnumerableConverter.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrToEnumerableConverter.java
index 6737977..f69f3d5 100644
--- a/solr/core/src/java/org/apache/solr/handler/sql/SolrToEnumerableConverter.java
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrToEnumerableConverter.java
@@ -67,11 +67,13 @@ class SolrToEnumerableConverter extends ConverterImpl implements EnumerableRel {
             constantArrayList(
                 Pair.zip(generateFields(SolrRules.solrFieldNames(rowType), solrImplementor.fieldMappings),
                     new AbstractList<Class>() {
-                      @Override public Class get(int index) {
+                      @Override
+                      public Class get(int index) {
                         return physType.fieldClass(index);
                       }
 
-                      @Override public int size() {
+                      @Override
+                      public int size() {
                         return rowType.getFieldCount();
                       }
                     }),
@@ -81,8 +83,9 @@ class SolrToEnumerableConverter extends ConverterImpl implements EnumerableRel {
     final Expression buckets = list.append("buckets", constantArrayList(solrImplementor.buckets, String.class));
     final Expression metricPairs = list.append("metricPairs", constantArrayList(solrImplementor.metricPairs, Pair.class));
     final Expression limit = list.append("limit", Expressions.constant(solrImplementor.limitValue));
+    final Expression negativeQuery = list.append("negativeQuery", Expressions.constant(Boolean.toString(solrImplementor.negativeQuery), String.class));
     Expression enumerable = list.append("enumerable", Expressions.call(table, SolrMethod.SOLR_QUERYABLE_QUERY.method,
-        fields, query, orders, buckets, metricPairs, limit));
+        fields, query, orders, buckets, metricPairs, limit, negativeQuery));
     Hook.QUERY_PLAN.run(query);
     list.add(Expressions.return_(null, enumerable));
     return implementor.result(physType, list.toBlock());

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java b/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
index 5b92c30..605abf5 100644
--- a/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
+++ b/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
@@ -104,9 +104,12 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       indexDoc(sdoc("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50"));
       indexDoc(sdoc("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60"));
       commit();
-      
+
+
+      System.out.println("############# testBasicSelect() ############");
+
       SolrParams sParams = mapParams(CommonParams.QT, "/sql", 
-          "stmt", "select id, field_i, str_s from collection1 where text='XXXX' order by field_i desc");
+          "stmt", "select id, field_i, str_s from collection1 where (text='(XXXX)' OR text='XXXX') AND text='XXXX' order by field_i desc");
 
       SolrStream solrStream = new SolrStream(jetty.url, sParams);
       List<Tuple> tuples = getTuples(solrStream);
@@ -696,7 +699,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
 
       sParams = mapParams(CommonParams.QT, "/sql",
         "stmt", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), "
-          + "cast(avg(1.0 * field_i) as float) from collection1 where (text='XXXX' AND NOT (text='XXXY')) "
+          + "cast(avg(1.0 * field_i) as float) from collection1 where (text='XXXX' AND NOT ((text='XXXY') AND (text='XXXY' OR text='XXXY'))) "
           + "group by str_s order by str_s desc");
 
       solrStream = new SolrStream(jetty.url, sParams);
@@ -856,9 +859,12 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       SolrParams sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "facet",
           "stmt", "select distinct str_s, field_i from collection1 order by str_s asc, field_i asc");
 
+      System.out.println("######## selectDistinctFacets #######");
+
       SolrStream solrStream = new SolrStream(jetty.url, sParams);
       List<Tuple> tuples = getTuples(solrStream);
 
+      //assert(false);
       assert(tuples.size() == 6);
 
       Tuple tuple;
@@ -991,22 +997,29 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       assert(tuple.getLong("field_i") == 1);
 
       tuple = tuples.get(1);
+
+
       assert(tuple.get("str_s").equals("a"));
       assert(tuple.getLong("field_i") == 20);
 
       tuple = tuples.get(2);
+
+
       assert(tuple.get("str_s").equals("b"));
       assert(tuple.getLong("field_i") == 2);
 
       tuple = tuples.get(3);
+
       assert(tuple.get("str_s").equals("c"));
       assert(tuple.getLong("field_i") == 30);
 
       tuple = tuples.get(4);
+
       assert(tuple.get("str_s").equals("c"));
       assert(tuple.getLong("field_i") == 50);
 
       tuple = tuples.get(5);
+
       assert(tuple.get("str_s").equals("c"));
       assert(tuple.getLong("field_i") == 60);
 
@@ -1053,6 +1066,8 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       SolrParams sParams = mapParams(CommonParams.QT, "/sql",
           "stmt", "select distinct str_s, field_i from collection1 order by str_s asc, field_i asc");
 
+      System.out.println("##################### testSelectDistinct()");
+
       TupleStream solrStream = new SolrStream(jetty.url, sParams);
       List<Tuple> tuples = getTuples(solrStream);
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/AndOperation.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/AndOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/AndOperation.java
new file mode 100644
index 0000000..f095f63
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/AndOperation.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.ops;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public class AndOperation implements BooleanOperation {
+
+  private static final long serialVersionUID = 1;
+  private UUID operationNodeId = UUID.randomUUID();
+
+  protected BooleanOperation leftOperand;
+  protected BooleanOperation rightOperand;
+
+  public void operate(Tuple tuple) {
+    leftOperand.operate(tuple);
+    rightOperand.operate(tuple);
+  }
+
+  public AndOperation(BooleanOperation leftOperand, BooleanOperation rightOperand) {
+    this.leftOperand = leftOperand;
+    this.rightOperand = rightOperand;
+  }
+
+  public AndOperation(StreamExpression expression, StreamFactory factory) throws IOException {
+      List<StreamExpression> operationExpressions = factory.getExpressionOperandsRepresentingTypes(expression, BooleanOperation.class);
+      if(operationExpressions != null && operationExpressions.size() == 2) {
+        StreamExpression left = operationExpressions.get(0);
+        StreamOperation leftOp = factory.constructOperation(left);
+        if(leftOp instanceof BooleanOperation) {
+          leftOperand = (BooleanOperation) leftOp;
+        } else {
+          throw new IOException("The And/Or Operation requires a BooleanOperation.");
+        }
+
+        StreamExpression right = operationExpressions.get(1);
+        StreamOperation rightOp = factory.constructOperation(right);
+        if(rightOp instanceof BooleanOperation) {
+          rightOperand = (BooleanOperation) rightOp;
+        } else {
+          throw new IOException("The And/Or Operation requires a BooleanOperation.");
+        }
+      } else {
+        throw new IOException("The And/Or Operation requires a BooleanOperations.");
+      }
+  }
+
+  public boolean evaluate() {
+    return leftOperand.evaluate() && rightOperand.evaluate();
+  }
+
+  @Override
+  public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+    if(leftOperand instanceof Expressible) {
+      expression.addParameter(leftOperand.toExpression(factory));
+    } else {
+      throw new IOException("This left operand of the AndOperation contains a non-expressible operation - it cannot be converted to an expression");
+    }
+
+    if(rightOperand instanceof Expressible) {
+      expression.addParameter(rightOperand.toExpression(factory));
+    } else {
+      throw new IOException("This the right operand of the AndOperation contains a non-expressible operation - it cannot be converted to an expression");
+    }
+    return expression;
+  }
+
+  @Override
+  public Explanation toExplanation(StreamFactory factory) throws IOException {
+    return new Explanation(operationNodeId.toString())
+        .withExpressionType(ExpressionType.OPERATION)
+        .withFunctionName(factory.getFunctionName(getClass()))
+        .withImplementingClass(getClass().getName())
+        .withExpression(toExpression(factory).toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/BooleanOperation.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/BooleanOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/BooleanOperation.java
new file mode 100644
index 0000000..609e4e1
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/BooleanOperation.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.ops;
+
+import org.apache.solr.client.solrj.io.Tuple;
+
+
+public interface BooleanOperation extends StreamOperation {
+  public abstract boolean evaluate();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/EqualsOperation.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/EqualsOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/EqualsOperation.java
new file mode 100644
index 0000000..b7ea17d
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/EqualsOperation.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.ops;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public class EqualsOperation extends LeafOperation {
+
+  private static final long serialVersionUID = 1;
+  private UUID operationNodeId = UUID.randomUUID();
+
+  public void operate(Tuple tuple) {
+    this.tuple = tuple;
+  }
+
+  public EqualsOperation(String field, double val) {
+    super(field, val);
+  }
+
+  public EqualsOperation(StreamExpression expression, StreamFactory factory) throws IOException {
+    super(expression, factory);
+  }
+
+  public boolean evaluate() {
+    Double d = tuple.getDouble(field);
+
+    if(d == null) {
+      return false;
+    }
+
+    return d == val;
+  }
+
+  public StreamExpression toExpression(StreamFactory factory) throws IOException {
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+    expression.addParameter(field);
+    expression.addParameter(Double.toString(val));
+    return expression;
+  }
+
+  @Override
+  public Explanation toExplanation(StreamFactory factory) throws IOException {
+    return new Explanation(operationNodeId.toString())
+        .withExpressionType(ExpressionType.OPERATION)
+        .withFunctionName(factory.getFunctionName(getClass()))
+        .withImplementingClass(getClass().getName())
+        .withExpression(toExpression(factory).toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GreaterThanEqualToOperation.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GreaterThanEqualToOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GreaterThanEqualToOperation.java
new file mode 100644
index 0000000..34bd521
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GreaterThanEqualToOperation.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.ops;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public class GreaterThanEqualToOperation extends LeafOperation {
+
+  private static final long serialVersionUID = 1;
+  private UUID operationNodeId = UUID.randomUUID();
+
+  public void operate(Tuple tuple) {
+    this.tuple = tuple;
+  }
+
+  public GreaterThanEqualToOperation(String field, double val) {
+    super(field, val);
+  }
+
+  public GreaterThanEqualToOperation(StreamExpression expression, StreamFactory factory) throws IOException {
+    super(expression, factory);
+  }
+
+  public boolean evaluate() {
+    Double d = tuple.getDouble(field);
+
+    if(d == null) {
+      return false;
+    }
+
+    return d >= val;
+  }
+
+  public StreamExpression toExpression(StreamFactory factory) throws IOException {
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+    expression.addParameter(field);
+    expression.addParameter(Double.toString(val));
+    return expression;
+  }
+
+  @Override
+  public Explanation toExplanation(StreamFactory factory) throws IOException {
+    return new Explanation(operationNodeId.toString())
+        .withExpressionType(ExpressionType.OPERATION)
+        .withFunctionName(factory.getFunctionName(getClass()))
+        .withImplementingClass(getClass().getName())
+        .withExpression(toExpression(factory).toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GreaterThanOperation.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GreaterThanOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GreaterThanOperation.java
new file mode 100644
index 0000000..a58ad01
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GreaterThanOperation.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.ops;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public class GreaterThanOperation extends LeafOperation {
+
+  private static final long serialVersionUID = 1;
+  private UUID operationNodeId = UUID.randomUUID();
+
+  public void operate(Tuple tuple) {
+    this.tuple = tuple;
+  }
+
+  public GreaterThanOperation(String field, double val) {
+    super(field, val);
+  }
+
+  public GreaterThanOperation(StreamExpression expression, StreamFactory factory) throws IOException {
+    super(expression, factory);
+  }
+
+  public boolean evaluate() {
+    Double d = tuple.getDouble(field);
+
+    if(d == null) {
+      return false;
+    }
+
+    return d > val;
+  }
+
+  public StreamExpression toExpression(StreamFactory factory) throws IOException {
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+    expression.addParameter(field);
+    expression.addParameter(Double.toString(val));
+    return expression;
+  }
+
+  @Override
+  public Explanation toExplanation(StreamFactory factory) throws IOException {
+    return new Explanation(operationNodeId.toString())
+        .withExpressionType(ExpressionType.OPERATION)
+        .withFunctionName(factory.getFunctionName(getClass()))
+        .withImplementingClass(getClass().getName())
+        .withExpression(toExpression(factory).toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/LeafOperation.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/LeafOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/LeafOperation.java
new file mode 100644
index 0000000..bcd979a
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/LeafOperation.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.ops;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public abstract class LeafOperation implements BooleanOperation {
+
+  private static final long serialVersionUID = 1;
+  private UUID operationNodeId = UUID.randomUUID();
+
+  protected String field;
+  protected Double val;
+  protected Tuple tuple;
+
+  public void operate(Tuple tuple) {
+    this.tuple = tuple;
+  }
+
+  public LeafOperation(String field, double val) {
+    this.field = field;
+    this.val = val;
+  }
+
+  public LeafOperation(StreamExpression expression, StreamFactory factory) throws IOException {
+    this.field = factory.getValueOperand(expression, 0);
+    this.val = Double.parseDouble(factory.getValueOperand(expression, 0));
+  }
+
+  @Override
+  public Explanation toExplanation(StreamFactory factory) throws IOException {
+    return new Explanation(operationNodeId.toString())
+        .withExpressionType(ExpressionType.OPERATION)
+        .withFunctionName(factory.getFunctionName(getClass()))
+        .withImplementingClass(getClass().getName())
+        .withExpression(toExpression(factory).toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/LessThanEqualToOperation.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/LessThanEqualToOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/LessThanEqualToOperation.java
new file mode 100644
index 0000000..6278f14
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/LessThanEqualToOperation.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.ops;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public class LessThanEqualToOperation extends LeafOperation {
+
+  private static final long serialVersionUID = 1;
+  private UUID operationNodeId = UUID.randomUUID();
+
+  public void operate(Tuple tuple) {
+    this.tuple = tuple;
+  }
+
+  public LessThanEqualToOperation(String field, double val) {
+    super(field, val);
+  }
+
+  public LessThanEqualToOperation(StreamExpression expression, StreamFactory factory) throws IOException {
+    super(expression, factory);
+  }
+
+  public boolean evaluate() {
+    Double d = tuple.getDouble(field);
+
+    if(d == null) {
+      return true;
+    }
+
+    return d <= val;
+  }
+
+  public StreamExpression toExpression(StreamFactory factory) throws IOException {
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+    expression.addParameter(field);
+    expression.addParameter(Double.toString(val));
+    return expression;
+  }
+
+  @Override
+  public Explanation toExplanation(StreamFactory factory) throws IOException {
+    return new Explanation(operationNodeId.toString())
+        .withExpressionType(ExpressionType.OPERATION)
+        .withFunctionName(factory.getFunctionName(getClass()))
+        .withImplementingClass(getClass().getName())
+        .withExpression(toExpression(factory).toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/LessThanOperation.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/LessThanOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/LessThanOperation.java
new file mode 100644
index 0000000..e37bee3
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/LessThanOperation.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.ops;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public class LessThanOperation extends LeafOperation {
+
+  private static final long serialVersionUID = 1;
+  private UUID operationNodeId = UUID.randomUUID();
+
+  public void operate(Tuple tuple) {
+    this.tuple = tuple;
+  }
+
+  public LessThanOperation(String field, double val) {
+    super(field, val);
+  }
+
+  public LessThanOperation(StreamExpression expression, StreamFactory factory) throws IOException {
+    super(expression, factory);
+  }
+
+  public boolean evaluate() {
+    Double d = tuple.getDouble(field);
+
+    if(d == null) {
+      return true;
+    }
+    
+    return d < val;
+  }
+
+  public StreamExpression toExpression(StreamFactory factory) throws IOException {
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+    expression.addParameter(field);
+    expression.addParameter(Double.toString(val));
+    return expression;
+  }
+
+  @Override
+  public Explanation toExplanation(StreamFactory factory) throws IOException {
+    return new Explanation(operationNodeId.toString())
+        .withExpressionType(ExpressionType.OPERATION)
+        .withFunctionName(factory.getFunctionName(getClass()))
+        .withImplementingClass(getClass().getName())
+        .withExpression(toExpression(factory).toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/NotOperation.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/NotOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/NotOperation.java
new file mode 100644
index 0000000..c16e4b3
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/NotOperation.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.ops;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+
+public class NotOperation implements BooleanOperation {
+
+  private static final long serialVersionUID = 1;
+  private UUID operationNodeId = UUID.randomUUID();
+
+  protected BooleanOperation operand;
+
+  public void operate(Tuple tuple) {
+    operand.operate(tuple);
+  }
+
+  public NotOperation(BooleanOperation operand) {
+    this.operand = operand;
+  }
+
+  public NotOperation(StreamExpression expression, StreamFactory factory) throws IOException {
+    List<StreamExpression> operationExpressions = factory.getExpressionOperandsRepresentingTypes(expression, BooleanOperation.class);
+    if(operationExpressions != null && operationExpressions.size() == 1) {
+      StreamExpression op = operationExpressions.get(0);
+      StreamOperation streamOp = factory.constructOperation(op);
+      if(op instanceof BooleanOperation) {
+        operand = (BooleanOperation) streamOp;
+      } else {
+        throw new IOException("The NotOperation requires a BooleanOperation.");
+      }
+
+    } else {
+      throw new IOException("The NotOperation requires a BooleanOperations.");
+    }
+  }
+
+  public boolean evaluate() {
+    return !operand.evaluate();
+  }
+
+  @Override
+  public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+    if(operand instanceof Expressible) {
+      expression.addParameter(operand.toExpression(factory));
+    } else {
+      throw new IOException("The operand of the NotOperation contains a non-expressible operation - it cannot be converted to an expression");
+    }
+    return expression;
+  }
+
+  @Override
+  public Explanation toExplanation(StreamFactory factory) throws IOException {
+    return new Explanation(operationNodeId.toString())
+        .withExpressionType(ExpressionType.OPERATION)
+        .withFunctionName(factory.getFunctionName(getClass()))
+        .withImplementingClass(getClass().getName())
+        .withExpression(toExpression(factory).toString());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/OrOperation.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/OrOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/OrOperation.java
new file mode 100644
index 0000000..faac5cd
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/OrOperation.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.ops;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public class OrOperation extends AndOperation {
+
+  private static final long serialVersionUID = 1;
+  private UUID operationNodeId = UUID.randomUUID();
+
+  public OrOperation(BooleanOperation leftOperand, BooleanOperation rightOperand) {
+    super(leftOperand, rightOperand);
+  }
+
+  public OrOperation(StreamExpression expression, StreamFactory factory) throws IOException {
+    super(expression, factory);
+  }
+
+  public boolean evaluate() {
+    return leftOperand.evaluate() || rightOperand.evaluate();
+  }
+
+  @Override
+  public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+    if(leftOperand instanceof Expressible) {
+      expression.addParameter(leftOperand.toExpression(factory));
+    } else {
+      throw new IOException("This left operand of the OrOperation contains a non-expressible operation - it cannot be converted to an expression");
+    }
+
+    if(rightOperand instanceof Expressible) {
+      expression.addParameter(rightOperand.toExpression(factory));
+    } else {
+      throw new IOException("This the right operand of the OrOperation contains a non-expressible operation - it cannot be converted to an expression");
+    }
+    return expression;
+  }
+
+  @Override
+  public Explanation toExplanation(StreamFactory factory) throws IOException {
+    return new Explanation(operationNodeId.toString())
+        .withExpressionType(ExpressionType.OPERATION)
+        .withFunctionName(factory.getFunctionName(getClass()))
+        .withImplementingClass(getClass().getName())
+        .withExpression(toExpression(factory).toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
index 4e239e6..94d937d 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
@@ -234,6 +234,7 @@ public class FacetStream extends TupleStream implements Expressible  {
     this.zkHost  = zkHost;
     this.params = params;
     this.buckets = buckets;
+    System.out.println("####### Bucket count:"+buckets.length);
     this.metrics = metrics;
     this.bucketSizeLimit   = bucketSizeLimit;
     this.collection = collection;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HavingStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HavingStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HavingStream.java
new file mode 100644
index 0000000..9a79990
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HavingStream.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
+import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
+import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
+import org.apache.solr.client.solrj.io.ops.BooleanOperation;
+import org.apache.solr.client.solrj.io.ops.StreamOperation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+/**
+ *  Iterates over a TupleStream and buffers Tuples that are equal based on a comparator.
+ *  This allows tuples to be grouped by common field(s).
+ *
+ *  The read() method emits one tuple per group. The fields of the emitted Tuple reflect the first tuple
+ *  encountered in the group.
+ *
+ *  Use the Tuple.getMaps() method to return all the Tuples in the group. This method returns
+ *  a list of maps (including the group head), which hold the data for each Tuple in the group.
+ *
+ *  Note: The ReducerStream requires that it's underlying stream be sorted and partitioned by the same
+ *  fields as it's comparator.
+ *
+ **/
+
+public class HavingStream extends TupleStream implements Expressible {
+
+  private static final long serialVersionUID = 1;
+
+  private TupleStream stream;
+  private BooleanOperation op;
+
+  private transient Tuple currentGroupHead;
+
+  public HavingStream(TupleStream stream, BooleanOperation op) throws IOException {
+    init(stream, op);
+  }
+
+
+  public HavingStream(StreamExpression expression, StreamFactory factory) throws IOException{
+    // grab all parameters out
+    List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
+    List<StreamExpression> operationExpressions = factory.getExpressionOperandsRepresentingTypes(expression, BooleanOperation.class);
+
+    // validate expression contains only what we want.
+    if(expression.getParameters().size() != streamExpressions.size() + 2){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
+    }
+
+    if(1 != streamExpressions.size()){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
+    }
+
+
+    BooleanOperation booleanOperation = null;
+    if(operationExpressions != null && operationExpressions.size() == 1) {
+      StreamExpression ex = operationExpressions.get(0);
+      StreamOperation operation = factory.constructOperation(ex);
+      if(operation instanceof BooleanOperation) {
+        booleanOperation = (BooleanOperation) operation;
+      } else {
+        throw new IOException("The HavingStream requires a BooleanOperation. A StreamOperation was provided.");
+      }
+    } else {
+      throw new IOException("The HavingStream requires a BooleanOperation.");
+    }
+
+    init(factory.constructStream(streamExpressions.get(0)), booleanOperation);
+  }
+
+  private void init(TupleStream stream, BooleanOperation op) throws IOException{
+    this.stream = stream;
+    this.op = op;
+  }
+
+  @Override
+  public StreamExpression toExpression(StreamFactory factory) throws IOException{
+    return toExpression(factory, true);
+  }
+
+  private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
+    // function name
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+
+    // stream
+    if(includeStreams){
+      expression.addParameter(((Expressible) stream).toExpression(factory));
+    }
+    else{
+      expression.addParameter("<stream>");
+    }
+
+    if(op instanceof Expressible) {
+      expression.addParameter(op.toExpression(factory));
+    } else {
+      throw new IOException("This ReducerStream contains a non-expressible operation - it cannot be converted to an expression");
+    }
+
+    return expression;
+  }
+
+  @Override
+  public Explanation toExplanation(StreamFactory factory) throws IOException {
+
+    return new StreamExplanation(getStreamNodeId().toString())
+        .withChildren(new Explanation[]{
+            stream.toExplanation(factory)
+        })
+        .withFunctionName(factory.getFunctionName(this.getClass()))
+        .withImplementingClass(this.getClass().getName())
+        .withExpressionType(ExpressionType.STREAM_DECORATOR)
+        .withExpression(toExpression(factory, false).toString())
+        .withHelpers(new Explanation[]{
+            op.toExplanation(factory)
+        });
+  }
+
+  public void setStreamContext(StreamContext context) {
+    this.stream.setStreamContext(context);
+  }
+
+  public List<TupleStream> children() {
+    List<TupleStream> l =  new ArrayList<TupleStream>();
+    l.add(stream);
+    return l;
+  }
+
+  public void open() throws IOException {
+    stream.open();
+  }
+
+  public void close() throws IOException {
+    stream.close();
+  }
+
+  public Tuple read() throws IOException {
+    while(true) {
+      Tuple tuple = stream.read();
+      if(tuple.EOF) {
+        return tuple;
+      }
+
+      op.operate(tuple);
+
+      if(op.evaluate()) {
+        return tuple;
+      }
+    }
+  }
+
+  /** Return the stream sort - ie, the order in which records are returned */
+  public StreamComparator getStreamSort(){
+    return stream.getStreamSort();
+  }
+
+  public int getCost() {
+    return 0;
+  }
+}
\ No newline at end of file