You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2016/10/27 01:24:53 UTC
[1/3] lucene-solr:branch_6x: SOLR-9533: Reload core config when a
core is reloaded
Repository: lucene-solr
Updated Branches:
refs/heads/branch_6x bbc318947 -> 0eb62c01b
SOLR-9533: Reload core config when a core is reloaded
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/9758603b
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/9758603b
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/9758603b
Branch: refs/heads/branch_6x
Commit: 9758603b81dfc3cf880bc8d40d5692177b61ae17
Parents: bbc3189
Author: Joel Bernstein <jb...@apache.org>
Authored: Wed Oct 26 17:38:13 2016 -0400
Committer: Joel Bernstein <jb...@apache.org>
Committed: Wed Oct 26 20:46:00 2016 -0400
----------------------------------------------------------------------
.../src/java/org/apache/solr/core/SolrCore.java | 4 +-
.../solr/core/TestCorePropertiesReload.java | 71 ++++++++++++++++++++
2 files changed, 74 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9758603b/solr/core/src/java/org/apache/solr/core/SolrCore.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 9c06acc..7130c23 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -585,9 +585,11 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
boolean success = false;
SolrCore core = null;
try {
+ CoreDescriptor cd = new CoreDescriptor(coreDescriptor.getName(), coreDescriptor);
+ cd.loadExtraProperties(); //Reload the extra properties
core = new SolrCore(getName(), getDataDir(), coreConfig.getSolrConfig(),
coreConfig.getIndexSchema(), coreConfig.getProperties(),
- coreDescriptor, updateHandler, solrDelPolicy, currentCore);
+ cd, updateHandler, solrDelPolicy, currentCore);
// we open a new IndexWriter to pick up the latest config
core.getUpdateHandler().getSolrCoreState().newIndexWriter(core, false);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9758603b/solr/core/src/test/org/apache/solr/core/TestCorePropertiesReload.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/core/TestCorePropertiesReload.java b/solr/core/src/test/org/apache/solr/core/TestCorePropertiesReload.java
new file mode 100644
index 0000000..bb7aaa0
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/core/TestCorePropertiesReload.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.core;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.util.Properties;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.solr.SolrTestCaseJ4;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestCorePropertiesReload extends SolrTestCaseJ4 {
+
+ private final File solrHomeDirectory = createTempDir().toFile();
+
+ public void setMeUp() throws Exception {
+ FileUtils.copyDirectory(new File(TEST_HOME()), solrHomeDirectory);
+ Properties props = new Properties();
+ props.setProperty("test", "Before reload");
+ writeProperties(props);
+ initCore("solrconfig.xml", "schema.xml", solrHomeDirectory.getAbsolutePath());
+ }
+
+ @Test
+ public void testPropertiesReload() throws Exception {
+ setMeUp();
+ SolrCore core = h.getCore();
+ CoreDescriptor coreDescriptor = core.getCoreDescriptor();
+ String testProp = coreDescriptor.getCoreProperty("test", null);
+ assertTrue(testProp.equals("Before reload"));
+
+ //Re-write the properties file
+ Properties props = new Properties();
+ props.setProperty("test", "After reload");
+ writeProperties(props);
+
+ h.reload();
+ core = h.getCore();
+ coreDescriptor = core.getCoreDescriptor();
+
+ testProp = coreDescriptor.getCoreProperty("test", null);
+ assertTrue(testProp.equals("After reload"));
+ }
+
+ private void writeProperties(Properties props) throws Exception {
+ FileWriter out = null;
+ try {
+ File confDir = new File(new File(solrHomeDirectory, "collection1"), "conf");
+ out = new FileWriter(new File(confDir, "solrcore.properties"));
+ props.store(out, "Reload Test");
+ } finally {
+ out.close();
+ }
+ }
+}
[3/3] lucene-solr:branch_6x: SOLR-9533: Fix precommit
Posted by jb...@apache.org.
SOLR-9533: Fix precommit
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/0eb62c01
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/0eb62c01
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/0eb62c01
Branch: refs/heads/branch_6x
Commit: 0eb62c01b29e13b360b97aa28a991e4b5e694aa7
Parents: 67938c2
Author: Joel Bernstein <jb...@apache.org>
Authored: Wed Oct 26 20:08:28 2016 -0400
Committer: Joel Bernstein <jb...@apache.org>
Committed: Wed Oct 26 20:48:38 2016 -0400
----------------------------------------------------------------------
.../org/apache/solr/core/TestCorePropertiesReload.java | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0eb62c01/solr/core/src/test/org/apache/solr/core/TestCorePropertiesReload.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/core/TestCorePropertiesReload.java b/solr/core/src/test/org/apache/solr/core/TestCorePropertiesReload.java
index bb7aaa0..6e1f768 100644
--- a/solr/core/src/test/org/apache/solr/core/TestCorePropertiesReload.java
+++ b/solr/core/src/test/org/apache/solr/core/TestCorePropertiesReload.java
@@ -17,12 +17,14 @@
package org.apache.solr.core;
import java.io.File;
-import java.io.FileWriter;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.BufferedWriter;
+import java.io.Writer;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
import org.apache.solr.SolrTestCaseJ4;
-import org.junit.BeforeClass;
import org.junit.Test;
public class TestCorePropertiesReload extends SolrTestCaseJ4 {
@@ -59,11 +61,12 @@ public class TestCorePropertiesReload extends SolrTestCaseJ4 {
}
private void writeProperties(Properties props) throws Exception {
- FileWriter out = null;
+ Writer out = null;
try {
File confDir = new File(new File(solrHomeDirectory, "collection1"), "conf");
- out = new FileWriter(new File(confDir, "solrcore.properties"));
+ out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(new File(confDir, "solrcore.properties")), "UTF8"));
props.store(out, "Reload Test");
+
} finally {
out.close();
}
[2/3] lucene-solr:branch_6x: SOLR-9559: Add ExecutorStream to execute
stored Streaming Expressions
Posted by jb...@apache.org.
SOLR-9559: Add ExecutorStream to execute stored Streaming Expressions
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/67938c2b
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/67938c2b
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/67938c2b
Branch: refs/heads/branch_6x
Commit: 67938c2bab9011e1049763368897645a1bf9209f
Parents: 9758603
Author: Joel Bernstein <jb...@apache.org>
Authored: Wed Oct 26 17:39:59 2016 -0400
Committer: Joel Bernstein <jb...@apache.org>
Committed: Wed Oct 26 20:48:06 2016 -0400
----------------------------------------------------------------------
.../org/apache/solr/handler/StreamHandler.java | 32 +--
.../client/solrj/io/stream/ExecutorStream.java | 224 +++++++++++++++++++
.../solrj/io/stream/StreamExpressionTest.java | 152 ++++++++++++-
3 files changed, 372 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/67938c2b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
index 0b42192..6e00efc 100644
--- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
@@ -35,36 +35,7 @@ import org.apache.solr.client.solrj.io.ops.ConcatOperation;
import org.apache.solr.client.solrj.io.ops.DistinctOperation;
import org.apache.solr.client.solrj.io.ops.GroupOperation;
import org.apache.solr.client.solrj.io.ops.ReplaceOperation;
-import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
-import org.apache.solr.client.solrj.io.stream.CommitStream;
-import org.apache.solr.client.solrj.io.stream.ComplementStream;
-import org.apache.solr.client.solrj.io.stream.DaemonStream;
-import org.apache.solr.client.solrj.io.stream.ExceptionStream;
-import org.apache.solr.client.solrj.io.stream.FacetStream;
-import org.apache.solr.client.solrj.io.stream.FeaturesSelectionStream;
-import org.apache.solr.client.solrj.io.stream.FetchStream;
-import org.apache.solr.client.solrj.io.stream.HashJoinStream;
-import org.apache.solr.client.solrj.io.stream.InnerJoinStream;
-import org.apache.solr.client.solrj.io.stream.IntersectStream;
-import org.apache.solr.client.solrj.io.stream.JDBCStream;
-import org.apache.solr.client.solrj.io.stream.LeftOuterJoinStream;
-import org.apache.solr.client.solrj.io.stream.MergeStream;
-import org.apache.solr.client.solrj.io.stream.ModelStream;
-import org.apache.solr.client.solrj.io.stream.OuterHashJoinStream;
-import org.apache.solr.client.solrj.io.stream.ParallelStream;
-import org.apache.solr.client.solrj.io.stream.RankStream;
-import org.apache.solr.client.solrj.io.stream.ReducerStream;
-import org.apache.solr.client.solrj.io.stream.RollupStream;
-import org.apache.solr.client.solrj.io.stream.ScoreNodesStream;
-import org.apache.solr.client.solrj.io.stream.SelectStream;
-import org.apache.solr.client.solrj.io.stream.SortStream;
-import org.apache.solr.client.solrj.io.stream.StatsStream;
-import org.apache.solr.client.solrj.io.stream.StreamContext;
-import org.apache.solr.client.solrj.io.stream.TextLogitStream;
-import org.apache.solr.client.solrj.io.stream.TopicStream;
-import org.apache.solr.client.solrj.io.stream.TupleStream;
-import org.apache.solr.client.solrj.io.stream.UniqueStream;
-import org.apache.solr.client.solrj.io.stream.UpdateStream;
+import org.apache.solr.client.solrj.io.stream.*;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
@@ -168,6 +139,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
.withFunctionName("model", ModelStream.class)
.withFunctionName("classify", ClassifyStream.class)
.withFunctionName("fetch", FetchStream.class)
+ .withFunctionName("executor", ExecutorStream.class)
// metrics
.withFunctionName("min", MinMetric.class)
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/67938c2b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExecutorStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExecutorStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExecutorStream.java
new file mode 100644
index 0000000..6765f72
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExecutorStream.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The executor function wraps a stream with Tuples containing Streaming Expressions
+ * and executes them in parallel. Sample syntax:
+ *
+ * executor(thread=10, topic(storedExpressions, q="*:*", fl="expr_s, id", id="topic1"))
+ *
+ * The Streaming Expression to execute is taken from the expr field in the Tuples.
+ */
+
+public class ExecutorStream extends TupleStream implements Expressible {
+
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private TupleStream stream;
+
+ private int threads;
+
+ private ExecutorService executorService;
+ private StreamFactory streamFactory;
+ private StreamContext streamContext;
+
+ public ExecutorStream(StreamExpression expression, StreamFactory factory) throws IOException {
+ // grab all parameters out
+ List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
+ StreamExpressionNamedParameter threadsParam = factory.getNamedOperand(expression, "threads");
+
+ int threads = 6;
+
+ if(threadsParam != null) {
+ threads = Integer.parseInt(((StreamExpressionValue)threadsParam.getParameter()).getValue());
+ }
+
+ if(1 != streamExpressions.size()){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
+ }
+
+ TupleStream stream = factory.constructStream(streamExpressions.get(0));
+ init(stream, threads, factory);
+ }
+
+ private void init(TupleStream tupleStream, int threads, StreamFactory factory) throws IOException{
+ this.threads = threads;
+ this.stream = tupleStream;
+ this.streamFactory = factory;
+ }
+
+ @Override
+ public StreamExpression toExpression(StreamFactory factory) throws IOException {
+ return toExpression(factory, true);
+ }
+
+ private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
+
+ // function name
+ StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+ expression.addParameter(new StreamExpressionNamedParameter("threads", Integer.toString(threads)));
+
+ // stream
+ if(includeStreams) {
+ if (stream instanceof Expressible) {
+ expression.addParameter(((Expressible) stream).toExpression(factory));
+ } else {
+ throw new IOException("The ExecuteStream contains a non-expressible TupleStream - it cannot be converted to an expression");
+ }
+ }
+
+ return expression;
+ }
+
+ @Override
+ public Explanation toExplanation(StreamFactory factory) throws IOException {
+
+ return new StreamExplanation(getStreamNodeId().toString())
+ .withChildren(new Explanation[]{
+ stream.toExplanation(factory)
+ })
+ .withFunctionName(factory.getFunctionName(this.getClass()))
+ .withImplementingClass(this.getClass().getName())
+ .withExpressionType(ExpressionType.STREAM_DECORATOR)
+ .withExpression(toExpression(factory, false).toString());
+ }
+
+ public void setStreamContext(StreamContext streamContext) {
+ this.streamContext = streamContext;
+ this.stream.setStreamContext(streamContext);
+ }
+
+ public List<TupleStream> children() {
+ List<TupleStream> l = new ArrayList();
+ l.add(stream);
+ return l;
+ }
+
+ public void open() throws IOException {
+ executorService = ExecutorUtil.newMDCAwareFixedThreadPool(threads, new SolrjNamedThreadFactory("ExecutorStream"));
+ stream.open();
+ }
+
+ public void close() throws IOException {
+ stream.close();
+ executorService.shutdown();
+ try {
+ executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+ } catch(InterruptedException e) {
+ logger.error("Interrupted while waiting for termination", e);
+ }
+ }
+
+ public Tuple read() throws IOException {
+ ArrayBlockingQueue<Tuple> queue = new ArrayBlockingQueue(10000);
+ while(true) {
+ Tuple tuple = stream.read();
+ if (!tuple.EOF) {
+ try {
+ queue.put(tuple);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ executorService.execute(new StreamTask(queue, streamFactory, streamContext));
+ } else {
+ return tuple;
+ }
+ }
+ }
+
+ public StreamComparator getStreamSort(){
+ return stream.getStreamSort();
+ }
+
+ public int getCost() {
+ return 0;
+ }
+
+ public static class StreamTask implements Runnable {
+
+ private ArrayBlockingQueue<Tuple> queue;
+ private StreamFactory streamFactory;
+ private StreamContext streamContext;
+
+ public StreamTask(ArrayBlockingQueue queue, StreamFactory streamFactory, StreamContext streamContext) {
+ this.queue = queue;
+ this.streamFactory = streamFactory;
+ this.streamContext = new StreamContext();
+ this.streamContext.setSolrClientCache(streamContext.getSolrClientCache());
+ this.streamContext.setModelCache(streamContext.getModelCache());
+ }
+
+ public void run() {
+ Tuple tuple = null;
+ try {
+ tuple = queue.take();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ String expr = tuple.getString("expr_s");
+ Object id = tuple.get("id");
+ TupleStream stream = null;
+
+ try {
+ stream = streamFactory.constructStream(expr);
+ stream.setStreamContext(streamContext);
+ stream.open();
+ while (true) {
+ Tuple t = stream.read();
+ if (t.EOF) {
+ break;
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Executor Error: id="+id+" expr_s="+expr, e);
+ } finally {
+ try {
+ stream.close();
+ } catch (Exception e1) {
+ logger.error("Executor Error", e1);
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/67938c2b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index 7b5777d..106368e 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -256,7 +256,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
tuples = getTuples(stream);
assertEquals(4, tuples.size());
- assertOrder(tuples, 4,3,1,0);
+ assertOrder(tuples, 4, 3, 1, 0);
// Basic w/ multi comp
sParams.set("q2", "search(" + COLLECTION + ", q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=${mySort})");
@@ -522,14 +522,14 @@ public class StreamExpressionTest extends SolrCloudTestCase {
// Basic test
expression = StreamExpressionParser.parse("top("
- + "n=3,"
- + "search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"),"
- + "sort=\"a_f asc, a_i asc\")");
+ + "n=3,"
+ + "search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"),"
+ + "sort=\"a_f asc, a_i asc\")");
stream = new RankStream(expression, factory);
tuples = getTuples(stream);
assert(tuples.size() == 3);
- assertOrder(tuples, 0,2,1);
+ assertOrder(tuples, 0, 2, 1);
// Basic test desc
expression = StreamExpressionParser.parse("top("
@@ -3794,7 +3794,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", expr);
- paramsLoc.set("qt","/stream");
+ paramsLoc.set("qt", "/stream");
SolrStream classifyStream = new SolrStream(url, paramsLoc);
Map<String, Double> idToLabel = getIdToLabel(classifyStream, "probability_d");
assertEquals(idToLabel.size(), 2);
@@ -3866,6 +3866,146 @@ public class StreamExpressionTest extends SolrCloudTestCase {
CollectionAdminRequest.deleteCollection("checkpointCollection").process(cluster.getSolrClient());
}
+ @Test
+ public void testExecutorStream() throws Exception {
+ CollectionAdminRequest.createCollection("workQueue", "conf", 2, 1).process(cluster.getSolrClient());
+ AbstractDistribZkTestBase.waitForRecoveriesToFinish("workQueue", cluster.getSolrClient().getZkStateReader(),
+ false, true, TIMEOUT);
+ CollectionAdminRequest.createCollection("mainCorpus", "conf", 2, 1).process(cluster.getSolrClient());
+ AbstractDistribZkTestBase.waitForRecoveriesToFinish("mainCorpus", cluster.getSolrClient().getZkStateReader(),
+ false, true, TIMEOUT);
+ CollectionAdminRequest.createCollection("destination", "conf", 2, 1).process(cluster.getSolrClient());
+ AbstractDistribZkTestBase.waitForRecoveriesToFinish("destination", cluster.getSolrClient().getZkStateReader(),
+ false, true, TIMEOUT);
+
+ UpdateRequest workRequest = new UpdateRequest();
+ UpdateRequest dataRequest = new UpdateRequest();
+
+
+ for (int i = 0; i < 500; i++) {
+ workRequest.add(id, String.valueOf(i), "expr_s", "update(destination, batchSize=50, search(mainCorpus, q=id:"+i+", rows=1, sort=\"id asc\", fl=\"id, body_t, field_i\"))");
+ dataRequest.add(id, String.valueOf(i), "body_t", "hello world "+i, "field_i", Integer.toString(i));
+ }
+
+ workRequest.commit(cluster.getSolrClient(), "workQueue");
+ dataRequest.commit(cluster.getSolrClient(), "mainCorpus");
+
+ String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/destination";
+ TupleStream executorStream;
+ ModifiableSolrParams paramsLoc;
+
+ StreamFactory factory = new StreamFactory()
+ .withCollectionZkHost("workQueue", cluster.getZkServer().getZkAddress())
+ .withCollectionZkHost("mainCorpus", cluster.getZkServer().getZkAddress())
+ .withCollectionZkHost("destination", cluster.getZkServer().getZkAddress())
+ .withFunctionName("search", CloudSolrStream.class)
+ .withFunctionName("executor", ExecutorStream.class)
+ .withFunctionName("update", UpdateStream.class);
+
+ String executorExpression = "executor(threads=3, search(workQueue, q=\"*:*\", fl=\"id, expr_s\", rows=1000, sort=\"id desc\"))";
+ executorStream = factory.constructStream(executorExpression);
+
+ StreamContext context = new StreamContext();
+ SolrClientCache clientCache = new SolrClientCache();
+ context.setSolrClientCache(clientCache);
+ executorStream.setStreamContext(context);
+ getTuples(executorStream);
+ //Destination collection should now contain all the records in the main corpus.
+ cluster.getSolrClient().commit("destination");
+ paramsLoc = new ModifiableSolrParams();
+ paramsLoc.set("expr", "search(destination, q=\"*:*\", fl=\"id, body_t, field_i\", rows=1000, sort=\"field_i asc\")");
+ paramsLoc.set("qt","/stream");
+
+ SolrStream solrStream = new SolrStream(url, paramsLoc);
+ List<Tuple> tuples = getTuples(solrStream);
+ assertTrue(tuples.size() == 500);
+ for(int i=0; i<500; i++) {
+ Tuple tuple = tuples.get(i);
+ long ivalue = tuple.getLong("field_i");
+ String body = tuple.getString("body_t");
+ assertTrue(ivalue == i);
+ assertTrue(body.equals("hello world "+i));
+ }
+
+ solrStream.close();
+ clientCache.close();
+ CollectionAdminRequest.deleteCollection("workQueue").process(cluster.getSolrClient());
+ CollectionAdminRequest.deleteCollection("mainCorpus").process(cluster.getSolrClient());
+ CollectionAdminRequest.deleteCollection("destination").process(cluster.getSolrClient());
+ }
+
+
+ @Test
+ public void testParallelExecutorStream() throws Exception {
+ CollectionAdminRequest.createCollection("workQueue", "conf", 2, 1).process(cluster.getSolrClient());
+ AbstractDistribZkTestBase.waitForRecoveriesToFinish("workQueue", cluster.getSolrClient().getZkStateReader(),
+ false, true, TIMEOUT);
+ CollectionAdminRequest.createCollection("mainCorpus", "conf", 2, 1).process(cluster.getSolrClient());
+ AbstractDistribZkTestBase.waitForRecoveriesToFinish("mainCorpus", cluster.getSolrClient().getZkStateReader(),
+ false, true, TIMEOUT);
+ CollectionAdminRequest.createCollection("destination", "conf", 2, 1).process(cluster.getSolrClient());
+ AbstractDistribZkTestBase.waitForRecoveriesToFinish("destination", cluster.getSolrClient().getZkStateReader(),
+ false, true, TIMEOUT);
+
+ UpdateRequest workRequest = new UpdateRequest();
+ UpdateRequest dataRequest = new UpdateRequest();
+
+
+ for (int i = 0; i < 500; i++) {
+ workRequest.add(id, String.valueOf(i), "expr_s", "update(destination, batchSize=50, search(mainCorpus, q=id:"+i+", rows=1, sort=\"id asc\", fl=\"id, body_t, field_i\"))");
+ dataRequest.add(id, String.valueOf(i), "body_t", "hello world "+i, "field_i", Integer.toString(i));
+ }
+
+ workRequest.commit(cluster.getSolrClient(), "workQueue");
+ dataRequest.commit(cluster.getSolrClient(), "mainCorpus");
+
+ String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/destination";
+ TupleStream executorStream;
+ ModifiableSolrParams paramsLoc;
+
+ StreamFactory factory = new StreamFactory()
+ .withCollectionZkHost("workQueue", cluster.getZkServer().getZkAddress())
+ .withCollectionZkHost("mainCorpus", cluster.getZkServer().getZkAddress())
+ .withCollectionZkHost("destination", cluster.getZkServer().getZkAddress())
+ .withFunctionName("search", CloudSolrStream.class)
+ .withFunctionName("executor", ExecutorStream.class)
+ .withFunctionName("parallel", ParallelStream.class)
+ .withFunctionName("update", UpdateStream.class);
+
+ String executorExpression = "parallel(workQueue, workers=2, sort=\"EOF asc\", executor(threads=3, queueSize=100, search(workQueue, q=\"*:*\", fl=\"id, expr_s\", rows=1000, partitionKeys=id, sort=\"id desc\")))";
+ executorStream = factory.constructStream(executorExpression);
+
+ StreamContext context = new StreamContext();
+ SolrClientCache clientCache = new SolrClientCache();
+ context.setSolrClientCache(clientCache);
+ executorStream.setStreamContext(context);
+ getTuples(executorStream);
+ //Destination collection should now contain all the records in the main corpus.
+ cluster.getSolrClient().commit("destination");
+ paramsLoc = new ModifiableSolrParams();
+ paramsLoc.set("expr", "search(destination, q=\"*:*\", fl=\"id, body_t, field_i\", rows=1000, sort=\"field_i asc\")");
+ paramsLoc.set("qt","/stream");
+
+ SolrStream solrStream = new SolrStream(url, paramsLoc);
+ List<Tuple> tuples = getTuples(solrStream);
+ assertTrue(tuples.size() == 500);
+ for(int i=0; i<500; i++) {
+ Tuple tuple = tuples.get(i);
+ long ivalue = tuple.getLong("field_i");
+ String body = tuple.getString("body_t");
+ assertTrue(ivalue == i);
+ assertTrue(body.equals("hello world " + i));
+ }
+
+ solrStream.close();
+ clientCache.close();
+ CollectionAdminRequest.deleteCollection("workQueue").process(cluster.getSolrClient());
+ CollectionAdminRequest.deleteCollection("mainCorpus").process(cluster.getSolrClient());
+ CollectionAdminRequest.deleteCollection("destination").process(cluster.getSolrClient());
+ }
+
+
+
private Map<String,Double> getIdToLabel(TupleStream stream, String outField) throws IOException {
Map<String, Double> idToLabel = new HashMap<>();
List<Tuple> tuples = getTuples(stream);