You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ho...@apache.org on 2016/02/25 02:57:24 UTC

[04/50] [abbrv] lucene-solr git commit: SOLR-8588: Add TopicStream to the streaming API to support publish/subscribe messaging

SOLR-8588: Add TopicStream to the streaming API to support publish/subscribe messaging


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/b2475bf9
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/b2475bf9
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/b2475bf9

Branch: refs/heads/jira/SOLR-445
Commit: b2475bf9fdc59c02454f730a6cc4916cff03f862
Parents: 3124a4d
Author: jbernste <jb...@apache.org>
Authored: Fri Feb 19 20:03:06 2016 -0500
Committer: jbernste <jb...@apache.org>
Committed: Fri Feb 19 20:03:43 2016 -0500

----------------------------------------------------------------------
 .../org/apache/solr/handler/StreamHandler.java  |  11 +-
 .../solr/client/solrj/io/stream/SolrStream.java |  24 +
 .../client/solrj/io/stream/TopicStream.java     | 463 +++++++++++++++++++
 .../solr/collection1/conf/schema-streaming.xml  |   1 +
 .../solrj/io/stream/StreamExpressionTest.java   | 180 ++++++-
 .../stream/StreamExpressionToExpessionTest.java |  20 +-
 6 files changed, 682 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b2475bf9/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
index de38c9b..113fa93 100644
--- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
@@ -107,8 +107,10 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
       .withFunctionName("intersect", IntersectStream.class)
       .withFunctionName("complement", ComplementStream.class)
          .withFunctionName("daemon", DaemonStream.class)
-      
-      // metrics
+         .withFunctionName("topic", TopicStream.class)
+
+
+    // metrics
       .withFunctionName("min", MinMetric.class)
       .withFunctionName("max", MaxMetric.class)
       .withFunctionName("avg", MeanMetric.class)
@@ -121,9 +123,8 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
       
       // stream reduction operations
       .withFunctionName("group", GroupOperation.class)
-      .withFunctionName("distinct", DistinctOperation.class)
-      ;
-    
+      .withFunctionName("distinct", DistinctOperation.class);
+
     // This pulls all the overrides and additions from the config
     Object functionMappingsObj = initArgs.get("streamFunctions");
     if(null != functionMappingsObj){

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b2475bf9/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
index abd98c8..1c149cc 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
@@ -17,6 +17,7 @@
 package org.apache.solr.client.solrj.io.stream;
 
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Map;
 import java.util.HashMap;
@@ -29,12 +30,17 @@ import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
 *  Queries a single Solr instance and maps SolrDocs to a Stream of Tuples.
 **/
 
 public class SolrStream extends TupleStream {
 
+  private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   private static final long serialVersionUID = 1;
 
   private String baseUrl;
@@ -46,6 +52,8 @@ public class SolrStream extends TupleStream {
   private transient JSONTupleStream jsonTupleStream;
   private transient HttpSolrClient client;
   private transient SolrClientCache cache;
+  private String slice;
+  private long checkpoint = -1;
 
   public SolrStream(String baseUrl, Map params) {
     this.baseUrl = baseUrl;
@@ -76,6 +84,7 @@ public class SolrStream extends TupleStream {
 
   public void open() throws IOException {
 
+
     if(cache == null) {
       client = new HttpSolrClient(baseUrl);
     } else {
@@ -97,6 +106,14 @@ public class SolrStream extends TupleStream {
     this.trace = trace;
   }
 
+  public void setSlice(String slice) {
+    this.slice = slice;
+  }
+
+  public void setCheckpoint(long checkpoint) {
+    this.checkpoint = checkpoint;
+  }
+
   private SolrParams loadParams(Map params) throws IOException {
     ModifiableSolrParams solrParams = new ModifiableSolrParams();
     if(params.containsKey("partitionKeys")) {
@@ -110,6 +127,10 @@ public class SolrStream extends TupleStream {
       }
     }
 
+    if(checkpoint > 0) {
+      solrParams.add("fq", "{!frange cost=100 incl=false l="+checkpoint+"}_version_");
+    }
+
     Iterator<Map.Entry> it = params.entrySet().iterator();
     while(it.hasNext()) {
       Map.Entry entry = it.next();
@@ -166,6 +187,9 @@ public class SolrStream extends TupleStream {
 
         if (trace) {
           fields.put("_CORE_", this.baseUrl);
+          if(slice != null) {
+            fields.put("_SLICE_", slice);
+          }
         }
 
         if (fieldMappings != null) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b2475bf9/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
new file mode 100644
index 0000000..3b7aa90
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
@@ -0,0 +1,463 @@
+package org.apache.solr.client.solrj.io.stream;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Random;
+import java.util.Set;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicStream extends CloudSolrStream implements Expressible  {
+
+  private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+
+  private static final long serialVersionUID = 1;
+
+  private long count;
+  private String id;
+  protected long checkpointEvery;
+
+  private Map<String, Long> checkpoints = new HashMap();
+  private String checkpointCollection;
+
+  public TopicStream(String zkHost,
+                     String checkpointCollection,
+                     String collection,
+                     String id,
+                     long checkpointEvery,
+                     Map<String, String> params) {
+    init(zkHost,
+         checkpointCollection,
+         collection,
+         id,
+         checkpointEvery,
+         params);
+  }
+
+  private void init(String zkHost,
+                    String checkpointCollection,
+                    String collection,
+                    String id,
+                    long checkpointEvery,
+                    Map<String, String> params) {
+    this.zkHost  = zkHost;
+    this.params  = params;
+    this.collection = collection;
+    this.checkpointCollection = checkpointCollection;
+    this.checkpointEvery = checkpointEvery;
+    this.id = id;
+    this.comp = new FieldComparator("_version_", ComparatorOrder.ASCENDING);
+  }
+
+  public TopicStream(StreamExpression expression, StreamFactory factory) throws IOException{
+    // grab all parameters out
+    String checkpointCollectionName = factory.getValueOperand(expression, 0);
+    String collectionName = factory.getValueOperand(expression, 1);
+    List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
+    StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
+
+    StreamExpressionNamedParameter idParam = factory.getNamedOperand(expression, "id");
+    if(null == idParam) {
+      throw new IOException("invalid TopicStream id cannot be null");
+    }
+
+    StreamExpressionNamedParameter flParam = factory.getNamedOperand(expression, "fl");
+
+    if(null == flParam) {
+      throw new IOException("invalid TopicStream fl cannot be null");
+    }
+
+    long checkpointEvery = -1;
+    StreamExpressionNamedParameter checkpointEveryParam = factory.getNamedOperand(expression, "checkpointEvery");
+
+    if(checkpointEveryParam != null) {
+      checkpointEvery = Long.parseLong(((StreamExpressionValue) checkpointEveryParam.getParameter()).getValue());
+    }
+
+    //  Checkpoint Collection Name
+    if(null == checkpointCollectionName){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - checkpointCollectionName expected as first operand",expression));
+    }
+
+    // Collection Name
+    if(null == collectionName){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as second operand",expression));
+    }
+
+    // Named parameters - passed directly to solr as solrparams
+    if(0 == namedParams.size()){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - at least one named parameter expected. eg. 'q=*:*'",expression));
+    }
+
+    Map<String,String> params = new HashMap<String,String>();
+    for(StreamExpressionNamedParameter namedParam : namedParams){
+      if(!namedParam.getName().equals("zkHost") &&
+          !namedParam.getName().equals("id") &&
+          !namedParam.getName().equals("checkpointEvery")) {
+        params.put(namedParam.getName(), namedParam.getParameter().toString().trim());
+      }
+    }
+
+    // zkHost, optional - if not provided then will look into factory list to get
+    String zkHost = null;
+    if(null == zkHostExpression){
+      zkHost = factory.getCollectionZkHost(collectionName);
+      if(zkHost == null) {
+        zkHost = factory.getDefaultZkHost();
+      }
+    }
+    else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
+      zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
+    }
+    if(null == zkHost){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
+    }
+
+    // We've got all the required items
+    init(zkHost,
+        checkpointCollectionName,
+        collectionName,
+        ((StreamExpressionValue) idParam.getParameter()).getValue(),
+        checkpointEvery,
+        params);
+  }
+
+  @Override
+  public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
+    // function name
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+
+    expression.addParameter(checkpointCollection);
+    // collection
+    expression.addParameter(collection);
+
+    for(Entry<String,String> param : params.entrySet()) {
+      String value = param.getValue();
+
+      // SOLR-8409: This is a special case where the params contain a " character
+      // Do note that in any other BASE streams with parameters where a " might come into play
+      // that this same replacement needs to take place.
+      value = value.replace("\"", "\\\"");
+
+      expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), value));
+    }
+
+    // zkHost
+    expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
+    expression.addParameter(new StreamExpressionNamedParameter("id", id));
+    expression.addParameter(new StreamExpressionNamedParameter("checkpointEvery", Long.toString(checkpointEvery)));
+
+    return expression;
+  }
+
+  public List<TupleStream> children() {
+    List<TupleStream> l =  new ArrayList();
+    return l;
+  }
+
+  public void open() throws IOException {
+    this.tuples = new TreeSet();
+    this.solrStreams = new ArrayList();
+    this.eofTuples = Collections.synchronizedMap(new HashMap());
+
+    if(cache != null) {
+      cloudSolrClient = cache.getCloudSolrClient(zkHost);
+    } else {
+      cloudSolrClient = new CloudSolrClient(zkHost);
+      this.cloudSolrClient.connect();
+    }
+
+    if(checkpoints.size() == 0) {
+      getPersistedCheckpoints();
+      if(checkpoints.size() == 0) {
+        getCheckpoints();
+      }
+    }
+
+    constructStreams();
+    openStreams();
+  }
+
+
+  private void openStreams() throws IOException {
+
+    ExecutorService service = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("TopicStream"));
+    try {
+      List<Future<TupleWrapper>> futures = new ArrayList();
+      for (TupleStream solrStream : solrStreams) {
+        StreamOpener so = new StreamOpener((SolrStream) solrStream, comp);
+        Future<TupleWrapper> future = service.submit(so);
+        futures.add(future);
+      }
+
+      try {
+        for (Future<TupleWrapper> f : futures) {
+          TupleWrapper w = f.get();
+          if (w != null) {
+            tuples.add(w);
+          }
+        }
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    } finally {
+      service.shutdown();
+    }
+  }
+
+  public void close() throws IOException {
+    try {
+      persistCheckpoints();
+    } finally {
+
+      if(solrStreams != null) {
+        for (TupleStream solrStream : solrStreams) {
+          solrStream.close();
+        }
+      }
+
+      if (cache == null) {
+        cloudSolrClient.close();
+      }
+    }
+  }
+
+  public Tuple read() throws IOException {
+    Tuple tuple = _read();
+
+    if(tuple.EOF) {
+      return tuple;
+    }
+
+    ++count;
+    if(checkpointEvery > -1 && (count % checkpointEvery) == 0) {
+      persistCheckpoints();
+    }
+
+    long version = tuple.getLong("_version_");
+    String slice = tuple.getString("_SLICE_");
+    checkpoints.put(slice, version);
+
+    tuple.remove("_SLICE_");
+    tuple.remove("_CORE_");
+
+    return tuple;
+  }
+
+  public int getCost() {
+    return 0;
+  }
+
+  private void getCheckpoints() throws IOException {
+    this.checkpoints = new HashMap();
+    ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
+    ClusterState clusterState = zkStateReader.getClusterState();
+    Collection<Slice> slices = clusterState.getActiveSlices(collection);
+
+    for(Slice slice : slices) {
+      String sliceName = slice.getName();
+      long checkpoint = getCheckpoint(slice, clusterState.getLiveNodes());
+      this.checkpoints.put(sliceName, checkpoint);
+    }
+  }
+
+  //Gets the highest version number for the slice.
+  private long getCheckpoint(Slice slice, Set<String> liveNodes) throws IOException {
+    Collection<Replica> replicas = slice.getReplicas();
+    long checkpoint = -1;
+    Map params = new HashMap();
+    params.put("q","*:*");
+    params.put("sort", "_version_ desc");
+    params.put("distrib", "false");
+    params.put("rows", 1);
+    for(Replica replica : replicas) {
+      if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) {
+        String coreUrl = replica.getCoreUrl();
+        SolrStream solrStream = new SolrStream(coreUrl, params);
+
+        if(streamContext != null) {
+          solrStream.setStreamContext(streamContext);
+        }
+
+        try {
+          solrStream.open();
+          Tuple tuple = solrStream.read();
+          if(tuple.EOF) {
+            return 0;
+          } else {
+            checkpoint = tuple.getLong("_version_");
+          }
+          break;
+        } finally {
+          solrStream.close();
+        }
+      }
+    }
+    return checkpoint;
+  }
+
+  private void persistCheckpoints() throws IOException{
+
+    UpdateRequest request = new UpdateRequest();
+    request.setParam("collection", checkpointCollection);
+    SolrInputDocument doc = new SolrInputDocument();
+    doc.addField("id", id);
+
+    for(Map.Entry<String, Long> entry : checkpoints.entrySet()) {
+      doc.addField("checkpoint_ss", entry.getKey()+"~"+entry.getValue());
+    }
+
+    request.add(doc);
+    try {
+      cloudSolrClient.request(request);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  private void getPersistedCheckpoints() throws IOException {
+
+    ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
+    ClusterState clusterState = zkStateReader.getClusterState();
+    Collection<Slice> slices = clusterState.getActiveSlices(checkpointCollection);
+    Set<String> liveNodes = clusterState.getLiveNodes();
+    OUTER:
+    for(Slice slice : slices) {
+      Collection<Replica> replicas = slice.getReplicas();
+      for(Replica replica : replicas) {
+        if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())){
+
+
+          HttpSolrClient httpClient = cache.getHttpSolrClient(replica.getCoreUrl());
+          try {
+
+            SolrDocument doc = httpClient.getById(id);
+            if(doc != null) {
+              List<String> checkpoints = (List<String>)doc.getFieldValue("checkpoint_ss");
+              for (String checkpoint : checkpoints) {
+                String[] pair = checkpoint.split("~");
+                this.checkpoints.put(pair[0], Long.parseLong(pair[1]));
+              }
+            }
+          }catch (Exception e) {
+            throw new IOException(e);
+          }
+          break OUTER;
+        }
+      }
+    }
+  }
+
+  protected void constructStreams() throws IOException {
+
+    try {
+
+      ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
+      ClusterState clusterState = zkStateReader.getClusterState();
+      Set<String> liveNodes = clusterState.getLiveNodes();
+      //System.out.println("Connected to zk an got cluster state.");
+
+      Collection<Slice> slices = clusterState.getActiveSlices(this.collection);
+
+      if(slices == null) {
+        //Try case insensitive match
+        for(String col : clusterState.getCollections()) {
+          if(col.equalsIgnoreCase(collection)) {
+            slices = clusterState.getActiveSlices(col);
+            break;
+          }
+        }
+
+        if(slices == null) {
+          throw new Exception("Collection not found:" + this.collection);
+        }
+      }
+
+      params.put("distrib", "false"); // We are the aggregator.
+      String fl = params.get("fl");
+      params.put("sort", "_version_ asc");
+      fl += ",_version_";
+      params.put("fl", fl);
+
+      Random random = new Random();
+
+      for(Slice slice : slices) {
+        Map localParams = new HashMap();
+        localParams.putAll(params);
+        long checkpoint = checkpoints.get(slice.getName());
+
+        Collection<Replica> replicas = slice.getReplicas();
+        List<Replica> shuffler = new ArrayList();
+        for(Replica replica : replicas) {
+          if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
+            shuffler.add(replica);
+        }
+
+        Replica rep = shuffler.get(random.nextInt(shuffler.size()));
+        ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
+        String url = zkProps.getCoreUrl();
+        SolrStream solrStream = new SolrStream(url, localParams);
+        solrStream.setSlice(slice.getName());
+        solrStream.setCheckpoint(checkpoint);
+        solrStream.setTrace(true);
+        if(streamContext != null) {
+          solrStream.setStreamContext(streamContext);
+        }
+        solrStreams.add(solrStream);
+      }
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b2475bf9/solr/solrj/src/test-files/solrj/solr/collection1/conf/schema-streaming.xml
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test-files/solrj/solr/collection1/conf/schema-streaming.xml b/solr/solrj/src/test-files/solrj/solr/collection1/conf/schema-streaming.xml
index 25a9bc0..7a7ee52 100644
--- a/solr/solrj/src/test-files/solrj/solr/collection1/conf/schema-streaming.xml
+++ b/solr/solrj/src/test-files/solrj/solr/collection1/conf/schema-streaming.xml
@@ -513,6 +513,7 @@
     <dynamicField name="*_i1"  type="int"    indexed="true" stored="true" multiValued="false"/>
 
     <dynamicField name="*_s"  type="string"  indexed="true"  stored="true"/>
+    <dynamicField name="*_ss" type="string"  indexed="true"  stored="true" multiValued="true"/>
     <dynamicField name="*_s1"  type="string"  indexed="true"  stored="true" multiValued="false"/>
     <dynamicField name="*_l"  type="long"   indexed="true"  stored="true"/>
     <dynamicField name="*_l1"  type="long"   indexed="true"  stored="true" multiValued="false"/>

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b2475bf9/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index 6fa1a22..465369b 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.ops.ConcatOperation;
 import org.apache.solr.client.solrj.io.ops.GroupOperation;
@@ -136,6 +137,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
     testRollupStream();
     testStatsStream();
     testNulls();
+    testTopicStream();
     testDaemonStream();
     testParallelUniqueStream();
     testParallelReducerStream();
@@ -224,8 +226,8 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
     tuples = getTuples(stream);
 
     assert(tuples.size() == 5);
-    assertOrder(tuples, 0,2,1,3,4);
-    assertLong(tuples.get(0),"a_i", 0);
+    assertOrder(tuples, 0, 2, 1, 3, 4);
+    assertLong(tuples.get(0), "a_i", 0);
 
     // Basic w/aliases
     expression = StreamExpressionParser.parse("search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", aliases=\"a_i=alias.a_i, a_s=name\", zkHost=" + zkServer.getZkAddress() + ")");
@@ -233,8 +235,8 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
     tuples = getTuples(stream);
 
     assert(tuples.size() == 5);
-    assertOrder(tuples, 0,2,1,3,4);
-    assertLong(tuples.get(0),"alias.a_i", 0);
+    assertOrder(tuples, 0, 2, 1, 3, 4);
+    assertLong(tuples.get(0), "alias.a_i", 0);
     assertString(tuples.get(0), "name", "hello0");
 
     // Basic filtered test
@@ -243,7 +245,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
     tuples = getTuples(stream);
 
     assert(tuples.size() == 3);
-    assertOrder(tuples, 0,3,4);
+    assertOrder(tuples, 0, 3, 4);
     assertLong(tuples.get(1), "a_i", 3);
     
     del("*:*");
@@ -394,7 +396,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
     tuples = getTuples(stream);
     
     assert(tuples.size() == 4);
-    assertOrder(tuples, 0,1,3,4);
+    assertOrder(tuples, 0, 1, 3, 4);
 
     // Basic test desc
     expression = StreamExpressionParser.parse("merge("
@@ -405,7 +407,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
     tuples = getTuples(stream);
     
     assert(tuples.size() == 4);
-    assertOrder(tuples, 4,3,1,0);
+    assertOrder(tuples, 4, 3, 1, 0);
     
     // Basic w/multi comp
     expression = StreamExpressionParser.parse("merge("
@@ -565,9 +567,9 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
     
     // basic w/spaces
     expression = StreamExpressionParser.parse("reduce("
-                                              +       "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f       asc\"),"
-                                              +       "by=\"a_s\"," +
-                                                      "group(sort=\"a_i asc\", n=\"2\"))");
+        + "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f       asc\"),"
+        + "by=\"a_s\"," +
+        "group(sort=\"a_i asc\", n=\"2\"))");
     stream = factory.constructStream(expression);
     tuples = getTuples(stream);
 
@@ -2217,7 +2219,163 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
     del("*:*");
     commit();
   }
-  
+
+  private void testTopicStream() throws Exception{
+    indexr(id, "0", "a_s", "hello", "a_i", "0", "a_f", "1");
+    indexr(id, "2", "a_s", "hello", "a_i", "2", "a_f", "2");
+    indexr(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello", "a_i", "1", "a_f", "5");
+    indexr(id, "5", "a_s", "hello", "a_i", "10", "a_f", "6");
+    indexr(id, "6", "a_s", "hello", "a_i", "11", "a_f", "7");
+    indexr(id, "7", "a_s", "hello", "a_i", "12", "a_f", "8");
+    indexr(id, "8", "a_s", "hello", "a_i", "13", "a_f", "9");
+    indexr(id, "9", "a_s", "hello", "a_i", "14", "a_f", "10");
+
+    commit();
+
+    StreamFactory factory = new StreamFactory()
+        .withCollectionZkHost("collection1", zkServer.getZkAddress())
+        .withFunctionName("topic", TopicStream.class)
+        .withFunctionName("search", CloudSolrStream.class)
+        .withFunctionName("daemon", DaemonStream.class);
+
+    StreamExpression expression;
+    TupleStream stream;
+    List<Tuple> tuples;
+
+    SolrClientCache cache = new SolrClientCache();
+
+    try {
+      //Store checkpoints in the same index as the main documents. This perfectly valid
+      expression = StreamExpressionParser.parse("topic(collection1, collection1, q=\"a_s:hello\", fl=\"id\", id=\"1000000\", checkpointEvery=3)");
+
+      stream = factory.constructStream(expression);
+      StreamContext context = new StreamContext();
+      context.setSolrClientCache(cache);
+      stream.setStreamContext(context);
+      tuples = getTuples(stream);
+
+      //Should be zero because the checkpoints will be set to the highest vesion on the shards.
+      assertEquals(tuples.size(), 0);
+
+      commit();
+      //Now check to see if the checkpoints are present
+
+              expression = StreamExpressionParser.parse("search(collection1, q=\"id:1000000\", fl=\"id, checkpoint_ss, _version_\", sort=\"id asc\")");
+              stream = factory.constructStream(expression);
+              context = new StreamContext();
+              context.setSolrClientCache(cache);
+              stream.setStreamContext(context);
+              tuples = getTuples(stream);
+              assertEquals(tuples.size(), 1);
+              List<String> checkpoints = tuples.get(0).getStrings("checkpoint_ss");
+              assertEquals(checkpoints.size(), 2);
+              Long version1 = tuples.get(0).getLong("_version_");
+
+      //Index a few more documents
+      indexr(id, "10", "a_s", "hello", "a_i", "13", "a_f", "9");
+      indexr(id, "11", "a_s", "hello", "a_i", "14", "a_f", "10");
+
+      commit();
+
+      expression = StreamExpressionParser.parse("topic(collection1, collection1, fl=\"id\", q=\"a_s:hello\", id=\"1000000\", checkpointEvery=2)");
+
+      stream = factory.constructStream(expression);
+      context = new StreamContext();
+      context.setSolrClientCache(cache);
+      stream.setStreamContext(context);
+
+      try {
+        stream.open();
+        Tuple tuple1 = stream.read();
+        assertEquals((long) tuple1.getLong("id"), 10l);
+        commit();
+
+                // Checkpoint should not have changed.
+                expression = StreamExpressionParser.parse("search(collection1, q=\"id:1000000\", fl=\"id, checkpoint_ss, _version_\", sort=\"id asc\")");
+                TupleStream cstream = factory.constructStream(expression);
+                context = new StreamContext();
+                context.setSolrClientCache(cache);
+                cstream.setStreamContext(context);
+                tuples = getTuples(cstream);
+
+                assertEquals(tuples.size(), 1);
+                checkpoints = tuples.get(0).getStrings("checkpoint_ss");
+                assertEquals(checkpoints.size(), 2);
+                Long version2 = tuples.get(0).getLong("_version_");
+                assertEquals(version1, version2);
+
+        Tuple tuple2 = stream.read();
+        commit();
+        assertEquals((long) tuple2.getLong("id"), 11l);
+
+                //Checkpoint should have changed.
+                expression = StreamExpressionParser.parse("search(collection1, q=\"id:1000000\", fl=\"id, checkpoint_ss, _version_\", sort=\"id asc\")");
+                cstream = factory.constructStream(expression);
+                context = new StreamContext();
+                context.setSolrClientCache(cache);
+                cstream.setStreamContext(context);
+                tuples = getTuples(cstream);
+
+                assertEquals(tuples.size(), 1);
+                checkpoints = tuples.get(0).getStrings("checkpoint_ss");
+                assertEquals(checkpoints.size(), 2);
+                Long version3 = tuples.get(0).getLong("_version_");
+                assertTrue(version3 > version2);
+
+        Tuple tuple3 = stream.read();
+        assertTrue(tuple3.EOF);
+      } finally {
+        stream.close();
+      }
+
+      //Test with the DaemonStream
+
+      DaemonStream dstream = null;
+      try {
+        expression = StreamExpressionParser.parse("daemon(topic(collection1, collection1, fl=\"id\", q=\"a_s:hello\", id=\"1000000\", checkpointEvery=2), id=\"test\", runInterval=\"1000\", queueSize=\"9\")");
+        dstream = (DaemonStream) factory.constructStream(expression);
+        context = new StreamContext();
+        context.setSolrClientCache(cache);
+        dstream.setStreamContext(context);
+
+        //Index a few more documents
+        indexr(id, "12", "a_s", "hello", "a_i", "13", "a_f", "9");
+        indexr(id, "13", "a_s", "hello", "a_i", "14", "a_f", "10");
+        commit();
+
+        //Start reading from the DaemonStream
+        Tuple tuple = null;
+
+        dstream.open();
+        tuple = dstream.read();
+        assertEquals(12, (long) tuple.getLong(id));
+        tuple = dstream.read();
+        assertEquals(13, (long) tuple.getLong(id));
+        commit(); // We want to see if the version has been updated after reading two tuples
+
+        //Index a few more documents
+        indexr(id, "14", "a_s", "hello", "a_i", "13", "a_f", "9");
+        indexr(id, "15", "a_s", "hello", "a_i", "14", "a_f", "10");
+        commit();
+
+        //Read from the same DaemonStream stream
+
+        tuple = dstream.read();
+        assertEquals(14, (long) tuple.getLong(id));
+        tuple = dstream.read(); // This should trigger a checkpoint as it's the 4th read from the stream.
+        assertEquals(15, (long) tuple.getLong(id));
+      } finally {
+        dstream.close();
+      }
+    } finally {
+      cache.close();
+      del("*:*");
+      commit();
+    }
+  }
+
   private void testUpdateStream() throws Exception {
     CloudSolrClient destinationCollectionClient = createCloudClient("destinationCollection");
     createCollection("destinationCollection", destinationCollectionClient, 2, 2);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b2475bf9/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java
index 93f8a6a..63baa01 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java
@@ -61,6 +61,7 @@ public class StreamExpressionToExpessionTest extends LuceneTestCase {
                     .withFunctionName("max", MaxMetric.class)
                     .withFunctionName("avg", MeanMetric.class)
                     .withFunctionName("daemon", DaemonStream.class)
+                    .withFunctionName("topic", TopicStream.class)
                     ;
   }
     
@@ -120,7 +121,24 @@ public class StreamExpressionToExpessionTest extends LuceneTestCase {
     assertTrue(expressionString.contains("queueSize=100"));
     assertTrue(expressionString.contains("runInterval=1000"));
   }
-  
+
+  @Test
+  public void testTopicStream() throws Exception {
+
+    TopicStream stream;
+    String expressionString;
+
+    // Basic test
+    stream = new TopicStream(StreamExpressionParser.parse("topic(collection2, collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", id=\"blah\", checkpointEvery=1000)"), factory);
+    expressionString = stream.toExpression(factory).toString();
+    assertTrue(expressionString.contains("topic(collection2,collection1"));
+    assertTrue(expressionString.contains("q=\"*:*\""));
+    assertTrue(expressionString.contains("fl=\"id,a_s,a_i,a_f\""));
+    assertTrue(expressionString.contains("id=blah"));
+    assertTrue(expressionString.contains("checkpointEvery=1000"));
+  }
+
+
   @Test
   public void testStatsStream() throws Exception {