You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by jb...@apache.org on 2021/06/09 14:20:18 UTC

[solr] branch main updated: SOLR-15197: Add WEEKDAY windows and forward and backword looking windows.

This is an automated email from the ASF dual-hosted git repository.

jbernste pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new 73963ca  SOLR-15197: Add WEEKDAY windows and forward and backword looking windows.
73963ca is described below

commit 73963cafe452ed571b43a9d4bd9524d9b53d5421
Author: Joel Bernstein <jb...@apache.org>
AuthorDate: Wed Jun 9 10:19:35 2021 -0400

    SOLR-15197: Add WEEKDAY windows and forward and backword looking windows.
---
 .../client/solrj/io/graph/GatherNodesStream.java   |  86 ++++++++++----
 .../client/solrj/io/graph/GraphExpressionTest.java | 126 +++++++++++++++++++--
 2 files changed, 179 insertions(+), 33 deletions(-)

diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java
index e573235..08b2ee3 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java
@@ -21,7 +21,9 @@ import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.time.DayOfWeek;
 import java.time.Instant;
+import java.time.ZoneId;
 import java.time.temporal.ChronoUnit;
 import java.util.*;
 import java.util.concurrent.Callable;
@@ -81,6 +83,7 @@ public class GatherNodesStream extends TupleStream implements Expressible {
   private int lag = 0;
   private static final int TEN_SECOND_INTERVAL = 0;
   private static final int DAY_INTERVAL = 1;
+  private static final int WEEK_DAY_INTERVAL = 2;
   private int interval = TEN_SECOND_INTERVAL;
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -214,7 +217,10 @@ public class GatherNodesStream extends TupleStream implements Expressible {
 
     if(windowExpression != null) {
       String windowValue = ((StreamExpressionValue) windowExpression.getParameter()).getValue();
-      if(windowValue.contains("DAY")) {
+      if(windowValue.contains("WEEKDAY")) {
+        intervalParam = WEEK_DAY_INTERVAL;
+        timeWindow = Integer.parseInt(windowValue.split("WEEKDAY")[0]);
+      } else if (windowValue.contains("DAY")) {
         intervalParam = DAY_INTERVAL;
         timeWindow = Integer.parseInt(windowValue.split("DAY")[0]);
       } else {
@@ -555,21 +561,23 @@ public class GatherNodesStream extends TupleStream implements Expressible {
     }
   }
 
-
   private String[] getTenSecondWindow(int size, int lag, String start) {
     try {
-      String[] window = new String[size];
+      List<String> windowList = new ArrayList<>();
       Date date = this.dateFormat.parse(start);
       Instant instant = date.toInstant();
-
-      for (int i = 0; i < size; i++) {
-        Instant windowInstant = instant.minus(10 * ((i+1) + lag), ChronoUnit.SECONDS);
+      int collect = Math.abs(size)+lag;
+      int i = -1;
+      while (windowList.size() < collect) {
+        ++i;
+        Instant windowInstant = size > 0 ? instant.plus(10*i, ChronoUnit.SECONDS) : instant.minus(10*i, ChronoUnit.SECONDS);
         String windowString = windowInstant.toString();
         windowString = windowString.substring(0, 18) + "0Z";
-        window[i] = windowString;
+        windowList.add(windowString);
       }
 
-      return window;
+      List<String> laggedWindow = windowList.subList(lag, windowList.size());
+      return laggedWindow.toArray(new String[laggedWindow.size()]);
     } catch(ParseException e) {
       log.warn("Unparseable date:{}", String.valueOf(start));
       return new String[0];
@@ -578,18 +586,51 @@ public class GatherNodesStream extends TupleStream implements Expressible {
 
   private String[] getDayWindow(int size, int lag, String start) {
     try {
-      String[] window = new String[size];
+      List<String> windowList = new ArrayList<>();
       Date date = this.dateFormat.parse(start);
       Instant instant = date.toInstant();
+      int collect = Math.abs(size)+lag;
+      int i = -1;
+      while (windowList.size() < collect) {
+        ++i;
+        Instant windowInstant = size > 0 ? instant.plus(i, ChronoUnit.DAYS) : instant.minus(i, ChronoUnit.DAYS);
+        String windowString = windowInstant.toString();
+        windowString = windowString.substring(0, 10) + "T00:00:00Z";
+        windowList.add(windowString);
+      }
+
+      List<String> laggedWindow = windowList.subList(lag, windowList.size());
+      return laggedWindow.toArray(new String[laggedWindow.size()]);
+    } catch(ParseException e) {
+      log.warn("Unparseable date:{}", String.valueOf(start));
+      return new String[0];
+    }
+  }
+
+  private String[] getWeekDayWindow(int size, int lag, String start) {
+    try {
+      List<String> windowList = new ArrayList<>();
+      Date date = this.dateFormat.parse(start);
+      Instant instant = date.toInstant();
+      int collect = Math.abs(size)+lag;
+      int i = -1;
+      while (windowList.size() < collect) {
+        ++i;
+        Instant windowInstant = size > 0 ? instant.plus(i, ChronoUnit.DAYS) : instant.minus(i, ChronoUnit.DAYS);
+        DayOfWeek dayOfWeek = windowInstant.atZone(ZoneId.of("UTC")).getDayOfWeek();
+
+        if(dayOfWeek.getValue() > 5) {
+          //Skip weekend
+          continue;
+        }
 
-      for (int i = 0; i < size; i++) {
-        Instant windowInstant = instant.minus(1 * ((i+1) + lag), ChronoUnit.DAYS);
         String windowString = windowInstant.toString();
         windowString = windowString.substring(0, 10) + "T00:00:00Z";
-        window[i] = windowString;
+        windowList.add(windowString);
       }
 
-      return window;
+      List<String> laggedWindow = windowList.subList(lag, windowList.size());
+      return laggedWindow.toArray(new String[laggedWindow.size()]);
     } catch(ParseException e) {
       log.warn("Unparseable date:{}", String.valueOf(start));
       return new String[0];
@@ -648,12 +689,11 @@ public class GatherNodesStream extends TupleStream implements Expressible {
             }
           }
 
-          if(windowSet == null || (lag == 0 && !windowSet.contains(String.valueOf(value)))) {
+          if(windowSet == null) {
             joinBatch.add(value);
           }
 
           if(window > Integer.MIN_VALUE && value != null) {
-            windowSet.add(value);
 
             /*
             * A time window has been set.
@@ -663,12 +703,16 @@ public class GatherNodesStream extends TupleStream implements Expressible {
 
             String[] timeWindow = null;
 
-            if(this.interval == DAY_INTERVAL) {
-              //Derive the daily window
-              timeWindow = getDayWindow(window, lag, value);
-            } else {
-              //Dervie the default ten second window
-              timeWindow = getTenSecondWindow(window, lag, value);
+            switch(this.interval) {
+              case WEEK_DAY_INTERVAL:
+                timeWindow = getWeekDayWindow(window, lag, value);
+                break;
+              case DAY_INTERVAL:
+                timeWindow = getDayWindow(window, lag, value);
+                break;
+              default:
+                timeWindow = getTenSecondWindow(window, lag, value);
+                break;
             }
 
             for(String windowString : timeWindow) {
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java
index afe74fe..d0b1e9e 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java
@@ -379,7 +379,7 @@ public class GraphExpressionTest extends SolrCloudTestCase {
 
     //Test the window without lag
 
-    expr = "nodes(collection1, random(collection1, q=\"id:(1 2)\", fl=\"time_ten_seconds_s\"), walk=\"time_ten_seconds_s->time_ten_seconds_s\", gather=\"id\", window=\"3\")";
+    expr = "nodes(collection1, random(collection1, q=\"id:(1 2)\", fl=\"time_ten_seconds_s\"), walk=\"time_ten_seconds_s->time_ten_seconds_s\", gather=\"id\", window=\"-3\")";
 
     stream = (GatherNodesStream)factory.constructStream(expr);
 
@@ -389,17 +389,48 @@ public class GraphExpressionTest extends SolrCloudTestCase {
     tuples = getTuples(stream);
 
     Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
-    assertTrue(tuples.size() == 5);
+    assertTrue(tuples.size() == 4);
     assertTrue(tuples.get(0).getString("node").equals("1"));
     assertTrue(tuples.get(1).getString("node").equals("2"));
     assertTrue(tuples.get(2).getString("node").equals("3"));
     assertTrue(tuples.get(3).getString("node").equals("4"));
-    assertTrue(tuples.get(4).getString("node").equals("5"));
+
+
+    expr = "nodes(collection1, random(collection1, q=\"id:(6)\", fl=\"time_ten_seconds_s\"), walk=\"time_ten_seconds_s->time_ten_seconds_s\", gather=\"id\", window=\"3\")";
+
+    stream = (GatherNodesStream)factory.constructStream(expr);
+
+    context = new StreamContext();
+    context.setSolrClientCache(cache);
+    stream.setStreamContext(context);
+    tuples = getTuples(stream);
+
+    Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
+    assertTrue(tuples.size() == 3);
+    assertTrue(tuples.get(0).getString("node").equals("4"));
+    assertTrue(tuples.get(1).getString("node").equals("5"));
+    assertTrue(tuples.get(2).getString("node").equals("6"));
+
+
+    expr = "nodes(collection1, random(collection1, q=\"id:(6)\", fl=\"time_ten_seconds_s\"), walk=\"time_ten_seconds_s->time_ten_seconds_s\", gather=\"id\", window=\"3\", lag=\"1\")";
+
+    stream = (GatherNodesStream)factory.constructStream(expr);
+
+    context = new StreamContext();
+    context.setSolrClientCache(cache);
+    stream.setStreamContext(context);
+    tuples = getTuples(stream);
+
+    Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
+    assertTrue(tuples.size() == 3);
+    assertTrue(tuples.get(0).getString("node").equals("3"));
+    assertTrue(tuples.get(1).getString("node").equals("4"));
+    assertTrue(tuples.get(2).getString("node").equals("5"));
 
 
     //Test window with lag
 
-    expr = "nodes(collection1, random(collection1, q=\"id:(1)\", fl=\"time_ten_seconds_s\"), walk=\"time_ten_seconds_s->time_ten_seconds_s\", gather=\"id\", window=\"2\", lag=\"2\")";
+    expr = "nodes(collection1, random(collection1, q=\"id:(1)\", fl=\"time_ten_seconds_s\"), walk=\"time_ten_seconds_s->time_ten_seconds_s\", gather=\"id\", window=\"-2\", lag=\"2\")";
 
     stream = (GatherNodesStream)factory.constructStream(expr);
 
@@ -411,14 +442,14 @@ public class GraphExpressionTest extends SolrCloudTestCase {
     Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
 
     assertTrue(tuples.size() == 2);
-    assertTrue(tuples.get(0).getString("node").equals("4"));
-    assertTrue(tuples.get(1).getString("node").equals("5"));
+    assertTrue(tuples.get(0).getString("node").equals("3"));
+    assertTrue(tuples.get(1).getString("node").equals("4"));
 
 
     // Test DAY window without lag
 
 
-    expr = "nodes(collection1, random(collection1, q=\"id:(1 2)\", fl=\"day_s\"), walk=\"day_s->day_s\", gather=\"id\", window=\"3DAYS\")";
+    expr = "nodes(collection1, random(collection1, q=\"id:(1 2)\", fl=\"day_s\"), walk=\"day_s->day_s\", gather=\"id\", window=\"-3DAYS\")";
 
     stream = (GatherNodesStream)factory.constructStream(expr);
 
@@ -428,17 +459,16 @@ public class GraphExpressionTest extends SolrCloudTestCase {
     tuples = getTuples(stream);
 
     Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
-    assertTrue(tuples.size() == 5);
+    assertTrue(tuples.size() == 4);
     assertTrue(tuples.get(0).getString("node").equals("1"));
     assertTrue(tuples.get(1).getString("node").equals("2"));
     assertTrue(tuples.get(2).getString("node").equals("3"));
     assertTrue(tuples.get(3).getString("node").equals("4"));
-    assertTrue(tuples.get(4).getString("node").equals("5"));
 
 
     // Test Day window with lag.
 
-    expr = "nodes(collection1, random(collection1, q=\"id:(1)\", fl=\"day_s\"), walk=\"day_s->day_s\", gather=\"id\", window=\"2DAYS\", lag=\"2\")";
+    expr = "nodes(collection1, random(collection1, q=\"id:(1)\", fl=\"day_s\"), walk=\"day_s->day_s\", gather=\"id\", window=\"-2DAYS\", lag=\"2\")";
 
     stream = (GatherNodesStream)factory.constructStream(expr);
 
@@ -449,9 +479,81 @@ public class GraphExpressionTest extends SolrCloudTestCase {
 
     Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
     assertTrue(tuples.size() == 2);
-    assertTrue(tuples.get(0).getString("node").equals("4"));
-    assertTrue(tuples.get(1).getString("node").equals("5"));
+    assertTrue(tuples.get(0).getString("node").equals("3"));
+    assertTrue(tuples.get(1).getString("node").equals("4"));
+
+
+    // Test Week Day
+
+    expr = "nodes(collection1, random(collection1, q=\"id:(3)\", fl=\"day_s\"), walk=\"day_s->day_s\", gather=\"id\", window=\"-2WEEKDAYS\")";
+
+    stream = (GatherNodesStream)factory.constructStream(expr);
+
+    context = new StreamContext();
+    context.setSolrClientCache(cache);
+    stream.setStreamContext(context);
+    tuples = getTuples(stream);
+
+    Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
+
+
+    assertTrue(tuples.size() == 2);
+    assertTrue(tuples.get(0).getString("node").equals("3"));
+    assertTrue(tuples.get(1).getString("node").equals("6"));
+
+
+    // Test Week Day with lag
+
+    expr = "nodes(collection1, random(collection1, q=\"id:(3)\", fl=\"day_s\"), walk=\"day_s->day_s\", gather=\"id\", window=\"-2WEEKDAYS\", lag=\"1\")";
+
+    stream = (GatherNodesStream)factory.constructStream(expr);
+
+    context = new StreamContext();
+    context.setSolrClientCache(cache);
+    stream.setStreamContext(context);
+    tuples = getTuples(stream);
+
+    Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
+
+    assertTrue(tuples.size() == 2);
+    assertTrue(tuples.get(0).getString("node").equals("6"));
+    assertTrue(tuples.get(1).getString("node").equals("7"));
+
+
+    // Test positive week day
+
+    expr = "nodes(collection1, random(collection1, q=\"id:(6)\", fl=\"day_s\"), walk=\"day_s->day_s\", gather=\"id\", window=\"2WEEKDAYS\")";
+
+    stream = (GatherNodesStream)factory.constructStream(expr);
 
+    context = new StreamContext();
+    context.setSolrClientCache(cache);
+    stream.setStreamContext(context);
+    tuples = getTuples(stream);
+
+    Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
+
+    assertTrue(tuples.size() == 2);
+    assertTrue(tuples.get(0).getString("node").equals("3"));
+    assertTrue(tuples.get(1).getString("node").equals("6"));
+
+
+    // Test positive Week Day with lag
+
+    expr = "nodes(collection1, random(collection1, q=\"id:(6)\", fl=\"day_s\"), walk=\"day_s->day_s\", gather=\"id\", window=\"2WEEKDAYS\", lag=\"1\")";
+
+    stream = (GatherNodesStream)factory.constructStream(expr);
+
+    context = new StreamContext();
+    context.setSolrClientCache(cache);
+    stream.setStreamContext(context);
+    tuples = getTuples(stream);
+
+    Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
+
+    assertTrue(tuples.size() == 2);
+    assertTrue(tuples.get(0).getString("node").equals("2"));
+    assertTrue(tuples.get(1).getString("node").equals("3"));
 
     cache.close();
   }