You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2016/03/11 21:30:27 UTC

lucene-solr git commit: SOLR-8832: Faulty DaemonStream shutdown procedures

Repository: lucene-solr
Updated Branches:
  refs/heads/master 50c413e86 -> 007d41c9f


SOLR-8832: Faulty DaemonStream shutdown procedures


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/007d41c9
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/007d41c9
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/007d41c9

Branch: refs/heads/master
Commit: 007d41c9f5073ee796dc35168d397e7a5b501997
Parents: 50c413e
Author: jbernste <jb...@apache.org>
Authored: Fri Mar 11 15:34:31 2016 -0500
Committer: jbernste <jb...@apache.org>
Committed: Fri Mar 11 15:35:51 2016 -0500

----------------------------------------------------------------------
 .../org/apache/solr/client/solrj/io/stream/DaemonStream.java | 6 +++++-
 .../solr/client/solrj/io/stream/StreamExpressionTest.java    | 8 ++++++--
 2 files changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/007d41c9/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
index 2f65394..edafd7e 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
@@ -171,11 +171,16 @@ public class DaemonStream extends TupleStream implements Expressible {
     this.tupleStream.setStreamContext(streamContext);
   }
 
+  public void shutdown() {
+    streamRunner.setShutdown(true);
+  }
+
   public void close() {
     if(closed) {
       return;
     }
     streamRunner.setShutdown(true);
+    this.closed = true;
   }
 
   public List<TupleStream> children() {
@@ -226,7 +231,6 @@ public class DaemonStream extends TupleStream implements Expressible {
 
     public synchronized void setShutdown(boolean shutdown) {
       this.shutdown = shutdown;
-      interrupt(); //We could be blocked on the queue or sleeping
     }
 
     public synchronized boolean getShutdown() {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/007d41c9/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index 465369b..e7f57c1 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -623,7 +623,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
     DaemonStream daemonStream;
 
     expression = StreamExpressionParser.parse("daemon(rollup("
-        + "search(collection1, q=*:*, fl=\"a_i,a_s\", sort=\"a_s asc\"),"
+        + "search(collection1, q=\"*:*\", fl=\"a_i,a_s\", sort=\"a_s asc\"),"
         + "over=\"a_s\","
         + "sum(a_i)"
         + "), id=\"test\", runInterval=\"1000\", queueSize=\"9\")");
@@ -2366,13 +2366,17 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
         assertEquals(14, (long) tuple.getLong(id));
         tuple = dstream.read(); // This should trigger a checkpoint as it's the 4th read from the stream.
         assertEquals(15, (long) tuple.getLong(id));
+
+        dstream.shutdown();
+        tuple = dstream.read();
+        assertTrue(tuple.EOF);
       } finally {
         dstream.close();
       }
     } finally {
-      cache.close();
       del("*:*");
       commit();
+      cache.close();
     }
   }