You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2016/03/21 18:07:28 UTC

[2/2] lucene-solr:branch_6_0: SOLR-8878: Allow the DaemonStream run rate be controlled by the internal stream

SOLR-8878: Allow the DaemonStream run rate be controlled by the internal stream


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/39c1858a
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/39c1858a
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/39c1858a

Branch: refs/heads/branch_6_0
Commit: 39c1858adc267131c9de07ca6f087dbbdf58c848
Parents: c49e761
Author: jbernste <jb...@apache.org>
Authored: Sun Mar 20 22:04:14 2016 -0400
Committer: jbernste <jb...@apache.org>
Committed: Mon Mar 21 12:57:18 2016 -0400

----------------------------------------------------------------------
 .../client/solrj/io/stream/DaemonStream.java    | 35 +++++++-----
 .../client/solrj/io/stream/TopicStream.java     | 16 +++++-
 .../client/solrj/io/stream/StreamingTest.java   | 56 +++++++++++++++++++-
 3 files changed, 92 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/39c1858a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
index edafd7e..a0aabe3 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
@@ -73,7 +73,7 @@ public class DaemonStream extends TupleStream implements Expressible {
     }
 
     if(runExpression == null) {
-      throw new IOException("Invalid expression runInterval parameter expected");
+      runInterval = 2000;
     } else {
       runInterval = Long.parseLong(((StreamExpressionValue) runExpression.getParameter()).getValue());
     }
@@ -243,7 +243,7 @@ public class DaemonStream extends TupleStream implements Expressible {
       OUTER:
       while (!getShutdown()) {
         long now = new Date().getTime();
-        if((now-lastRun) > this.runInterval) {
+        if ((now - lastRun) > this.runInterval) {
           lastRun = now;
           try {
             tupleStream.open();
@@ -252,25 +252,31 @@ public class DaemonStream extends TupleStream implements Expressible {
               Tuple tuple = tupleStream.read();
               if (tuple.EOF) {
                 errors = 0; // Reset errors on successful run.
+                if (tuple.fields.containsKey("sleepMillis")) {
+                  this.sleepMillis = tuple.getLong("sleepMillis");
+                  this.runInterval = -1;
+                }
                 break INNER;
               } else if (!eatTuples) {
                 try {
                   queue.put(tuple);
-                } catch(InterruptedException e) {
+                } catch (InterruptedException e) {
                   break OUTER;
                 }
               }
             }
           } catch (IOException e) {
+            e.printStackTrace();
             exception = e;
-            logger.error("Error in DaemonStream:"+id, e);
+            logger.error("Error in DaemonStream:" + id, e);
             ++errors;
-            if(errors > 100) {
-              logger.error("Too many consectutive errors. Stopping DaemonStream:"+id);
+            if (errors > 100) {
+              logger.error("Too many consectutive errors. Stopping DaemonStream:" + id);
               break OUTER;
             }
           } catch (Throwable t) {
-            logger.error("Fatal Error in DaemonStream:"+id, t);
+            t.printStackTrace();
+            logger.error("Fatal Error in DaemonStream:" + id, t);
             //For anything other then IOException break out of the loop and shutdown the thread.
             break OUTER;
           } finally {
@@ -279,18 +285,21 @@ public class DaemonStream extends TupleStream implements Expressible {
             } catch (IOException e1) {
               if (exception == null) {
                 exception = e1;
-                logger.error("Error in DaemonStream:"+id, e1);
+                logger.error("Error in DaemonStream:" + id, e1);
                 break OUTER;
               }
             }
           }
         }
         incrementIterations();
-        try {
-          Thread.sleep(sleepMillis);
-        } catch (InterruptedException e) {
-          logger.error("Error in DaemonStream:"+id, e);
-          break OUTER;
+
+        if (sleepMillis > 0) {
+          try {
+            Thread.sleep(sleepMillis);
+          } catch (InterruptedException e) {
+            logger.error("Error in DaemonStream:" + id, e);
+            break OUTER;
+          }
         }
       }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/39c1858a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
index 3b7aa90..a52e8f9 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
@@ -65,6 +65,7 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
   private static final long serialVersionUID = 1;
 
   private long count;
+  private int runCount;
   private String id;
   protected long checkpointEvery;
 
@@ -98,6 +99,9 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
     this.checkpointEvery = checkpointEvery;
     this.id = id;
     this.comp = new FieldComparator("_version_", ComparatorOrder.ASCENDING);
+    if(!params.containsKey("rows")) {
+      params.put("rows", "500");
+    }
   }
 
   public TopicStream(StreamExpression expression, StreamFactory factory) throws IOException{
@@ -257,6 +261,7 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
   }
 
   public void close() throws IOException {
+    runCount = 0;
     try {
       persistCheckpoints();
     } finally {
@@ -277,10 +282,17 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
     Tuple tuple = _read();
 
     if(tuple.EOF) {
+      if(runCount > 0) {
+        tuple.put("sleepMillis", 0);
+      } else {
+        tuple.put("sleepMillis", 1000);
+      }
+
       return tuple;
     }
 
     ++count;
+    ++runCount;
     if(checkpointEvery > -1 && (count % checkpointEvery) == 0) {
       persistCheckpoints();
     }
@@ -427,7 +439,9 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
       params.put("distrib", "false"); // We are the aggregator.
       String fl = params.get("fl");
       params.put("sort", "_version_ asc");
-      fl += ",_version_";
+      if(!fl.contains("_version_")) {
+        fl += ",_version_";
+      }
       params.put("fl", fl);
 
       Random random = new Random();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/39c1858a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
index cec2c52..165029f 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
 import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
 import org.apache.solr.client.solrj.io.comp.FieldComparator;
@@ -226,7 +227,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     attachStreamFactory(pstream);
     List<Tuple> tuples = getTuples(pstream);
     assert(tuples.size() == 5);
-    assertOrder(tuples, 0,1,3,4,6);
+    assertOrder(tuples, 0, 1, 3, 4, 6);
 
     //Test the eofTuples
 
@@ -1369,7 +1370,59 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
   }
 
 
+  private void testDaemonTopicStream() throws Exception {
 
+    String zkHost = zkServer.getZkAddress();
+
+    StreamContext context = new StreamContext();
+    SolrClientCache cache = new SolrClientCache();
+    context.setSolrClientCache(cache);
+
+    Map params = new HashMap();
+    params.put("q","a_s:hello0");
+    params.put("rows", "500");
+    params.put("fl", "id");
+
+    TopicStream topicStream = new TopicStream(zkHost, "collection1", "collection1", "50000000", 1000000, params);
+
+    DaemonStream daemonStream = new DaemonStream(topicStream, "daemon1", 1000, 500);
+    daemonStream.setStreamContext(context);
+
+    daemonStream.open();
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
+    indexr(id, "3", "a_s", "hello0", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello0", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+
+    commit();
+
+
+    for(int i=0; i<5; i++) {
+      daemonStream.read();
+    }
+
+
+    indexr(id, "5", "a_s", "hello0", "a_i", "4", "a_f", "4");
+    indexr(id, "6", "a_s", "hello0", "a_i", "4", "a_f", "4");
+
+    commit();
+
+    for(int i=0; i<2; i++) {
+      daemonStream.read();
+    }
+
+    daemonStream.shutdown();
+
+    Tuple tuple = daemonStream.read();
+
+    assertTrue(tuple.EOF);
+    daemonStream.close();
+    cache.close();
+    del("*:*");
+    commit();
+  }
 
   private void testParallelRollupStream() throws Exception {
 
@@ -1799,6 +1852,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     testSubFacetStream();
     testStatsStream();
     //testExceptionStream();
+    testDaemonTopicStream();
     testParallelEOF();
     testParallelUniqueStream();
     testParallelRankStream();