You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/02/16 13:47:07 UTC
[17/50] lucene-solr:jira/solr-9858: SOLR-8593: Refactoring and adding
aggregationMode=facet methods
SOLR-8593: Refactoring and adding aggregationMode=facet methods
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/37fdc37f
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/37fdc37f
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/37fdc37f
Branch: refs/heads/jira/solr-9858
Commit: 37fdc37fc3d88054634482d39b5774893751f91f
Parents: 05a6170
Author: Joel Bernstein <jb...@apache.org>
Authored: Thu Dec 15 16:11:12 2016 -0500
Committer: Joel Bernstein <jb...@apache.org>
Committed: Thu Dec 15 16:12:08 2016 -0500
----------------------------------------------------------------------
.../apache/solr/handler/sql/SolrAggregate.java | 3 +
.../org/apache/solr/handler/sql/SolrFilter.java | 70 ++-
.../org/apache/solr/handler/sql/SolrMethod.java | 11 +-
.../org/apache/solr/handler/sql/SolrRel.java | 5 +
.../org/apache/solr/handler/sql/SolrTable.java | 567 ++++++++++++++-----
.../handler/sql/SolrToEnumerableConverter.java | 9 +-
.../org/apache/solr/handler/TestSQLHandler.java | 21 +-
.../solr/client/solrj/io/ops/AndOperation.java | 101 ++++
.../client/solrj/io/ops/BooleanOperation.java | 24 +
.../client/solrj/io/ops/EqualsOperation.java | 70 +++
.../io/ops/GreaterThanEqualToOperation.java | 70 +++
.../solrj/io/ops/GreaterThanOperation.java | 70 +++
.../solr/client/solrj/io/ops/LeafOperation.java | 59 ++
.../solrj/io/ops/LessThanEqualToOperation.java | 70 +++
.../client/solrj/io/ops/LessThanOperation.java | 70 +++
.../solr/client/solrj/io/ops/NotOperation.java | 87 +++
.../solr/client/solrj/io/ops/OrOperation.java | 71 +++
.../client/solrj/io/stream/FacetStream.java | 1 +
.../client/solrj/io/stream/HavingStream.java | 190 +++++++
19 files changed, 1413 insertions(+), 156 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/core/src/java/org/apache/solr/handler/sql/SolrAggregate.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrAggregate.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrAggregate.java
index f913585..2512099 100644
--- a/solr/core/src/java/org/apache/solr/handler/sql/SolrAggregate.java
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrAggregate.java
@@ -21,6 +21,7 @@ import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.util.ImmutableBitSet;
@@ -66,7 +67,9 @@ class SolrAggregate extends Aggregate implements SolrRel {
final List<String> inNames = SolrRules.solrFieldNames(getInput().getRowType());
+
for(Pair<AggregateCall, String> namedAggCall : getNamedAggCalls()) {
+
AggregateCall aggCall = namedAggCall.getKey();
Pair<String, String> metric = toSolrMetric(implementor, aggCall, inNames);
implementor.addMetricPair(namedAggCall.getValue(), metric.getKey(), metric.getValue());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/core/src/java/org/apache/solr/handler/sql/SolrFilter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrFilter.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrFilter.java
index c6eb33c..5f30926 100644
--- a/solr/core/src/java/org/apache/solr/handler/sql/SolrFilter.java
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrFilter.java
@@ -24,6 +24,7 @@ import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.util.Pair;
import java.util.ArrayList;
@@ -56,38 +57,68 @@ class SolrFilter extends Filter implements SolrRel {
Translator translator = new Translator(SolrRules.solrFieldNames(getRowType()));
String query = translator.translateMatch(condition);
implementor.addQuery(query);
+ implementor.setNegativeQuery(translator.negativeQuery);
}
/** Translates {@link RexNode} expressions into Solr query strings. */
private static class Translator {
+
private final List<String> fieldNames;
+ public boolean negativeQuery = true;
Translator(List<String> fieldNames) {
this.fieldNames = fieldNames;
}
private String translateMatch(RexNode condition) {
- return translateOr(condition);
+ if(condition.getKind().belongsTo(SqlKind.COMPARISON)) {
+ return translateComparison(condition);
+ } else if(condition.isA(SqlKind.AND)) {
+ return "("+translateAnd(condition)+")";
+ } else if(condition.isA(SqlKind.OR)) {
+ return "(" + translateOr(condition) + ")";
+ } else {
+ return null;
+ }
}
private String translateOr(RexNode condition) {
List<String> ors = new ArrayList<>();
for (RexNode node : RelOptUtil.disjunctions(condition)) {
- ors.add(translateAnd(node));
+ ors.add(translateMatch(node));
}
return String.join(" OR ", ors);
}
+
+
private String translateAnd(RexNode node0) {
- List<String> ands = new ArrayList<>();
- for (RexNode node : RelOptUtil.conjunctions(node0)) {
- ands.add(translateMatch2(node));
+ List<String> andStrings = new ArrayList();
+ List<String> notStrings = new ArrayList();
+
+ List<RexNode> ands = new ArrayList();
+ List<RexNode> nots = new ArrayList();
+ RelOptUtil.decomposeConjunction(node0, ands, nots);
+
+
+ for(RexNode node: ands) {
+ andStrings.add(translateMatch(node));
}
- return String.join(" AND ", ands);
+ String andString = String.join(" AND ", andStrings);
+
+ if(nots.size() > 0) {
+ for(RexNode node: nots) {
+ notStrings.add(translateMatch(node));
+ }
+ String notString = String.join(" NOT ", notStrings);
+ return "("+ andString +") NOT ("+notString+")";
+ } else {
+ return andString;
+ }
}
- private String translateMatch2(RexNode node) {
+ private String translateComparison(RexNode node) {
Pair<String, RexLiteral> binaryTranslated = null;
if (((RexCall) node).getOperands().size() == 2) {
binaryTranslated = translateBinary((RexCall) node);
@@ -95,19 +126,30 @@ class SolrFilter extends Filter implements SolrRel {
switch (node.getKind()) {
case NOT:
- return "-"+translateMatch2(((RexCall) node).getOperands().get(0));
+ return "-"+translateComparison(((RexCall) node).getOperands().get(0));
case EQUALS:
- return binaryTranslated.getKey() + ":" + binaryTranslated.getValue().getValue2();
+ String terms = binaryTranslated.getValue().getValue2().toString().trim();
+ if(!terms.startsWith("(")){
+ terms = "\""+terms+"\"";
+ }
+
+ String clause = binaryTranslated.getKey() + ":" + terms;
+ this.negativeQuery = false;
+ return clause;
case NOT_EQUALS:
- return "-" + binaryTranslated.getKey() + ":" + binaryTranslated.getValue().getValue2();
+ return "-(" + binaryTranslated.getKey() + ":" + binaryTranslated.getValue().getValue2()+")";
case LESS_THAN:
- return binaryTranslated.getKey() + ": [ * TO " + binaryTranslated.getValue().getValue2() + " }";
+ this.negativeQuery = false;
+ return "("+binaryTranslated.getKey() + ": [ * TO " + binaryTranslated.getValue().getValue2() + " })";
case LESS_THAN_OR_EQUAL:
- return binaryTranslated.getKey() + ": [ * TO " + binaryTranslated.getValue().getValue2() + " ]";
+ this.negativeQuery = false;
+ return "("+binaryTranslated.getKey() + ": [ * TO " + binaryTranslated.getValue().getValue2() + " ])";
case GREATER_THAN:
- return binaryTranslated.getKey() + ": { " + binaryTranslated.getValue().getValue2() + " TO * ]";
+ this.negativeQuery = false;
+ return "("+binaryTranslated.getKey() + ": { " + binaryTranslated.getValue().getValue2() + " TO * ])";
case GREATER_THAN_OR_EQUAL:
- return binaryTranslated.getKey() + ": [ " + binaryTranslated.getValue().getValue2() + " TO * ]";
+ this.negativeQuery = false;
+ return "("+binaryTranslated.getKey() + ": [ " + binaryTranslated.getValue().getValue2() + " TO * ])";
default:
throw new AssertionError("cannot translate " + node);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/core/src/java/org/apache/solr/handler/sql/SolrMethod.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrMethod.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrMethod.java
index 31c4548..4ec3fdb 100644
--- a/solr/core/src/java/org/apache/solr/handler/sql/SolrMethod.java
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrMethod.java
@@ -25,8 +25,15 @@ import java.util.List;
* Builtin methods in the Solr adapter.
*/
enum SolrMethod {
- SOLR_QUERYABLE_QUERY(SolrTable.SolrQueryable.class, "query", List.class, String.class, List.class, List.class,
- List.class, String.class);
+ SOLR_QUERYABLE_QUERY(SolrTable.SolrQueryable.class,
+ "query",
+ List.class,
+ String.class,
+ List.class,
+ List.class,
+ List.class,
+ String.class,
+ String.class);
public final Method method;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/core/src/java/org/apache/solr/handler/sql/SolrRel.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrRel.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrRel.java
index ea22951..b7843d7 100644
--- a/solr/core/src/java/org/apache/solr/handler/sql/SolrRel.java
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrRel.java
@@ -36,6 +36,7 @@ interface SolrRel extends RelNode {
class Implementor {
final Map<String, String> fieldMappings = new HashMap<>();
String query = null;
+ boolean negativeQuery;
String limitValue = null;
final List<Pair<String, String>> orders = new ArrayList<>();
final List<String> buckets = new ArrayList<>();
@@ -54,6 +55,10 @@ interface SolrRel extends RelNode {
this.query = query;
}
+ void setNegativeQuery(boolean negativeQuery) {
+ this.negativeQuery = negativeQuery;
+ }
+
void addOrder(String column, String direction) {
column = this.fieldMappings.getOrDefault(column, column);
this.orders.add(new Pair<>(column, direction));
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java
index e5fd88f..14e69e6 100644
--- a/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java
@@ -72,7 +72,7 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
private Enumerable<Object> query(final Properties properties) {
return query(properties, Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(),
- Collections.emptyList(), null);
+ Collections.emptyList(), null, null);
}
/** Executes a Solr query on the underlying table.
@@ -82,150 +82,58 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
* @param query A string for the query
* @return Enumerator of results
*/
- private Enumerable<Object> query(final Properties properties, final List<Map.Entry<String, Class>> fields,
- final String query, final List<Pair<String, String>> orders, final List<String> buckets,
- final List<Pair<String, String>> metricPairs, final String limit) {
+ private Enumerable<Object> query(final Properties properties,
+ final List<Map.Entry<String, Class>> fields,
+ final String query,
+ final List<Pair<String, String>> orders,
+ final List<String> buckets,
+ final List<Pair<String, String>> metricPairs,
+ final String limit,
+ final String negativeQuery) {
// SolrParams should be a ModifiableParams instead of a map
- ModifiableSolrParams solrParams = new ModifiableSolrParams();
- solrParams.add(CommonParams.OMIT_HEADER, "true");
+ boolean mapReduce = "map_reduce".equals(properties.getProperty("aggregationMode"));
+ boolean negative = Boolean.parseBoolean(negativeQuery);
+
+ String q = null;
if (query == null) {
- solrParams.add(CommonParams.Q, DEFAULT_QUERY);
+ q = DEFAULT_QUERY;
} else {
- solrParams.add(CommonParams.Q, DEFAULT_QUERY + " AND " + query);
- }
-
- // List<String> doesn't have add so must make a new ArrayList
- List<String> fieldsList = new ArrayList<>(fields.size());
- fieldsList.addAll(fields.stream().map(Map.Entry::getKey).collect(Collectors.toList()));
- LinkedHashMap<String,String> ordersMap = new LinkedHashMap<>();
- for (Pair<String,String> order : orders) {
- ordersMap.put(order.getKey(), order.getValue());
- }
- List<Metric> metrics = buildMetrics(metricPairs);
- List<Bucket> bucketsList = buckets.stream().map(Bucket::new).collect(Collectors.toList());
-
- for(int i = buckets.size()-1; i >= 0; i--) {
- if (!ordersMap.containsKey(buckets.get(i))) {
- ordersMap.put(buckets.get(i), "asc");
- }
- }
-
- boolean isReOrder = false;
-
- for(Metric metric : metrics) {
- String metricIdentifier = metric.getIdentifier();
-
- ordersMap.remove(metricIdentifier);
-
- if(fieldsList.contains(metricIdentifier)) {
- fieldsList.remove(metricIdentifier);
- isReOrder = true;
- }
-
- for(String column : metric.getColumns()) {
- if (!fieldsList.contains(column)) {
- fieldsList.add(column);
- }
-
- if (!ordersMap.containsKey(column)) {
- ordersMap.put(column, "asc");
- }
- }
- }
-
- if (ordersMap.size() < 4) {
- ordersMap.put(DEFAULT_VERSION_FIELD, "desc");
-
- // Make sure the default sort field is in the field list
- if (!fieldsList.contains(DEFAULT_VERSION_FIELD)) {
- fieldsList.add(DEFAULT_VERSION_FIELD);
- }
- }
-
- if(!ordersMap.isEmpty()) {
- List<String> orderList = new ArrayList<>(ordersMap.size());
- for(Map.Entry<String, String> order : ordersMap.entrySet()) {
- String column = order.getKey();
- if(!fieldsList.contains(column)) {
- fieldsList.add(column);
- }
- orderList.add(column + " " + order.getValue());
+ if(negative) {
+ q = DEFAULT_QUERY + " AND " + query;
+ } else {
+ q = query;
}
- solrParams.add(CommonParams.SORT, String.join(",", orderList));
- }
-
- if (fieldsList.isEmpty()) {
- solrParams.add(CommonParams.FL, "*");
- } else {
- solrParams.add(CommonParams.FL, String.join(",", fieldsList));
}
TupleStream tupleStream;
String zk = properties.getProperty("zk");
try {
- if (metrics.isEmpty() && bucketsList.isEmpty()) {
- solrParams.add(CommonParams.QT, "/export");
- if (limit != null) {
- solrParams.add(CommonParams.ROWS, limit);
- tupleStream = new LimitStream(new CloudSolrStream(zk, collection, solrParams), Integer.parseInt(limit));
- } else {
- tupleStream = new CloudSolrStream(zk, collection, solrParams);
- }
+ if (metricPairs.isEmpty() && buckets.isEmpty()) {
+ tupleStream = handleSelect(zk, collection, q, fields, orders, limit);
} else {
- Metric[] metricsArray = metrics.toArray(new Metric[metrics.size()]);
- if(bucketsList.isEmpty()) {
- solrParams.remove(CommonParams.FL);
- solrParams.remove(CommonParams.SORT);
- tupleStream = new StatsStream(zk, collection, solrParams, metricsArray);
+ if(buckets.isEmpty()) {
+ tupleStream = handleStats(zk, collection, q, metricPairs);
} else {
- solrParams.add(CommonParams.QT, "/export");
-
- int numWorkers = Integer.parseInt(properties.getProperty("numWorkers", "1"));
- if (numWorkers > 1) solrParams.add("partitionKeys",String.join(",", buckets));
-
- tupleStream = new CloudSolrStream(zk, collection, solrParams);
- tupleStream = new RollupStream(tupleStream, bucketsList.toArray(new Bucket[bucketsList.size()]), metricsArray);
-
- if(numWorkers > 1) {
- String workerZkHost = properties.getProperty("workerZkhost");
- String workerCollection = properties.getProperty("workerCollection");
- // Do the rollups in parallel
- // Maintain the sort of the Tuples coming from the workers.
- StreamComparator comp = bucketSortComp(bucketsList, ordersMap);
-
- ParallelStream parallelStream = new ParallelStream(workerZkHost, workerCollection, tupleStream, numWorkers, comp);
-
- StreamFactory factory = new StreamFactory()
- .withFunctionName("search", CloudSolrStream.class)
- .withFunctionName("parallel", ParallelStream.class)
- .withFunctionName("rollup", RollupStream.class)
- .withFunctionName("sum", SumMetric.class)
- .withFunctionName("min", MinMetric.class)
- .withFunctionName("max", MaxMetric.class)
- .withFunctionName("avg", MeanMetric.class)
- .withFunctionName("count", CountMetric.class);
-
- parallelStream.setStreamFactory(factory);
- tupleStream = parallelStream;
- isReOrder = true;
- }
-
- if (isReOrder) {
- int limitVal = limit == null ? 100 : Integer.parseInt(limit);
- StreamComparator comp = getComp(orders);
- if (orders.isEmpty() && !ordersMap.isEmpty()) {
- // default order
- comp = getComp(new ArrayList<>(ordersMap.entrySet()));
- }
- tupleStream = new RankStream(tupleStream, limitVal, comp);
+ if(mapReduce) {
+ tupleStream = handleGroupByMapReduce(zk,
+ collection,
+ properties,
+ fields,
+ q,
+ orders,
+ buckets,
+ metricPairs,
+ limit);
} else {
- // Sort is the same as the same as the underlying stream
- // Only need to limit the result, not Rank the result
- if (limit != null) {
- solrParams.add(CommonParams.ROWS, limit);
- tupleStream = new LimitStream(new CloudSolrStream(zk, collection, solrParams), Integer.parseInt(limit));
- }
+ tupleStream = handleGroupByFacet(zk,
+ collection,
+ fields,
+ q,
+ orders,
+ buckets,
+ metricPairs,
+ limit);
}
}
}
@@ -258,6 +166,21 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
}
}
+ private static StreamComparator bucketSortComp(Bucket[] buckets, String dir) {
+ FieldComparator[] comps = new FieldComparator[buckets.length];
+ for(int i=0; i<buckets.length; i++) {
+ ComparatorOrder comparatorOrder = ascDescComp(dir);
+ String sortKey = buckets[i].toString();
+ comps[i] = new FieldComparator(sortKey, comparatorOrder);
+ }
+
+ if(comps.length == 1) {
+ return comps[0];
+ } else {
+ return new MultipleFieldComparator(comps);
+ }
+ }
+
private String getSortDirection(Map.Entry<String, String> order) {
String direction = order.getValue();
return direction == null ? "asc" : direction;
@@ -283,6 +206,9 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
private List<Metric> buildMetrics(List<Pair<String, String>> metricPairs) {
List<Metric> metrics = new ArrayList<>(metricPairs.size());
metrics.addAll(metricPairs.stream().map(this::getMetric).collect(Collectors.toList()));
+ if(metrics.size() == 0) {
+ metrics.add(new CountMetric());
+ }
return metrics;
}
@@ -304,6 +230,358 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
}
}
+ private TupleStream handleSelect(String zk,
+ String collection,
+ String query,
+ List<Map.Entry<String, Class>> fields,
+ List<Pair<String, String>> orders,
+ String limit) throws IOException {
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.add(CommonParams.Q, query);
+
+ if(orders.size() > 0) {
+ params.add(CommonParams.SORT, getSort(orders));
+ } else {
+ params.add(CommonParams.SORT, "_version_ desc");
+ }
+
+ if(fields.size() > 0) {
+ params.add(CommonParams.FL, getFields(fields));
+ }
+
+ if (limit != null) {
+ params.add(CommonParams.ROWS, limit);
+ return new LimitStream(new CloudSolrStream(zk, collection, params), Integer.parseInt(limit));
+ } else {
+ params.add(CommonParams.QT, "/export");
+ return new CloudSolrStream(zk, collection, params);
+ }
+ }
+
+ private String getSort(List<Pair<String, String>> orders) {
+ StringBuilder buf = new StringBuilder();
+ for(Pair<String, String> pair : orders) {
+ if(buf.length() > 0) {
+ buf.append(",");
+ }
+ buf.append(pair.getKey()).append(" ").append(pair.getValue());
+ }
+
+ return buf.toString();
+ }
+
+ private String getFields(List<Map.Entry<String, Class>> fields) {
+ StringBuilder buf = new StringBuilder();
+ boolean appendVersion = true;
+ for(Map.Entry<String, Class> field : fields) {
+
+ if(buf.length() > 0) {
+ buf.append(",");
+ }
+
+ if(field.getKey().equals("_version_")) {
+ appendVersion = false;
+ }
+
+ buf.append(field.getKey());
+ }
+
+ if(appendVersion){
+ buf.append(",_version_");
+ }
+
+ return buf.toString();
+ }
+
+ private String getFields(Set<String> fieldSet) {
+ StringBuilder buf = new StringBuilder();
+ boolean appendVersion = true;
+ for(String field : fieldSet) {
+
+ if(buf.length() > 0) {
+ buf.append(",");
+ }
+
+ if(field.equals("_version_")) {
+ appendVersion = false;
+ }
+
+ buf.append(field);
+ }
+
+ if(appendVersion){
+ buf.append(",_version_");
+ }
+
+ return buf.toString();
+ }
+
+
+ private Set<String> getFieldSet(Metric[] metrics, List<Map.Entry<String, Class>> fields) {
+ HashSet set = new HashSet();
+ for(Metric metric : metrics) {
+ for(String column : metric.getColumns()) {
+ set.add(column);
+ }
+ }
+
+ for(Map.Entry<String, Class> field : fields) {
+ if(field.getKey().indexOf('(') == -1) {
+ set.add(field.getKey());
+ }
+ }
+
+ return set;
+ }
+
+ private static String getSortDirection(List<Pair<String, String>> orders) {
+ if(orders != null && orders.size() > 0) {
+ for(Pair<String,String> item : orders) {
+ return item.getValue();
+ }
+ }
+
+ return "asc";
+ }
+
+ private static String bucketSort(Bucket[] buckets, String dir) {
+ StringBuilder buf = new StringBuilder();
+ boolean comma = false;
+ for(Bucket bucket : buckets) {
+ if(comma) {
+ buf.append(",");
+ }
+ buf.append(bucket.toString()).append(" ").append(dir);
+ comma = true;
+ }
+
+ return buf.toString();
+ }
+
+ private static String getPartitionKeys(Bucket[] buckets) {
+ StringBuilder buf = new StringBuilder();
+ boolean comma = false;
+ for(Bucket bucket : buckets) {
+ if(comma) {
+ buf.append(",");
+ }
+ buf.append(bucket.toString());
+ comma = true;
+ }
+ return buf.toString();
+ }
+
+ private static boolean sortsEqual(Bucket[] buckets, String direction, List<Pair<String, String>> orders) {
+
+ if(buckets.length != orders.size()) {
+ return false;
+ }
+
+ for(int i=0; i< buckets.length; i++) {
+ Bucket bucket = buckets[i];
+ Pair<String, String> order = orders.get(i);
+ if(!bucket.toString().equals(order.getKey())) {
+ return false;
+ }
+
+ if(!order.getValue().toLowerCase(Locale.ROOT).contains(direction.toLowerCase(Locale.ROOT))) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private TupleStream handleGroupByMapReduce(String zk,
+ String collection,
+ Properties properties,
+ final List<Map.Entry<String, Class>> fields,
+ final String query,
+ final List<Pair<String, String>> orders,
+ final List<String> _buckets,
+ final List<Pair<String, String>> metricPairs,
+ final String limit) throws IOException {
+
+ int numWorkers = Integer.parseInt(properties.getProperty("numWorkers", "1"));
+
+ Bucket[] buckets = buildBuckets(_buckets, fields);
+ Metric[] metrics = buildMetrics(metricPairs).toArray(new Metric[0]);
+
+ Set<String> fieldSet = getFieldSet(metrics, fields);
+
+ if(metrics.length == 0) {
+ throw new IOException("Group by queries must include atleast one aggregate function.");
+ }
+
+ String fl = getFields(fieldSet);
+ String sortDirection = getSortDirection(orders);
+ String sort = bucketSort(buckets, sortDirection);
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+
+ params.set(CommonParams.FL, fl);
+ params.set(CommonParams.Q, query);
+ //Always use the /export handler for Group By Queries because it requires exporting full result sets.
+ params.set(CommonParams.QT, "/export");
+
+ if(numWorkers > 1) {
+ params.set("partitionKeys", getPartitionKeys(buckets));
+ }
+
+ params.set("sort", sort);
+
+ TupleStream tupleStream = null;
+
+ CloudSolrStream cstream = new CloudSolrStream(zk, collection, params);
+ tupleStream = new RollupStream(cstream, buckets, metrics);
+
+ if(numWorkers > 1) {
+ // Do the rollups in parallel
+ // Maintain the sort of the Tuples coming from the workers.
+ StreamComparator comp = bucketSortComp(buckets, sortDirection);
+ ParallelStream parallelStream = new ParallelStream(zk, collection, tupleStream, numWorkers, comp);
+
+ StreamFactory factory = new StreamFactory()
+ .withFunctionName("search", CloudSolrStream.class)
+ .withFunctionName("parallel", ParallelStream.class)
+ .withFunctionName("rollup", RollupStream.class)
+ .withFunctionName("sum", SumMetric.class)
+ .withFunctionName("min", MinMetric.class)
+ .withFunctionName("max", MaxMetric.class)
+ .withFunctionName("avg", MeanMetric.class)
+ .withFunctionName("count", CountMetric.class);
+
+ parallelStream.setStreamFactory(factory);
+ tupleStream = parallelStream;
+ }
+
+ //TODO: This should be done on the workers, but it won't serialize because it relies on Presto classes.
+ // Once we make this a Expressionable the problem will be solved.
+
+
+ if(orders != null && orders.size() > 0) {
+ int lim = limit == null ? 100 : Integer.parseInt(limit);
+ if(!sortsEqual(buckets, sortDirection, orders)) {
+ StreamComparator comp = getComp(orders);
+ //Rank the Tuples
+ //If parallel stream is used ALL the Rolled up tuples from the workers will be ranked
+ //Providing a true Top or Bottom.
+ tupleStream = new RankStream(tupleStream, lim, comp);
+ } else {
+ // Sort is the same as the same as the underlying stream
+ // Only need to limit the result, not Rank the result
+ if(lim > -1) {
+ tupleStream = new LimitStream(tupleStream, lim);
+ }
+ }
+ }
+
+ return tupleStream;
+ }
+
+ private Bucket[] buildBuckets(List<String> buckets, List<Map.Entry<String, Class>> fields) {
+ Bucket[] bucketsArray = new Bucket[buckets.size()];
+
+ int i=0;
+ for(Map.Entry<String,Class> field : fields) {
+ String fieldName = field.getKey();
+ if(buckets.contains(fieldName)) {
+ bucketsArray[i++] = new Bucket(fieldName);
+ }
+ }
+
+ return bucketsArray;
+ }
+
+
+ private TupleStream handleGroupByFacet(String zkHost,
+ String collection,
+ final List<Map.Entry<String, Class>> fields,
+ final String query,
+ final List<Pair<String, String>> orders,
+ final List<String> bucketFields,
+ final List<Pair<String, String>> metricPairs,
+ final String lim) throws IOException {
+
+ ModifiableSolrParams solrParams = new ModifiableSolrParams();
+ solrParams.add(CommonParams.Q, query);
+
+ Bucket[] buckets = buildBuckets(bucketFields, fields);
+ Metric[] metrics = buildMetrics(metricPairs).toArray(new Metric[0]);
+ if(metrics.length == 0) {
+ metrics = new Metric[1];
+ metrics[0] = new CountMetric();
+ }
+
+ int limit = lim != null ? Integer.parseInt(lim) : 100;
+
+ FieldComparator[] sorts = null;
+
+ if(orders == null || orders.size() == 0) {
+ sorts = new FieldComparator[buckets.length];
+ for(int i=0; i<sorts.length; i++) {
+ sorts[i] = new FieldComparator("index", ComparatorOrder.ASCENDING);
+ }
+ } else {
+ sorts = getComps(orders);
+ }
+
+ TupleStream tupleStream = new FacetStream(zkHost,
+ collection,
+ solrParams,
+ buckets,
+ metrics,
+ sorts,
+ limit);
+
+
+ if(lim != null)
+ {
+ tupleStream = new LimitStream(tupleStream, limit);
+ }
+
+ return tupleStream;
+ }
+
+ private TupleStream handleSelectDistinctMapReduce(final Properties properties,
+ final List<Map.Entry<String, Class>> fields,
+ final String query,
+ final List<Pair<String, String>> orders,
+ final List<String> buckets,
+ final List<Pair<String, String>> metricPairs,
+ final String limit) {
+
+
+
+
+
+
+ return null;
+ }
+
+ private TupleStream handleSelectDistinctFacet(final Properties properties,
+ final List<Map.Entry<String, Class>> fields,
+ final String query,
+ final List<Pair<String, String>> orders,
+ final List<String> buckets,
+ final List<Pair<String, String>> metricPairs,
+ final String limit) {
+ return null;
+ }
+
+ private TupleStream handleStats(String zk,
+ String collection,
+ String query,
+ List<Pair<String, String>> metricPairs) {
+
+
+ ModifiableSolrParams solrParams = new ModifiableSolrParams();
+ solrParams.add(CommonParams.Q, query);
+ Metric[] metrics = buildMetrics(metricPairs).toArray(new Metric[0]);
+ return new StatsStream(zk, collection, solrParams, metrics);
+ }
+
public <T> Queryable<T> asQueryable(QueryProvider queryProvider, SchemaPlus schema, String tableName) {
return new SolrQueryable<>(queryProvider, schema, this, tableName);
}
@@ -339,8 +617,29 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
*/
@SuppressWarnings("UnusedDeclaration")
public Enumerable<Object> query(List<Map.Entry<String, Class>> fields, String query, List<Pair<String, String>> order,
- List<String> buckets, List<Pair<String, String>> metricPairs, String limit) {
- return getTable().query(getProperties(), fields, query, order, buckets, metricPairs, limit);
+ List<String> buckets, List<Pair<String, String>> metricPairs, String limit, String negativeQuery) {
+ return getTable().query(getProperties(), fields, query, order, buckets, metricPairs, limit, negativeQuery);
+ }
+ }
+
+ private static FieldComparator[] getComps(List<Pair<String, String>> orders) {
+ FieldComparator[] comps = new FieldComparator[orders.size()];
+ for(int i=0; i<orders.size(); i++) {
+ Pair<String,String> sortItem = orders.get(i);
+ String ordering = sortItem.getValue();
+ ComparatorOrder comparatorOrder = ascDescComp(ordering);
+ String sortKey = sortItem.getKey();
+ comps[i] = new FieldComparator(sortKey, comparatorOrder);
+ }
+
+ return comps;
+ }
+
+ private static ComparatorOrder ascDescComp(String s) {
+ if(s.toLowerCase(Locale.ROOT).contains("desc")) {
+ return ComparatorOrder.DESCENDING;
+ } else {
+ return ComparatorOrder.ASCENDING;
}
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/core/src/java/org/apache/solr/handler/sql/SolrToEnumerableConverter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrToEnumerableConverter.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrToEnumerableConverter.java
index 6737977..f69f3d5 100644
--- a/solr/core/src/java/org/apache/solr/handler/sql/SolrToEnumerableConverter.java
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrToEnumerableConverter.java
@@ -67,11 +67,13 @@ class SolrToEnumerableConverter extends ConverterImpl implements EnumerableRel {
constantArrayList(
Pair.zip(generateFields(SolrRules.solrFieldNames(rowType), solrImplementor.fieldMappings),
new AbstractList<Class>() {
- @Override public Class get(int index) {
+ @Override
+ public Class get(int index) {
return physType.fieldClass(index);
}
- @Override public int size() {
+ @Override
+ public int size() {
return rowType.getFieldCount();
}
}),
@@ -81,8 +83,9 @@ class SolrToEnumerableConverter extends ConverterImpl implements EnumerableRel {
final Expression buckets = list.append("buckets", constantArrayList(solrImplementor.buckets, String.class));
final Expression metricPairs = list.append("metricPairs", constantArrayList(solrImplementor.metricPairs, Pair.class));
final Expression limit = list.append("limit", Expressions.constant(solrImplementor.limitValue));
+ final Expression negativeQuery = list.append("negativeQuery", Expressions.constant(Boolean.toString(solrImplementor.negativeQuery), String.class));
Expression enumerable = list.append("enumerable", Expressions.call(table, SolrMethod.SOLR_QUERYABLE_QUERY.method,
- fields, query, orders, buckets, metricPairs, limit));
+ fields, query, orders, buckets, metricPairs, limit, negativeQuery));
Hook.QUERY_PLAN.run(query);
list.add(Expressions.return_(null, enumerable));
return implementor.result(physType, list.toBlock());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java b/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
index 5b92c30..605abf5 100644
--- a/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
+++ b/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
@@ -104,9 +104,12 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
indexDoc(sdoc("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50"));
indexDoc(sdoc("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60"));
commit();
-
+
+
+ System.out.println("############# testBasicSelect() ############");
+
SolrParams sParams = mapParams(CommonParams.QT, "/sql",
- "stmt", "select id, field_i, str_s from collection1 where text='XXXX' order by field_i desc");
+ "stmt", "select id, field_i, str_s from collection1 where (text='(XXXX)' OR text='XXXX') AND text='XXXX' order by field_i desc");
SolrStream solrStream = new SolrStream(jetty.url, sParams);
List<Tuple> tuples = getTuples(solrStream);
@@ -696,7 +699,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
sParams = mapParams(CommonParams.QT, "/sql",
"stmt", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), "
- + "cast(avg(1.0 * field_i) as float) from collection1 where (text='XXXX' AND NOT (text='XXXY')) "
+ + "cast(avg(1.0 * field_i) as float) from collection1 where (text='XXXX' AND NOT ((text='XXXY') AND (text='XXXY' OR text='XXXY'))) "
+ "group by str_s order by str_s desc");
solrStream = new SolrStream(jetty.url, sParams);
@@ -856,9 +859,12 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
SolrParams sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "facet",
"stmt", "select distinct str_s, field_i from collection1 order by str_s asc, field_i asc");
+ System.out.println("######## selectDistinctFacets #######");
+
SolrStream solrStream = new SolrStream(jetty.url, sParams);
List<Tuple> tuples = getTuples(solrStream);
+ //assert(false);
assert(tuples.size() == 6);
Tuple tuple;
@@ -991,22 +997,29 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getLong("field_i") == 1);
tuple = tuples.get(1);
+
+
assert(tuple.get("str_s").equals("a"));
assert(tuple.getLong("field_i") == 20);
tuple = tuples.get(2);
+
+
assert(tuple.get("str_s").equals("b"));
assert(tuple.getLong("field_i") == 2);
tuple = tuples.get(3);
+
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 30);
tuple = tuples.get(4);
+
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 50);
tuple = tuples.get(5);
+
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 60);
@@ -1053,6 +1066,8 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
SolrParams sParams = mapParams(CommonParams.QT, "/sql",
"stmt", "select distinct str_s, field_i from collection1 order by str_s asc, field_i asc");
+ System.out.println("##################### testSelectDistinct()");
+
TupleStream solrStream = new SolrStream(jetty.url, sParams);
List<Tuple> tuples = getTuples(solrStream);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/AndOperation.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/AndOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/AndOperation.java
new file mode 100644
index 0000000..f095f63
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/AndOperation.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.ops;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public class AndOperation implements BooleanOperation {
+
+ private static final long serialVersionUID = 1;
+ private UUID operationNodeId = UUID.randomUUID();
+
+ protected BooleanOperation leftOperand;
+ protected BooleanOperation rightOperand;
+
+ public void operate(Tuple tuple) {
+ leftOperand.operate(tuple);
+ rightOperand.operate(tuple);
+ }
+
+ public AndOperation(BooleanOperation leftOperand, BooleanOperation rightOperand) {
+ this.leftOperand = leftOperand;
+ this.rightOperand = rightOperand;
+ }
+
+ public AndOperation(StreamExpression expression, StreamFactory factory) throws IOException {
+ List<StreamExpression> operationExpressions = factory.getExpressionOperandsRepresentingTypes(expression, BooleanOperation.class);
+ if(operationExpressions != null && operationExpressions.size() == 2) {
+ StreamExpression left = operationExpressions.get(0);
+ StreamOperation leftOp = factory.constructOperation(left);
+ if(leftOp instanceof BooleanOperation) {
+ leftOperand = (BooleanOperation) leftOp;
+ } else {
+ throw new IOException("The And/Or Operation requires a BooleanOperation.");
+ }
+
+ StreamExpression right = operationExpressions.get(1);
+ StreamOperation rightOp = factory.constructOperation(right);
+ if(rightOp instanceof BooleanOperation) {
+ rightOperand = (BooleanOperation) rightOp;
+ } else {
+ throw new IOException("The And/Or Operation requires a BooleanOperation.");
+ }
+ } else {
+ throw new IOException("The And/Or Operation requires a BooleanOperations.");
+ }
+ }
+
+ public boolean evaluate() {
+ return leftOperand.evaluate() && rightOperand.evaluate();
+ }
+
+ @Override
+ public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
+ StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+ if(leftOperand instanceof Expressible) {
+ expression.addParameter(leftOperand.toExpression(factory));
+ } else {
+ throw new IOException("This left operand of the AndOperation contains a non-expressible operation - it cannot be converted to an expression");
+ }
+
+ if(rightOperand instanceof Expressible) {
+ expression.addParameter(rightOperand.toExpression(factory));
+ } else {
+ throw new IOException("This the right operand of the AndOperation contains a non-expressible operation - it cannot be converted to an expression");
+ }
+ return expression;
+ }
+
+ @Override
+ public Explanation toExplanation(StreamFactory factory) throws IOException {
+ return new Explanation(operationNodeId.toString())
+ .withExpressionType(ExpressionType.OPERATION)
+ .withFunctionName(factory.getFunctionName(getClass()))
+ .withImplementingClass(getClass().getName())
+ .withExpression(toExpression(factory).toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/BooleanOperation.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/BooleanOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/BooleanOperation.java
new file mode 100644
index 0000000..609e4e1
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/BooleanOperation.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.ops;
+
+import org.apache.solr.client.solrj.io.Tuple;
+
+
+public interface BooleanOperation extends StreamOperation {
+ public abstract boolean evaluate();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/EqualsOperation.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/EqualsOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/EqualsOperation.java
new file mode 100644
index 0000000..b7ea17d
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/EqualsOperation.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.ops;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public class EqualsOperation extends LeafOperation {
+
+ private static final long serialVersionUID = 1;
+ private UUID operationNodeId = UUID.randomUUID();
+
+ public void operate(Tuple tuple) {
+ this.tuple = tuple;
+ }
+
+ public EqualsOperation(String field, double val) {
+ super(field, val);
+ }
+
+ public EqualsOperation(StreamExpression expression, StreamFactory factory) throws IOException {
+ super(expression, factory);
+ }
+
+ public boolean evaluate() {
+ Double d = tuple.getDouble(field);
+
+ if(d == null) {
+ return false;
+ }
+
+ return d == val;
+ }
+
+ public StreamExpression toExpression(StreamFactory factory) throws IOException {
+ StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+ expression.addParameter(field);
+ expression.addParameter(Double.toString(val));
+ return expression;
+ }
+
+ @Override
+ public Explanation toExplanation(StreamFactory factory) throws IOException {
+ return new Explanation(operationNodeId.toString())
+ .withExpressionType(ExpressionType.OPERATION)
+ .withFunctionName(factory.getFunctionName(getClass()))
+ .withImplementingClass(getClass().getName())
+ .withExpression(toExpression(factory).toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GreaterThanEqualToOperation.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GreaterThanEqualToOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GreaterThanEqualToOperation.java
new file mode 100644
index 0000000..34bd521
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GreaterThanEqualToOperation.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.ops;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public class GreaterThanEqualToOperation extends LeafOperation {
+
+ private static final long serialVersionUID = 1;
+ private UUID operationNodeId = UUID.randomUUID();
+
+ public void operate(Tuple tuple) {
+ this.tuple = tuple;
+ }
+
+ public GreaterThanEqualToOperation(String field, double val) {
+ super(field, val);
+ }
+
+ public GreaterThanEqualToOperation(StreamExpression expression, StreamFactory factory) throws IOException {
+ super(expression, factory);
+ }
+
+ public boolean evaluate() {
+ Double d = tuple.getDouble(field);
+
+ if(d == null) {
+ return false;
+ }
+
+ return d >= val;
+ }
+
+ public StreamExpression toExpression(StreamFactory factory) throws IOException {
+ StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+ expression.addParameter(field);
+ expression.addParameter(Double.toString(val));
+ return expression;
+ }
+
+ @Override
+ public Explanation toExplanation(StreamFactory factory) throws IOException {
+ return new Explanation(operationNodeId.toString())
+ .withExpressionType(ExpressionType.OPERATION)
+ .withFunctionName(factory.getFunctionName(getClass()))
+ .withImplementingClass(getClass().getName())
+ .withExpression(toExpression(factory).toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GreaterThanOperation.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GreaterThanOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GreaterThanOperation.java
new file mode 100644
index 0000000..a58ad01
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GreaterThanOperation.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.ops;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public class GreaterThanOperation extends LeafOperation {
+
+ private static final long serialVersionUID = 1;
+ private UUID operationNodeId = UUID.randomUUID();
+
+ public void operate(Tuple tuple) {
+ this.tuple = tuple;
+ }
+
+ public GreaterThanOperation(String field, double val) {
+ super(field, val);
+ }
+
+ public GreaterThanOperation(StreamExpression expression, StreamFactory factory) throws IOException {
+ super(expression, factory);
+ }
+
+ public boolean evaluate() {
+ Double d = tuple.getDouble(field);
+
+ if(d == null) {
+ return false;
+ }
+
+ return d > val;
+ }
+
+ public StreamExpression toExpression(StreamFactory factory) throws IOException {
+ StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+ expression.addParameter(field);
+ expression.addParameter(Double.toString(val));
+ return expression;
+ }
+
+ @Override
+ public Explanation toExplanation(StreamFactory factory) throws IOException {
+ return new Explanation(operationNodeId.toString())
+ .withExpressionType(ExpressionType.OPERATION)
+ .withFunctionName(factory.getFunctionName(getClass()))
+ .withImplementingClass(getClass().getName())
+ .withExpression(toExpression(factory).toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/LeafOperation.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/LeafOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/LeafOperation.java
new file mode 100644
index 0000000..bcd979a
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/LeafOperation.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.ops;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public abstract class LeafOperation implements BooleanOperation {
+
+ private static final long serialVersionUID = 1;
+ private UUID operationNodeId = UUID.randomUUID();
+
+ protected String field;
+ protected Double val;
+ protected Tuple tuple;
+
+ public void operate(Tuple tuple) {
+ this.tuple = tuple;
+ }
+
+ public LeafOperation(String field, double val) {
+ this.field = field;
+ this.val = val;
+ }
+
+ public LeafOperation(StreamExpression expression, StreamFactory factory) throws IOException {
+ this.field = factory.getValueOperand(expression, 0);
+ this.val = Double.parseDouble(factory.getValueOperand(expression, 0));
+ }
+
+ @Override
+ public Explanation toExplanation(StreamFactory factory) throws IOException {
+ return new Explanation(operationNodeId.toString())
+ .withExpressionType(ExpressionType.OPERATION)
+ .withFunctionName(factory.getFunctionName(getClass()))
+ .withImplementingClass(getClass().getName())
+ .withExpression(toExpression(factory).toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/LessThanEqualToOperation.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/LessThanEqualToOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/LessThanEqualToOperation.java
new file mode 100644
index 0000000..6278f14
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/LessThanEqualToOperation.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.ops;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public class LessThanEqualToOperation extends LeafOperation {
+
+ private static final long serialVersionUID = 1;
+ private UUID operationNodeId = UUID.randomUUID();
+
+ public void operate(Tuple tuple) {
+ this.tuple = tuple;
+ }
+
+ public LessThanEqualToOperation(String field, double val) {
+ super(field, val);
+ }
+
+ public LessThanEqualToOperation(StreamExpression expression, StreamFactory factory) throws IOException {
+ super(expression, factory);
+ }
+
+ public boolean evaluate() {
+ Double d = tuple.getDouble(field);
+
+ if(d == null) {
+ return true;
+ }
+
+ return d <= val;
+ }
+
+ public StreamExpression toExpression(StreamFactory factory) throws IOException {
+ StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+ expression.addParameter(field);
+ expression.addParameter(Double.toString(val));
+ return expression;
+ }
+
+ @Override
+ public Explanation toExplanation(StreamFactory factory) throws IOException {
+ return new Explanation(operationNodeId.toString())
+ .withExpressionType(ExpressionType.OPERATION)
+ .withFunctionName(factory.getFunctionName(getClass()))
+ .withImplementingClass(getClass().getName())
+ .withExpression(toExpression(factory).toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/LessThanOperation.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/LessThanOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/LessThanOperation.java
new file mode 100644
index 0000000..e37bee3
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/LessThanOperation.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.ops;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public class LessThanOperation extends LeafOperation {
+
+ private static final long serialVersionUID = 1;
+ private UUID operationNodeId = UUID.randomUUID();
+
+ public void operate(Tuple tuple) {
+ this.tuple = tuple;
+ }
+
+ public LessThanOperation(String field, double val) {
+ super(field, val);
+ }
+
+ public LessThanOperation(StreamExpression expression, StreamFactory factory) throws IOException {
+ super(expression, factory);
+ }
+
+ public boolean evaluate() {
+ Double d = tuple.getDouble(field);
+
+ if(d == null) {
+ return true;
+ }
+
+ return d < val;
+ }
+
+ public StreamExpression toExpression(StreamFactory factory) throws IOException {
+ StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+ expression.addParameter(field);
+ expression.addParameter(Double.toString(val));
+ return expression;
+ }
+
+ @Override
+ public Explanation toExplanation(StreamFactory factory) throws IOException {
+ return new Explanation(operationNodeId.toString())
+ .withExpressionType(ExpressionType.OPERATION)
+ .withFunctionName(factory.getFunctionName(getClass()))
+ .withImplementingClass(getClass().getName())
+ .withExpression(toExpression(factory).toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/NotOperation.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/NotOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/NotOperation.java
new file mode 100644
index 0000000..c16e4b3
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/NotOperation.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.ops;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+
+public class NotOperation implements BooleanOperation {
+
+ private static final long serialVersionUID = 1;
+ private UUID operationNodeId = UUID.randomUUID();
+
+ protected BooleanOperation operand;
+
+ public void operate(Tuple tuple) {
+ operand.operate(tuple);
+ }
+
+ public NotOperation(BooleanOperation operand) {
+ this.operand = operand;
+ }
+
+ public NotOperation(StreamExpression expression, StreamFactory factory) throws IOException {
+ List<StreamExpression> operationExpressions = factory.getExpressionOperandsRepresentingTypes(expression, BooleanOperation.class);
+ if(operationExpressions != null && operationExpressions.size() == 1) {
+ StreamExpression op = operationExpressions.get(0);
+ StreamOperation streamOp = factory.constructOperation(op);
+ if(op instanceof BooleanOperation) {
+ operand = (BooleanOperation) streamOp;
+ } else {
+ throw new IOException("The NotOperation requires a BooleanOperation.");
+ }
+
+ } else {
+ throw new IOException("The NotOperation requires a BooleanOperations.");
+ }
+ }
+
+ public boolean evaluate() {
+ return !operand.evaluate();
+ }
+
+ @Override
+ public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
+ StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+ if(operand instanceof Expressible) {
+ expression.addParameter(operand.toExpression(factory));
+ } else {
+ throw new IOException("The operand of the NotOperation contains a non-expressible operation - it cannot be converted to an expression");
+ }
+ return expression;
+ }
+
+ @Override
+ public Explanation toExplanation(StreamFactory factory) throws IOException {
+ return new Explanation(operationNodeId.toString())
+ .withExpressionType(ExpressionType.OPERATION)
+ .withFunctionName(factory.getFunctionName(getClass()))
+ .withImplementingClass(getClass().getName())
+ .withExpression(toExpression(factory).toString());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/OrOperation.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/OrOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/OrOperation.java
new file mode 100644
index 0000000..faac5cd
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/OrOperation.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.ops;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public class OrOperation extends AndOperation {
+
+ private static final long serialVersionUID = 1;
+ private UUID operationNodeId = UUID.randomUUID();
+
+ public OrOperation(BooleanOperation leftOperand, BooleanOperation rightOperand) {
+ super(leftOperand, rightOperand);
+ }
+
+ public OrOperation(StreamExpression expression, StreamFactory factory) throws IOException {
+ super(expression, factory);
+ }
+
+ public boolean evaluate() {
+ return leftOperand.evaluate() || rightOperand.evaluate();
+ }
+
+ @Override
+ public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
+ StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+ if(leftOperand instanceof Expressible) {
+ expression.addParameter(leftOperand.toExpression(factory));
+ } else {
+ throw new IOException("This left operand of the OrOperation contains a non-expressible operation - it cannot be converted to an expression");
+ }
+
+ if(rightOperand instanceof Expressible) {
+ expression.addParameter(rightOperand.toExpression(factory));
+ } else {
+ throw new IOException("This the right operand of the OrOperation contains a non-expressible operation - it cannot be converted to an expression");
+ }
+ return expression;
+ }
+
+ @Override
+ public Explanation toExplanation(StreamFactory factory) throws IOException {
+ return new Explanation(operationNodeId.toString())
+ .withExpressionType(ExpressionType.OPERATION)
+ .withFunctionName(factory.getFunctionName(getClass()))
+ .withImplementingClass(getClass().getName())
+ .withExpression(toExpression(factory).toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
index 4e239e6..94d937d 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
@@ -234,6 +234,7 @@ public class FacetStream extends TupleStream implements Expressible {
this.zkHost = zkHost;
this.params = params;
this.buckets = buckets;
+ System.out.println("####### Bucket count:"+buckets.length);
this.metrics = metrics;
this.bucketSizeLimit = bucketSizeLimit;
this.collection = collection;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37fdc37f/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HavingStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HavingStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HavingStream.java
new file mode 100644
index 0000000..9a79990
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HavingStream.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
+import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
+import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
+import org.apache.solr.client.solrj.io.ops.BooleanOperation;
+import org.apache.solr.client.solrj.io.ops.StreamOperation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+/**
+ * Iterates over a TupleStream and buffers Tuples that are equal based on a comparator.
+ * This allows tuples to be grouped by common field(s).
+ *
+ * The read() method emits one tuple per group. The fields of the emitted Tuple reflect the first tuple
+ * encountered in the group.
+ *
+ * Use the Tuple.getMaps() method to return all the Tuples in the group. This method returns
+ * a list of maps (including the group head), which hold the data for each Tuple in the group.
+ *
+ * Note: The ReducerStream requires that it's underlying stream be sorted and partitioned by the same
+ * fields as it's comparator.
+ *
+ **/
+
+public class HavingStream extends TupleStream implements Expressible {
+
+ private static final long serialVersionUID = 1;
+
+ private TupleStream stream;
+ private BooleanOperation op;
+
+ private transient Tuple currentGroupHead;
+
+ public HavingStream(TupleStream stream, BooleanOperation op) throws IOException {
+ init(stream, op);
+ }
+
+
+ public HavingStream(StreamExpression expression, StreamFactory factory) throws IOException{
+ // grab all parameters out
+ List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
+ List<StreamExpression> operationExpressions = factory.getExpressionOperandsRepresentingTypes(expression, BooleanOperation.class);
+
+ // validate expression contains only what we want.
+ if(expression.getParameters().size() != streamExpressions.size() + 2){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
+ }
+
+ if(1 != streamExpressions.size()){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
+ }
+
+
+ BooleanOperation booleanOperation = null;
+ if(operationExpressions != null && operationExpressions.size() == 1) {
+ StreamExpression ex = operationExpressions.get(0);
+ StreamOperation operation = factory.constructOperation(ex);
+ if(operation instanceof BooleanOperation) {
+ booleanOperation = (BooleanOperation) operation;
+ } else {
+ throw new IOException("The HavingStream requires a BooleanOperation. A StreamOperation was provided.");
+ }
+ } else {
+ throw new IOException("The HavingStream requires a BooleanOperation.");
+ }
+
+ init(factory.constructStream(streamExpressions.get(0)), booleanOperation);
+ }
+
+ private void init(TupleStream stream, BooleanOperation op) throws IOException{
+ this.stream = stream;
+ this.op = op;
+ }
+
+ @Override
+ public StreamExpression toExpression(StreamFactory factory) throws IOException{
+ return toExpression(factory, true);
+ }
+
+ private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
+ // function name
+ StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+
+ // stream
+ if(includeStreams){
+ expression.addParameter(((Expressible) stream).toExpression(factory));
+ }
+ else{
+ expression.addParameter("<stream>");
+ }
+
+ if(op instanceof Expressible) {
+ expression.addParameter(op.toExpression(factory));
+ } else {
+ throw new IOException("This ReducerStream contains a non-expressible operation - it cannot be converted to an expression");
+ }
+
+ return expression;
+ }
+
+ @Override
+ public Explanation toExplanation(StreamFactory factory) throws IOException {
+
+ return new StreamExplanation(getStreamNodeId().toString())
+ .withChildren(new Explanation[]{
+ stream.toExplanation(factory)
+ })
+ .withFunctionName(factory.getFunctionName(this.getClass()))
+ .withImplementingClass(this.getClass().getName())
+ .withExpressionType(ExpressionType.STREAM_DECORATOR)
+ .withExpression(toExpression(factory, false).toString())
+ .withHelpers(new Explanation[]{
+ op.toExplanation(factory)
+ });
+ }
+
+ public void setStreamContext(StreamContext context) {
+ this.stream.setStreamContext(context);
+ }
+
+ public List<TupleStream> children() {
+ List<TupleStream> l = new ArrayList<TupleStream>();
+ l.add(stream);
+ return l;
+ }
+
+ public void open() throws IOException {
+ stream.open();
+ }
+
+ public void close() throws IOException {
+ stream.close();
+ }
+
+ public Tuple read() throws IOException {
+ while(true) {
+ Tuple tuple = stream.read();
+ if(tuple.EOF) {
+ return tuple;
+ }
+
+ op.operate(tuple);
+
+ if(op.evaluate()) {
+ return tuple;
+ }
+ }
+ }
+
+ /** Return the stream sort - ie, the order in which records are returned */
+ public StreamComparator getStreamSort(){
+ return stream.getStreamSort();
+ }
+
+ public int getCost() {
+ return 0;
+ }
+}
\ No newline at end of file