You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2016/03/21 18:07:27 UTC
[1/2] lucene-solr:branch_6_0: SOLR-8878: Remove debugging
Repository: lucene-solr
Updated Branches:
refs/heads/branch_6_0 c49e761c1 -> 4dad70f24
SOLR-8878: Remove debugging
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/4dad70f2
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/4dad70f2
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/4dad70f2
Branch: refs/heads/branch_6_0
Commit: 4dad70f24a0596b66896f49df1710b349dc6a6ca
Parents: 39c1858
Author: jbernste <jb...@apache.org>
Authored: Sun Mar 20 22:19:38 2016 -0400
Committer: jbernste <jb...@apache.org>
Committed: Mon Mar 21 12:57:18 2016 -0400
----------------------------------------------------------------------
.../java/org/apache/solr/client/solrj/io/stream/DaemonStream.java | 2 --
1 file changed, 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4dad70f2/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
index a0aabe3..752ea7c 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
@@ -266,7 +266,6 @@ public class DaemonStream extends TupleStream implements Expressible {
}
}
} catch (IOException e) {
- e.printStackTrace();
exception = e;
logger.error("Error in DaemonStream:" + id, e);
++errors;
@@ -275,7 +274,6 @@ public class DaemonStream extends TupleStream implements Expressible {
break OUTER;
}
} catch (Throwable t) {
- t.printStackTrace();
logger.error("Fatal Error in DaemonStream:" + id, t);
//For anything other then IOException break out of the loop and shutdown the thread.
break OUTER;
[2/2] lucene-solr:branch_6_0: SOLR-8878: Allow the DaemonStream run
rate be controlled by the internal stream
Posted by jb...@apache.org.
SOLR-8878: Allow the DaemonStream run rate be controlled by the internal stream
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/39c1858a
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/39c1858a
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/39c1858a
Branch: refs/heads/branch_6_0
Commit: 39c1858adc267131c9de07ca6f087dbbdf58c848
Parents: c49e761
Author: jbernste <jb...@apache.org>
Authored: Sun Mar 20 22:04:14 2016 -0400
Committer: jbernste <jb...@apache.org>
Committed: Mon Mar 21 12:57:18 2016 -0400
----------------------------------------------------------------------
.../client/solrj/io/stream/DaemonStream.java | 35 +++++++-----
.../client/solrj/io/stream/TopicStream.java | 16 +++++-
.../client/solrj/io/stream/StreamingTest.java | 56 +++++++++++++++++++-
3 files changed, 92 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/39c1858a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
index edafd7e..a0aabe3 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
@@ -73,7 +73,7 @@ public class DaemonStream extends TupleStream implements Expressible {
}
if(runExpression == null) {
- throw new IOException("Invalid expression runInterval parameter expected");
+ runInterval = 2000;
} else {
runInterval = Long.parseLong(((StreamExpressionValue) runExpression.getParameter()).getValue());
}
@@ -243,7 +243,7 @@ public class DaemonStream extends TupleStream implements Expressible {
OUTER:
while (!getShutdown()) {
long now = new Date().getTime();
- if((now-lastRun) > this.runInterval) {
+ if ((now - lastRun) > this.runInterval) {
lastRun = now;
try {
tupleStream.open();
@@ -252,25 +252,31 @@ public class DaemonStream extends TupleStream implements Expressible {
Tuple tuple = tupleStream.read();
if (tuple.EOF) {
errors = 0; // Reset errors on successful run.
+ if (tuple.fields.containsKey("sleepMillis")) {
+ this.sleepMillis = tuple.getLong("sleepMillis");
+ this.runInterval = -1;
+ }
break INNER;
} else if (!eatTuples) {
try {
queue.put(tuple);
- } catch(InterruptedException e) {
+ } catch (InterruptedException e) {
break OUTER;
}
}
}
} catch (IOException e) {
+ e.printStackTrace();
exception = e;
- logger.error("Error in DaemonStream:"+id, e);
+ logger.error("Error in DaemonStream:" + id, e);
++errors;
- if(errors > 100) {
- logger.error("Too many consectutive errors. Stopping DaemonStream:"+id);
+ if (errors > 100) {
+ logger.error("Too many consectutive errors. Stopping DaemonStream:" + id);
break OUTER;
}
} catch (Throwable t) {
- logger.error("Fatal Error in DaemonStream:"+id, t);
+ t.printStackTrace();
+ logger.error("Fatal Error in DaemonStream:" + id, t);
//For anything other then IOException break out of the loop and shutdown the thread.
break OUTER;
} finally {
@@ -279,18 +285,21 @@ public class DaemonStream extends TupleStream implements Expressible {
} catch (IOException e1) {
if (exception == null) {
exception = e1;
- logger.error("Error in DaemonStream:"+id, e1);
+ logger.error("Error in DaemonStream:" + id, e1);
break OUTER;
}
}
}
}
incrementIterations();
- try {
- Thread.sleep(sleepMillis);
- } catch (InterruptedException e) {
- logger.error("Error in DaemonStream:"+id, e);
- break OUTER;
+
+ if (sleepMillis > 0) {
+ try {
+ Thread.sleep(sleepMillis);
+ } catch (InterruptedException e) {
+ logger.error("Error in DaemonStream:" + id, e);
+ break OUTER;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/39c1858a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
index 3b7aa90..a52e8f9 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
@@ -65,6 +65,7 @@ public class TopicStream extends CloudSolrStream implements Expressible {
private static final long serialVersionUID = 1;
private long count;
+ private int runCount;
private String id;
protected long checkpointEvery;
@@ -98,6 +99,9 @@ public class TopicStream extends CloudSolrStream implements Expressible {
this.checkpointEvery = checkpointEvery;
this.id = id;
this.comp = new FieldComparator("_version_", ComparatorOrder.ASCENDING);
+ if(!params.containsKey("rows")) {
+ params.put("rows", "500");
+ }
}
public TopicStream(StreamExpression expression, StreamFactory factory) throws IOException{
@@ -257,6 +261,7 @@ public class TopicStream extends CloudSolrStream implements Expressible {
}
public void close() throws IOException {
+ runCount = 0;
try {
persistCheckpoints();
} finally {
@@ -277,10 +282,17 @@ public class TopicStream extends CloudSolrStream implements Expressible {
Tuple tuple = _read();
if(tuple.EOF) {
+ if(runCount > 0) {
+ tuple.put("sleepMillis", 0);
+ } else {
+ tuple.put("sleepMillis", 1000);
+ }
+
return tuple;
}
++count;
+ ++runCount;
if(checkpointEvery > -1 && (count % checkpointEvery) == 0) {
persistCheckpoints();
}
@@ -427,7 +439,9 @@ public class TopicStream extends CloudSolrStream implements Expressible {
params.put("distrib", "false"); // We are the aggregator.
String fl = params.get("fl");
params.put("sort", "_version_ asc");
- fl += ",_version_";
+ if(!fl.contains("_version_")) {
+ fl += ",_version_";
+ }
params.put("fl", fl);
Random random = new Random();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/39c1858a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
index cec2c52..165029f 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
@@ -226,7 +227,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
attachStreamFactory(pstream);
List<Tuple> tuples = getTuples(pstream);
assert(tuples.size() == 5);
- assertOrder(tuples, 0,1,3,4,6);
+ assertOrder(tuples, 0, 1, 3, 4, 6);
//Test the eofTuples
@@ -1369,7 +1370,59 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
}
+ private void testDaemonTopicStream() throws Exception {
+ String zkHost = zkServer.getZkAddress();
+
+ StreamContext context = new StreamContext();
+ SolrClientCache cache = new SolrClientCache();
+ context.setSolrClientCache(cache);
+
+ Map params = new HashMap();
+ params.put("q","a_s:hello0");
+ params.put("rows", "500");
+ params.put("fl", "id");
+
+ TopicStream topicStream = new TopicStream(zkHost, "collection1", "collection1", "50000000", 1000000, params);
+
+ DaemonStream daemonStream = new DaemonStream(topicStream, "daemon1", 1000, 500);
+ daemonStream.setStreamContext(context);
+
+ daemonStream.open();
+
+ indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+ indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
+ indexr(id, "3", "a_s", "hello0", "a_i", "3", "a_f", "3");
+ indexr(id, "4", "a_s", "hello0", "a_i", "4", "a_f", "4");
+ indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+
+ commit();
+
+
+ for(int i=0; i<5; i++) {
+ daemonStream.read();
+ }
+
+
+ indexr(id, "5", "a_s", "hello0", "a_i", "4", "a_f", "4");
+ indexr(id, "6", "a_s", "hello0", "a_i", "4", "a_f", "4");
+
+ commit();
+
+ for(int i=0; i<2; i++) {
+ daemonStream.read();
+ }
+
+ daemonStream.shutdown();
+
+ Tuple tuple = daemonStream.read();
+
+ assertTrue(tuple.EOF);
+ daemonStream.close();
+ cache.close();
+ del("*:*");
+ commit();
+ }
private void testParallelRollupStream() throws Exception {
@@ -1799,6 +1852,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
testSubFacetStream();
testStatsStream();
//testExceptionStream();
+ testDaemonTopicStream();
testParallelEOF();
testParallelUniqueStream();
testParallelRankStream();