You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2016/04/07 22:29:44 UTC
[12/50] [abbrv] lucene-solr:apiv2: SOLR-8888: Add shortestPath
Streaming Expression
SOLR-8888: Add shortestPath Streaming Expression
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/3500b45d
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/3500b45d
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/3500b45d
Branch: refs/heads/apiv2
Commit: 3500b45d6d28253d44e48ff8e444774a5fb3ace0
Parents: 7263491
Author: jbernste <jb...@apache.org>
Authored: Thu Mar 31 16:23:59 2016 -0400
Committer: jbernste <jb...@apache.org>
Committed: Thu Mar 31 16:24:51 2016 -0400
----------------------------------------------------------------------
.../org/apache/solr/handler/StreamHandler.java | 2 +
.../solrj/io/graph/ShortestPathStream.java | 490 +++++++++++++++++++
.../client/solrj/io/graph/package-info.java | 22 +
.../solrj/io/graph/GraphExpressionTest.java | 404 +++++++++++++++
.../solr/client/solrj/io/graph/GraphTest.java | 387 +++++++++++++++
5 files changed, 1305 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3500b45d/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
index c7bac9f..226058e 100644
--- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
@@ -28,6 +28,7 @@ import java.util.Map.Entry;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.graph.ShortestPathStream;
import org.apache.solr.client.solrj.io.ops.ConcatOperation;
import org.apache.solr.client.solrj.io.ops.DistinctOperation;
import org.apache.solr.client.solrj.io.ops.GroupOperation;
@@ -115,6 +116,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
.withFunctionName("complement", ComplementStream.class)
.withFunctionName("daemon", DaemonStream.class)
.withFunctionName("topic", TopicStream.class)
+ .withFunctionName("shortestPath", ShortestPathStream.class)
// metrics
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3500b45d/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/ShortestPathStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/ShortestPathStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/ShortestPathStream.java
new file mode 100644
index 0000000..bb9b09d
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/ShortestPathStream.java
@@ -0,0 +1,490 @@
+package org.apache.solr.client.solrj.io.graph;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
+import org.apache.solr.client.solrj.io.stream.*;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
+
+public class ShortestPathStream extends TupleStream implements Expressible {
+
+ private static final long serialVersionUID = 1;
+
+ private String fromNode;
+ private String toNode;
+ private String fromField;
+ private String toField;
+ private int joinBatchSize;
+ private int maxDepth;
+ private String zkHost;
+ private String collection;
+ private LinkedList<Tuple> shortestPaths = new LinkedList();
+ private boolean found;
+ private StreamContext streamContext;
+ private int threads;
+ private Map queryParams;
+
+ public ShortestPathStream(String zkHost,
+ String collection,
+ String fromNode,
+ String toNode,
+ String fromField,
+ String toField,
+ Map queryParams,
+ int joinBatchSize,
+ int threads,
+ int maxDepth) {
+
+ init(zkHost,
+ collection,
+ fromNode,
+ toNode,
+ fromField,
+ toField,
+ queryParams,
+ joinBatchSize,
+ threads,
+ maxDepth);
+ }
+
+ public ShortestPathStream(StreamExpression expression, StreamFactory factory) throws IOException {
+
+ String collectionName = factory.getValueOperand(expression, 0);
+ List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
+ StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
+
+ // Collection Name
+ if(null == collectionName) {
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression));
+ }
+
+ String fromNode = null;
+ StreamExpressionNamedParameter fromExpression = factory.getNamedOperand(expression, "from");
+
+ if(fromExpression == null) {
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - from param is required",expression));
+ } else {
+ fromNode = ((StreamExpressionValue)fromExpression.getParameter()).getValue();
+ }
+
+ String toNode = null;
+ StreamExpressionNamedParameter toExpression = factory.getNamedOperand(expression, "to");
+
+ if(toExpression == null) {
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - to param is required", expression));
+ } else {
+ toNode = ((StreamExpressionValue)toExpression.getParameter()).getValue();
+ }
+
+ String fromField = null;
+ String toField = null;
+
+ StreamExpressionNamedParameter edgeExpression = factory.getNamedOperand(expression, "edge");
+
+ if(edgeExpression == null) {
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - edge param is required", expression));
+ } else {
+ String edge = ((StreamExpressionValue)edgeExpression.getParameter()).getValue();
+ String[] fields = edge.split("=");
+ if(fields.length != 2) {
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - edge param separated by and = and must contain two fields", expression));
+ }
+ fromField = fields[0].trim();
+ toField = fields[1].trim();
+ }
+
+ int threads = 6;
+
+ StreamExpressionNamedParameter threadsExpression = factory.getNamedOperand(expression, "threads");
+
+ if(threadsExpression != null) {
+ threads = Integer.parseInt(((StreamExpressionValue)threadsExpression.getParameter()).getValue());
+ }
+
+ int partitionSize = 250;
+
+ StreamExpressionNamedParameter partitionExpression = factory.getNamedOperand(expression, "partitionSize");
+
+ if(partitionExpression != null) {
+ partitionSize = Integer.parseInt(((StreamExpressionValue)partitionExpression.getParameter()).getValue());
+ }
+
+ int maxDepth = 0;
+
+ StreamExpressionNamedParameter depthExpression = factory.getNamedOperand(expression, "maxDepth");
+
+ if(depthExpression == null) {
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - maxDepth param is required", expression));
+ } else {
+ maxDepth = Integer.parseInt(((StreamExpressionValue) depthExpression.getParameter()).getValue());
+ }
+
+ Map<String,String> params = new HashMap<String,String>();
+ for(StreamExpressionNamedParameter namedParam : namedParams){
+ if(!namedParam.getName().equals("zkHost") &&
+ !namedParam.getName().equals("to") &&
+ !namedParam.getName().equals("from") &&
+ !namedParam.getName().equals("edge") &&
+ !namedParam.getName().equals("maxDepth") &&
+ !namedParam.getName().equals("threads") &&
+ !namedParam.getName().equals("partitionSize"))
+ {
+ params.put(namedParam.getName(), namedParam.getParameter().toString().trim());
+ }
+ }
+
+ // zkHost, optional - if not provided then will look into factory list to get
+ String zkHost = null;
+ if(null == zkHostExpression){
+ zkHost = factory.getCollectionZkHost(collectionName);
+ if(zkHost == null) {
+ zkHost = factory.getDefaultZkHost();
+ }
+ } else if(zkHostExpression.getParameter() instanceof StreamExpressionValue) {
+ zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
+ }
+
+ if(null == zkHost){
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
+ }
+
+ // We've got all the required items
+ init(zkHost, collectionName, fromNode, toNode, fromField, toField, params, partitionSize, threads, maxDepth);
+ }
+
+ private void init(String zkHost,
+ String collection,
+ String fromNode,
+ String toNode,
+ String fromField,
+ String toField,
+ Map queryParams,
+ int joinBatchSize,
+ int threads,
+ int maxDepth) {
+ this.zkHost = zkHost;
+ this.collection = collection;
+ this.fromNode = fromNode;
+ this.toNode = toNode;
+ this.fromField = fromField;
+ this.toField = toField;
+ this.queryParams = queryParams;
+ this.joinBatchSize = joinBatchSize;
+ this.threads = threads;
+ this.maxDepth = maxDepth;
+ }
+
+ @Override
+ public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
+
+ StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+
+ // collection
+ expression.addParameter(collection);
+
+ Set<Map.Entry> entries = queryParams.entrySet();
+ // parameters
+ for(Map.Entry param : entries){
+ String value = param.getValue().toString();
+
+ // SOLR-8409: This is a special case where the params contain a " character
+ // Do note that in any other BASE streams with parameters where a " might come into play
+ // that this same replacement needs to take place.
+ value = value.replace("\"", "\\\"");
+
+ expression.addParameter(new StreamExpressionNamedParameter(param.getKey().toString(), value));
+ }
+
+ expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
+ expression.addParameter(new StreamExpressionNamedParameter("maxDepth", Integer.toString(maxDepth)));
+ expression.addParameter(new StreamExpressionNamedParameter("threads", Integer.toString(threads)));
+ expression.addParameter(new StreamExpressionNamedParameter("partitionSize", Integer.toString(joinBatchSize)));
+ expression.addParameter(new StreamExpressionNamedParameter("from", fromNode));
+ expression.addParameter(new StreamExpressionNamedParameter("to", toNode));
+ expression.addParameter(new StreamExpressionNamedParameter("edge", fromField+"="+toField));
+ return expression;
+ }
+
+ public void setStreamContext(StreamContext context) {
+ this.streamContext = context;
+ }
+
+ public List<TupleStream> children() {
+ List<TupleStream> l = new ArrayList();
+ return l;
+ }
+
+ public void open() throws IOException {
+
+ List<Map<String,List<String>>> allVisited = new ArrayList();
+ Map visited = new HashMap();
+ visited.put(this.fromNode, null);
+
+ allVisited.add(visited);
+ int depth = 0;
+ Map<String, List<String>> nextVisited = null;
+ List<Edge> targets = new ArrayList();
+ ExecutorService threadPool = null;
+
+ try {
+
+ threadPool = ExecutorUtil.newMDCAwareFixedThreadPool(threads, new SolrjNamedThreadFactory("ShortestPathStream"));
+
+ //Breadth first search
+ TRAVERSE:
+ while (targets.size() == 0 && depth < maxDepth) {
+ Set<String> nodes = visited.keySet();
+ Iterator<String> it = nodes.iterator();
+ nextVisited = new HashMap();
+ int batchCount = 0;
+ List<String> queryNodes = new ArrayList();
+ List<Future> futures = new ArrayList();
+ JOIN:
+ //Queue up all the batches
+ while (it.hasNext()) {
+ String node = it.next();
+ queryNodes.add(node);
+ ++batchCount;
+ if (batchCount == joinBatchSize || !it.hasNext()) {
+ try {
+ JoinRunner joinRunner = new JoinRunner(queryNodes);
+ Future<List<Edge>> future = threadPool.submit(joinRunner);
+ futures.add(future);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ batchCount = 0;
+ queryNodes = new ArrayList();
+ }
+ }
+
+ try {
+ //Process the batches as they become available
+ OUTER:
+ for (Future<List<Edge>> future : futures) {
+ List<Edge> edges = future.get();
+ INNER:
+ for (Edge edge : edges) {
+ if (toNode.equals(edge.to)) {
+ targets.add(edge);
+ if(nextVisited.containsKey(edge.to)) {
+ List<String> parents = nextVisited.get(edge.to);
+ parents.add(edge.from);
+ } else {
+ List<String> parents = new ArrayList();
+ parents.add(edge.from);
+ nextVisited.put(edge.to, parents);
+ }
+ } else {
+ if (!cycle(edge.to, allVisited)) {
+ if(nextVisited.containsKey(edge.to)) {
+ List<String> parents = nextVisited.get(edge.to);
+ parents.add(edge.from);
+ } else {
+ List<String> parents = new ArrayList();
+ parents.add(edge.from);
+ nextVisited.put(edge.to, parents);
+ }
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ allVisited.add(nextVisited);
+ visited = nextVisited;
+ ++depth;
+ }
+ } finally {
+ threadPool.shutdown();
+ }
+
+ Set<String> finalPaths = new HashSet();
+ if(targets.size() > 0) {
+ for(Edge edge : targets) {
+ List<LinkedList> paths = new ArrayList();
+ LinkedList<String> path = new LinkedList();
+ path.addFirst(edge.to);
+ paths.add(path);
+ //Walk back up the tree a collect the parent nodes.
+ INNER:
+ for (int i = allVisited.size() - 1; i >= 0; --i) {
+ Map<String, List<String>> v = allVisited.get(i);
+ Iterator<LinkedList> it = paths.iterator();
+ List newPaths = new ArrayList();
+ while(it.hasNext()) {
+ LinkedList p = it.next();
+ List<String> parents = v.get(p.peekFirst());
+ if (parents != null) {
+ for(String parent : parents) {
+ LinkedList newPath = new LinkedList();
+ newPath.addAll(p);
+ newPath.addFirst(parent);
+ newPaths.add(newPath);
+ }
+ paths = newPaths;
+ }
+ }
+ }
+
+ for(LinkedList p : paths) {
+ String s = p.toString();
+ if (!finalPaths.contains(s)){
+ Tuple shortestPath = new Tuple(new HashMap());
+ shortestPath.put("path", p);
+ shortestPaths.add(shortestPath);
+ finalPaths.add(s);
+ }
+ }
+ }
+ }
+ }
+
+ private class JoinRunner implements Callable<List<Edge>> {
+
+ private List<String> nodes;
+ private List<Edge> edges = new ArrayList();
+
+ public JoinRunner(List<String> nodes) {
+ this.nodes = nodes;
+ }
+
+ public List<Edge> call() {
+
+ Map joinParams = new HashMap();
+ String fl = fromField + "," + toField;
+
+ joinParams.putAll(queryParams);
+ joinParams.put("fl", fl);
+ joinParams.put("qt", "/export");
+ joinParams.put("sort", toField + " asc,"+fromField +" asc");
+
+ StringBuffer nodeQuery = new StringBuffer();
+
+ for(String node : nodes) {
+ nodeQuery.append(node).append(" ");
+ }
+
+ String q = fromField + ":(" + nodeQuery.toString().trim() + ")";
+
+ joinParams.put("q", q);
+ TupleStream stream = null;
+ try {
+ stream = new UniqueStream(new CloudSolrStream(zkHost, collection, joinParams), new MultipleFieldEqualitor(new FieldEqualitor(toField), new FieldEqualitor(fromField)));
+ stream.setStreamContext(streamContext);
+ stream.open();
+ BATCH:
+ while (true) {
+ Tuple tuple = stream.read();
+ if (tuple.EOF) {
+ break BATCH;
+ }
+ String _toNode = tuple.getString(toField);
+ String _fromNode = tuple.getString(fromField);
+ Edge edge = new Edge(_fromNode, _toNode);
+ edges.add(edge);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch(Exception ce) {
+ throw new RuntimeException(ce);
+ }
+ }
+ return edges;
+ }
+ }
+
+ private class Edge {
+
+ private String from;
+ private String to;
+
+ public Edge(String from, String to) {
+ this.from = from;
+ this.to = to;
+ }
+ }
+
+ private boolean cycle(String node, List<Map<String,List<String>>> allVisited) {
+ //Check all visited trees for each level to see if we've encountered this node before.
+ for(Map<String, List<String>> visited : allVisited) {
+ if(visited.containsKey(node)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ public void close() throws IOException {
+ this.found = false;
+ }
+
+ public Tuple read() throws IOException {
+ if(shortestPaths.size() > 0) {
+ found = true;
+ Tuple t = shortestPaths.removeFirst();
+ return t;
+ } else {
+ Map m = new HashMap();
+ m.put("EOF", true);
+ if(!found) {
+ m.put("sorry", "No path found");
+ }
+ return new Tuple(m);
+ }
+ }
+
+ public int getCost() {
+ return 0;
+ }
+
+ @Override
+ public StreamComparator getStreamSort() {
+ return null;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3500b45d/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/package-info.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/package-info.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/package-info.java
new file mode 100644
index 0000000..b34e0dd
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Streaming Graph Traversals
+ **/
+package org.apache.solr.client.solrj.io.graph;
+
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3500b45d/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java
new file mode 100644
index 0000000..db58a90
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java
@@ -0,0 +1,404 @@
+package org.apache.solr.client.solrj.io.graph;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.io.SolrClientCache;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.*;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
+import org.apache.solr.cloud.AbstractZkTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * All base tests will be done with CloudSolrStream. Under the covers CloudSolrStream uses SolrStream so
+ * SolrStream will get fully exercised through these tests.
+ *
+ **/
+
+@Slow
+@LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40","Lucene41","Lucene42","Lucene45"})
+public class GraphExpressionTest extends AbstractFullDistribZkTestBase {
+
+ private static final String SOLR_HOME = getFile("solrj" + File.separator + "solr").getAbsolutePath();
+
+ static {
+ schemaString = "schema-streaming.xml";
+ }
+
+ @BeforeClass
+ public static void beforeSuperClass() {
+ AbstractZkTestCase.SOLRHOME = new File(SOLR_HOME());
+ }
+
+ @AfterClass
+ public static void afterSuperClass() {
+
+ }
+
+ protected String getCloudSolrConfig() {
+ return "solrconfig-streaming.xml";
+ }
+
+
+ @Override
+ public String getSolrHome() {
+ return SOLR_HOME;
+ }
+
+ public static String SOLR_HOME() {
+ return SOLR_HOME;
+ }
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ // we expect this time of exception as shards go up and down...
+ //ignoreException(".*");
+
+ System.setProperty("numShards", Integer.toString(sliceCount));
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+ resetExceptionIgnores();
+ }
+
+ public GraphExpressionTest() {
+ super();
+ sliceCount = 2;
+ }
+
+ @Test
+ public void testAll() throws Exception{
+ assertNotNull(cloudClient);
+
+ handle.clear();
+ handle.put("timestamp", SKIPVAL);
+
+ waitForRecoveriesToFinish(false);
+
+ del("*:*");
+ commit();
+
+ testShortestPathStream();
+ }
+
+ private void testShortestPathStream() throws Exception {
+
+ indexr(id, "0", "from_s", "jim", "to_s", "mike", "predicate_s", "knows");
+ indexr(id, "1", "from_s", "jim", "to_s", "dave", "predicate_s", "knows");
+ indexr(id, "2", "from_s", "jim", "to_s", "stan", "predicate_s", "knows");
+ indexr(id, "3", "from_s", "dave", "to_s", "stan", "predicate_s", "knows");
+ indexr(id, "4", "from_s", "dave", "to_s", "bill", "predicate_s", "knows");
+ indexr(id, "5", "from_s", "dave", "to_s", "mike", "predicate_s", "knows");
+ indexr(id, "20", "from_s", "dave", "to_s", "alex", "predicate_s", "knows");
+ indexr(id, "21", "from_s", "alex", "to_s", "steve", "predicate_s", "knows");
+ indexr(id, "6", "from_s", "stan", "to_s", "alice", "predicate_s", "knows");
+ indexr(id, "7", "from_s", "stan", "to_s", "mary", "predicate_s", "knows");
+ indexr(id, "8", "from_s", "stan", "to_s", "dave", "predicate_s", "knows");
+ indexr(id, "10", "from_s", "mary", "to_s", "mike", "predicate_s", "knows");
+ indexr(id, "11", "from_s", "mary", "to_s", "max", "predicate_s", "knows");
+ indexr(id, "12", "from_s", "mary", "to_s", "jim", "predicate_s", "knows");
+ indexr(id, "13", "from_s", "mary", "to_s", "steve", "predicate_s", "knows");
+
+ commit();
+
+ List<Tuple> tuples = null;
+ Set<String> paths = null;
+ ShortestPathStream stream = null;
+ StreamContext context = new StreamContext();
+ SolrClientCache cache = new SolrClientCache();
+ context.setSolrClientCache(cache);
+
+ StreamFactory factory = new StreamFactory()
+ .withCollectionZkHost("collection1", zkServer.getZkAddress())
+ .withFunctionName("shortestPath", ShortestPathStream.class);
+
+ Map params = new HashMap();
+ params.put("fq", "predicate_s:knows");
+
+ stream = (ShortestPathStream)factory.constructStream("shortestPath(collection1, " +
+ "from=\"jim\", " +
+ "to=\"steve\"," +
+ "edge=\"from_s=to_s\"," +
+ "fq=\"predicate_s:knows\","+
+ "threads=\"3\","+
+ "partitionSize=\"3\","+
+ "maxDepth=\"6\")");
+
+ stream.setStreamContext(context);
+ paths = new HashSet();
+ tuples = getTuples(stream);
+
+ assertTrue(tuples.size() == 2);
+
+ for(Tuple tuple : tuples) {
+ paths.add(tuple.getStrings("path").toString());
+ }
+
+ assertTrue(paths.contains("[jim, dave, alex, steve]"));
+ assertTrue(paths.contains("[jim, stan, mary, steve]"));
+
+ //Test with batch size of 1
+
+ params.put("fq", "predicate_s:knows");
+
+ stream = (ShortestPathStream)factory.constructStream("shortestPath(collection1, " +
+ "from=\"jim\", " +
+ "to=\"steve\"," +
+ "edge=\"from_s=to_s\"," +
+ "fq=\"predicate_s:knows\","+
+ "threads=\"3\","+
+ "partitionSize=\"1\","+
+ "maxDepth=\"6\")");
+
+ stream.setStreamContext(context);
+ paths = new HashSet();
+ tuples = getTuples(stream);
+
+ assertTrue(tuples.size() == 2);
+
+ for(Tuple tuple : tuples) {
+ paths.add(tuple.getStrings("path").toString());
+ }
+
+ assertTrue(paths.contains("[jim, dave, alex, steve]"));
+ assertTrue(paths.contains("[jim, stan, mary, steve]"));
+
+ //Test with bad predicate
+
+
+ stream = (ShortestPathStream)factory.constructStream("shortestPath(collection1, " +
+ "from=\"jim\", " +
+ "to=\"steve\"," +
+ "edge=\"from_s=to_s\"," +
+ "fq=\"predicate_s:crap\","+
+ "threads=\"3\","+
+ "partitionSize=\"3\","+
+ "maxDepth=\"6\")");
+
+ stream.setStreamContext(context);
+ paths = new HashSet();
+ tuples = getTuples(stream);
+
+ assertTrue(tuples.size() == 0);
+
+ //Test with depth 2
+
+ stream = (ShortestPathStream)factory.constructStream("shortestPath(collection1, " +
+ "from=\"jim\", " +
+ "to=\"steve\"," +
+ "edge=\"from_s=to_s\"," +
+ "fq=\"predicate_s:knows\","+
+ "threads=\"3\","+
+ "partitionSize=\"3\","+
+ "maxDepth=\"2\")");
+
+
+ stream.setStreamContext(context);
+ tuples = getTuples(stream);
+
+ assertTrue(tuples.size() == 0);
+
+ //Take out alex
+ params.put("fq", "predicate_s:knows NOT to_s:alex");
+
+ stream = (ShortestPathStream)factory.constructStream("shortestPath(collection1, " +
+ "from=\"jim\", " +
+ "to=\"steve\"," +
+ "edge=\"from_s=to_s\"," +
+ "fq=\" predicate_s:knows NOT to_s:alex\","+
+ "threads=\"3\","+
+ "partitionSize=\"3\","+
+ "maxDepth=\"6\")");
+
+
+ stream.setStreamContext(context);
+ paths = new HashSet();
+ tuples = getTuples(stream);
+ assertTrue(tuples.size() == 1);
+
+ for(Tuple tuple : tuples) {
+ paths.add(tuple.getStrings("path").toString());
+ }
+
+ assertTrue(paths.contains("[jim, stan, mary, steve]"));
+
+ cache.close();
+ del("*:*");
+ commit();
+ }
+
+ protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
+ tupleStream.open();
+ List<Tuple> tuples = new ArrayList<Tuple>();
+ for(Tuple t = tupleStream.read(); !t.EOF; t = tupleStream.read()) {
+ tuples.add(t);
+ }
+ tupleStream.close();
+ return tuples;
+ }
+ protected boolean assertOrder(List<Tuple> tuples, int... ids) throws Exception {
+ return assertOrderOf(tuples, "id", ids);
+ }
+ protected boolean assertOrderOf(List<Tuple> tuples, String fieldName, int... ids) throws Exception {
+ int i = 0;
+ for(int val : ids) {
+ Tuple t = tuples.get(i);
+ Long tip = (Long)t.get(fieldName);
+ if(tip.intValue() != val) {
+ throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
+ }
+ ++i;
+ }
+ return true;
+ }
+
+ protected boolean assertMapOrder(List<Tuple> tuples, int... ids) throws Exception {
+ int i = 0;
+ for(int val : ids) {
+ Tuple t = tuples.get(i);
+ List<Map> tip = t.getMaps("group");
+ int id = (int)tip.get(0).get("id");
+ if(id != val) {
+ throw new Exception("Found value:"+id+" expecting:"+val);
+ }
+ ++i;
+ }
+ return true;
+ }
+
+
+ protected boolean assertFields(List<Tuple> tuples, String ... fields) throws Exception{
+ for(Tuple tuple : tuples){
+ for(String field : fields){
+ if(!tuple.fields.containsKey(field)){
+ throw new Exception(String.format(Locale.ROOT, "Expected field '%s' not found", field));
+ }
+ }
+ }
+ return true;
+ }
+ protected boolean assertNotFields(List<Tuple> tuples, String ... fields) throws Exception{
+ for(Tuple tuple : tuples){
+ for(String field : fields){
+ if(tuple.fields.containsKey(field)){
+ throw new Exception(String.format(Locale.ROOT, "Unexpected field '%s' found", field));
+ }
+ }
+ }
+ return true;
+ }
+
+ protected boolean assertGroupOrder(Tuple tuple, int... ids) throws Exception {
+ List<?> group = (List<?>)tuple.get("tuples");
+ int i=0;
+ for(int val : ids) {
+ Map<?,?> t = (Map<?,?>)group.get(i);
+ Long tip = (Long)t.get("id");
+ if(tip.intValue() != val) {
+ throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
+ }
+ ++i;
+ }
+ return true;
+ }
+
+ public boolean assertLong(Tuple tuple, String fieldName, long l) throws Exception {
+ long lv = (long)tuple.get(fieldName);
+ if(lv != l) {
+ throw new Exception("Longs not equal:"+l+" : "+lv);
+ }
+
+ return true;
+ }
+
+ public boolean assertString(Tuple tuple, String fieldName, String expected) throws Exception {
+ String actual = (String)tuple.get(fieldName);
+
+ if( (null == expected && null != actual) ||
+ (null != expected && null == actual) ||
+ (null != expected && !expected.equals(actual))){
+ throw new Exception("Longs not equal:"+expected+" : "+actual);
+ }
+
+ return true;
+ }
+
+ protected boolean assertMaps(List<Map> maps, int... ids) throws Exception {
+ if(maps.size() != ids.length) {
+ throw new Exception("Expected id count != actual map count:"+ids.length+":"+maps.size());
+ }
+
+ int i=0;
+ for(int val : ids) {
+ Map t = maps.get(i);
+ Long tip = (Long)t.get("id");
+ if(tip.intValue() != val) {
+ throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
+ }
+ ++i;
+ }
+ return true;
+ }
+
+ private boolean assertList(List list, Object... vals) throws Exception {
+
+ if(list.size() != vals.length) {
+ throw new Exception("Lists are not the same size:"+list.size() +" : "+vals.length);
+ }
+
+ for(int i=0; i<list.size(); i++) {
+ Object a = list.get(i);
+ Object b = vals[i];
+ if(!a.equals(b)) {
+ throw new Exception("List items not equals:"+a+" : "+b);
+ }
+ }
+
+ return true;
+ }
+
+
+ @Override
+ protected void indexr(Object... fields) throws Exception {
+ SolrInputDocument doc = getDoc(fields);
+ indexDoc(doc);
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3500b45d/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphTest.java
new file mode 100644
index 0000000..77f04e7
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphTest.java
@@ -0,0 +1,387 @@
+package org.apache.solr.client.solrj.io.graph;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.solr.client.solrj.io.SolrClientCache;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.StreamContext;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
+import org.apache.solr.cloud.AbstractZkTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Set;
+import java.util.HashSet;
+/**
+ * All base tests will be done with CloudSolrStream. Under the covers CloudSolrStream uses SolrStream so
+ * SolrStream will get fully exercised through these tests.
+ *
+ **/
+
+@LuceneTestCase.Slow
+@LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40","Lucene41","Lucene42","Lucene45"})
+public class GraphTest extends AbstractFullDistribZkTestBase {
+
+ private static final String SOLR_HOME = getFile("solrj" + File.separator + "solr").getAbsolutePath();
+ private StreamFactory streamFactory;
+
+ static {
+ schemaString = "schema-streaming.xml";
+ }
+
+ @BeforeClass
+ public static void beforeSuperClass() {
+ AbstractZkTestCase.SOLRHOME = new File(SOLR_HOME());
+ }
+
+ @AfterClass
+ public static void afterSuperClass() {
+
+ }
+
+ protected String getCloudSolrConfig() {
+ return "solrconfig-streaming.xml";
+ }
+
+
+ @Override
+ public String getSolrHome() {
+ return SOLR_HOME;
+ }
+
+ public static String SOLR_HOME() {
+ return SOLR_HOME;
+ }
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ // we expect this time of exception as shards go up and down...
+ //ignoreException(".*");
+ //System.setProperty("export.test", "true");
+ System.setProperty("numShards", Integer.toString(sliceCount));
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+ resetExceptionIgnores();
+ }
+
+ public GraphTest() {
+ super();
+ sliceCount = 2;
+
+ }
+
+ private void testShortestPathStream() throws Exception {
+
+ indexr(id, "0", "from_s", "jim", "to_s", "mike", "predicate_s", "knows");
+ indexr(id, "1", "from_s", "jim", "to_s", "dave", "predicate_s", "knows");
+ indexr(id, "2", "from_s", "jim", "to_s", "stan", "predicate_s", "knows");
+ indexr(id, "3", "from_s", "dave", "to_s", "stan", "predicate_s", "knows");
+ indexr(id, "4", "from_s", "dave", "to_s", "bill", "predicate_s", "knows");
+ indexr(id, "5", "from_s", "dave", "to_s", "mike", "predicate_s", "knows");
+ indexr(id, "20", "from_s", "dave", "to_s", "alex", "predicate_s", "knows");
+ indexr(id, "21", "from_s", "alex", "to_s", "steve", "predicate_s", "knows");
+ indexr(id, "6", "from_s", "stan", "to_s", "alice", "predicate_s", "knows");
+ indexr(id, "7", "from_s", "stan", "to_s", "mary", "predicate_s", "knows");
+ indexr(id, "8", "from_s", "stan", "to_s", "dave", "predicate_s", "knows");
+ indexr(id, "10", "from_s", "mary", "to_s", "mike", "predicate_s", "knows");
+ indexr(id, "11", "from_s", "mary", "to_s", "max", "predicate_s", "knows");
+ indexr(id, "12", "from_s", "mary", "to_s", "jim", "predicate_s", "knows");
+ indexr(id, "13", "from_s", "mary", "to_s", "steve", "predicate_s", "knows");
+
+ commit();
+
+ List<Tuple> tuples = null;
+ Set<String> paths = null;
+ ShortestPathStream stream = null;
+ String zkHost = zkServer.getZkAddress();
+ StreamContext context = new StreamContext();
+ SolrClientCache cache = new SolrClientCache();
+ context.setSolrClientCache(cache);
+
+ Map params = new HashMap();
+ params.put("fq", "predicate_s:knows");
+
+ stream = new ShortestPathStream(zkHost,
+ "collection1",
+ "jim",
+ "steve",
+ "from_s",
+ "to_s",
+ params,
+ 20,
+ 3,
+ 6);
+
+
+
+ stream.setStreamContext(context);
+ paths = new HashSet();
+ tuples = getTuples(stream);
+
+ assertTrue(tuples.size() == 2);
+
+ for(Tuple tuple : tuples) {
+ paths.add(tuple.getStrings("path").toString());
+ }
+
+ assertTrue(paths.contains("[jim, dave, alex, steve]"));
+ assertTrue(paths.contains("[jim, stan, mary, steve]"));
+
+ //Test with batch size of 1
+
+ params.put("fq", "predicate_s:knows");
+
+ stream = new ShortestPathStream(zkHost,
+ "collection1",
+ "jim",
+ "steve",
+ "from_s",
+ "to_s",
+ params,
+ 1,
+ 3,
+ 6);
+
+ stream.setStreamContext(context);
+ paths = new HashSet();
+ tuples = getTuples(stream);
+
+ assertTrue(tuples.size() == 2);
+
+ for(Tuple tuple : tuples) {
+ paths.add(tuple.getStrings("path").toString());
+ }
+
+ assertTrue(paths.contains("[jim, dave, alex, steve]"));
+ assertTrue(paths.contains("[jim, stan, mary, steve]"));
+
+ //Test with bad predicate
+
+ params.put("fq", "predicate_s:crap");
+
+ stream = new ShortestPathStream(zkHost,
+ "collection1",
+ "jim",
+ "steve",
+ "from_s",
+ "to_s",
+ params,
+ 1,
+ 3,
+ 6);
+
+ stream.setStreamContext(context);
+ paths = new HashSet();
+ tuples = getTuples(stream);
+
+ assertTrue(tuples.size() == 0);
+
+ //Test with depth 2
+
+ params.put("fq", "predicate_s:knows");
+
+ stream = new ShortestPathStream(zkHost,
+ "collection1",
+ "jim",
+ "steve",
+ "from_s",
+ "to_s",
+ params,
+ 1,
+ 3,
+ 2);
+
+ stream.setStreamContext(context);
+ paths = new HashSet();
+ tuples = getTuples(stream);
+
+ assertTrue(tuples.size() == 0);
+
+
+
+ //Take out alex
+ params.put("fq", "predicate_s:knows NOT to_s:alex");
+
+ stream = new ShortestPathStream(zkHost,
+ "collection1",
+ "jim",
+ "steve",
+ "from_s",
+ "to_s",
+ params,
+ 10,
+ 3,
+ 6);
+
+ stream.setStreamContext(context);
+ paths = new HashSet();
+ tuples = getTuples(stream);
+ assertTrue(tuples.size() == 1);
+
+ for(Tuple tuple : tuples) {
+ paths.add(tuple.getStrings("path").toString());
+ }
+
+ assertTrue(paths.contains("[jim, stan, mary, steve]"));
+
+ cache.close();
+ del("*:*");
+ commit();
+ }
+
+ @Test
+ public void streamTests() throws Exception {
+ assertNotNull(cloudClient);
+
+ handle.clear();
+ handle.put("timestamp", SKIPVAL);
+
+ waitForRecoveriesToFinish(false);
+
+ del("*:*");
+
+ commit();
+
+ testShortestPathStream();
+
+ }
+
+ protected Map mapParams(String... vals) {
+ Map params = new HashMap();
+ String k = null;
+ for(String val : vals) {
+ if(k == null) {
+ k = val;
+ } else {
+ params.put(k, val);
+ k = null;
+ }
+ }
+
+ return params;
+ }
+
+ protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
+ tupleStream.open();
+ List<Tuple> tuples = new ArrayList();
+ for(;;) {
+ Tuple t = tupleStream.read();
+ if(t.EOF) {
+ break;
+ } else {
+ tuples.add(t);
+ }
+ }
+ tupleStream.close();
+ return tuples;
+ }
+
+ protected Tuple getTuple(TupleStream tupleStream) throws IOException {
+ tupleStream.open();
+ Tuple t = tupleStream.read();
+ tupleStream.close();
+ return t;
+ }
+
+
+ protected boolean assertOrder(List<Tuple> tuples, int... ids) throws Exception {
+ int i = 0;
+ for(int val : ids) {
+ Tuple t = tuples.get(i);
+ Long tip = (Long)t.get("id");
+ if(tip.intValue() != val) {
+ throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
+ }
+ ++i;
+ }
+ return true;
+ }
+
+ protected boolean assertGroupOrder(Tuple tuple, int... ids) throws Exception {
+ List group = (List)tuple.get("tuples");
+ int i=0;
+ for(int val : ids) {
+ Map t = (Map)group.get(i);
+ Long tip = (Long)t.get("id");
+ if(tip.intValue() != val) {
+ throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
+ }
+ ++i;
+ }
+ return true;
+ }
+
+ protected boolean assertMaps(List<Map> maps, int... ids) throws Exception {
+ if(maps.size() != ids.length) {
+ throw new Exception("Expected id count != actual map count:"+ids.length+":"+maps.size());
+ }
+
+ int i=0;
+ for(int val : ids) {
+ Map t = maps.get(i);
+ Long tip = (Long)t.get("id");
+ if(tip.intValue() != val) {
+ throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
+ }
+ ++i;
+ }
+ return true;
+ }
+
+ public boolean assertLong(Tuple tuple, String fieldName, long l) throws Exception {
+ long lv = (long)tuple.get(fieldName);
+ if(lv != l) {
+ throw new Exception("Longs not equal:"+l+" : "+lv);
+ }
+
+ return true;
+ }
+
+ @Override
+ protected void indexr(Object... fields) throws Exception {
+ SolrInputDocument doc = getDoc(fields);
+ indexDoc(doc);
+ }
+
+ private void attachStreamFactory(TupleStream tupleStream) {
+ StreamContext streamContext = new StreamContext();
+ streamContext.setStreamFactory(streamFactory);
+ tupleStream.setStreamContext(streamContext);
+ }
+}
+