You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by jb...@apache.org on 2021/06/09 14:20:18 UTC
[solr] branch main updated: SOLR-15197: Add WEEKDAY windows and
forward and backword looking windows.
This is an automated email from the ASF dual-hosted git repository.
jbernste pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 73963ca SOLR-15197: Add WEEKDAY windows and forward and backword looking windows.
73963ca is described below
commit 73963cafe452ed571b43a9d4bd9524d9b53d5421
Author: Joel Bernstein <jb...@apache.org>
AuthorDate: Wed Jun 9 10:19:35 2021 -0400
SOLR-15197: Add WEEKDAY windows and forward and backword looking windows.
---
.../client/solrj/io/graph/GatherNodesStream.java | 86 ++++++++++----
.../client/solrj/io/graph/GraphExpressionTest.java | 126 +++++++++++++++++++--
2 files changed, 179 insertions(+), 33 deletions(-)
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java
index e573235..08b2ee3 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java
@@ -21,7 +21,9 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.text.ParseException;
import java.text.SimpleDateFormat;
+import java.time.DayOfWeek;
import java.time.Instant;
+import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.Callable;
@@ -81,6 +83,7 @@ public class GatherNodesStream extends TupleStream implements Expressible {
private int lag = 0;
private static final int TEN_SECOND_INTERVAL = 0;
private static final int DAY_INTERVAL = 1;
+ private static final int WEEK_DAY_INTERVAL = 2;
private int interval = TEN_SECOND_INTERVAL;
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -214,7 +217,10 @@ public class GatherNodesStream extends TupleStream implements Expressible {
if(windowExpression != null) {
String windowValue = ((StreamExpressionValue) windowExpression.getParameter()).getValue();
- if(windowValue.contains("DAY")) {
+ if(windowValue.contains("WEEKDAY")) {
+ intervalParam = WEEK_DAY_INTERVAL;
+ timeWindow = Integer.parseInt(windowValue.split("WEEKDAY")[0]);
+ } else if (windowValue.contains("DAY")) {
intervalParam = DAY_INTERVAL;
timeWindow = Integer.parseInt(windowValue.split("DAY")[0]);
} else {
@@ -555,21 +561,23 @@ public class GatherNodesStream extends TupleStream implements Expressible {
}
}
-
private String[] getTenSecondWindow(int size, int lag, String start) {
try {
- String[] window = new String[size];
+ List<String> windowList = new ArrayList<>();
Date date = this.dateFormat.parse(start);
Instant instant = date.toInstant();
-
- for (int i = 0; i < size; i++) {
- Instant windowInstant = instant.minus(10 * ((i+1) + lag), ChronoUnit.SECONDS);
+ int collect = Math.abs(size)+lag;
+ int i = -1;
+ while (windowList.size() < collect) {
+ ++i;
+ Instant windowInstant = size > 0 ? instant.plus(10*i, ChronoUnit.SECONDS) : instant.minus(10*i, ChronoUnit.SECONDS);
String windowString = windowInstant.toString();
windowString = windowString.substring(0, 18) + "0Z";
- window[i] = windowString;
+ windowList.add(windowString);
}
- return window;
+ List<String> laggedWindow = windowList.subList(lag, windowList.size());
+ return laggedWindow.toArray(new String[laggedWindow.size()]);
} catch(ParseException e) {
log.warn("Unparseable date:{}", String.valueOf(start));
return new String[0];
@@ -578,18 +586,51 @@ public class GatherNodesStream extends TupleStream implements Expressible {
private String[] getDayWindow(int size, int lag, String start) {
try {
- String[] window = new String[size];
+ List<String> windowList = new ArrayList<>();
Date date = this.dateFormat.parse(start);
Instant instant = date.toInstant();
+ int collect = Math.abs(size)+lag;
+ int i = -1;
+ while (windowList.size() < collect) {
+ ++i;
+ Instant windowInstant = size > 0 ? instant.plus(i, ChronoUnit.DAYS) : instant.minus(i, ChronoUnit.DAYS);
+ String windowString = windowInstant.toString();
+ windowString = windowString.substring(0, 10) + "T00:00:00Z";
+ windowList.add(windowString);
+ }
+
+ List<String> laggedWindow = windowList.subList(lag, windowList.size());
+ return laggedWindow.toArray(new String[laggedWindow.size()]);
+ } catch(ParseException e) {
+ log.warn("Unparseable date:{}", String.valueOf(start));
+ return new String[0];
+ }
+ }
+
+ private String[] getWeekDayWindow(int size, int lag, String start) {
+ try {
+ List<String> windowList = new ArrayList<>();
+ Date date = this.dateFormat.parse(start);
+ Instant instant = date.toInstant();
+ int collect = Math.abs(size)+lag;
+ int i = -1;
+ while (windowList.size() < collect) {
+ ++i;
+ Instant windowInstant = size > 0 ? instant.plus(i, ChronoUnit.DAYS) : instant.minus(i, ChronoUnit.DAYS);
+ DayOfWeek dayOfWeek = windowInstant.atZone(ZoneId.of("UTC")).getDayOfWeek();
+
+ if(dayOfWeek.getValue() > 5) {
+ //Skip weekend
+ continue;
+ }
- for (int i = 0; i < size; i++) {
- Instant windowInstant = instant.minus(1 * ((i+1) + lag), ChronoUnit.DAYS);
String windowString = windowInstant.toString();
windowString = windowString.substring(0, 10) + "T00:00:00Z";
- window[i] = windowString;
+ windowList.add(windowString);
}
- return window;
+ List<String> laggedWindow = windowList.subList(lag, windowList.size());
+ return laggedWindow.toArray(new String[laggedWindow.size()]);
} catch(ParseException e) {
log.warn("Unparseable date:{}", String.valueOf(start));
return new String[0];
@@ -648,12 +689,11 @@ public class GatherNodesStream extends TupleStream implements Expressible {
}
}
- if(windowSet == null || (lag == 0 && !windowSet.contains(String.valueOf(value)))) {
+ if(windowSet == null) {
joinBatch.add(value);
}
if(window > Integer.MIN_VALUE && value != null) {
- windowSet.add(value);
/*
* A time window has been set.
@@ -663,12 +703,16 @@ public class GatherNodesStream extends TupleStream implements Expressible {
String[] timeWindow = null;
- if(this.interval == DAY_INTERVAL) {
- //Derive the daily window
- timeWindow = getDayWindow(window, lag, value);
- } else {
- //Dervie the default ten second window
- timeWindow = getTenSecondWindow(window, lag, value);
+ switch(this.interval) {
+ case WEEK_DAY_INTERVAL:
+ timeWindow = getWeekDayWindow(window, lag, value);
+ break;
+ case DAY_INTERVAL:
+ timeWindow = getDayWindow(window, lag, value);
+ break;
+ default:
+ timeWindow = getTenSecondWindow(window, lag, value);
+ break;
}
for(String windowString : timeWindow) {
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java
index afe74fe..d0b1e9e 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java
@@ -379,7 +379,7 @@ public class GraphExpressionTest extends SolrCloudTestCase {
//Test the window without lag
- expr = "nodes(collection1, random(collection1, q=\"id:(1 2)\", fl=\"time_ten_seconds_s\"), walk=\"time_ten_seconds_s->time_ten_seconds_s\", gather=\"id\", window=\"3\")";
+ expr = "nodes(collection1, random(collection1, q=\"id:(1 2)\", fl=\"time_ten_seconds_s\"), walk=\"time_ten_seconds_s->time_ten_seconds_s\", gather=\"id\", window=\"-3\")";
stream = (GatherNodesStream)factory.constructStream(expr);
@@ -389,17 +389,48 @@ public class GraphExpressionTest extends SolrCloudTestCase {
tuples = getTuples(stream);
Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
- assertTrue(tuples.size() == 5);
+ assertTrue(tuples.size() == 4);
assertTrue(tuples.get(0).getString("node").equals("1"));
assertTrue(tuples.get(1).getString("node").equals("2"));
assertTrue(tuples.get(2).getString("node").equals("3"));
assertTrue(tuples.get(3).getString("node").equals("4"));
- assertTrue(tuples.get(4).getString("node").equals("5"));
+
+
+ expr = "nodes(collection1, random(collection1, q=\"id:(6)\", fl=\"time_ten_seconds_s\"), walk=\"time_ten_seconds_s->time_ten_seconds_s\", gather=\"id\", window=\"3\")";
+
+ stream = (GatherNodesStream)factory.constructStream(expr);
+
+ context = new StreamContext();
+ context.setSolrClientCache(cache);
+ stream.setStreamContext(context);
+ tuples = getTuples(stream);
+
+ Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
+ assertTrue(tuples.size() == 3);
+ assertTrue(tuples.get(0).getString("node").equals("4"));
+ assertTrue(tuples.get(1).getString("node").equals("5"));
+ assertTrue(tuples.get(2).getString("node").equals("6"));
+
+
+ expr = "nodes(collection1, random(collection1, q=\"id:(6)\", fl=\"time_ten_seconds_s\"), walk=\"time_ten_seconds_s->time_ten_seconds_s\", gather=\"id\", window=\"3\", lag=\"1\")";
+
+ stream = (GatherNodesStream)factory.constructStream(expr);
+
+ context = new StreamContext();
+ context.setSolrClientCache(cache);
+ stream.setStreamContext(context);
+ tuples = getTuples(stream);
+
+ Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
+ assertTrue(tuples.size() == 3);
+ assertTrue(tuples.get(0).getString("node").equals("3"));
+ assertTrue(tuples.get(1).getString("node").equals("4"));
+ assertTrue(tuples.get(2).getString("node").equals("5"));
//Test window with lag
- expr = "nodes(collection1, random(collection1, q=\"id:(1)\", fl=\"time_ten_seconds_s\"), walk=\"time_ten_seconds_s->time_ten_seconds_s\", gather=\"id\", window=\"2\", lag=\"2\")";
+ expr = "nodes(collection1, random(collection1, q=\"id:(1)\", fl=\"time_ten_seconds_s\"), walk=\"time_ten_seconds_s->time_ten_seconds_s\", gather=\"id\", window=\"-2\", lag=\"2\")";
stream = (GatherNodesStream)factory.constructStream(expr);
@@ -411,14 +442,14 @@ public class GraphExpressionTest extends SolrCloudTestCase {
Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
assertTrue(tuples.size() == 2);
- assertTrue(tuples.get(0).getString("node").equals("4"));
- assertTrue(tuples.get(1).getString("node").equals("5"));
+ assertTrue(tuples.get(0).getString("node").equals("3"));
+ assertTrue(tuples.get(1).getString("node").equals("4"));
// Test DAY window without lag
- expr = "nodes(collection1, random(collection1, q=\"id:(1 2)\", fl=\"day_s\"), walk=\"day_s->day_s\", gather=\"id\", window=\"3DAYS\")";
+ expr = "nodes(collection1, random(collection1, q=\"id:(1 2)\", fl=\"day_s\"), walk=\"day_s->day_s\", gather=\"id\", window=\"-3DAYS\")";
stream = (GatherNodesStream)factory.constructStream(expr);
@@ -428,17 +459,16 @@ public class GraphExpressionTest extends SolrCloudTestCase {
tuples = getTuples(stream);
Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
- assertTrue(tuples.size() == 5);
+ assertTrue(tuples.size() == 4);
assertTrue(tuples.get(0).getString("node").equals("1"));
assertTrue(tuples.get(1).getString("node").equals("2"));
assertTrue(tuples.get(2).getString("node").equals("3"));
assertTrue(tuples.get(3).getString("node").equals("4"));
- assertTrue(tuples.get(4).getString("node").equals("5"));
// Test Day window with lag.
- expr = "nodes(collection1, random(collection1, q=\"id:(1)\", fl=\"day_s\"), walk=\"day_s->day_s\", gather=\"id\", window=\"2DAYS\", lag=\"2\")";
+ expr = "nodes(collection1, random(collection1, q=\"id:(1)\", fl=\"day_s\"), walk=\"day_s->day_s\", gather=\"id\", window=\"-2DAYS\", lag=\"2\")";
stream = (GatherNodesStream)factory.constructStream(expr);
@@ -449,9 +479,81 @@ public class GraphExpressionTest extends SolrCloudTestCase {
Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
assertTrue(tuples.size() == 2);
- assertTrue(tuples.get(0).getString("node").equals("4"));
- assertTrue(tuples.get(1).getString("node").equals("5"));
+ assertTrue(tuples.get(0).getString("node").equals("3"));
+ assertTrue(tuples.get(1).getString("node").equals("4"));
+
+
+ // Test Week Day
+
+ expr = "nodes(collection1, random(collection1, q=\"id:(3)\", fl=\"day_s\"), walk=\"day_s->day_s\", gather=\"id\", window=\"-2WEEKDAYS\")";
+
+ stream = (GatherNodesStream)factory.constructStream(expr);
+
+ context = new StreamContext();
+ context.setSolrClientCache(cache);
+ stream.setStreamContext(context);
+ tuples = getTuples(stream);
+
+ Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
+
+
+ assertTrue(tuples.size() == 2);
+ assertTrue(tuples.get(0).getString("node").equals("3"));
+ assertTrue(tuples.get(1).getString("node").equals("6"));
+
+
+ // Test Week Day with lag
+
+ expr = "nodes(collection1, random(collection1, q=\"id:(3)\", fl=\"day_s\"), walk=\"day_s->day_s\", gather=\"id\", window=\"-2WEEKDAYS\", lag=\"1\")";
+
+ stream = (GatherNodesStream)factory.constructStream(expr);
+
+ context = new StreamContext();
+ context.setSolrClientCache(cache);
+ stream.setStreamContext(context);
+ tuples = getTuples(stream);
+
+ Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
+
+ assertTrue(tuples.size() == 2);
+ assertTrue(tuples.get(0).getString("node").equals("6"));
+ assertTrue(tuples.get(1).getString("node").equals("7"));
+
+
+ // Test positive week day
+
+ expr = "nodes(collection1, random(collection1, q=\"id:(6)\", fl=\"day_s\"), walk=\"day_s->day_s\", gather=\"id\", window=\"2WEEKDAYS\")";
+
+ stream = (GatherNodesStream)factory.constructStream(expr);
+ context = new StreamContext();
+ context.setSolrClientCache(cache);
+ stream.setStreamContext(context);
+ tuples = getTuples(stream);
+
+ Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
+
+ assertTrue(tuples.size() == 2);
+ assertTrue(tuples.get(0).getString("node").equals("3"));
+ assertTrue(tuples.get(1).getString("node").equals("6"));
+
+
+ // Test positive Week Day with lag
+
+ expr = "nodes(collection1, random(collection1, q=\"id:(6)\", fl=\"day_s\"), walk=\"day_s->day_s\", gather=\"id\", window=\"2WEEKDAYS\", lag=\"1\")";
+
+ stream = (GatherNodesStream)factory.constructStream(expr);
+
+ context = new StreamContext();
+ context.setSolrClientCache(cache);
+ stream.setStreamContext(context);
+ tuples = getTuples(stream);
+
+ Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
+
+ assertTrue(tuples.size() == 2);
+ assertTrue(tuples.get(0).getString("node").equals("2"));
+ assertTrue(tuples.get(1).getString("node").equals("3"));
cache.close();
}