You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2021/03/01 15:32:31 UTC
[lucene-solr] branch branch_8x updated: SOLR-15132: Add temporal
graph query to the nodes Streaming Expression
This is an automated email from the ASF dual-hosted git repository.
jbernste pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/branch_8x by this push:
new aadaad4 SOLR-15132: Add temporal graph query to the nodes Streaming Expression
aadaad4 is described below
commit aadaad4a0909a1fc66746d56d86853417ff395ef
Author: Joel Bernstein <jb...@apache.org>
AuthorDate: Fri Feb 12 15:18:54 2021 -0500
SOLR-15132: Add temporal graph query to the nodes Streaming Expression
---
.../client/solrj/io/graph/GatherNodesStream.java | 108 ++++++++++++++++++---
.../apache/solr/client/solrj/io/graph/Node.java | 2 +-
.../client/solrj/io/graph/GraphExpressionTest.java | 72 ++++++++++----
3 files changed, 147 insertions(+), 35 deletions(-)
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java
index bd1c5c6..3f65bd8 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java
@@ -18,14 +18,12 @@
package org.apache.solr.client.solrj.io.graph;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
+import java.lang.invoke.MethodHandles;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -51,6 +49,8 @@ import org.apache.solr.client.solrj.io.stream.metrics.Metric;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.solr.common.params.CommonParams.SORT;
@@ -75,6 +75,11 @@ public class GatherNodesStream extends TupleStream implements Expressible {
private Traversal traversal;
private List<Metric> metrics;
private int maxDocFreq;
+ private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssX", Locale.ENGLISH);
+ private Set<String> windowSet;
+ private int window = Integer.MIN_VALUE;
+ private int lag = 1;
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public GatherNodesStream(String zkHost,
String collection,
@@ -98,7 +103,9 @@ public class GatherNodesStream extends TupleStream implements Expressible {
metrics,
trackTraversal,
scatter,
- maxDocFreq);
+ maxDocFreq,
+ Integer.MIN_VALUE,
+ 1);
}
public GatherNodesStream(StreamExpression expression, StreamFactory factory) throws IOException {
@@ -196,6 +203,20 @@ public class GatherNodesStream extends TupleStream implements Expressible {
useDefaultTraversal = true;
}
+ StreamExpressionNamedParameter windowExpression = factory.getNamedOperand(expression, "window");
+ int timeWindow = Integer.MIN_VALUE;
+
+ if(windowExpression != null) {
+ timeWindow = Integer.parseInt(((StreamExpressionValue) windowExpression.getParameter()).getValue());
+ }
+
+ StreamExpressionNamedParameter lagExpression = factory.getNamedOperand(expression, "lag");
+ int timeLag = 1;
+
+ if(lagExpression != null) {
+ timeLag = Integer.parseInt(((StreamExpressionValue) lagExpression.getParameter()).getValue());
+ }
+
StreamExpressionNamedParameter docFreqExpression = factory.getNamedOperand(expression, "maxDocFreq");
int docFreq = -1;
@@ -210,7 +231,10 @@ public class GatherNodesStream extends TupleStream implements Expressible {
!namedParam.getName().equals("walk") &&
!namedParam.getName().equals("scatter") &&
!namedParam.getName().equals("maxDocFreq") &&
- !namedParam.getName().equals("trackTraversal"))
+ !namedParam.getName().equals("trackTraversal") &&
+ !namedParam.getName().equals("window") &&
+ !namedParam.getName().equals("lag")
+ )
{
params.put(namedParam.getName(), namedParam.getParameter().toString().trim());
}
@@ -242,7 +266,9 @@ public class GatherNodesStream extends TupleStream implements Expressible {
metrics,
trackTraversal,
scatter,
- docFreq);
+ docFreq,
+ timeWindow,
+ timeLag);
}
@SuppressWarnings({"unchecked"})
@@ -256,7 +282,9 @@ public class GatherNodesStream extends TupleStream implements Expressible {
List<Metric> metrics,
boolean trackTraversal,
Set<Traversal.Scatter> scatter,
- int maxDocFreq) {
+ int maxDocFreq,
+ int window,
+ int lag) {
this.zkHost = zkHost;
this.collection = collection;
this.tupleStream = tupleStream;
@@ -268,6 +296,13 @@ public class GatherNodesStream extends TupleStream implements Expressible {
this.trackTraversal = trackTraversal;
this.scatter = scatter;
this.maxDocFreq = maxDocFreq;
+ this.window = window;
+
+ if(window > Integer.MIN_VALUE) {
+ windowSet = new HashSet<>();
+ }
+
+ this.lag = lag;
}
@Override
@@ -506,6 +541,26 @@ public class GatherNodesStream extends TupleStream implements Expressible {
}
+ private String[] getTenSecondWindow(int size, int lag, String start) {
+ try {
+ String[] window = new String[size];
+ Date date = this.dateFormat.parse(start);
+ Instant instant = date.toInstant();
+
+ for (int i = 0; i < size; i++) {
+ Instant windowInstant = instant.minus(10 * (i + lag), ChronoUnit.SECONDS);
+ String windowString = windowInstant.toString();
+ windowString = windowString.substring(0, 18) + "0Z";
+ window[i] = windowString;
+ }
+
+ return window;
+ } catch(ParseException e) {
+ log.warn("Unparseable date:{}", String.valueOf(start));
+ return new String[0];
+ }
+ }
+
public void close() throws IOException {
tupleStream.close();
}
@@ -558,8 +613,33 @@ public class GatherNodesStream extends TupleStream implements Expressible {
}
}
- joinBatch.add(value);
- if (joinBatch.size() == 400) {
+ if(windowSet == null || (lag == 1 && !windowSet.contains(String.valueOf(value)))) {
+ joinBatch.add(value);
+ }
+
+ if(window > Integer.MIN_VALUE && value != null) {
+ windowSet.add(value);
+
+ /*
+ * A time window has been set.
+ * The join value is expected to be an ISO formatted time stamp.
+ * We derive the window and add it to the join values below.
+ */
+
+ String[] timeWindow = getTenSecondWindow(window, lag, value);
+ for(String windowString : timeWindow) {
+ if(!windowSet.contains(windowString)) {
+ /*
+ * Time windows can overlap, so make sure we don't query for the same timestamp more then once.
+ * This would cause duplicate results if overlapping windows are collected in different threads.
+ */
+ joinBatch.add(windowString);
+ }
+ windowSet.add(windowString);
+ }
+ }
+
+ if (joinBatch.size() >= 400) {
JoinRunner joinRunner = new JoinRunner(joinBatch);
@SuppressWarnings({"rawtypes"})
Future future = threadPool.submit(joinRunner);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/Node.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/Node.java
index 6b551d7..8e794bc 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/Node.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/Node.java
@@ -30,7 +30,7 @@ public class Node {
public Node(String id, boolean track) {
this.id=id;
if(track) {
- ancestors = new HashSet<>();
+ ancestors = new TreeSet<>();
}
}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java
index f5f757d..0a57774 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java
@@ -39,13 +39,7 @@ import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
-import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
-import org.apache.solr.client.solrj.io.stream.FacetStream;
-import org.apache.solr.client.solrj.io.stream.HashJoinStream;
-import org.apache.solr.client.solrj.io.stream.ScoreNodesStream;
-import org.apache.solr.client.solrj.io.stream.SortStream;
-import org.apache.solr.client.solrj.io.stream.StreamContext;
-import org.apache.solr.client.solrj.io.stream.TupleStream;
+import org.apache.solr.client.solrj.io.stream.*;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
@@ -250,18 +244,18 @@ public class GraphExpressionTest extends SolrCloudTestCase {
public void testGatherNodesStream() throws Exception {
new UpdateRequest()
- .add(id, "0", "basket_s", "basket1", "product_s", "product1", "price_f", "20")
- .add(id, "1", "basket_s", "basket1", "product_s", "product3", "price_f", "30")
- .add(id, "2", "basket_s", "basket1", "product_s", "product5", "price_f", "1")
- .add(id, "3", "basket_s", "basket2", "product_s", "product1", "price_f", "2")
- .add(id, "4", "basket_s", "basket2", "product_s", "product6", "price_f", "5")
- .add(id, "5", "basket_s", "basket2", "product_s", "product7", "price_f", "10")
- .add(id, "6", "basket_s", "basket3", "product_s", "product4", "price_f", "20")
- .add(id, "7", "basket_s", "basket3", "product_s", "product3", "price_f", "10")
- .add(id, "8", "basket_s", "basket3", "product_s", "product1", "price_f", "10")
- .add(id, "9", "basket_s", "basket4", "product_s", "product4", "price_f", "40")
- .add(id, "10", "basket_s", "basket4", "product_s", "product3", "price_f", "10")
- .add(id, "11", "basket_s", "basket4", "product_s", "product1", "price_f", "10")
+ .add(id, "0", "basket_s", "basket1", "product_s", "product1", "price_f", "20", "time_ten_seconds_s", "2020-09-24T18:23:50Z")
+ .add(id, "1", "basket_s", "basket1", "product_s", "product3", "price_f", "30", "time_ten_seconds_s", "2020-09-24T18:23:40Z")
+ .add(id, "2", "basket_s", "basket1", "product_s", "product5", "price_f", "1", "time_ten_seconds_s", "2020-09-24T18:23:30Z")
+ .add(id, "3", "basket_s", "basket2", "product_s", "product1", "price_f", "2", "time_ten_seconds_s", "2020-09-24T18:23:20Z")
+ .add(id, "4", "basket_s", "basket2", "product_s", "product6", "price_f", "5", "time_ten_seconds_s", "2020-09-24T18:23:10Z")
+ .add(id, "5", "basket_s", "basket2", "product_s", "product7", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:23:00Z")
+ .add(id, "6", "basket_s", "basket3", "product_s", "product4", "price_f", "20", "time_ten_seconds_s", "2020-09-24T18:22:50Z")
+ .add(id, "7", "basket_s", "basket3", "product_s", "product3", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:22:40Z")
+ .add(id, "8", "basket_s", "basket3", "product_s", "product1", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:22:30Z")
+ .add(id, "9", "basket_s", "basket4", "product_s", "product4", "price_f", "40", "time_ten_seconds_s", "2020-09-24T18:22:20Z")
+ .add(id, "10", "basket_s", "basket4", "product_s", "product3", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:22:10Z")
+ .add(id, "11", "basket_s", "basket4", "product_s", "product1", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:22:00Z")
.commit(cluster.getSolrClient(), COLLECTION);
List<Tuple> tuples = null;
@@ -276,10 +270,12 @@ public class GraphExpressionTest extends SolrCloudTestCase {
.withFunctionName("gatherNodes", GatherNodesStream.class)
.withFunctionName("nodes", GatherNodesStream.class)
.withFunctionName("search", CloudSolrStream.class)
+ .withFunctionName("random", RandomStream.class)
.withFunctionName("count", CountMetric.class)
.withFunctionName("avg", MeanMetric.class)
.withFunctionName("sum", SumMetric.class)
.withFunctionName("min", MinMetric.class)
+ .withFunctionName("sort", SortStream.class)
.withFunctionName("max", MaxMetric.class);
String expr = "nodes(collection1, " +
@@ -293,6 +289,7 @@ public class GraphExpressionTest extends SolrCloudTestCase {
Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
assertTrue(tuples.size() == 4);
+
assertTrue(tuples.get(0).getString("node").equals("basket1"));
assertTrue(tuples.get(1).getString("node").equals("basket2"));
assertTrue(tuples.get(2).getString("node").equals("basket3"));
@@ -388,8 +385,43 @@ public class GraphExpressionTest extends SolrCloudTestCase {
assertTrue(tuples.get(0).getString("node").equals("basket2"));
assertTrue(tuples.get(1).getString("node").equals("basket3"));
- cache.close();
+ //Test the window without lag
+
+ expr = "nodes(collection1, random(collection1, q=\"id:(1 2)\", fl=\"time_ten_seconds_s\"), walk=\"time_ten_seconds_s->time_ten_seconds_s\", gather=\"id\", window=\"3\")";
+
+ stream = (GatherNodesStream)factory.constructStream(expr);
+
+ context = new StreamContext();
+ context.setSolrClientCache(cache);
+ stream.setStreamContext(context);
+ tuples = getTuples(stream);
+
+ Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
+ assertTrue(tuples.size() == 5);
+ assertTrue(tuples.get(0).getString("node").equals("1"));
+ assertTrue(tuples.get(1).getString("node").equals("2"));
+ assertTrue(tuples.get(2).getString("node").equals("3"));
+ assertTrue(tuples.get(3).getString("node").equals("4"));
+ assertTrue(tuples.get(4).getString("node").equals("5"));
+
+
+ //Test window with lag
+
+ expr = "nodes(collection1, random(collection1, q=\"id:(1)\", fl=\"time_ten_seconds_s\"), walk=\"time_ten_seconds_s->time_ten_seconds_s\", gather=\"id\", window=\"2\", lag=\"2\")";
+
+ stream = (GatherNodesStream)factory.constructStream(expr);
+
+ context = new StreamContext();
+ context.setSolrClientCache(cache);
+ stream.setStreamContext(context);
+ tuples = getTuples(stream);
+
+ Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
+ assertTrue(tuples.size() == 2);
+ assertTrue(tuples.get(0).getString("node").equals("3"));
+ assertTrue(tuples.get(1).getString("node").equals("4"));
+ cache.close();
}