You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dp...@apache.org on 2015/11/07 23:08:57 UTC

svn commit: r1713190 - in /lucene/dev/trunk/solr: CHANGES.txt solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java

Author: dpgove
Date: Sat Nov  7 22:08:56 2015
New Revision: 1713190

URL: http://svn.apache.org/viewvc?rev=1713190&view=rev
Log:
SOLR-7938: MergeStream now supports merging more than 2 streams together

Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java
    lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1713190&r1=1713189&r2=1713190&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Sat Nov  7 22:08:56 2015
@@ -83,6 +83,7 @@ New Features
 * SOLR-6273: Cross Data Center Replication. Active/passive replication for separate
   SolrClouds hosted on separate data centers. (Renaud Delbru, Yonik Seeley via Erick Erickson)
 
+* SOLR-7938: MergeStream now supports merging more than 2 streams together (Dennis Gove)
 
 Optimizations
 ----------------------

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java?rev=1713190&r1=1713189&r2=1713190&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java Sat Nov  7 22:08:56 2015
@@ -32,21 +32,22 @@ import org.apache.solr.client.solrj.io.s
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 
 /**
-* Unions streamA with streamB ordering the Tuples based on a Comparator.
-* Both streams must be sorted by the fields being compared.
+* Merges two or more streams together ordering the Tuples based on a Comparator.
+* All streams must be sorted by the fields being compared - this will be validated on construction.
 **/
-
-
 public class MergeStream extends TupleStream implements Expressible {
 
   private static final long serialVersionUID = 1;
 
-  private PushBackStream streamA;
-  private PushBackStream streamB;
+  private PushBackStream[] streams;
   private StreamComparator comp;
 
   public MergeStream(TupleStream streamA, TupleStream streamB, StreamComparator comp) throws IOException {
-    init(streamA, streamB, comp);
+    init(comp, streamA, streamB);
+  }
+  
+  public MergeStream(StreamComparator comp, TupleStream ... streams) throws IOException {
+    init(comp, streams);
   }
   
   public MergeStream(StreamExpression expression,StreamFactory factory) throws IOException {
@@ -59,29 +60,39 @@ public class MergeStream extends TupleSt
       throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
     }
     
-    if(2 != streamExpressions.size()){
-      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting two streams but found %d (must be PushBackStream types)",expression, streamExpressions.size()));
+    if(streamExpressions.size() < 2){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting at least two streams but found %d (must be PushBackStream types)",expression, streamExpressions.size()));
     }
 
     if(null == onExpression || !(onExpression.getParameter() instanceof StreamExpressionValue)){
       throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'on' parameter listing fields to merge on but didn't find one",expression));
     }
     
-    init( factory.constructStream(streamExpressions.get(0)),
-          factory.constructStream(streamExpressions.get(1)),
-          factory.constructComparator(((StreamExpressionValue)onExpression.getParameter()).getValue(), FieldComparator.class)
+    TupleStream[] streams = new TupleStream[streamExpressions.size()];
+    for(int idx = 0; idx < streamExpressions.size(); ++idx){
+      streams[idx] = factory.constructStream(streamExpressions.get(idx));
+    }
+    
+    init( factory.constructComparator(((StreamExpressionValue)onExpression.getParameter()).getValue(), FieldComparator.class),
+          streams
         );
   }
   
-  private void init(TupleStream streamA, TupleStream streamB, StreamComparator comp) throws IOException {
-    this.streamA = new PushBackStream(streamA);
-    this.streamB = new PushBackStream(streamB);
-    this.comp = comp;
-
-    // streamA and streamB must both be sorted so that comp can be derived from
-    if(!comp.isDerivedFrom(streamA.getStreamSort()) || !comp.isDerivedFrom(streamB.getStreamSort())){
-      throw new IOException("Invalid MergeStream - both substream comparators (sort) must be a superset of this stream's comparator.");
+  private void init(StreamComparator comp, TupleStream ... streams) throws IOException {
+    
+    // All streams must both be sorted so that comp can be derived from
+    for(TupleStream stream : streams){
+      if(!comp.isDerivedFrom(stream.getStreamSort())){
+        throw new IOException("Invalid MergeStream - all substream comparators (sort) must be a superset of this stream's comparator.");
+      }
+    }
+    
+    // Convert to PushBack streams so we can push back tuples
+    this.streams = new PushBackStream[streams.length];
+    for(int idx = 0; idx < streams.length; ++idx){
+      this.streams[idx] = new PushBackStream(streams[idx]);
     }
+    this.comp = comp;
   }
   
   @Override
@@ -90,8 +101,9 @@ public class MergeStream extends TupleSt
     StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
     
     // streams
-    expression.addParameter(streamA.toExpression(factory));
-    expression.addParameter(streamB.toExpression(factory));
+    for(PushBackStream stream : streams){
+      expression.addParameter(stream.toExpression(factory));
+    }
     
     // on
     expression.addParameter(new StreamExpressionNamedParameter("on",comp.toExpression(factory)));
@@ -100,54 +112,101 @@ public class MergeStream extends TupleSt
   }
 
   public void setStreamContext(StreamContext context) {
-    this.streamA.setStreamContext(context);
-    this.streamB.setStreamContext(context);
+    for(PushBackStream stream : streams){
+      stream.setStreamContext(context);
+    }
   }
 
   public List<TupleStream> children() {
     List<TupleStream> l =  new ArrayList();
-    l.add(streamA);
-    l.add(streamB);
+    for(PushBackStream stream : streams){
+      l.add(stream);
+    }
     return l;
   }
 
   public void open() throws IOException {
-    streamA.open();
-    streamB.open();
+    for(PushBackStream stream : streams){
+      stream.open();
+    }
   }
 
   public void close() throws IOException {
-    streamA.close();
-    streamB.close();
+    for(PushBackStream stream : streams){
+      stream.close();
+    }
   }
 
   public Tuple read() throws IOException {
-    Tuple a = streamA.read();
-    Tuple b = streamB.read();
-
-    if(a.EOF && b.EOF) {
-      return a;
-    }
-
-    if(a.EOF) {
-      streamA.pushBack(a);
-      return b;
-    }
-
-    if(b.EOF) {
-      streamB.pushBack(b);
-      return a;
-    }
-
-    int c = comp.compare(a,b);
-
-    if(c < 0) {
-      streamB.pushBack(b);
-      return a;
-    } else {
-      streamA.pushBack(a);
-      return b;
-    }
+    
+    // might be able to optimize this by sorting the streams based on the next to read tuple from each.
+    // if we can ensure the sort of the streams and update it in less than linear time then there would
+    // be some performance gain. But, assuming the # of streams is kinda small then this might not be
+    // worth it
+    
+    Tuple minimum = null;
+    PushBackStream minimumStream = null;
+    for(PushBackStream stream : streams){
+      Tuple current = stream.read();
+      
+      if(current.EOF){
+        stream.pushBack(current);
+        continue;
+      }
+      
+      if(null == minimum){
+        minimum = current;
+        minimumStream = stream;
+        continue;
+      }
+      
+      if(comp.compare(current, minimum) < 0){
+        // Push back on its stream
+        minimumStream.pushBack(minimum);
+        
+        minimum = current;
+        minimumStream = stream;
+        continue;
+      }
+      else{
+        stream.pushBack(current);
+      }
+    }
+    
+    // If all EOF then min will be null, else min is the current minimum
+    if(null == minimum){
+      // return EOF, doesn't matter which cause we're done
+      return streams[0].read();
+    }
+    
+    return minimum;
+    
+//    Tuple a = streamA.read();
+//    Tuple b = streamB.read();
+//
+//    if(a.EOF && b.EOF) {
+//      return a;
+//    }
+//
+//    if(a.EOF) {
+//      streamA.pushBack(a);
+//      return b;
+//    }
+//
+//    if(b.EOF) {
+//      streamB.pushBack(b);
+//      return a;
+//    }
+//
+//    int c = comp.compare(a,b);
+//
+//    if(c < 0) {
+//      streamB.pushBack(b);
+//      return a;
+//    } else {
+//      streamA.pushBack(a);
+//      return b;
+//    }
   }
   
   /** Return the stream sort - ie, the order in which records are returned */

Modified: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java?rev=1713190&r1=1713189&r2=1713190&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java Sat Nov  7 22:08:56 2015
@@ -341,6 +341,17 @@ public class StreamExpressionTest extend
     assert(tuples.size() == 5);
     assertOrder(tuples, 0,2,1,3,4);
     
+    // full factory w/multi streams
+    stream = factory.constructStream("merge("
+                                    + "search(collection1, q=\"id:(0 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+                                    + "search(collection1, q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+                                    + "search(collection1, q=\"id:(2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+                                    + "on=\"a_f asc\")");
+    tuples = getTuples(stream);
+    
+    assert(tuples.size() == 4);
+    assertOrder(tuples, 0,2,1,4);
+    
     del("*:*");
     commit();
   }