You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by jb...@apache.org on 2021/06/03 13:57:25 UTC
[solr] branch main updated: SOLR-15197: Support temporal graph
queries with daily windows
This is an automated email from the ASF dual-hosted git repository.
jbernste pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 4c76bcc SOLR-15197: Support temporal graph queries with daily windows
4c76bcc is described below
commit 4c76bcc65017a1ff20dd64eedd63d4976a38a554
Author: Joel Bernstein <jo...@Joel-Bernsteins-MacBook-Pro-2.local>
AuthorDate: Thu Jun 3 09:48:19 2021 -0400
SOLR-15197: Support temporal graph queries with daily windows
---
.../client/solrj/io/graph/GatherNodesStream.java | 63 ++++++++++++++++---
.../client/solrj/io/graph/GraphExpressionTest.java | 70 +++++++++++++++++-----
2 files changed, 109 insertions(+), 24 deletions(-)
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java
index 77a01de..e573235 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java
@@ -78,7 +78,11 @@ public class GatherNodesStream extends TupleStream implements Expressible {
private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssX", Locale.ENGLISH);
private Set<String> windowSet;
private int window = Integer.MIN_VALUE;
- private int lag = 1;
+ private int lag = 0;
+ private static final int TEN_SECOND_INTERVAL = 0;
+ private static final int DAY_INTERVAL = 1;
+ private int interval = TEN_SECOND_INTERVAL;
+
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public GatherNodesStream(String zkHost,
@@ -105,7 +109,8 @@ public class GatherNodesStream extends TupleStream implements Expressible {
scatter,
maxDocFreq,
Integer.MIN_VALUE,
- 1);
+ 1,
+ TEN_SECOND_INTERVAL);
}
public GatherNodesStream(StreamExpression expression, StreamFactory factory) throws IOException {
@@ -205,13 +210,21 @@ public class GatherNodesStream extends TupleStream implements Expressible {
StreamExpressionNamedParameter windowExpression = factory.getNamedOperand(expression, "window");
int timeWindow = Integer.MIN_VALUE;
+ int intervalParam = -1;
if(windowExpression != null) {
- timeWindow = Integer.parseInt(((StreamExpressionValue) windowExpression.getParameter()).getValue());
+ String windowValue = ((StreamExpressionValue) windowExpression.getParameter()).getValue();
+ if(windowValue.contains("DAY")) {
+ intervalParam = DAY_INTERVAL;
+ timeWindow = Integer.parseInt(windowValue.split("DAY")[0]);
+ } else {
+ intervalParam = TEN_SECOND_INTERVAL;
+ timeWindow = Integer.parseInt(windowValue);
+ }
}
StreamExpressionNamedParameter lagExpression = factory.getNamedOperand(expression, "lag");
- int timeLag = 1;
+ int timeLag = 0;
if(lagExpression != null) {
timeLag = Integer.parseInt(((StreamExpressionValue) lagExpression.getParameter()).getValue());
@@ -268,7 +281,8 @@ public class GatherNodesStream extends TupleStream implements Expressible {
scatter,
docFreq,
timeWindow,
- timeLag);
+ timeLag,
+ intervalParam);
}
@SuppressWarnings({"unchecked"})
@@ -284,7 +298,8 @@ public class GatherNodesStream extends TupleStream implements Expressible {
Set<Traversal.Scatter> scatter,
int maxDocFreq,
int window,
- int lag) {
+ int lag,
+ int interval) {
this.zkHost = zkHost;
this.collection = collection;
this.tupleStream = tupleStream;
@@ -297,6 +312,7 @@ public class GatherNodesStream extends TupleStream implements Expressible {
this.scatter = scatter;
this.maxDocFreq = maxDocFreq;
this.window = window;
+ this.interval = interval;
if(window > Integer.MIN_VALUE) {
windowSet = new HashSet<>();
@@ -547,7 +563,7 @@ public class GatherNodesStream extends TupleStream implements Expressible {
Instant instant = date.toInstant();
for (int i = 0; i < size; i++) {
- Instant windowInstant = instant.minus(10 * (i + lag), ChronoUnit.SECONDS);
+ Instant windowInstant = instant.minus(10 * ((i+1) + lag), ChronoUnit.SECONDS);
String windowString = windowInstant.toString();
windowString = windowString.substring(0, 18) + "0Z";
window[i] = windowString;
@@ -560,6 +576,26 @@ public class GatherNodesStream extends TupleStream implements Expressible {
}
}
+ private String[] getDayWindow(int size, int lag, String start) {
+ try {
+ String[] window = new String[size];
+ Date date = this.dateFormat.parse(start);
+ Instant instant = date.toInstant();
+
+ for (int i = 0; i < size; i++) {
+ Instant windowInstant = instant.minus(1 * ((i+1) + lag), ChronoUnit.DAYS);
+ String windowString = windowInstant.toString();
+ windowString = windowString.substring(0, 10) + "T00:00:00Z";
+ window[i] = windowString;
+ }
+
+ return window;
+ } catch(ParseException e) {
+ log.warn("Unparseable date:{}", String.valueOf(start));
+ return new String[0];
+ }
+ }
+
public void close() throws IOException {
tupleStream.close();
}
@@ -612,7 +648,7 @@ public class GatherNodesStream extends TupleStream implements Expressible {
}
}
- if(windowSet == null || (lag == 1 && !windowSet.contains(String.valueOf(value)))) {
+ if(windowSet == null || (lag == 0 && !windowSet.contains(String.valueOf(value)))) {
joinBatch.add(value);
}
@@ -625,7 +661,16 @@ public class GatherNodesStream extends TupleStream implements Expressible {
* We derive the window and add it to the join values below.
*/
- String[] timeWindow = getTenSecondWindow(window, lag, value);
+ String[] timeWindow = null;
+
+ if(this.interval == DAY_INTERVAL) {
+ //Derive the daily window
+ timeWindow = getDayWindow(window, lag, value);
+ } else {
+ //Dervie the default ten second window
+ timeWindow = getTenSecondWindow(window, lag, value);
+ }
+
for(String windowString : timeWindow) {
if(!windowSet.contains(windowString)) {
/*
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java
index 055f2f5..afe74fe 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java
@@ -235,18 +235,18 @@ public class GraphExpressionTest extends SolrCloudTestCase {
public void testGatherNodesStream() throws Exception {
new UpdateRequest()
- .add(id, "0", "basket_s", "basket1", "product_s", "product1", "price_f", "20", "time_ten_seconds_s", "2020-09-24T18:23:50Z")
- .add(id, "1", "basket_s", "basket1", "product_s", "product3", "price_f", "30", "time_ten_seconds_s", "2020-09-24T18:23:40Z")
- .add(id, "2", "basket_s", "basket1", "product_s", "product5", "price_f", "1", "time_ten_seconds_s", "2020-09-24T18:23:30Z")
- .add(id, "3", "basket_s", "basket2", "product_s", "product1", "price_f", "2", "time_ten_seconds_s", "2020-09-24T18:23:20Z")
- .add(id, "4", "basket_s", "basket2", "product_s", "product6", "price_f", "5", "time_ten_seconds_s", "2020-09-24T18:23:10Z")
- .add(id, "5", "basket_s", "basket2", "product_s", "product7", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:23:00Z")
- .add(id, "6", "basket_s", "basket3", "product_s", "product4", "price_f", "20", "time_ten_seconds_s", "2020-09-24T18:22:50Z")
- .add(id, "7", "basket_s", "basket3", "product_s", "product3", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:22:40Z")
- .add(id, "8", "basket_s", "basket3", "product_s", "product1", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:22:30Z")
- .add(id, "9", "basket_s", "basket4", "product_s", "product4", "price_f", "40", "time_ten_seconds_s", "2020-09-24T18:22:20Z")
- .add(id, "10", "basket_s", "basket4", "product_s", "product3", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:22:10Z")
- .add(id, "11", "basket_s", "basket4", "product_s", "product1", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:22:00Z")
+ .add(id, "0", "basket_s", "basket1", "product_s", "product1", "price_f", "20", "time_ten_seconds_s", "2020-09-24T18:23:50Z", "day_s", "2020-09-24T00:00:00Z")
+ .add(id, "1", "basket_s", "basket1", "product_s", "product3", "price_f", "30", "time_ten_seconds_s", "2020-09-24T18:23:40Z", "day_s", "2020-09-23T00:00:00Z")
+ .add(id, "2", "basket_s", "basket1", "product_s", "product5", "price_f", "1", "time_ten_seconds_s", "2020-09-24T18:23:30Z", "day_s", "2020-09-22T00:00:00Z")
+ .add(id, "3", "basket_s", "basket2", "product_s", "product1", "price_f", "2", "time_ten_seconds_s", "2020-09-24T18:23:20Z", "day_s", "2020-09-21T00:00:00Z")
+ .add(id, "4", "basket_s", "basket2", "product_s", "product6", "price_f", "5", "time_ten_seconds_s", "2020-09-24T18:23:10Z","day_s", "2020-09-20T00:00:00Z")
+ .add(id, "5", "basket_s", "basket2", "product_s", "product7", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:23:00Z", "day_s", "2020-09-19T00:00:00Z")
+ .add(id, "6", "basket_s", "basket3", "product_s", "product4", "price_f", "20", "time_ten_seconds_s", "2020-09-24T18:22:50Z", "day_s", "2020-09-18T00:00:00Z")
+ .add(id, "7", "basket_s", "basket3", "product_s", "product3", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:22:40Z", "day_s", "2020-09-17T00:00:00Z")
+ .add(id, "8", "basket_s", "basket3", "product_s", "product1", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:22:30Z", "day_s", "2020-09-16T00:00:00Z")
+ .add(id, "9", "basket_s", "basket4", "product_s", "product4", "price_f", "40", "time_ten_seconds_s", "2020-09-24T18:22:20Z", "day_s", "2020-09-15T00:00:00Z")
+ .add(id, "10", "basket_s", "basket4", "product_s", "product3", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:22:10Z", "day_s", "2020-09-14T00:00:00Z")
+ .add(id, "11", "basket_s", "basket4", "product_s", "product1", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:22:00Z", "day_s", "2020-09-13T00:00:00Z")
.commit(cluster.getSolrClient(), COLLECTION);
List<Tuple> tuples = null;
@@ -409,9 +409,50 @@ public class GraphExpressionTest extends SolrCloudTestCase {
tuples = getTuples(stream);
Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
+
assertTrue(tuples.size() == 2);
- assertTrue(tuples.get(0).getString("node").equals("3"));
- assertTrue(tuples.get(1).getString("node").equals("4"));
+ assertTrue(tuples.get(0).getString("node").equals("4"));
+ assertTrue(tuples.get(1).getString("node").equals("5"));
+
+
+ // Test DAY window without lag
+
+
+ expr = "nodes(collection1, random(collection1, q=\"id:(1 2)\", fl=\"day_s\"), walk=\"day_s->day_s\", gather=\"id\", window=\"3DAYS\")";
+
+ stream = (GatherNodesStream)factory.constructStream(expr);
+
+ context = new StreamContext();
+ context.setSolrClientCache(cache);
+ stream.setStreamContext(context);
+ tuples = getTuples(stream);
+
+ Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
+ assertTrue(tuples.size() == 5);
+ assertTrue(tuples.get(0).getString("node").equals("1"));
+ assertTrue(tuples.get(1).getString("node").equals("2"));
+ assertTrue(tuples.get(2).getString("node").equals("3"));
+ assertTrue(tuples.get(3).getString("node").equals("4"));
+ assertTrue(tuples.get(4).getString("node").equals("5"));
+
+
+ // Test Day window with lag.
+
+ expr = "nodes(collection1, random(collection1, q=\"id:(1)\", fl=\"day_s\"), walk=\"day_s->day_s\", gather=\"id\", window=\"2DAYS\", lag=\"2\")";
+
+ stream = (GatherNodesStream)factory.constructStream(expr);
+
+ context = new StreamContext();
+ context.setSolrClientCache(cache);
+ stream.setStreamContext(context);
+ tuples = getTuples(stream);
+
+ Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
+ assertTrue(tuples.size() == 2);
+ assertTrue(tuples.get(0).getString("node").equals("4"));
+ assertTrue(tuples.get(1).getString("node").equals("5"));
+
+
cache.close();
}
@@ -678,7 +719,6 @@ public class GraphExpressionTest extends SolrCloudTestCase {
assertTrue(tuples.get(1).getString("node").equals("jim"));
assertTrue(tuples.get(1).getLong("level").equals(1L));
List<String> ancestors = tuples.get(1).getStrings("ancestors");
- System.out.println("##################### Ancestors:"+ancestors);
assert(ancestors.size() == 1);
assert(ancestors.get(0).equals("bill"));