You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dp...@apache.org on 2015/11/07 23:08:57 UTC
svn commit: r1713190 - in /lucene/dev/trunk/solr: CHANGES.txt
solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java
solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
Author: dpgove
Date: Sat Nov 7 22:08:56 2015
New Revision: 1713190
URL: http://svn.apache.org/viewvc?rev=1713190&view=rev
Log:
SOLR-7938: MergeStream now supports merging more than 2 streams together
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java
lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1713190&r1=1713189&r2=1713190&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Sat Nov 7 22:08:56 2015
@@ -83,6 +83,7 @@ New Features
* SOLR-6273: Cross Data Center Replication. Active/passive replication for separate
SolrClouds hosted on separate data centers. (Renaud Delbru, Yonik Seeley via Erick Erickson)
+* SOLR-7938: MergeStream now supports merging more than 2 streams together (Dennis Gove)
Optimizations
----------------------
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java?rev=1713190&r1=1713189&r2=1713190&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java Sat Nov 7 22:08:56 2015
@@ -32,21 +32,22 @@ import org.apache.solr.client.solrj.io.s
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
/**
-* Unions streamA with streamB ordering the Tuples based on a Comparator.
-* Both streams must be sorted by the fields being compared.
+* Merges two or more streams together ordering the Tuples based on a Comparator.
+* All streams must be sorted by the fields being compared - this will be validated on construction.
**/
-
-
public class MergeStream extends TupleStream implements Expressible {
private static final long serialVersionUID = 1;
- private PushBackStream streamA;
- private PushBackStream streamB;
+ private PushBackStream[] streams;
private StreamComparator comp;
public MergeStream(TupleStream streamA, TupleStream streamB, StreamComparator comp) throws IOException {
- init(streamA, streamB, comp);
+ init(comp, streamA, streamB);
+ }
+
+ public MergeStream(StreamComparator comp, TupleStream ... streams) throws IOException {
+ init(comp, streams);
}
public MergeStream(StreamExpression expression,StreamFactory factory) throws IOException {
@@ -59,29 +60,39 @@ public class MergeStream extends TupleSt
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
}
- if(2 != streamExpressions.size()){
- throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting two streams but found %d (must be PushBackStream types)",expression, streamExpressions.size()));
+ if(streamExpressions.size() < 2){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting at least two streams but found %d (must be PushBackStream types)",expression, streamExpressions.size()));
}
if(null == onExpression || !(onExpression.getParameter() instanceof StreamExpressionValue)){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'on' parameter listing fields to merge on but didn't find one",expression));
}
- init( factory.constructStream(streamExpressions.get(0)),
- factory.constructStream(streamExpressions.get(1)),
- factory.constructComparator(((StreamExpressionValue)onExpression.getParameter()).getValue(), FieldComparator.class)
+ TupleStream[] streams = new TupleStream[streamExpressions.size()];
+ for(int idx = 0; idx < streamExpressions.size(); ++idx){
+ streams[idx] = factory.constructStream(streamExpressions.get(idx));
+ }
+
+ init( factory.constructComparator(((StreamExpressionValue)onExpression.getParameter()).getValue(), FieldComparator.class),
+ streams
);
}
- private void init(TupleStream streamA, TupleStream streamB, StreamComparator comp) throws IOException {
- this.streamA = new PushBackStream(streamA);
- this.streamB = new PushBackStream(streamB);
- this.comp = comp;
-
- // streamA and streamB must both be sorted so that comp can be derived from
- if(!comp.isDerivedFrom(streamA.getStreamSort()) || !comp.isDerivedFrom(streamB.getStreamSort())){
- throw new IOException("Invalid MergeStream - both substream comparators (sort) must be a superset of this stream's comparator.");
+ private void init(StreamComparator comp, TupleStream ... streams) throws IOException {
+
+ // All streams must both be sorted so that comp can be derived from
+ for(TupleStream stream : streams){
+ if(!comp.isDerivedFrom(stream.getStreamSort())){
+ throw new IOException("Invalid MergeStream - all substream comparators (sort) must be a superset of this stream's comparator.");
+ }
+ }
+
+ // Convert to PushBack streams so we can push back tuples
+ this.streams = new PushBackStream[streams.length];
+ for(int idx = 0; idx < streams.length; ++idx){
+ this.streams[idx] = new PushBackStream(streams[idx]);
}
+ this.comp = comp;
}
@Override
@@ -90,8 +101,9 @@ public class MergeStream extends TupleSt
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// streams
- expression.addParameter(streamA.toExpression(factory));
- expression.addParameter(streamB.toExpression(factory));
+ for(PushBackStream stream : streams){
+ expression.addParameter(stream.toExpression(factory));
+ }
// on
expression.addParameter(new StreamExpressionNamedParameter("on",comp.toExpression(factory)));
@@ -100,54 +112,101 @@ public class MergeStream extends TupleSt
}
public void setStreamContext(StreamContext context) {
- this.streamA.setStreamContext(context);
- this.streamB.setStreamContext(context);
+ for(PushBackStream stream : streams){
+ stream.setStreamContext(context);
+ }
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList();
- l.add(streamA);
- l.add(streamB);
+ for(PushBackStream stream : streams){
+ l.add(stream);
+ }
return l;
}
public void open() throws IOException {
- streamA.open();
- streamB.open();
+ for(PushBackStream stream : streams){
+ stream.open();
+ }
}
public void close() throws IOException {
- streamA.close();
- streamB.close();
+ for(PushBackStream stream : streams){
+ stream.close();
+ }
}
public Tuple read() throws IOException {
- Tuple a = streamA.read();
- Tuple b = streamB.read();
-
- if(a.EOF && b.EOF) {
- return a;
- }
-
- if(a.EOF) {
- streamA.pushBack(a);
- return b;
- }
-
- if(b.EOF) {
- streamB.pushBack(b);
- return a;
- }
-
- int c = comp.compare(a,b);
-
- if(c < 0) {
- streamB.pushBack(b);
- return a;
- } else {
- streamA.pushBack(a);
- return b;
- }
+
+ // might be able to optimize this by sorting the streams based on the next to read tuple from each.
+ // if we can ensure the sort of the streams and update it in less than linear time then there would
+ // be some performance gain. But, assuming the # of streams is kinda small then this might not be
+ // worth it
+
+ Tuple minimum = null;
+ PushBackStream minimumStream = null;
+ for(PushBackStream stream : streams){
+ Tuple current = stream.read();
+
+ if(current.EOF){
+ stream.pushBack(current);
+ continue;
+ }
+
+ if(null == minimum){
+ minimum = current;
+ minimumStream = stream;
+ continue;
+ }
+
+ if(comp.compare(current, minimum) < 0){
+ // Push back on its stream
+ minimumStream.pushBack(minimum);
+
+ minimum = current;
+ minimumStream = stream;
+ continue;
+ }
+ else{
+ stream.pushBack(current);
+ }
+ }
+
+ // If all EOF then min will be null, else min is the current minimum
+ if(null == minimum){
+ // return EOF, doesn't matter which cause we're done
+ return streams[0].read();
+ }
+
+ return minimum;
+
+// Tuple a = streamA.read();
+// Tuple b = streamB.read();
+//
+// if(a.EOF && b.EOF) {
+// return a;
+// }
+//
+// if(a.EOF) {
+// streamA.pushBack(a);
+// return b;
+// }
+//
+// if(b.EOF) {
+// streamB.pushBack(b);
+// return a;
+// }
+//
+// int c = comp.compare(a,b);
+//
+// if(c < 0) {
+// streamB.pushBack(b);
+// return a;
+// } else {
+// streamA.pushBack(a);
+// return b;
+// }
}
/** Return the stream sort - ie, the order in which records are returned */
Modified: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java?rev=1713190&r1=1713189&r2=1713190&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java Sat Nov 7 22:08:56 2015
@@ -341,6 +341,17 @@ public class StreamExpressionTest extend
assert(tuples.size() == 5);
assertOrder(tuples, 0,2,1,3,4);
+ // full factory w/multi streams
+ stream = factory.constructStream("merge("
+ + "search(collection1, q=\"id:(0 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ + "search(collection1, q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ + "search(collection1, q=\"id:(2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ + "on=\"a_f asc\")");
+ tuples = getTuples(stream);
+
+ assert(tuples.size() == 4);
+ assertOrder(tuples, 0,2,1,4);
+
del("*:*");
commit();
}