You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2016/07/17 05:34:36 UTC
[2/2] lucene-solr:branch_6x: SOLR-9240:Support running the topic()
Streaming Expression in parallel mode.
SOLR-9240:Support running the topic() Streaming Expression in parallel mode.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/c6df1868
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/c6df1868
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/c6df1868
Branch: refs/heads/branch_6x
Commit: c6df1868a0cdf79b4d4f8b8cd5fc58cf9794d6dc
Parents: 5932f52
Author: jbernste <jb...@apache.org>
Authored: Mon Jul 11 20:10:27 2016 -0400
Committer: jbernste <jb...@apache.org>
Committed: Sat Jul 16 23:25:08 2016 -0400
----------------------------------------------------------------------
.../client/solrj/io/stream/ParallelStream.java | 2 +-
.../client/solrj/io/stream/TopicStream.java | 41 +++-
.../solr/configsets/streaming/conf/schema.xml | 2 +-
.../solr/client/solrj/io/sql/JdbcTest.java | 4 +-
.../solrj/io/stream/StreamExpressionTest.java | 185 ++++++++++++++++++-
.../client/solrj/io/stream/StreamingTest.java | 19 +-
6 files changed, 231 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c6df1868/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
index 779cc31..3125ff0 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
@@ -101,7 +101,7 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
// Workers
if(null == workersParam || null == workersParam.getParameter() || !(workersParam.getParameter() instanceof StreamExpressionValue)){
- throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single 'workersParam' parameter of type positive integer but didn't find one",expression));
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single 'workers' parameter of type positive integer but didn't find one",expression));
}
String workersStr = ((StreamExpressionValue)workersParam.getParameter()).getValue();
int workersInt = 0;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c6df1868/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
index 30c6f59..c4343c6 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -74,9 +75,9 @@ public class TopicStream extends CloudSolrStream implements Expressible {
private int runCount;
private String id;
protected long checkpointEvery;
-
private Map<String, Long> checkpoints = new HashMap<String, Long>();
private String checkpointCollection;
+ private long initialCheckpoint = -1;
// Use TopicStream that takes a SolrParams
@Deprecated
@@ -84,12 +85,14 @@ public class TopicStream extends CloudSolrStream implements Expressible {
String checkpointCollection,
String collection,
String id,
+ long initialCheckpoint,
long checkpointEvery,
Map<String, String> params) {
init(zkHost,
checkpointCollection,
collection,
id,
+ initialCheckpoint,
checkpointEvery,
new MapSolrParams(params));
}
@@ -98,12 +101,14 @@ public class TopicStream extends CloudSolrStream implements Expressible {
String checkpointCollection,
String collection,
String id,
+ long initialCheckpoint,
long checkpointEvery,
SolrParams params) {
init(zkHost,
checkpointCollection,
collection,
id,
+ initialCheckpoint,
checkpointEvery,
params);
}
@@ -113,6 +118,7 @@ public class TopicStream extends CloudSolrStream implements Expressible {
String checkpointCollection,
String collection,
String id,
+ long initialCheckpoint,
long checkpointEvery,
SolrParams params) {
this.zkHost = zkHost;
@@ -121,11 +127,13 @@ public class TopicStream extends CloudSolrStream implements Expressible {
if(mParams.getParams("rows") == null) {
mParams.set("rows", "500");
}
+
this.params = mParams;
this.collection = collection;
this.checkpointCollection = checkpointCollection;
this.checkpointEvery = checkpointEvery;
this.id = id;
+ this.initialCheckpoint = initialCheckpoint;
this.comp = new FieldComparator("_version_", ComparatorOrder.ASCENDING);
}
@@ -147,6 +155,13 @@ public class TopicStream extends CloudSolrStream implements Expressible {
throw new IOException("invalid TopicStream fl cannot be null");
}
+ long initialCheckpoint = -1;
+ StreamExpressionNamedParameter initialCheckpointParam = factory.getNamedOperand(expression, "initialCheckpoint");
+
+ if(initialCheckpointParam != null) {
+ initialCheckpoint = Long.parseLong(((StreamExpressionValue) initialCheckpointParam.getParameter()).getValue());
+ }
+
long checkpointEvery = -1;
StreamExpressionNamedParameter checkpointEveryParam = factory.getNamedOperand(expression, "checkpointEvery");
@@ -198,6 +213,7 @@ public class TopicStream extends CloudSolrStream implements Expressible {
checkpointCollectionName,
collectionName,
((StreamExpressionValue) idParam.getParameter()).getValue(),
+ initialCheckpoint,
checkpointEvery,
params);
}
@@ -226,6 +242,9 @@ public class TopicStream extends CloudSolrStream implements Expressible {
// zkHost
expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
expression.addParameter(new StreamExpressionNamedParameter("id", id));
+ if(initialCheckpoint > -1) {
+ expression.addParameter(new StreamExpressionNamedParameter("initialCheckpoint", Long.toString(initialCheckpoint)));
+ }
expression.addParameter(new StreamExpressionNamedParameter("checkpointEvery", Long.toString(checkpointEvery)));
return expression;
@@ -279,6 +298,11 @@ public class TopicStream extends CloudSolrStream implements Expressible {
this.solrStreams = new ArrayList();
this.eofTuples = Collections.synchronizedMap(new HashMap());
+ if(checkpoints.size() == 0 && streamContext.numWorkers > 1) {
+ //Each worker must maintain it's own checkpoints
+ this.id = this.id+"_"+streamContext.workerID;
+ }
+
if(streamContext.getSolrClientCache() != null) {
cloudSolrClient = streamContext.getSolrClientCache().getCloudSolrClient(zkHost);
} else {
@@ -385,7 +409,13 @@ public class TopicStream extends CloudSolrStream implements Expressible {
for(Slice slice : slices) {
String sliceName = slice.getName();
- long checkpoint = getCheckpoint(slice, clusterState.getLiveNodes());
+ long checkpoint = 0;
+ if(initialCheckpoint > -1) {
+ checkpoint = initialCheckpoint;
+ } else {
+ checkpoint = getCheckpoint(slice, clusterState.getLiveNodes());
+ }
+
this.checkpoints.put(sliceName, checkpoint);
}
}
@@ -405,7 +435,9 @@ public class TopicStream extends CloudSolrStream implements Expressible {
SolrStream solrStream = new SolrStream(coreUrl, params);
if(streamContext != null) {
- solrStream.setStreamContext(streamContext);
+ StreamContext localContext = new StreamContext();
+ localContext.setSolrClientCache(streamContext.getSolrClientCache());
+ solrStream.setStreamContext(localContext);
}
try {
@@ -502,6 +534,9 @@ public class TopicStream extends CloudSolrStream implements Expressible {
throw new Exception("Collection not found:" + this.collection);
}
}
+
+
+ Iterator<String> iterator = params.getParameterNamesIterator();
ModifiableSolrParams mParams = new ModifiableSolrParams(params);
mParams.set("distrib", "false"); // We are the aggregator.
String fl = mParams.get("fl");
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c6df1868/solr/solrj/src/test-files/solrj/solr/configsets/streaming/conf/schema.xml
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test-files/solrj/solr/configsets/streaming/conf/schema.xml b/solr/solrj/src/test-files/solrj/solr/configsets/streaming/conf/schema.xml
index 34ecdcb..e7f2772 100644
--- a/solr/solrj/src/test-files/solrj/solr/configsets/streaming/conf/schema.xml
+++ b/solr/solrj/src/test-files/solrj/solr/configsets/streaming/conf/schema.xml
@@ -387,7 +387,7 @@
-->
- <field name="id" type="int" indexed="true" stored="true" multiValued="false" required="false"/>
+ <field name="id" type="string" indexed="true" stored="true" multiValued="false" required="false"/>
<field name="signatureField" type="string" indexed="true" stored="false"/>
<field name="s_multi" type="string" indexed="true" stored="true" docValues="true" multiValued="true"/>
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c6df1868/solr/solrj/src/test/org/apache/solr/client/solrj/io/sql/JdbcTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/sql/JdbcTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/sql/JdbcTest.java
index a031e58..41f3309 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/sql/JdbcTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/sql/JdbcTest.java
@@ -615,13 +615,13 @@ public class JdbcTest extends SolrCloudTestCase {
assertEquals("my_float_col".length(), resultSetMetaData.getColumnDisplaySize(4));
assertEquals("testnull_i".length(), resultSetMetaData.getColumnDisplaySize(5));
- assertEquals("Long", resultSetMetaData.getColumnTypeName(1));
+ assertEquals("String", resultSetMetaData.getColumnTypeName(1));
assertEquals("Long", resultSetMetaData.getColumnTypeName(2));
assertEquals("String", resultSetMetaData.getColumnTypeName(3));
assertEquals("Double", resultSetMetaData.getColumnTypeName(4));
assertEquals("Long", resultSetMetaData.getColumnTypeName(5));
- assertEquals(Types.DOUBLE, resultSetMetaData.getColumnType(1));
+ assertEquals(Types.VARCHAR, resultSetMetaData.getColumnType(1));
assertEquals(Types.DOUBLE, resultSetMetaData.getColumnType(2));
assertEquals(Types.VARCHAR, resultSetMetaData.getColumnType(3));
assertEquals(Types.DOUBLE, resultSetMetaData.getColumnType(4));
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c6df1868/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index b1da1c6..4af565a 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -261,7 +261,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
tuples = getTuples(stream);
assertEquals(5, tuples.size());
- assertOrder(tuples, 0,2,1,3,4);
+ assertOrder(tuples, 0, 2, 1, 3, 4);
}
@Test
@@ -1548,7 +1548,9 @@ public class StreamExpressionTest extends SolrCloudTestCase {
stream = new HashJoinStream(expression, factory);
tuples = getTuples(stream);
assertEquals(17, tuples.size());
- assertOrder(tuples, 1, 1, 2, 2, 15, 15, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 7);
+
+ //Does a lexical sort
+ assertOrder(tuples, 1, 1, 15, 15, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 7);
}
@@ -2526,6 +2528,138 @@ public class StreamExpressionTest extends SolrCloudTestCase {
}
}
+
+ @Test
+ public void testParallelTopicStream() throws Exception {
+
+ new UpdateRequest()
+ .add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "1")
+ .add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "2")
+ .add(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3")
+ .add(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4")
+ .add(id, "1", "a_s", "hello", "a_i", "1", "a_f", "5")
+ .add(id, "5", "a_s", "hello", "a_i", "10", "a_f", "6")
+ .add(id, "6", "a_s", "hello", "a_i", "11", "a_f", "7")
+ .add(id, "7", "a_s", "hello", "a_i", "12", "a_f", "8")
+ .add(id, "8", "a_s", "hello", "a_i", "13", "a_f", "9")
+ .add(id, "9", "a_s", "hello", "a_i", "14", "a_f", "10")
+ .commit(cluster.getSolrClient(), COLLECTION);
+
+ StreamFactory factory = new StreamFactory()
+ .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
+ .withFunctionName("topic", TopicStream.class)
+ .withFunctionName("search", CloudSolrStream.class)
+ .withFunctionName("parallel", ParallelStream.class)
+ .withFunctionName("daemon", DaemonStream.class);
+
+ StreamExpression expression;
+ TupleStream stream;
+ List<Tuple> tuples;
+
+ SolrClientCache cache = new SolrClientCache();
+
+ try {
+ //Store checkpoints in the same index as the main documents. This is perfectly valid
+ expression = StreamExpressionParser.parse("parallel(collection1, " +
+ "workers=\"2\", " +
+ "sort=\"_version_ asc\"," +
+ "topic(collection1, " +
+ "collection1, " +
+ "q=\"a_s:hello\", " +
+ "fl=\"id\", " +
+ "id=\"1000000\", " +
+ "partitionKeys=\"id\"))");
+
+ stream = factory.constructStream(expression);
+ StreamContext context = new StreamContext();
+ context.setSolrClientCache(cache);
+ stream.setStreamContext(context);
+ tuples = getTuples(stream);
+
+ //Should be zero because the checkpoints will be set to the highest version on the shards.
+ assertEquals(tuples.size(), 0);
+
+ cluster.getSolrClient().commit("collection1");
+ //Now check to see if the checkpoints are present
+
+ expression = StreamExpressionParser.parse("search(collection1, q=\"id:1000000*\", fl=\"id, checkpoint_ss, _version_\", sort=\"id asc\")");
+
+ stream = factory.constructStream(expression);
+ context = new StreamContext();
+ context.setSolrClientCache(cache);
+ stream.setStreamContext(context);
+ tuples = getTuples(stream);
+ assertEquals(tuples.size(), 2);
+ List<String> checkpoints = tuples.get(0).getStrings("checkpoint_ss");
+ assertEquals(checkpoints.size(), 2);
+ String id1 = tuples.get(0).getString("id");
+ String id2 = tuples.get(1).getString("id");
+ assertTrue(id1.equals("1000000_0"));
+ assertTrue(id2.equals("1000000_1"));
+
+ //Index a few more documents
+ new UpdateRequest()
+ .add(id, "10", "a_s", "hello", "a_i", "13", "a_f", "9")
+ .add(id, "11", "a_s", "hello", "a_i", "14", "a_f", "10")
+ .commit(cluster.getSolrClient(), COLLECTION);
+
+ expression = StreamExpressionParser.parse("parallel(collection1, " +
+ "workers=\"2\", " +
+ "sort=\"_version_ asc\"," +
+ "topic(collection1, " +
+ "collection1, " +
+ "q=\"a_s:hello\", " +
+ "fl=\"id\", " +
+ "id=\"1000000\", " +
+ "partitionKeys=\"id\"))");
+
+ stream = factory.constructStream(expression);
+ context = new StreamContext();
+ context.setSolrClientCache(cache);
+ stream.setStreamContext(context);
+
+ assertTopicRun(stream, "10", "11");
+
+ //Test will initial checkpoint. This should pull all
+
+ expression = StreamExpressionParser.parse("parallel(collection1, " +
+ "workers=\"2\", " +
+ "sort=\"_version_ asc\"," +
+ "topic(collection1, " +
+ "collection1, " +
+ "q=\"a_s:hello\", " +
+ "fl=\"id\", " +
+ "id=\"2000000\", " +
+ "initialCheckpoint=\"0\", " +
+ "partitionKeys=\"id\"))");
+
+ stream = factory.constructStream(expression);
+ context = new StreamContext();
+ context.setSolrClientCache(cache);
+ stream.setStreamContext(context);
+ assertTopicRun(stream, "0","1","2","3","4","5","6","7","8","9","10","11");
+
+ //Add more documents
+ //Index a few more documents
+ new UpdateRequest()
+ .add(id, "12", "a_s", "hello", "a_i", "13", "a_f", "9")
+ .add(id, "13", "a_s", "hello", "a_i", "14", "a_f", "10")
+ .commit(cluster.getSolrClient(), COLLECTION);
+
+ //Run the same topic again including the initialCheckpoint. It should start where it left off.
+ //initialCheckpoint should be ignored for all but the first run.
+ stream = factory.constructStream(expression);
+ context = new StreamContext();
+ context.setSolrClientCache(cache);
+ stream.setStreamContext(context);
+ assertTopicRun(stream, "12","13");
+ } finally {
+ cache.close();
+ }
+ }
+
+
+
@Test
public void testUpdateStream() throws Exception {
@@ -3031,9 +3165,9 @@ public class StreamExpressionTest extends SolrCloudTestCase {
int i = 0;
for(int val : ids) {
Tuple t = tuples.get(i);
- Long tip = (Long)t.get(fieldName);
- if(tip.intValue() != val) {
- throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
+ String tip = t.getString(fieldName);
+ if(!tip.equals(Integer.toString(val))) {
+ throw new Exception("Found value:"+tip+" expecting:"+val);
}
++i;
}
@@ -3119,9 +3253,9 @@ public class StreamExpressionTest extends SolrCloudTestCase {
int i=0;
for(int val : ids) {
Map t = maps.get(i);
- Long tip = (Long)t.get("id");
- if(tip.intValue() != val) {
- throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
+ String tip = (String)t.get("id");
+ if(!tip.equals(Integer.toString(val))) {
+ throw new Exception("Found value:"+tip+" expecting:"+val);
}
++i;
}
@@ -3145,4 +3279,39 @@ public class StreamExpressionTest extends SolrCloudTestCase {
return true;
}
+ private void assertTopicRun(TupleStream stream, String... idArray) throws Exception {
+ long version = -1;
+ int count = 0;
+ List<String> ids = new ArrayList();
+ for(String id : idArray) {
+ ids.add(id);
+ }
+
+ try {
+ stream.open();
+ while (true) {
+ Tuple tuple = stream.read();
+ if (tuple.EOF) {
+ break;
+ } else {
+ ++count;
+ String id = tuple.getString("id");
+ if (!ids.contains(id)) {
+ throw new Exception("Expecting id in topic run not found:" + id);
+ }
+
+ long v = tuple.getLong("_version_");
+ if (v < version) {
+ throw new Exception("Out of order version in topic run:" + v);
+ }
+ }
+ }
+ } finally {
+ stream.close();
+ }
+
+ if(count != ids.size()) {
+ throw new Exception("Wrong count in topic run:"+count);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c6df1868/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
index 9685b74..0da6750 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
@@ -1315,7 +1315,12 @@ public class StreamingTest extends SolrCloudTestCase {
SolrParams sParams = mapParams("q", "a_s:hello0", "rows", "500", "fl", "id");
- TopicStream topicStream = new TopicStream(zkHost, COLLECTION, COLLECTION, "50000000", 1000000, sParams);
+ TopicStream topicStream = new TopicStream(zkHost,
+ COLLECTION,
+ COLLECTION,
+ "50000000",
+ -1,
+ 1000000, sParams);
DaemonStream daemonStream = new DaemonStream(topicStream, "daemon1", 1000, 500);
daemonStream.setStreamContext(context);
@@ -1895,9 +1900,9 @@ public class StreamingTest extends SolrCloudTestCase {
int i = 0;
for(int val : ids) {
Tuple t = tuples.get(i);
- Long tip = (Long)t.get("id");
- if(tip.intValue() != val) {
- throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
+ String tip = (String)t.get("id");
+ if(!tip.equals(Integer.toString(val))) {
+ throw new Exception("Found value:"+tip+" expecting:"+val);
}
++i;
}
@@ -1926,9 +1931,9 @@ public class StreamingTest extends SolrCloudTestCase {
int i=0;
for(int val : ids) {
Map t = maps.get(i);
- Long tip = (Long)t.get("id");
- if(tip.intValue() != val) {
- throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
+ String tip = (String)t.get("id");
+ if(!tip.equals(Integer.toString(val))) {
+ throw new Exception("Found value:"+tip+" expecting:"+val);
}
++i;
}