You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dp...@apache.org on 2015/11/11 02:45:05 UTC
svn commit: r1713753 - in /lucene/dev/trunk/solr: ./
core/src/java/org/apache/solr/handler/
solrj/src/java/org/apache/solr/client/solrj/io/
solrj/src/java/org/apache/solr/client/solrj/io/comp/
solrj/src/java/org/apache/solr/client/solrj/io/eq/ solrj/sr...
Author: dpgove
Date: Wed Nov 11 01:45:04 2015
New Revision: 1713753
URL: http://svn.apache.org/viewvc?rev=1713753&view=rev
Log:
SOLR-7584: Adds Inner and LeftOuter Joins to the Streaming API and Streaming Expressions (Dennis Gove, Corey Wu)
Added:
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/BiJoinStream.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/InnerJoinStream.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JoinStream.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/LeftOuterJoinStream.java (with props)
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/Equalitor.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/FieldEqualitor.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/MultipleFieldEqualitor.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1713753&r1=1713752&r2=1713753&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Wed Nov 11 01:45:04 2015
@@ -93,6 +93,8 @@ New Features
* SOLR-8268: StatsStream now implements the Expressible interface (Dennis Gove)
+* SOLR-7584: Adds Inner and LeftOuter Joins to the Streaming API and Streaming Expressions (Dennis Gove, Corey Wu)
+
Optimizations
----------------------
* SOLR-7876: Speed up queries and operations that use many terms when timeAllowed has not been
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SQLHandler.java?rev=1713753&r1=1713752&r2=1713753&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SQLHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SQLHandler.java Wed Nov 11 01:45:04 2015
@@ -276,15 +276,17 @@ public class SQLHandler extends RequestH
if(sqlVisitor.sorts != null && sqlVisitor.sorts.size() > 0) {
StreamComparator[] adjustedSorts = adjustSorts(sqlVisitor.sorts, buckets);
+ // Because of the way adjustSorts works we know that each FieldComparator has a single
+ // field name. For this reason we can just look at the leftFieldName
FieldEqualitor[] fieldEqualitors = new FieldEqualitor[adjustedSorts.length];
StringBuilder buf = new StringBuilder();
for(int i=0; i<adjustedSorts.length; i++) {
FieldComparator fieldComparator = (FieldComparator)adjustedSorts[i];
- fieldEqualitors[i] = new FieldEqualitor(fieldComparator.getFieldName());
+ fieldEqualitors[i] = new FieldEqualitor(fieldComparator.getLeftFieldName());
if(i>0) {
buf.append(",");
}
- buf.append(fieldComparator.getFieldName()).append(" ").append(fieldComparator.getOrder().toString());
+ buf.append(fieldComparator.getLeftFieldName()).append(" ").append(fieldComparator.getOrder().toString());
}
sort = buf.toString();
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/StreamHandler.java?rev=1713753&r1=1713752&r2=1713753&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/StreamHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/StreamHandler.java Wed Nov 11 01:45:04 2015
@@ -31,6 +31,8 @@ import org.apache.solr.client.solrj.io.T
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
import org.apache.solr.client.solrj.io.stream.ExceptionStream;
+import org.apache.solr.client.solrj.io.stream.InnerJoinStream;
+import org.apache.solr.client.solrj.io.stream.LeftOuterJoinStream;
import org.apache.solr.client.solrj.io.stream.MergeStream;
import org.apache.solr.client.solrj.io.stream.ParallelStream;
import org.apache.solr.client.solrj.io.stream.RankStream;
@@ -100,6 +102,8 @@ public class StreamHandler extends Reque
.withFunctionName("parallel", ParallelStream.class)
.withFunctionName("rollup", RollupStream.class)
.withFunctionName("stats", StatsStream.class)
+ .withFunctionName("innerJoin", InnerJoinStream.class)
+ .withFunctionName("leftOuterJoin", LeftOuterJoinStream.class)
// metrics
.withFunctionName("min", MinMetric.class)
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java?rev=1713753&r1=1713752&r2=1713753&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java Wed Nov 11 01:45:04 2015
@@ -141,4 +141,8 @@ public class Tuple implements Cloneable
Tuple clone = new Tuple(m);
return clone;
}
+
+ public void merge(Tuple other){
+ fields.putAll(other.getMap());
+ }
}
\ No newline at end of file
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java?rev=1713753&r1=1713752&r2=1713753&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java Wed Nov 11 01:45:04 2015
@@ -17,11 +17,7 @@
package org.apache.solr.client.solrj.io.comp;
-import java.io.Serializable;
-import java.util.Comparator;
-
import org.apache.solr.client.solrj.io.Tuple;
-import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
@@ -33,28 +29,55 @@ public class FieldComparator implements
private static final long serialVersionUID = 1;
- private String fieldName;
+ private String leftFieldName;
+ private String rightFieldName;
private final ComparatorOrder order;
private ComparatorLambda comparator;
- public FieldComparator(String fieldName, ComparatorOrder order) {
- this.fieldName = fieldName;
+ public FieldComparator(String fieldName, ComparatorOrder order){
+ leftFieldName = fieldName;
+ rightFieldName = fieldName;
+ this.order = order;
+ assignComparator();
+ }
+
+ public FieldComparator(String leftFieldName, String rightFieldName, ComparatorOrder order) {
+ this.leftFieldName = leftFieldName;
+ this.rightFieldName = rightFieldName;
this.order = order;
assignComparator();
}
- public String getFieldName(){
- return fieldName;
+ public void setLeftFieldName(String leftFieldName){
+ this.leftFieldName = leftFieldName;
+ }
+ public String getLeftFieldName(){
+ return leftFieldName;
+ }
+
+ public void setRightFieldName(String rightFieldName){
+ this.rightFieldName = rightFieldName;
+ }
+ public String getRightFieldName(){
+ return rightFieldName;
}
public ComparatorOrder getOrder(){
return order;
}
+ public boolean hasDifferentFieldNames(){
+ return !leftFieldName.equals(rightFieldName);
+ }
+
public StreamExpressionParameter toExpression(StreamFactory factory){
StringBuilder sb = new StringBuilder();
- sb.append(fieldName);
+ sb.append(leftFieldName);
+ if(!leftFieldName.equals(rightFieldName)){
+ sb.append("=");
+ sb.append(rightFieldName);
+ }
sb.append(" ");
sb.append(order);
@@ -76,8 +99,8 @@ public class FieldComparator implements
comparator = new ComparatorLambda() {
@Override
public int compare(Tuple leftTuple, Tuple rightTuple) {
- Comparable leftComp = (Comparable)leftTuple.get(fieldName);
- Comparable rightComp = (Comparable)rightTuple.get(fieldName);
+ Comparable leftComp = (Comparable)leftTuple.get(leftFieldName);
+ Comparable rightComp = (Comparable)rightTuple.get(rightFieldName);
if(leftComp == rightComp){ return 0; } // if both null then they are equal. if both are same ref then are equal
if(null == leftComp){ return 1; }
@@ -92,8 +115,8 @@ public class FieldComparator implements
comparator = new ComparatorLambda() {
@Override
public int compare(Tuple leftTuple, Tuple rightTuple) {
- Comparable leftComp = (Comparable)leftTuple.get(fieldName);
- Comparable rightComp = (Comparable)rightTuple.get(fieldName);
+ Comparable leftComp = (Comparable)leftTuple.get(leftFieldName);
+ Comparable rightComp = (Comparable)rightTuple.get(rightFieldName);
if(leftComp == rightComp){ return 0; } // if both null then they are equal. if both are same ref then are equal
if(null == leftComp){ return -1; }
@@ -114,7 +137,7 @@ public class FieldComparator implements
if(null == base){ return false; }
if(base instanceof FieldComparator){
FieldComparator baseComp = (FieldComparator)base;
- return fieldName.equals(baseComp.fieldName) && order == baseComp.order;
+ return (leftFieldName.equals(baseComp.leftFieldName) || rightFieldName.equals(baseComp.rightFieldName)) && order == baseComp.order;
}
else if(base instanceof MultipleFieldComparator){
// must equal the first one
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/Equalitor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/Equalitor.java?rev=1713753&r1=1713752&r2=1713753&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/Equalitor.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/Equalitor.java Wed Nov 11 01:45:04 2015
@@ -27,4 +27,5 @@ package org.apache.solr.client.solrj.io.
*/
public interface Equalitor<T> {
public boolean test(T left, T right);
+
}
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/FieldEqualitor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/FieldEqualitor.java?rev=1713753&r1=1713752&r2=1713753&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/FieldEqualitor.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/FieldEqualitor.java Wed Nov 11 01:45:04 2015
@@ -76,6 +76,14 @@ public class FieldEqualitor implements S
return 0 == leftComp.compareTo(rightComp);
}
+ public String getLeftFieldName(){
+ return leftFieldName;
+ }
+
+ public String getRightFieldName(){
+ return rightFieldName;
+ }
+
@Override
public boolean isDerivedFrom(StreamEqualitor base){
if(null == base){ return false; }
@@ -99,7 +107,7 @@ public class FieldEqualitor implements S
if(null == base){ return false; }
if(base instanceof FieldComparator){
FieldComparator baseComp = (FieldComparator)base;
- return leftFieldName.equals(baseComp.getFieldName()) && rightFieldName.equals(baseComp.getFieldName());
+ return leftFieldName.equals(baseComp.getLeftFieldName()) || rightFieldName.equals(baseComp.getRightFieldName());
}
else if(base instanceof MultipleFieldComparator){
// must equal the first one
@@ -111,11 +119,4 @@ public class FieldEqualitor implements S
return false;
}
-
- public String getLeftFieldName(){
- return leftFieldName;
- }
- public String getRightFieldName(){
- return rightFieldName;
- }
}
\ No newline at end of file
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/MultipleFieldEqualitor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/MultipleFieldEqualitor.java?rev=1713753&r1=1713752&r2=1713753&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/MultipleFieldEqualitor.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/MultipleFieldEqualitor.java Wed Nov 11 01:45:04 2015
@@ -46,16 +46,6 @@ public class MultipleFieldEqualitor impl
return eqs;
}
- public boolean test(Tuple t1, Tuple t2) {
- for(Equalitor<Tuple> eq : eqs) {
- if(!eq.test(t1, t2)){
- return false;
- }
- }
-
- return true;
- }
-
@Override
public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
StringBuilder sb = new StringBuilder();
@@ -108,5 +98,16 @@ public class MultipleFieldEqualitor impl
}
return false;
+
+ }
+
+ public boolean test(Tuple t1, Tuple t2) {
+ for(Equalitor<Tuple> eq : eqs) {
+ if(!eq.test(t1, t2)){
+ return false;
+ }
+ }
+
+ return true;
}
}
\ No newline at end of file
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/BiJoinStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/BiJoinStream.java?rev=1713753&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/BiJoinStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/BiJoinStream.java Wed Nov 11 01:45:04 2015
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.eq.Equalitor;
+import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
+import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
+import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+/**
+ * Joins leftStream with rightStream based on a Equalitor. Both streams must be sorted by the fields being joined on.
+ * Resulting stream is sorted by the equalitor.
+ **/
+
+public abstract class BiJoinStream extends JoinStream implements Expressible {
+
+ protected PushBackStream leftStream;
+ protected PushBackStream rightStream;
+
+ // This is used to determine whether we should iterate the left or right side (depending on stream order).
+ // It is built from the incoming equalitor and streams' comparators.
+ protected StreamComparator iterationComparator;
+ protected StreamComparator leftStreamComparator, rightStreamComparator;
+
+ public BiJoinStream(TupleStream leftStream, TupleStream rightStream, StreamEqualitor eq) throws IOException {
+ super(eq, leftStream, rightStream);
+ init();
+ }
+
+ public BiJoinStream(StreamExpression expression, StreamFactory factory) throws IOException {
+ super(expression, factory);
+ init();
+ }
+
+ private void init() throws IOException {
+
+ // Validates all incoming streams for tuple order
+ validateTupleOrder();
+
+ leftStream = getStream(0);
+ rightStream = getStream(1);
+
+ // iterationComparator is a combination of the equalitor and the comp from each stream. This can easily be done by
+ // grabbing the first N parts of each comp where N is the number of parts in the equalitor. Because we've already
+ // validated tuple order (the comps) then we know this can be done.
+ iterationComparator = createIterationComparator(eq, leftStream.getStreamSort());
+ leftStreamComparator = createSideComparator(eq, leftStream.getStreamSort());
+ rightStreamComparator = createSideComparator(eq, rightStream.getStreamSort());
+ }
+
+ protected void validateTupleOrder() throws IOException {
+ if (!isValidTupleOrder()) {
+ throw new IOException(
+ "Invalid JoinStream - all incoming stream comparators (sort) must be a superset of this stream's equalitor.");
+ }
+ }
+
+ private StreamComparator createIterationComparator(StreamEqualitor eq, StreamComparator comp) throws IOException {
+ if (eq instanceof MultipleFieldEqualitor && comp instanceof MultipleFieldComparator) {
+ // we know the comp is at least as long as the eq because we've already validated the tuple order
+ StreamComparator[] compoundComps = new StreamComparator[((MultipleFieldEqualitor) eq).getEqs().length];
+ for (int idx = 0; idx < compoundComps.length; ++idx) {
+ StreamEqualitor sourceEqualitor = ((MultipleFieldEqualitor) eq).getEqs()[idx];
+ StreamComparator sourceComparator = ((MultipleFieldComparator) comp).getComps()[idx];
+
+ if (sourceEqualitor instanceof FieldEqualitor && sourceComparator instanceof FieldComparator) {
+ FieldEqualitor fieldEqualitor = (FieldEqualitor) sourceEqualitor;
+ FieldComparator fieldComparator = (FieldComparator) sourceComparator;
+ compoundComps[idx] = new FieldComparator(fieldEqualitor.getLeftFieldName(),
+ fieldEqualitor.getRightFieldName(), fieldComparator.getOrder());
+ } else {
+ throw new IOException("Failed to create an iteration comparator");
+ }
+ }
+ return new MultipleFieldComparator(compoundComps);
+ } else if (comp instanceof MultipleFieldComparator) {
+ StreamEqualitor sourceEqualitor = eq;
+ StreamComparator sourceComparator = ((MultipleFieldComparator) comp).getComps()[0];
+
+ if (sourceEqualitor instanceof FieldEqualitor && sourceComparator instanceof FieldComparator) {
+ FieldEqualitor fieldEqualitor = (FieldEqualitor) sourceEqualitor;
+ FieldComparator fieldComparator = (FieldComparator) sourceComparator;
+ return new FieldComparator(fieldEqualitor.getLeftFieldName(), fieldEqualitor.getRightFieldName(),
+ fieldComparator.getOrder());
+ } else {
+ throw new IOException("Failed to create an iteration comparator");
+ }
+ } else {
+ StreamEqualitor sourceEqualitor = eq;
+ StreamComparator sourceComparator = comp;
+
+ if (sourceEqualitor instanceof FieldEqualitor && sourceComparator instanceof FieldComparator) {
+ FieldEqualitor fieldEqualitor = (FieldEqualitor) sourceEqualitor;
+ FieldComparator fieldComparator = (FieldComparator) sourceComparator;
+ return new FieldComparator(fieldEqualitor.getLeftFieldName(), fieldEqualitor.getRightFieldName(),
+ fieldComparator.getOrder());
+ } else {
+ throw new IOException("Failed to create an iteration comparator");
+ }
+ }
+ }
+
+ private StreamComparator createSideComparator(StreamEqualitor eq, StreamComparator comp) throws IOException {
+ if (eq instanceof MultipleFieldEqualitor && comp instanceof MultipleFieldComparator) {
+ // we know the comp is at least as long as the eq because we've already validated the tuple order
+ StreamComparator[] compoundComps = new StreamComparator[((MultipleFieldEqualitor) eq).getEqs().length];
+ for (int idx = 0; idx < compoundComps.length; ++idx) {
+ StreamComparator sourceComparator = ((MultipleFieldComparator) comp).getComps()[idx];
+
+ if (sourceComparator instanceof FieldComparator) {
+ FieldComparator fieldComparator = (FieldComparator) sourceComparator;
+ compoundComps[idx] = new FieldComparator(fieldComparator.getLeftFieldName(),
+ fieldComparator.getRightFieldName(), fieldComparator.getOrder());
+ } else {
+ throw new IOException("Failed to create an side comparator");
+ }
+ }
+ return new MultipleFieldComparator(compoundComps);
+ } else if (comp instanceof MultipleFieldComparator) {
+ StreamComparator sourceComparator = ((MultipleFieldComparator) comp).getComps()[0];
+
+ if (sourceComparator instanceof FieldComparator) {
+ FieldComparator fieldComparator = (FieldComparator) sourceComparator;
+ return new FieldComparator(fieldComparator.getLeftFieldName(), fieldComparator.getRightFieldName(),
+ fieldComparator.getOrder());
+ } else {
+ throw new IOException("Failed to create an side comparator");
+ }
+ } else {
+ StreamComparator sourceComparator = comp;
+
+ if (sourceComparator instanceof FieldComparator) {
+ FieldComparator fieldComparator = (FieldComparator) sourceComparator;
+ return new FieldComparator(fieldComparator.getLeftFieldName(), fieldComparator.getRightFieldName(),
+ fieldComparator.getOrder());
+ } else {
+ throw new IOException("Failed to create an side comparator");
+ }
+ }
+ }
+}
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java?rev=1713753&r1=1713752&r2=1713753&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java Wed Nov 11 01:45:04 2015
@@ -407,9 +407,9 @@ public class CloudSolrStream extends Tup
protected class TupleWrapper implements Comparable<TupleWrapper> {
private Tuple tuple;
private SolrStream stream;
- private Comparator comp;
+ private StreamComparator comp;
- public TupleWrapper(SolrStream stream, Comparator comp) {
+ public TupleWrapper(SolrStream stream, StreamComparator comp) {
this.stream = stream;
this.comp = comp;
}
@@ -449,9 +449,9 @@ public class CloudSolrStream extends Tup
protected class StreamOpener implements Callable<TupleWrapper> {
private SolrStream stream;
- private Comparator<Tuple> comp;
+ private StreamComparator comp;
- public StreamOpener(SolrStream stream, Comparator<Tuple> comp) {
+ public StreamOpener(SolrStream stream, StreamComparator comp) {
this.stream = stream;
this.comp = comp;
}
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java?rev=1713753&r1=1713752&r2=1713753&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java Wed Nov 11 01:45:04 2015
@@ -62,7 +62,7 @@ public class FacetStream extends TupleSt
Bucket[] buckets,
Metric[] metrics,
FieldComparator[] sorts,
- int limit) {
+ int limit) throws IOException {
this.zkHost = zkHost;
this.props = props;
this.buckets = buckets;
@@ -70,6 +70,15 @@ public class FacetStream extends TupleSt
this.limit = limit;
this.collection = collection;
this.sorts = sorts;
+
+ // In a facet world it only makes sense to have the same field name in all of the sorters
+ // Because FieldComparator allows for left and right field names we will need to validate
+ // that they are the same
+ for(FieldComparator sort : sorts){
+ if(sort.hasDifferentFieldNames()){
+ throw new IOException("Invalid FacetStream - all sorts must be constructed with a single field name.");
+ }
+ }
}
public void setStreamContext(StreamContext context) {
@@ -144,7 +153,7 @@ public class FacetStream extends TupleSt
return _sorts;
} else if(_sorts.length == 1) {
FieldComparator[] adjustedSorts = new FieldComparator[_buckets.length];
- if (_sorts[0].getFieldName().contains("(")) {
+ if (_sorts[0].getLeftFieldName().contains("(")) {
//Its a metric sort so apply the same sort criteria at each level.
for (int i = 0; i < adjustedSorts.length; i++) {
adjustedSorts[i] = _sorts[0];
@@ -174,7 +183,7 @@ public class FacetStream extends TupleSt
buf.append("\"type\":\"terms\"");
buf.append(",\"field\":\""+_buckets[level].toString()+"\"");
buf.append(",\"limit\":"+_limit);
- buf.append(",\"sort\":{\""+getFacetSort(_sorts[level].getFieldName(), _metrics)+"\":\""+_sorts[level].getOrder()+"\"}");
+ buf.append(",\"sort\":{\""+getFacetSort(_sorts[level].getLeftFieldName(), _metrics)+"\":\""+_sorts[level].getOrder()+"\"}");
buf.append(",\"facet\":{");
int metricCount = 0;
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/InnerJoinStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/InnerJoinStream.java?rev=1713753&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/InnerJoinStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/InnerJoinStream.java Wed Nov 11 01:45:04 2015
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.LinkedList;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+/**
+ * Joins leftStream with rightStream based on a Equalitor. Both streams must be sorted by the fields being joined on.
+ * Resulting stream is sorted by the equalitor.
+ **/
+
+public class InnerJoinStream extends BiJoinStream implements Expressible {
+
+ private LinkedList<Tuple> joinedTuples = new LinkedList<Tuple>();
+ private LinkedList<Tuple> leftTupleGroup = new LinkedList<Tuple>();
+ private LinkedList<Tuple> rightTupleGroup = new LinkedList<Tuple>();
+
+ public InnerJoinStream(TupleStream leftStream, TupleStream rightStream, StreamEqualitor eq) throws IOException {
+ super(leftStream, rightStream, eq);
+ }
+
+ public InnerJoinStream(StreamExpression expression, StreamFactory factory) throws IOException {
+ super(expression, factory);
+ }
+
+ public Tuple read() throws IOException {
+ // if we've already figured out the next joined tuple then just return it
+ if (joinedTuples.size() > 0) {
+ return joinedTuples.removeFirst();
+ }
+
+ // keep going until we find something to return or (left or right) are empty
+ while (true) {
+ if (0 == leftTupleGroup.size()) {
+ Tuple firstMember = loadEqualTupleGroup(leftStream, leftTupleGroup, leftStreamComparator);
+
+ // if first member of group is EOF then we're done
+ if (firstMember.EOF) {
+ return firstMember;
+ }
+ }
+
+ if (0 == rightTupleGroup.size()) {
+ Tuple firstMember = loadEqualTupleGroup(rightStream, rightTupleGroup, rightStreamComparator);
+
+ // if first member of group is EOF then we're done
+ if (firstMember.EOF) {
+ return firstMember;
+ }
+ }
+
+ // At this point we know both left and right groups have at least 1 member
+ if (eq.test(leftTupleGroup.get(0), rightTupleGroup.get(0))) {
+ // The groups are equal. Join em together and build the joinedTuples
+ for (Tuple left : leftTupleGroup) {
+ for (Tuple right : rightTupleGroup) {
+ Tuple clone = left.clone();
+ clone.merge(right);
+ joinedTuples.add(clone);
+ }
+ }
+
+ // Cause each to advance next time we need to look
+ leftTupleGroup.clear();
+ rightTupleGroup.clear();
+
+ return joinedTuples.removeFirst();
+ } else {
+ int c = iterationComparator.compare(leftTupleGroup.get(0), rightTupleGroup.get(0));
+ if (c < 0) {
+ // advance left
+ leftTupleGroup.clear();
+ } else {
+ // advance right
+ rightTupleGroup.clear();
+ }
+
+ }
+ }
+ }
+
+ @Override
+ public StreamComparator getStreamSort() {
+ return iterationComparator;
+ }
+}
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JoinStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JoinStream.java?rev=1713753&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JoinStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JoinStream.java Wed Nov 11 01:45:04 2015
@@ -0,0 +1,196 @@
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
+import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Defines a JoinStream which can hold N streams, all joined with the same equalitor */
+public abstract class JoinStream extends TupleStream implements Expressible {
+
+ private static final long serialVersionUID = 1;
+ private List<PushBackStream> streams;
+ protected StreamEqualitor eq;
+
+ public JoinStream(StreamEqualitor eq, TupleStream first, TupleStream second, TupleStream... others) {
+ this.streams = new ArrayList<PushBackStream>();
+
+ this.eq = eq;
+
+ this.streams.add(new PushBackStream(first));
+ this.streams.add(new PushBackStream(second));
+
+ for (TupleStream other : others) {
+ this.streams.add(new PushBackStream(other));
+ }
+ }
+
+ protected abstract void validateTupleOrder() throws IOException;
+
+ public JoinStream(StreamExpression expression, StreamFactory factory) throws IOException {
+ // grab all parameters out
+ List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression,
+ Expressible.class, TupleStream.class);
+ StreamExpressionNamedParameter onExpression = factory.getNamedOperand(expression, "on");
+
+ // validate expression contains only what we want.
+ if (expression.getParameters().size() != streamExpressions.size() + 1) {
+ throw new IOException(String.format(Locale.ROOT, "Invalid expression %s - unknown operands found", expression));
+ }
+
+ if (streamExpressions.size() < 2) {
+ throw new IOException(String.format(Locale.ROOT,
+ "Invalid expression %s - expecting at least two streams but found %d (must be PushBackStream types)",
+ expression, streamExpressions.size()));
+ }
+
+ this.streams = new ArrayList<PushBackStream>();
+ for (StreamExpression streamExpression : streamExpressions) {
+ this.streams.add(new PushBackStream(factory.constructStream(streamExpression)));
+ }
+
+ if (null == onExpression || !(onExpression.getParameter() instanceof StreamExpressionValue)) {
+ throw new IOException(String.format(Locale.ROOT,
+ "Invalid expression %s - expecting single 'on' parameter listing fields to join on but didn't find one",
+ expression));
+ }
+
+ this.eq = factory.constructEqualitor(((StreamExpressionValue) onExpression.getParameter()).getValue(),
+ FieldEqualitor.class);
+ }
+
+ @Override
+ public StreamExpression toExpression(StreamFactory factory) throws IOException {
+ // function name
+ StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+
+ // streams
+ for (PushBackStream stream : streams) {
+ expression.addParameter(stream.toExpression(factory));
+ }
+
+ // on
+ if (eq instanceof Expressible) {
+ expression.addParameter(new StreamExpressionNamedParameter("on", ((Expressible) eq).toExpression(factory)));
+ } else {
+ throw new IOException(
+ "This JoinStream contains a non-expressible equalitor - it cannot be converted to an expression");
+ }
+
+ return expression;
+ }
+
+ public void setStreamContext(StreamContext context) {
+ for (PushBackStream stream : streams) {
+ stream.setStreamContext(context);
+ }
+ }
+
+ public void open() throws IOException {
+ for (PushBackStream stream : streams) {
+ stream.open();
+ }
+ }
+
+ public void close() throws IOException {
+ for (PushBackStream stream : streams) {
+ stream.close();
+ }
+ }
+
+ public List<TupleStream> children() {
+ List<TupleStream> list = new ArrayList<TupleStream>();
+ for (TupleStream stream : streams) {
+ list.add(stream);
+ }
+ return list;
+ }
+
+ public PushBackStream getStream(int idx) {
+ if (streams.size() > idx) {
+ return streams.get(idx);
+ }
+
+ throw new IllegalArgumentException(String.format(Locale.ROOT,"Stream idx=%d doesn't exist. Number of streams is %d", idx,
+ streams.size()));
+ }
+
+ protected boolean isValidTupleOrder() {
+ // Validate that the equalitor is derivable from the comparator in each stream. If it is, then we know all stream
+ // comparators are
+ // derivable with each other stream
+ for (TupleStream stream : streams) {
+ if (!eq.isDerivedFrom(stream.getStreamSort())) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Given the stream, start from beginning and load group with all tuples that are equal to the first in stream
+ * (including the first one in the stream). All matched tuples are removed from the stream. Result is at least one
+ * tuple will be read from the stream and 0 or more tuples will exist in the group. If the first tuple is EOF then the
+ * group will have 0 items. Else it will have at least one item. The first group member is returned.
+ *
+ * @param group
+ * - should be empty
+ */
+ protected Tuple loadEqualTupleGroup(PushBackStream stream, LinkedList<Tuple> group, StreamComparator groupComparator)
+ throws IOException {
+ // Find next set of same tuples from the stream
+ Tuple firstMember = stream.read();
+
+ if (!firstMember.EOF) {
+ // first in group, implicitly a member
+ group.add(firstMember);
+
+ BREAKPOINT: while (true) {
+ Tuple nMember = stream.read();
+ if (!nMember.EOF && 0 == groupComparator.compare(firstMember, nMember)) {
+ // they are in same group
+ group.add(nMember);
+ } else {
+ stream.pushBack(nMember);
+ break BREAKPOINT;
+ }
+ }
+ }
+
+ return firstMember;
+ }
+
+ public int getCost() {
+ return 0;
+ }
+
+}
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/LeftOuterJoinStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/LeftOuterJoinStream.java?rev=1713753&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/LeftOuterJoinStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/LeftOuterJoinStream.java Wed Nov 11 01:45:04 2015
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.LinkedList;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
+import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
+import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+/**
+ * Joins leftStream with rightStream based on a Equalitor. Both streams must be sorted by the fields being joined on.
+ * Resulting stream is sorted by the equalitor.
+ **/
+
+public class LeftOuterJoinStream extends BiJoinStream implements Expressible {
+
+ private LinkedList<Tuple> joinedTuples = new LinkedList<Tuple>();
+ private LinkedList<Tuple> leftTupleGroup = new LinkedList<Tuple>();
+ private LinkedList<Tuple> rightTupleGroup = new LinkedList<Tuple>();
+
+ public LeftOuterJoinStream(TupleStream leftStream, TupleStream rightStream, StreamEqualitor eq) throws IOException {
+ super(leftStream, rightStream, eq);
+ }
+
+ public LeftOuterJoinStream(StreamExpression expression, StreamFactory factory) throws IOException {
+ super(expression, factory);
+ }
+
+ public Tuple read() throws IOException {
+ // if we've already figured out the next joined tuple then just return it
+ if (joinedTuples.size() > 0) {
+ return joinedTuples.removeFirst();
+ }
+
+ // keep going until we find something to return or left stream is empty
+ while (true) {
+ if (0 == leftTupleGroup.size()) {
+ Tuple firstMember = loadEqualTupleGroup(leftStream, leftTupleGroup, leftStreamComparator);
+
+ // if first member of group is EOF then we're done
+ if (firstMember.EOF) {
+ return firstMember;
+ }
+ }
+
+ if (0 == rightTupleGroup.size()) {
+ // Load the right tuple group, but don't end if it's EOF
+ loadEqualTupleGroup(rightStream, rightTupleGroup, rightStreamComparator);
+ }
+
+ // If the right stream is at the EOF, we just return the next element from the left stream
+ if (0 == rightTupleGroup.size() || rightTupleGroup.get(0).EOF) {
+ return leftTupleGroup.removeFirst();
+ }
+
+ // At this point we know both left and right groups have at least 1 member
+ if (eq.test(leftTupleGroup.get(0), rightTupleGroup.get(0))) {
+ // The groups are equal. Join em together and build the joinedTuples
+ for (Tuple left : leftTupleGroup) {
+ for (Tuple right : rightTupleGroup) {
+ Tuple clone = left.clone();
+ clone.merge(right);
+ joinedTuples.add(clone);
+ }
+ }
+
+ // Cause each to advance next time we need to look
+ leftTupleGroup.clear();
+ rightTupleGroup.clear();
+
+ return joinedTuples.removeFirst();
+ } else {
+ int c = iterationComparator.compare(leftTupleGroup.get(0), rightTupleGroup.get(0));
+ if (c < 0) {
+ // If there's no match, we still advance the left stream while returning every element.
+ // Because it's a left-outer join we still return the left tuple if no match on right.
+ return leftTupleGroup.removeFirst();
+ } else {
+ // advance right
+ rightTupleGroup.clear();
+ }
+ }
+ }
+ }
+
+ @Override
+ public StreamComparator getStreamSort() {
+ return iterationComparator;
+ }
+}
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java?rev=1713753&r1=1713752&r2=1713753&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java Wed Nov 11 01:45:04 2015
@@ -80,7 +80,7 @@ public class ReducerStream extends Tuple
}
else{
FieldComparator fComp = (FieldComparator)comp;
- return new FieldEqualitor(fComp.getFieldName());
+ return new FieldEqualitor(fComp.getLeftFieldName(), fComp.getRightFieldName());
}
}
@@ -101,8 +101,6 @@ public class ReducerStream extends Tuple
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'by' parameter listing fields to group by but didn't find one",expression));
}
- // Reducing is always done over equality, so always use an EqualTo comparator
-
init(factory.constructStream(streamExpressions.get(0)),
factory.constructEqualitor(((StreamExpressionValue)byExpression.getParameter()).getValue(), FieldEqualitor.class)
);
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java?rev=1713753&r1=1713752&r2=1713753&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java Wed Nov 11 01:45:04 2015
@@ -204,10 +204,45 @@ public class StreamFactory implements Se
}
return new MultipleFieldComparator(comps);
}
+ else if(comparatorString.contains("=")){
+ // expected format is "left=right order"
+ String[] parts = comparatorString.split("[ =]");
+
+ if(parts.length < 3){
+ throw new IOException(String.format(Locale.ROOT,"Invalid comparator expression %s - expecting 'left=right order'",comparatorString));
+ }
+
+ String leftFieldName = null;
+ String rightFieldName = null;
+ String order = null;
+ for(String part : parts){
+ // skip empty
+ if(null == part || 0 == part.trim().length()){ continue; }
+
+ // assign each in order
+ if(null == leftFieldName){
+ leftFieldName = part.trim();
+ }
+ else if(null == rightFieldName){
+ rightFieldName = part.trim();
+ }
+ else if(null == order){
+ order = part.trim();
+ break; // we're done, stop looping
+ }
+ }
+
+ if(null == leftFieldName || null == rightFieldName || null == order){
+ throw new IOException(String.format(Locale.ROOT,"Invalid comparator expression %s - expecting 'left=right order'",comparatorString));
+ }
+
+ return (StreamComparator)createInstance(comparatorType, new Class[]{ String.class, String.class, ComparatorOrder.class }, new Object[]{ leftFieldName, rightFieldName, ComparatorOrder.fromString(order) });
+ }
else{
+ // expected format is "field order"
String[] parts = comparatorString.split(" ");
if(2 != parts.length){
- throw new IOException(String.format(Locale.ROOT,"Invalid comparator expression %s - expecting fieldName and order",comparatorString));
+ throw new IOException(String.format(Locale.ROOT,"Invalid comparator expression %s - expecting 'field order'",comparatorString));
}
String fieldName = parts[0].trim();
@@ -254,7 +289,12 @@ public class StreamFactory implements Se
return ctor.newInstance(params);
} catch (NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
- throw new IOException(String.format(Locale.ROOT,"Unable to construct instance of %s", clazz.getName()),e);
+ if(null != e.getMessage()){
+ throw new IOException(String.format(Locale.ROOT,"Unable to construct instance of %s caused by %s", clazz.getName(), e.getMessage()),e);
+ }
+ else{
+ throw new IOException(String.format(Locale.ROOT,"Unable to construct instance of %s", clazz.getName()),e);
+ }
}
}
Modified: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java?rev=1713753&r1=1713752&r2=1713753&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java Wed Nov 11 01:45:04 2015
@@ -29,11 +29,9 @@ import org.apache.solr.client.solrj.io.T
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
-import org.apache.solr.client.solrj.io.stream.metrics.Bucket;
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
-import org.apache.solr.client.solrj.io.stream.metrics.Metric;
import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
@@ -132,6 +130,8 @@ public class StreamExpressionTest extend
testParallelRankStream();
testParallelMergeStream();
testParallelRollupStream();
+ testInnerJoinStream();
+ testLeftOuterJoinStream();
}
private void testCloudSolrStream() throws Exception {
@@ -411,6 +411,18 @@ public class StreamExpressionTest extend
assert(tuples.size() == 4);
assertOrder(tuples, 0,1,3,4);
+
+ // full factory, switch order
+ stream = factory.constructStream("top("
+ + "n=4,"
+ + "unique("
+ + "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc, a_i desc\"),"
+ + "over=\"a_f\"),"
+ + "sort=\"a_f asc\")");
+ tuples = getTuples(stream);
+
+ assert(tuples.size() == 4);
+ assertOrder(tuples, 2,1,3,4);
del("*:*");
commit();
@@ -996,6 +1008,152 @@ public class StreamExpressionTest extend
commit();
}
+ private void testInnerJoinStream() throws Exception {
+
+ indexr(id, "1", "side_s", "left", "join1_i", "0", "join2_s", "a", "ident_s", "left_1"); // 8, 9
+ indexr(id, "15", "side_s", "left", "join1_i", "0", "join2_s", "a", "ident_s", "left_1"); // 8, 9
+ indexr(id, "2", "side_s", "left", "join1_i", "0", "join2_s", "b", "ident_s", "left_2");
+ indexr(id, "3", "side_s", "left", "join1_i", "1", "join2_s", "a", "ident_s", "left_3"); // 10
+ indexr(id, "4", "side_s", "left", "join1_i", "1", "join2_s", "b", "ident_s", "left_4"); // 11
+ indexr(id, "5", "side_s", "left", "join1_i", "1", "join2_s", "c", "ident_s", "left_5"); // 12
+ indexr(id, "6", "side_s", "left", "join1_i", "2", "join2_s", "d", "ident_s", "left_6");
+ indexr(id, "7", "side_s", "left", "join1_i", "3", "join2_s", "e", "ident_s", "left_7"); // 14
+
+ indexr(id, "8", "side_s", "right", "join1_i", "0", "join2_s", "a", "ident_s", "right_1", "join3_i", "0"); // 1,15
+ indexr(id, "9", "side_s", "right", "join1_i", "0", "join2_s", "a", "ident_s", "right_2", "join3_i", "0"); // 1,15
+ indexr(id, "10", "side_s", "right", "join1_i", "1", "join2_s", "a", "ident_s", "right_3", "join3_i", "1"); // 3
+ indexr(id, "11", "side_s", "right", "join1_i", "1", "join2_s", "b", "ident_s", "right_4", "join3_i", "1"); // 4
+ indexr(id, "12", "side_s", "right", "join1_i", "1", "join2_s", "c", "ident_s", "right_5", "join3_i", "1"); // 5
+ indexr(id, "13", "side_s", "right", "join1_i", "2", "join2_s", "dad", "ident_s", "right_6", "join3_i", "2");
+ indexr(id, "14", "side_s", "right", "join1_i", "3", "join2_s", "e", "ident_s", "right_7", "join3_i", "3"); // 7
+ commit();
+
+ StreamExpression expression;
+ TupleStream stream;
+ List<Tuple> tuples;
+
+ StreamFactory factory = new StreamFactory()
+ .withCollectionZkHost("collection1", zkServer.getZkAddress())
+ .withFunctionName("search", CloudSolrStream.class)
+ .withFunctionName("innerJoin", InnerJoinStream.class);
+
+ // Basic test
+ expression = StreamExpressionParser.parse("innerJoin("
+ + "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
+ + "search(collection1, q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc\"),"
+ + "on=\"join1_i=join1_i, join2_s=join2_s\")");
+ stream = new InnerJoinStream(expression, factory);
+ tuples = getTuples(stream);
+ assert(tuples.size() == 8);
+ assertOrder(tuples, 1,1,15,15,3,4,5,7);
+
+ // Basic desc
+ expression = StreamExpressionParser.parse("innerJoin("
+ + "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+ + "search(collection1, q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+ + "on=\"join1_i=join1_i, join2_s=join2_s\")");
+ stream = new InnerJoinStream(expression, factory);
+ tuples = getTuples(stream);
+ assert(tuples.size() == 8);
+ assertOrder(tuples, 7,3,4,5,1,1,15,15);
+
+ // Results in both searches, no join matches
+ expression = StreamExpressionParser.parse("innerJoin("
+ + "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\"),"
+ + "search(collection1, q=\"side_s:right\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\", aliases=\"id=right.id, join1_i=right.join1_i, join2_s=right.join2_s, ident_s=right.ident_s\"),"
+ + "on=\"ident_s=right.ident_s\")");
+ stream = new InnerJoinStream(expression, factory);
+ tuples = getTuples(stream);
+ assert(tuples.size() == 0);
+
+ // Differing field names
+ expression = StreamExpressionParser.parse("innerJoin("
+ + "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
+ + "search(collection1, q=\"side_s:right\", fl=\"join3_i,join2_s,ident_s\", sort=\"join3_i asc, join2_s asc\", aliases=\"join3_i=aliasesField\"),"
+ + "on=\"join1_i=aliasesField, join2_s=join2_s\")");
+ stream = new InnerJoinStream(expression, factory);
+ tuples = getTuples(stream);
+
+ assert(tuples.size() == 8);
+ assertOrder(tuples, 1,1,15,15,3,4,5,7);
+
+ del("*:*");
+ commit();
+ }
+
+ private void testLeftOuterJoinStream() throws Exception {
+
+ indexr(id, "1", "side_s", "left", "join1_i", "0", "join2_s", "a", "ident_s", "left_1"); // 8, 9
+ indexr(id, "15", "side_s", "left", "join1_i", "0", "join2_s", "a", "ident_s", "left_1"); // 8, 9
+ indexr(id, "2", "side_s", "left", "join1_i", "0", "join2_s", "b", "ident_s", "left_2");
+ indexr(id, "3", "side_s", "left", "join1_i", "1", "join2_s", "a", "ident_s", "left_3"); // 10
+ indexr(id, "4", "side_s", "left", "join1_i", "1", "join2_s", "b", "ident_s", "left_4"); // 11
+ indexr(id, "5", "side_s", "left", "join1_i", "1", "join2_s", "c", "ident_s", "left_5"); // 12
+ indexr(id, "6", "side_s", "left", "join1_i", "2", "join2_s", "d", "ident_s", "left_6");
+ indexr(id, "7", "side_s", "left", "join1_i", "3", "join2_s", "e", "ident_s", "left_7"); // 14
+
+ indexr(id, "8", "side_s", "right", "join1_i", "0", "join2_s", "a", "ident_s", "right_1", "join3_i", "0"); // 1,15
+ indexr(id, "9", "side_s", "right", "join1_i", "0", "join2_s", "a", "ident_s", "right_2", "join3_i", "0"); // 1,15
+ indexr(id, "10", "side_s", "right", "join1_i", "1", "join2_s", "a", "ident_s", "right_3", "join3_i", "1"); // 3
+ indexr(id, "11", "side_s", "right", "join1_i", "1", "join2_s", "b", "ident_s", "right_4", "join3_i", "1"); // 4
+ indexr(id, "12", "side_s", "right", "join1_i", "1", "join2_s", "c", "ident_s", "right_5", "join3_i", "1"); // 5
+ indexr(id, "13", "side_s", "right", "join1_i", "2", "join2_s", "dad", "ident_s", "right_6", "join3_i", "2");
+ indexr(id, "14", "side_s", "right", "join1_i", "3", "join2_s", "e", "ident_s", "right_7", "join3_i", "3"); // 7
+ commit();
+
+ StreamExpression expression;
+ TupleStream stream;
+ List<Tuple> tuples;
+
+ StreamFactory factory = new StreamFactory()
+ .withCollectionZkHost("collection1", zkServer.getZkAddress())
+ .withFunctionName("search", CloudSolrStream.class)
+ .withFunctionName("leftOuterJoin", LeftOuterJoinStream.class);
+
+ // Basic test
+ expression = StreamExpressionParser.parse("leftOuterJoin("
+ + "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
+ + "search(collection1, q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc\"),"
+ + "on=\"join1_i=join1_i, join2_s=join2_s\")");
+ stream = new LeftOuterJoinStream(expression, factory);
+ tuples = getTuples(stream);
+ assert(tuples.size() == 10);
+ assertOrder(tuples, 1,1,15,15,2,3,4,5,6,7);
+
+ // Basic desc
+ expression = StreamExpressionParser.parse("leftOuterJoin("
+ + "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+ + "search(collection1, q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+ + "on=\"join1_i=join1_i, join2_s=join2_s\")");
+ stream = new LeftOuterJoinStream(expression, factory);
+ tuples = getTuples(stream);
+ assert(tuples.size() == 10);
+ assertOrder(tuples, 7,6,3,4,5,1,1,15,15,2);
+
+ // Results in both searches, no join matches
+ expression = StreamExpressionParser.parse("leftOuterJoin("
+ + "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\"),"
+ + "search(collection1, q=\"side_s:right\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\", aliases=\"id=right.id, join1_i=right.join1_i, join2_s=right.join2_s, ident_s=right.ident_s\"),"
+ + "on=\"ident_s=right.ident_s\")");
+ stream = new LeftOuterJoinStream(expression, factory);
+ tuples = getTuples(stream);
+ assert(tuples.size() == 8);
+ assertOrder(tuples, 1,15,2,3,4,5,6,7);
+
+ // Differing field names
+ expression = StreamExpressionParser.parse("leftOuterJoin("
+ + "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
+ + "search(collection1, q=\"side_s:right\", fl=\"join3_i,join2_s,ident_s\", sort=\"join3_i asc, join2_s asc\", aliases=\"join3_i=aliasesField\"),"
+ + "on=\"join1_i=aliasesField, join2_s=join2_s\")");
+ stream = new LeftOuterJoinStream(expression, factory);
+ tuples = getTuples(stream);
+ assert(tuples.size() == 10);
+ assertOrder(tuples, 1,1,15,15,2,3,4,5,6,7);
+
+ del("*:*");
+ commit();
+ }
+
protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
tupleStream.open();
List<Tuple> tuples = new ArrayList<Tuple>();