You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by jb...@apache.org on 2021/06/03 13:57:25 UTC

[solr] branch main updated: SOLR-15197: Support temporal graph queries with daily windows

This is an automated email from the ASF dual-hosted git repository.

jbernste pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new 4c76bcc  SOLR-15197: Support temporal graph queries with daily windows
4c76bcc is described below

commit 4c76bcc65017a1ff20dd64eedd63d4976a38a554
Author: Joel Bernstein <jo...@Joel-Bernsteins-MacBook-Pro-2.local>
AuthorDate: Thu Jun 3 09:48:19 2021 -0400

    SOLR-15197: Support temporal graph queries with daily windows
---
 .../client/solrj/io/graph/GatherNodesStream.java   | 63 ++++++++++++++++---
 .../client/solrj/io/graph/GraphExpressionTest.java | 70 +++++++++++++++++-----
 2 files changed, 109 insertions(+), 24 deletions(-)

diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java
index 77a01de..e573235 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java
@@ -78,7 +78,11 @@ public class GatherNodesStream extends TupleStream implements Expressible {
   private  SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssX", Locale.ENGLISH);
   private Set<String> windowSet;
   private int window = Integer.MIN_VALUE;
-  private int lag = 1;
+  private int lag = 0;
+  private static final int TEN_SECOND_INTERVAL = 0;
+  private static final int DAY_INTERVAL = 1;
+  private int interval = TEN_SECOND_INTERVAL;
+
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   public GatherNodesStream(String zkHost,
@@ -105,7 +109,8 @@ public class GatherNodesStream extends TupleStream implements Expressible {
         scatter,
         maxDocFreq,
         Integer.MIN_VALUE,
-    1);
+    1,
+        TEN_SECOND_INTERVAL);
   }
 
   public GatherNodesStream(StreamExpression expression, StreamFactory factory) throws IOException {
@@ -205,13 +210,21 @@ public class GatherNodesStream extends TupleStream implements Expressible {
 
     StreamExpressionNamedParameter windowExpression = factory.getNamedOperand(expression, "window");
     int timeWindow = Integer.MIN_VALUE;
+    int intervalParam = -1;
 
     if(windowExpression != null) {
-      timeWindow = Integer.parseInt(((StreamExpressionValue) windowExpression.getParameter()).getValue());
+      String windowValue = ((StreamExpressionValue) windowExpression.getParameter()).getValue();
+      if(windowValue.contains("DAY")) {
+        intervalParam = DAY_INTERVAL;
+        timeWindow = Integer.parseInt(windowValue.split("DAY")[0]);
+      } else {
+        intervalParam = TEN_SECOND_INTERVAL;
+        timeWindow = Integer.parseInt(windowValue);
+      }
     }
 
     StreamExpressionNamedParameter lagExpression = factory.getNamedOperand(expression, "lag");
-    int timeLag = 1;
+    int timeLag = 0;
 
     if(lagExpression != null) {
       timeLag = Integer.parseInt(((StreamExpressionValue) lagExpression.getParameter()).getValue());
@@ -268,7 +281,8 @@ public class GatherNodesStream extends TupleStream implements Expressible {
          scatter,
          docFreq,
          timeWindow,
-         timeLag);
+         timeLag,
+         intervalParam);
   }
 
   @SuppressWarnings({"unchecked"})
@@ -284,7 +298,8 @@ public class GatherNodesStream extends TupleStream implements Expressible {
                     Set<Traversal.Scatter> scatter,
                     int maxDocFreq,
                     int window,
-                    int lag) {
+                    int lag,
+                    int interval) {
     this.zkHost = zkHost;
     this.collection = collection;
     this.tupleStream = tupleStream;
@@ -297,6 +312,7 @@ public class GatherNodesStream extends TupleStream implements Expressible {
     this.scatter = scatter;
     this.maxDocFreq = maxDocFreq;
     this.window = window;
+    this.interval = interval;
 
     if(window > Integer.MIN_VALUE) {
       windowSet = new HashSet<>();
@@ -547,7 +563,7 @@ public class GatherNodesStream extends TupleStream implements Expressible {
       Instant instant = date.toInstant();
 
       for (int i = 0; i < size; i++) {
-        Instant windowInstant = instant.minus(10 * (i + lag), ChronoUnit.SECONDS);
+        Instant windowInstant = instant.minus(10 * ((i+1) + lag), ChronoUnit.SECONDS);
         String windowString = windowInstant.toString();
         windowString = windowString.substring(0, 18) + "0Z";
         window[i] = windowString;
@@ -560,6 +576,26 @@ public class GatherNodesStream extends TupleStream implements Expressible {
     }
   }
 
+  private String[] getDayWindow(int size, int lag, String start) {
+    try {
+      String[] window = new String[size];
+      Date date = this.dateFormat.parse(start);
+      Instant instant = date.toInstant();
+
+      for (int i = 0; i < size; i++) {
+        Instant windowInstant = instant.minus(1 * ((i+1) + lag), ChronoUnit.DAYS);
+        String windowString = windowInstant.toString();
+        windowString = windowString.substring(0, 10) + "T00:00:00Z";
+        window[i] = windowString;
+      }
+
+      return window;
+    } catch(ParseException e) {
+      log.warn("Unparseable date:{}", String.valueOf(start));
+      return new String[0];
+    }
+  }
+
   public void close() throws IOException {
     tupleStream.close();
   }
@@ -612,7 +648,7 @@ public class GatherNodesStream extends TupleStream implements Expressible {
             }
           }
 
-          if(windowSet == null || (lag == 1 && !windowSet.contains(String.valueOf(value)))) {
+          if(windowSet == null || (lag == 0 && !windowSet.contains(String.valueOf(value)))) {
             joinBatch.add(value);
           }
 
@@ -625,7 +661,16 @@ public class GatherNodesStream extends TupleStream implements Expressible {
             * We derive the window and add it to the join values below.
             */
 
-            String[] timeWindow = getTenSecondWindow(window, lag, value);
+            String[] timeWindow = null;
+
+            if(this.interval == DAY_INTERVAL) {
+              //Derive the daily window
+              timeWindow = getDayWindow(window, lag, value);
+            } else {
+              //Dervie the default ten second window
+              timeWindow = getTenSecondWindow(window, lag, value);
+            }
+
             for(String windowString : timeWindow) {
               if(!windowSet.contains(windowString)) {
                 /*
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java
index 055f2f5..afe74fe 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java
@@ -235,18 +235,18 @@ public class GraphExpressionTest extends SolrCloudTestCase {
   public void testGatherNodesStream() throws Exception {
 
     new UpdateRequest()
-        .add(id, "0", "basket_s", "basket1", "product_s", "product1", "price_f", "20", "time_ten_seconds_s", "2020-09-24T18:23:50Z")
-        .add(id, "1", "basket_s", "basket1", "product_s", "product3", "price_f", "30", "time_ten_seconds_s", "2020-09-24T18:23:40Z")
-        .add(id, "2", "basket_s", "basket1", "product_s", "product5", "price_f", "1", "time_ten_seconds_s", "2020-09-24T18:23:30Z")
-        .add(id, "3", "basket_s", "basket2", "product_s", "product1", "price_f", "2", "time_ten_seconds_s", "2020-09-24T18:23:20Z")
-        .add(id, "4", "basket_s", "basket2", "product_s", "product6", "price_f", "5", "time_ten_seconds_s", "2020-09-24T18:23:10Z")
-        .add(id, "5", "basket_s", "basket2", "product_s", "product7", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:23:00Z")
-        .add(id, "6", "basket_s", "basket3", "product_s", "product4", "price_f", "20", "time_ten_seconds_s", "2020-09-24T18:22:50Z")
-        .add(id, "7", "basket_s", "basket3", "product_s", "product3", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:22:40Z")
-        .add(id, "8", "basket_s", "basket3", "product_s", "product1", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:22:30Z")
-        .add(id, "9", "basket_s", "basket4", "product_s", "product4", "price_f", "40", "time_ten_seconds_s", "2020-09-24T18:22:20Z")
-        .add(id, "10", "basket_s", "basket4", "product_s", "product3", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:22:10Z")
-        .add(id, "11", "basket_s", "basket4", "product_s", "product1", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:22:00Z")
+        .add(id, "0", "basket_s", "basket1", "product_s", "product1", "price_f", "20", "time_ten_seconds_s", "2020-09-24T18:23:50Z", "day_s", "2020-09-24T00:00:00Z")
+        .add(id, "1", "basket_s", "basket1", "product_s", "product3", "price_f", "30", "time_ten_seconds_s", "2020-09-24T18:23:40Z", "day_s", "2020-09-23T00:00:00Z")
+        .add(id, "2", "basket_s", "basket1", "product_s", "product5", "price_f", "1", "time_ten_seconds_s", "2020-09-24T18:23:30Z", "day_s", "2020-09-22T00:00:00Z")
+        .add(id, "3", "basket_s", "basket2", "product_s", "product1", "price_f", "2", "time_ten_seconds_s", "2020-09-24T18:23:20Z", "day_s", "2020-09-21T00:00:00Z")
+        .add(id, "4", "basket_s", "basket2", "product_s", "product6", "price_f", "5", "time_ten_seconds_s", "2020-09-24T18:23:10Z","day_s", "2020-09-20T00:00:00Z")
+        .add(id, "5", "basket_s", "basket2", "product_s", "product7", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:23:00Z", "day_s", "2020-09-19T00:00:00Z")
+        .add(id, "6", "basket_s", "basket3", "product_s", "product4", "price_f", "20", "time_ten_seconds_s", "2020-09-24T18:22:50Z", "day_s", "2020-09-18T00:00:00Z")
+        .add(id, "7", "basket_s", "basket3", "product_s", "product3", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:22:40Z", "day_s", "2020-09-17T00:00:00Z")
+        .add(id, "8", "basket_s", "basket3", "product_s", "product1", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:22:30Z", "day_s", "2020-09-16T00:00:00Z")
+        .add(id, "9", "basket_s", "basket4", "product_s", "product4", "price_f", "40", "time_ten_seconds_s", "2020-09-24T18:22:20Z", "day_s", "2020-09-15T00:00:00Z")
+        .add(id, "10", "basket_s", "basket4", "product_s", "product3", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:22:10Z", "day_s", "2020-09-14T00:00:00Z")
+        .add(id, "11", "basket_s", "basket4", "product_s", "product1", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:22:00Z", "day_s", "2020-09-13T00:00:00Z")
         .commit(cluster.getSolrClient(), COLLECTION);
 
     List<Tuple> tuples = null;
@@ -409,9 +409,50 @@ public class GraphExpressionTest extends SolrCloudTestCase {
     tuples = getTuples(stream);
 
     Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
+
     assertTrue(tuples.size() == 2);
-    assertTrue(tuples.get(0).getString("node").equals("3"));
-    assertTrue(tuples.get(1).getString("node").equals("4"));
+    assertTrue(tuples.get(0).getString("node").equals("4"));
+    assertTrue(tuples.get(1).getString("node").equals("5"));
+
+
+    // Test DAY window without lag
+
+
+    expr = "nodes(collection1, random(collection1, q=\"id:(1 2)\", fl=\"day_s\"), walk=\"day_s->day_s\", gather=\"id\", window=\"3DAYS\")";
+
+    stream = (GatherNodesStream)factory.constructStream(expr);
+
+    context = new StreamContext();
+    context.setSolrClientCache(cache);
+    stream.setStreamContext(context);
+    tuples = getTuples(stream);
+
+    Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
+    assertTrue(tuples.size() == 5);
+    assertTrue(tuples.get(0).getString("node").equals("1"));
+    assertTrue(tuples.get(1).getString("node").equals("2"));
+    assertTrue(tuples.get(2).getString("node").equals("3"));
+    assertTrue(tuples.get(3).getString("node").equals("4"));
+    assertTrue(tuples.get(4).getString("node").equals("5"));
+
+
+    // Test Day window with lag.
+
+    expr = "nodes(collection1, random(collection1, q=\"id:(1)\", fl=\"day_s\"), walk=\"day_s->day_s\", gather=\"id\", window=\"2DAYS\", lag=\"2\")";
+
+    stream = (GatherNodesStream)factory.constructStream(expr);
+
+    context = new StreamContext();
+    context.setSolrClientCache(cache);
+    stream.setStreamContext(context);
+    tuples = getTuples(stream);
+
+    Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
+    assertTrue(tuples.size() == 2);
+    assertTrue(tuples.get(0).getString("node").equals("4"));
+    assertTrue(tuples.get(1).getString("node").equals("5"));
+
+
     cache.close();
   }
 
@@ -678,7 +719,6 @@ public class GraphExpressionTest extends SolrCloudTestCase {
     assertTrue(tuples.get(1).getString("node").equals("jim"));
     assertTrue(tuples.get(1).getLong("level").equals(1L));
     List<String> ancestors = tuples.get(1).getStrings("ancestors");
-    System.out.println("##################### Ancestors:"+ancestors);
     assert(ancestors.size() == 1);
     assert(ancestors.get(0).equals("bill"));