You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/08/09 06:31:44 UTC

[GitHub] [iotdb] ZhanGHanG9991 opened a new pull request, #6928: [IOTDB-4073] Add SessionTimeWindowAccessStrategy in UDF

ZhanGHanG9991 opened a new pull request, #6928:
URL: https://github.com/apache/iotdb/pull/6928

   ## Description
   Add SessionTimeWindowAccessStrategy in UDF
   
   <!--
   In each section, please describe design decisions made, including:
    - Choice of algorithms
    - Behavioral aspects. What configuration values are acceptable? How are corner cases and error 
       conditions handled, such as when there are insufficient resources?
    - Class organization and design (how the logic is split between classes, inheritance, composition, 
       design patterns)
    - Method organization and design (how the logic is split between methods, parameters and return types)
    - Naming (class, method, API, configuration, HTTP endpoint, names of emitted metrics)
   -->
   
   
   <!-- It's good to describe an alternative design (or mention an alternative name) for every design 
   (or naming) decision point and compare the alternatives with the designs that you've implemented 
   (or the names you've chosen) to highlight the advantages of the chosen designs and names. -->
   
   <!-- If there was a discussion of the design of the feature implemented in this PR elsewhere 
   (e. g. a "Proposal" issue, any other issue, or a thread in the development mailing list), 
   link to that discussion from this PR description and explain what have changed in your final design 
   compared to your original proposal or the consensus version in the end of the discussion. 
   If something hasn't changed since the original discussion, you can omit a detailed discussion of 
   those aspects of the design here, perhaps apart from brief mentioning for the sake of readability 
   of this PR description. -->
   
   <!-- Some of the aspects mentioned above may be omitted for simple and small changes. -->
   
   <hr>
   
   This PR has:
   - [x] been self-reviewed.
       - [ ] concurrent read
       - [ ] concurrent write
       - [ ] concurrent read and write 
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. 
   - [ ] added or updated version, __license__, or notice information
   - [x] added comments explaining the "why" and the intent of the code wherever would not be obvious 
     for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold 
     for code coverage.
   - [x] added integration tests.
   - [x] been tested in a test IoTDB cluster.
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items 
   apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items 
   from the checklist above are strictly necessary, but it would be very helpful if you at least 
   self-review the PR. -->
   
   <hr>
   
   ##### Key changed/added classes (or packages if there are too many classes) in this PR
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] lancelly commented on a diff in pull request #6928: [IOTDB-4073] Add SessionTimeWindowAccessStrategy in UDF

Posted by GitBox <gi...@apache.org>.
lancelly commented on code in PR #6928:
URL: https://github.com/apache/iotdb/pull/6928#discussion_r945404503


##########
integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFSessionWindowQueryIT.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.udf;
+
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.constant.UDFTestConstant;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBUDFSessionWindowQueryIT {
+
+  protected static final int ITERATION_TIMES = 10000;
+
+  protected static boolean enableSeqSpaceCompaction;
+  protected static boolean enableUnseqSpaceCompaction;
+  protected static boolean enableCrossSpaceCompaction;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    enableSeqSpaceCompaction = ConfigFactory.getConfig().isEnableSeqSpaceCompaction();
+    enableUnseqSpaceCompaction = ConfigFactory.getConfig().isEnableUnseqSpaceCompaction();
+    enableCrossSpaceCompaction = ConfigFactory.getConfig().isEnableCrossSpaceCompaction();
+    ConfigFactory.getConfig().setEnableSeqSpaceCompaction(false);
+    ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(false);
+    ConfigFactory.getConfig().setEnableCrossSpaceCompaction(false);
+    ConfigFactory.getConfig()
+        .setUdfCollectorMemoryBudgetInMB(5)
+        .setUdfTransformerMemoryBudgetInMB(5)
+        .setUdfReaderMemoryBudgetInMB(5);
+    EnvFactory.getEnv().initBeforeClass();
+    createTimeSeries();
+    generateData();
+    registerUDF();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanAfterClass();
+    ConfigFactory.getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+    ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+    ConfigFactory.getConfig().setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+    ConfigFactory.getConfig()
+        .setUdfCollectorMemoryBudgetInMB(100)
+        .setUdfTransformerMemoryBudgetInMB(100)
+        .setUdfReaderMemoryBudgetInMB(100);
+  }
+
+  private static void createTimeSeries() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("SET STORAGE GROUP TO root.vehicle");
+      statement.execute("CREATE TIMESERIES root.vehicle.d1.s3 with datatype=INT32,encoding=PLAIN");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private static void generateData() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      for (int i = 0; i < ITERATION_TIMES; ++i) {
+        if (i == 5 || i == 6 || i == 7 || i == 51 || i == 52 || i == 53 || i == 54 || i == 9996
+            || i == 9997 || i == 9998) {
+          continue;
+        }
+        statement.execute(
+            (String.format("insert into root.vehicle.d1(timestamp,s3) values(%d,%d)", i, i)));
+      }
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private static void registerUDF() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute(
+          "create function window_start_end as 'org.apache.iotdb.db.query.udf.example.WindowStartEnd'");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private void testSessionTimeWindowSS(
+      String sessionGap, long[] windowStart, long[] windowEnd, Long displayBegin, Long displayEnd) {
+    String sql;
+    if (displayBegin == null) {
+      sql =
+          String.format(
+              "select window_start_end(s3, '%s'='%s', '%s'='%s') from root.vehicle.d1",
+              UDFTestConstant.ACCESS_STRATEGY_KEY,
+              UDFTestConstant.ACCESS_STRATEGY_SESSION,
+              UDFTestConstant.SESSION_GAP_KEY,
+              sessionGap);
+    } else {
+      sql =
+          String.format(
+              "select window_start_end(s3, '%s'='%s', '%s'='%s', '%s'='%s', '%s'='%s') from root.vehicle.d1",
+              UDFTestConstant.ACCESS_STRATEGY_KEY,
+              UDFTestConstant.ACCESS_STRATEGY_SESSION,
+              UDFTestConstant.DISPLAY_WINDOW_BEGIN_KEY,
+              displayBegin.longValue(),
+              UDFTestConstant.DISPLAY_WINDOW_END_KEY,
+              displayEnd.longValue(),
+              UDFTestConstant.SESSION_GAP_KEY,
+              sessionGap);
+    }
+
+    try (Connection conn = EnvFactory.getEnv().getConnection();
+        Statement statement = conn.createStatement();
+        ResultSet resultSet = statement.executeQuery(sql)) {
+      assertEquals(2, resultSet.getMetaData().getColumnCount());
+      for (int i = 0; i < windowStart.length; i++) {
+        resultSet.next();
+        Assert.assertEquals(resultSet.getLong(1), windowStart[i]);
+        Assert.assertEquals(resultSet.getLong(2), windowEnd[i]);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testSessionTimeWindowSS1() {
+    String sessionGap = "2";
+    long[] windowStart = new long[] {0, 8, 55, 9999};
+    long[] windowEnd = new long[] {4, 50, 9995, 9999};
+    testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, null, null);
+  }
+
+  @Test
+  public void testSessionTimeWindowSS2() {
+    String sessionGap = "5";
+    long[] windowStart = new long[] {0, 55};
+    long[] windowEnd = new long[] {50, 9999};
+    testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, null, null);
+  }
+
+  @Test
+  public void testSessionTimeWindowSS3() {
+    String sessionGap = "6";
+    long[] windowStart = new long[] {0};
+    long[] windowEnd = new long[] {9999};
+    testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, null, null);
+  }
+
+  @Test
+  public void testSessionTimeWindowSS4() {
+    String sessionGap = "2";
+    Long displayBegin = 1L;
+    Long displayEnd = 9993L;
+    long[] windowStart = new long[] {1, 8, 55};
+    long[] windowEnd = new long[] {4, 50, 9992};
+    testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, displayBegin, displayEnd);
+  }
+
+  @Test
+  public void testSessionTimeWindowSS5() {
+    String sessionGap = "5";
+    Long displayBegin = 43L;
+    Long displayEnd = 100L;
+    long[] windowStart = new long[] {43, 55};
+    long[] windowEnd = new long[] {50, 99};
+    testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, displayBegin, displayEnd);
+  }
+
+  @Test
+  public void testSessionTimeWindowSS6() {
+    String sessionGap = "1";
+    Long displayBegin = 2L;
+    Long displayEnd = 20000L;
+    ArrayList<Long> windowStart = new ArrayList<>();
+    ArrayList<Long> windowEnd = new ArrayList<>();
+    for (long i = displayBegin; i <= 9999; i++) {
+      if (i == 5 || i == 6 || i == 7 || i == 51 || i == 52 || i == 53 || i == 54 || i == 9996
+          || i == 9997 || i == 9998) {
+        continue;
+      }
+      windowStart.add(i);
+      windowEnd.add(i);
+    }
+    testSessionTimeWindowSS(
+        sessionGap,
+        windowStart.stream().mapToLong(t -> t.longValue()).toArray(),
+        windowEnd.stream().mapToLong(t -> t.longValue()).toArray(),
+        displayBegin,
+        displayEnd);
+  }
+
+  private void testSessionTimeWindowSSOutOfRange(
+      String sessionGap, Long displayBegin, Long displayEnd) {
+    String sql;
+    if (displayBegin == null) {
+      sql =
+          String.format(
+              "select window_start_end(s3, '%s'='%s', '%s'='%s') from root.vehicle.d1",
+              UDFTestConstant.ACCESS_STRATEGY_KEY,
+              UDFTestConstant.ACCESS_STRATEGY_SESSION,
+              UDFTestConstant.SESSION_GAP_KEY,
+              sessionGap);
+    } else {
+      sql =
+          String.format(
+              "select window_start_end(s3, '%s'='%s', '%s'='%s', '%s'='%s', '%s'='%s') from root.vehicle.d1",
+              UDFTestConstant.ACCESS_STRATEGY_KEY,
+              UDFTestConstant.ACCESS_STRATEGY_SESSION,
+              UDFTestConstant.DISPLAY_WINDOW_BEGIN_KEY,
+              displayBegin.longValue(),
+              UDFTestConstant.DISPLAY_WINDOW_END_KEY,
+              displayEnd.longValue(),

Review Comment:
   ```suggestion
                 displayBegin,
                 UDFTestConstant.DISPLAY_WINDOW_END_KEY,
                 displayEnd,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] ZhanGHanG9991 commented on a diff in pull request #6928: [IOTDB-4073] Add SessionTimeWindowAccessStrategy in UDF

Posted by GitBox <gi...@apache.org>.
ZhanGHanG9991 commented on code in PR #6928:
URL: https://github.com/apache/iotdb/pull/6928#discussion_r945428886


##########
integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFSessionWindowQueryIT.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.udf;
+
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.constant.UDFTestConstant;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBUDFSessionWindowQueryIT {
+
+  protected static final int ITERATION_TIMES = 10000;

Review Comment:
   I've made it take 1000.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] lancelly commented on a diff in pull request #6928: [IOTDB-4073] Add SessionTimeWindowAccessStrategy in UDF

Posted by GitBox <gi...@apache.org>.
lancelly commented on code in PR #6928:
URL: https://github.com/apache/iotdb/pull/6928#discussion_r945404408


##########
integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFSessionWindowQueryIT.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.udf;
+
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.constant.UDFTestConstant;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBUDFSessionWindowQueryIT {
+
+  protected static final int ITERATION_TIMES = 10000;
+
+  protected static boolean enableSeqSpaceCompaction;
+  protected static boolean enableUnseqSpaceCompaction;
+  protected static boolean enableCrossSpaceCompaction;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    enableSeqSpaceCompaction = ConfigFactory.getConfig().isEnableSeqSpaceCompaction();
+    enableUnseqSpaceCompaction = ConfigFactory.getConfig().isEnableUnseqSpaceCompaction();
+    enableCrossSpaceCompaction = ConfigFactory.getConfig().isEnableCrossSpaceCompaction();
+    ConfigFactory.getConfig().setEnableSeqSpaceCompaction(false);
+    ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(false);
+    ConfigFactory.getConfig().setEnableCrossSpaceCompaction(false);
+    ConfigFactory.getConfig()
+        .setUdfCollectorMemoryBudgetInMB(5)
+        .setUdfTransformerMemoryBudgetInMB(5)
+        .setUdfReaderMemoryBudgetInMB(5);
+    EnvFactory.getEnv().initBeforeClass();
+    createTimeSeries();
+    generateData();
+    registerUDF();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanAfterClass();
+    ConfigFactory.getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+    ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+    ConfigFactory.getConfig().setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+    ConfigFactory.getConfig()
+        .setUdfCollectorMemoryBudgetInMB(100)
+        .setUdfTransformerMemoryBudgetInMB(100)
+        .setUdfReaderMemoryBudgetInMB(100);
+  }
+
+  private static void createTimeSeries() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("SET STORAGE GROUP TO root.vehicle");
+      statement.execute("CREATE TIMESERIES root.vehicle.d1.s3 with datatype=INT32,encoding=PLAIN");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private static void generateData() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      for (int i = 0; i < ITERATION_TIMES; ++i) {
+        if (i == 5 || i == 6 || i == 7 || i == 51 || i == 52 || i == 53 || i == 54 || i == 9996
+            || i == 9997 || i == 9998) {
+          continue;
+        }
+        statement.execute(
+            (String.format("insert into root.vehicle.d1(timestamp,s3) values(%d,%d)", i, i)));
+      }
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private static void registerUDF() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute(
+          "create function window_start_end as 'org.apache.iotdb.db.query.udf.example.WindowStartEnd'");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private void testSessionTimeWindowSS(
+      String sessionGap, long[] windowStart, long[] windowEnd, Long displayBegin, Long displayEnd) {
+    String sql;
+    if (displayBegin == null) {
+      sql =
+          String.format(
+              "select window_start_end(s3, '%s'='%s', '%s'='%s') from root.vehicle.d1",
+              UDFTestConstant.ACCESS_STRATEGY_KEY,
+              UDFTestConstant.ACCESS_STRATEGY_SESSION,
+              UDFTestConstant.SESSION_GAP_KEY,
+              sessionGap);
+    } else {
+      sql =
+          String.format(
+              "select window_start_end(s3, '%s'='%s', '%s'='%s', '%s'='%s', '%s'='%s') from root.vehicle.d1",
+              UDFTestConstant.ACCESS_STRATEGY_KEY,
+              UDFTestConstant.ACCESS_STRATEGY_SESSION,
+              UDFTestConstant.DISPLAY_WINDOW_BEGIN_KEY,
+              displayBegin.longValue(),
+              UDFTestConstant.DISPLAY_WINDOW_END_KEY,
+              displayEnd.longValue(),
+              UDFTestConstant.SESSION_GAP_KEY,
+              sessionGap);
+    }
+
+    try (Connection conn = EnvFactory.getEnv().getConnection();
+        Statement statement = conn.createStatement();
+        ResultSet resultSet = statement.executeQuery(sql)) {
+      assertEquals(2, resultSet.getMetaData().getColumnCount());
+      for (int i = 0; i < windowStart.length; i++) {
+        resultSet.next();
+        Assert.assertEquals(resultSet.getLong(1), windowStart[i]);
+        Assert.assertEquals(resultSet.getLong(2), windowEnd[i]);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testSessionTimeWindowSS1() {
+    String sessionGap = "2";
+    long[] windowStart = new long[] {0, 8, 55, 9999};
+    long[] windowEnd = new long[] {4, 50, 9995, 9999};
+    testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, null, null);
+  }
+
+  @Test
+  public void testSessionTimeWindowSS2() {
+    String sessionGap = "5";
+    long[] windowStart = new long[] {0, 55};
+    long[] windowEnd = new long[] {50, 9999};
+    testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, null, null);
+  }
+
+  @Test
+  public void testSessionTimeWindowSS3() {
+    String sessionGap = "6";
+    long[] windowStart = new long[] {0};
+    long[] windowEnd = new long[] {9999};
+    testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, null, null);
+  }
+
+  @Test
+  public void testSessionTimeWindowSS4() {
+    String sessionGap = "2";
+    Long displayBegin = 1L;
+    Long displayEnd = 9993L;
+    long[] windowStart = new long[] {1, 8, 55};
+    long[] windowEnd = new long[] {4, 50, 9992};
+    testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, displayBegin, displayEnd);
+  }
+
+  @Test
+  public void testSessionTimeWindowSS5() {
+    String sessionGap = "5";
+    Long displayBegin = 43L;
+    Long displayEnd = 100L;
+    long[] windowStart = new long[] {43, 55};
+    long[] windowEnd = new long[] {50, 99};
+    testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, displayBegin, displayEnd);
+  }
+
+  @Test
+  public void testSessionTimeWindowSS6() {
+    String sessionGap = "1";
+    Long displayBegin = 2L;
+    Long displayEnd = 20000L;
+    ArrayList<Long> windowStart = new ArrayList<>();
+    ArrayList<Long> windowEnd = new ArrayList<>();
+    for (long i = displayBegin; i <= 9999; i++) {
+      if (i == 5 || i == 6 || i == 7 || i == 51 || i == 52 || i == 53 || i == 54 || i == 9996
+          || i == 9997 || i == 9998) {
+        continue;
+      }
+      windowStart.add(i);
+      windowEnd.add(i);
+    }
+    testSessionTimeWindowSS(
+        sessionGap,
+        windowStart.stream().mapToLong(t -> t.longValue()).toArray(),
+        windowEnd.stream().mapToLong(t -> t.longValue()).toArray(),

Review Comment:
   ```suggestion
           windowStart.stream().mapToLong(t -> t).toArray(),
           windowEnd.stream().mapToLong(t -> t).toArray(),
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] lancelly commented on a diff in pull request #6928: [IOTDB-4073] Add SessionTimeWindowAccessStrategy in UDF

Posted by GitBox <gi...@apache.org>.
lancelly commented on code in PR #6928:
URL: https://github.com/apache/iotdb/pull/6928#discussion_r945399392


##########
server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java:
##########
@@ -653,4 +654,122 @@ public RowWindow currentWindow() {
       }
     };
   }
+
+  @Override
+  protected LayerRowWindowReader constructRowSessionTimeWindowReader(
+      SessionTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
+      throws QueryProcessException {
+    final long displayWindowBegin = strategy.getDisplayWindowBegin();
+    final long displayWindowEnd = strategy.getDisplayWindowEnd();
+    final long sessionTimeGap = strategy.getSessionTimeGap();
+
+    final IUDFInputDataSet udfInputDataSet = this;
+    final ElasticSerializableRowRecordList rowRecordList =
+        new ElasticSerializableRowRecordList(
+            dataTypes, queryId, memoryBudgetInMB, CACHE_BLOCK_SIZE);
+    final ElasticSerializableRowRecordListBackedMultiColumnWindow window =
+        new ElasticSerializableRowRecordListBackedMultiColumnWindow(rowRecordList);
+
+    return new LayerRowWindowReader() {
+
+      private boolean isFirstIteration = true;
+      private boolean hasAtLeastOneRow = false;
+
+      private long nextWindowTimeBegin = displayWindowBegin;
+      private long nextWindowTimeEnd = 0;
+      private int nextIndexBegin = 0;
+      private int nextIndexEnd = 1;
+
+      @Override
+      public YieldableState yield() throws IOException, QueryProcessException {
+        if (isFirstIteration) {
+          if (rowRecordList.size() == 0) {
+            final YieldableState yieldableState =
+                LayerCacheUtils.yieldRow(udfInputDataSet, rowRecordList);
+            if (yieldableState != YieldableState.YIELDABLE) {
+              return yieldableState;
+            }
+          }
+          nextWindowTimeBegin = Math.max(displayWindowBegin, rowRecordList.getTime(0));
+          hasAtLeastOneRow = rowRecordList.size() != 0;
+          isFirstIteration = false;
+        }
+
+        if (!hasAtLeastOneRow || displayWindowEnd <= nextWindowTimeBegin) {
+          return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+        }
+
+        while (rowRecordList.getTime(rowRecordList.size() - 1) < displayWindowEnd) {
+          final YieldableState yieldableState =
+              LayerCacheUtils.yieldRow(udfInputDataSet, rowRecordList);
+          if (yieldableState == YieldableState.YIELDABLE) {
+            if (rowRecordList.getTime(rowRecordList.size() - 2) >= displayWindowBegin
+                && rowRecordList.getTime(rowRecordList.size() - 1)
+                        - rowRecordList.getTime(rowRecordList.size() - 2)
+                    >= sessionTimeGap) {
+              nextIndexEnd = rowRecordList.size() - 1;
+              break;
+            } else {
+              nextIndexEnd++;
+            }
+          } else if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
+            return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
+          } else if (yieldableState == YieldableState.NOT_YIELDABLE_NO_MORE_DATA) {
+            nextIndexEnd = rowRecordList.size();
+            break;
+          }
+        }
+
+        nextWindowTimeEnd = rowRecordList.getTime(nextIndexEnd - 1);
+
+        if (nextIndexBegin == nextIndexEnd) {
+          return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+        }
+
+        // Only if encounter user set the strategy's displayWindowBegin, which will go into the for
+        // loop to find the true index of the first window begin.
+        // For other situation, we will only go into if (nextWindowTimeBegin <= tvList.getTime(i))
+        // once.
+        for (int i = nextIndexBegin; i < rowRecordList.size(); ++i) {
+          if (nextWindowTimeBegin <= rowRecordList.getTime(i)) {
+            nextIndexBegin = i;
+            break;
+          }
+          // The first window's beginning time is greater than all the timestamp of the query result
+          // set
+          if (i == rowRecordList.size() - 1) {
+            return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+          }
+        }
+
+        window.seek(nextIndexBegin, nextIndexEnd, nextWindowTimeBegin, nextWindowTimeEnd);
+
+        return YieldableState.YIELDABLE;
+      }
+
+      @Override
+      public boolean next() throws IOException, QueryProcessException {
+        return false;
+      }
+
+      @Override
+      public void readyForNext() throws IOException, QueryProcessException {
+        if (nextIndexEnd < rowRecordList.size()) {
+          nextWindowTimeBegin = rowRecordList.getTime(nextIndexEnd);
+        }
+        rowRecordList.setEvictionUpperBound(nextIndexBegin + 1);
+        nextIndexBegin = nextIndexEnd;
+      }
+
+      @Override
+      public TSDataType[] getDataTypes() {
+        return new TSDataType[0];
+      }

Review Comment:
   why not return dataTypes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] lancelly commented on a diff in pull request #6928: [IOTDB-4073] Add SessionTimeWindowAccessStrategy in UDF

Posted by GitBox <gi...@apache.org>.
lancelly commented on code in PR #6928:
URL: https://github.com/apache/iotdb/pull/6928#discussion_r945388610


##########
server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java:
##########
@@ -542,4 +543,120 @@ public RowWindow currentWindow() {
       }
     };
   }
+
+  @Override
+  protected LayerRowWindowReader constructRowSessionTimeWindowReader(
+      SessionTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+    final long displayWindowBegin = strategy.getDisplayWindowBegin();
+    final long displayWindowEnd = strategy.getDisplayWindowEnd();
+    final long sessionTimeGap = strategy.getSessionTimeGap();
+
+    final SafetyPile safetyPile = safetyLine.addSafetyPile();
+    final ElasticSerializableTVListBackedSingleColumnWindow window =
+        new ElasticSerializableTVListBackedSingleColumnWindow(tvList);
+
+    return new LayerRowWindowReader() {
+
+      private boolean isFirstIteration = true;
+      private boolean hasAtLeastOneRow = false;
+
+      private long nextWindowTimeBegin = displayWindowBegin;
+      private long nextWindowTimeEnd = 0;
+      private int nextIndexBegin = 0;
+      private int nextIndexEnd = 1;
+
+      @Override
+      public YieldableState yield() throws IOException, QueryProcessException {
+        if (isFirstIteration) {
+          if (tvList.size() == 0) {
+            final YieldableState yieldableState =
+                LayerCacheUtils.yieldPoint(
+                    parentLayerPointReaderDataType, parentLayerPointReader, tvList);
+            if (yieldableState != YieldableState.YIELDABLE) {
+              return yieldableState;
+            }
+          }
+          nextWindowTimeBegin = Math.max(displayWindowBegin, tvList.getTime(0));
+          hasAtLeastOneRow = tvList.size() != 0;
+          isFirstIteration = false;
+        }
+
+        if (!hasAtLeastOneRow || displayWindowEnd <= nextWindowTimeBegin) {
+          return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+        }
+
+        while (tvList.getTime(tvList.size() - 1) < displayWindowEnd) {

Review Comment:
   If using displayWindowEnd here, all data will be stored first



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] ZhanGHanG9991 commented on a diff in pull request #6928: [IOTDB-4073] Add SessionTimeWindowAccessStrategy in UDF

Posted by GitBox <gi...@apache.org>.
ZhanGHanG9991 commented on code in PR #6928:
URL: https://github.com/apache/iotdb/pull/6928#discussion_r945450276


##########
integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFSessionWindowQueryIT.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.udf;
+
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.constant.UDFTestConstant;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBUDFSessionWindowQueryIT {
+
+  protected static final int ITERATION_TIMES = 10000;
+
+  protected static boolean enableSeqSpaceCompaction;
+  protected static boolean enableUnseqSpaceCompaction;
+  protected static boolean enableCrossSpaceCompaction;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    enableSeqSpaceCompaction = ConfigFactory.getConfig().isEnableSeqSpaceCompaction();
+    enableUnseqSpaceCompaction = ConfigFactory.getConfig().isEnableUnseqSpaceCompaction();
+    enableCrossSpaceCompaction = ConfigFactory.getConfig().isEnableCrossSpaceCompaction();
+    ConfigFactory.getConfig().setEnableSeqSpaceCompaction(false);
+    ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(false);
+    ConfigFactory.getConfig().setEnableCrossSpaceCompaction(false);
+    ConfigFactory.getConfig()
+        .setUdfCollectorMemoryBudgetInMB(5)
+        .setUdfTransformerMemoryBudgetInMB(5)
+        .setUdfReaderMemoryBudgetInMB(5);
+    EnvFactory.getEnv().initBeforeClass();
+    createTimeSeries();
+    generateData();
+    registerUDF();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanAfterClass();
+    ConfigFactory.getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+    ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+    ConfigFactory.getConfig().setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+    ConfigFactory.getConfig()
+        .setUdfCollectorMemoryBudgetInMB(100)
+        .setUdfTransformerMemoryBudgetInMB(100)
+        .setUdfReaderMemoryBudgetInMB(100);
+  }
+
+  private static void createTimeSeries() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("SET STORAGE GROUP TO root.vehicle");
+      statement.execute("CREATE TIMESERIES root.vehicle.d1.s3 with datatype=INT32,encoding=PLAIN");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private static void generateData() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      for (int i = 0; i < ITERATION_TIMES; ++i) {
+        if (i == 5 || i == 6 || i == 7 || i == 51 || i == 52 || i == 53 || i == 54 || i == 9996
+            || i == 9997 || i == 9998) {
+          continue;
+        }
+        statement.execute(
+            (String.format("insert into root.vehicle.d1(timestamp,s3) values(%d,%d)", i, i)));
+      }
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private static void registerUDF() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute(
+          "create function window_start_end as 'org.apache.iotdb.db.query.udf.example.WindowStartEnd'");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private void testSessionTimeWindowSS(
+      String sessionGap, long[] windowStart, long[] windowEnd, Long displayBegin, Long displayEnd) {
+    String sql;
+    if (displayBegin == null) {
+      sql =
+          String.format(
+              "select window_start_end(s3, '%s'='%s', '%s'='%s') from root.vehicle.d1",
+              UDFTestConstant.ACCESS_STRATEGY_KEY,
+              UDFTestConstant.ACCESS_STRATEGY_SESSION,
+              UDFTestConstant.SESSION_GAP_KEY,
+              sessionGap);
+    } else {
+      sql =
+          String.format(
+              "select window_start_end(s3, '%s'='%s', '%s'='%s', '%s'='%s', '%s'='%s') from root.vehicle.d1",
+              UDFTestConstant.ACCESS_STRATEGY_KEY,
+              UDFTestConstant.ACCESS_STRATEGY_SESSION,
+              UDFTestConstant.DISPLAY_WINDOW_BEGIN_KEY,
+              displayBegin.longValue(),
+              UDFTestConstant.DISPLAY_WINDOW_END_KEY,
+              displayEnd.longValue(),
+              UDFTestConstant.SESSION_GAP_KEY,
+              sessionGap);
+    }
+
+    try (Connection conn = EnvFactory.getEnv().getConnection();
+        Statement statement = conn.createStatement();
+        ResultSet resultSet = statement.executeQuery(sql)) {
+      assertEquals(2, resultSet.getMetaData().getColumnCount());
+      for (int i = 0; i < windowStart.length; i++) {
+        resultSet.next();
+        Assert.assertEquals(resultSet.getLong(1), windowStart[i]);
+        Assert.assertEquals(resultSet.getLong(2), windowEnd[i]);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testSessionTimeWindowSS1() {
+    String sessionGap = "2";
+    long[] windowStart = new long[] {0, 8, 55, 9999};
+    long[] windowEnd = new long[] {4, 50, 9995, 9999};
+    testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, null, null);
+  }
+
+  @Test
+  public void testSessionTimeWindowSS2() {
+    String sessionGap = "5";
+    long[] windowStart = new long[] {0, 55};
+    long[] windowEnd = new long[] {50, 9999};
+    testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, null, null);
+  }
+
+  @Test
+  public void testSessionTimeWindowSS3() {
+    String sessionGap = "6";
+    long[] windowStart = new long[] {0};
+    long[] windowEnd = new long[] {9999};
+    testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, null, null);
+  }
+
+  @Test
+  public void testSessionTimeWindowSS4() {
+    String sessionGap = "2";
+    Long displayBegin = 1L;
+    Long displayEnd = 9993L;
+    long[] windowStart = new long[] {1, 8, 55};
+    long[] windowEnd = new long[] {4, 50, 9992};
+    testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, displayBegin, displayEnd);
+  }
+
+  @Test
+  public void testSessionTimeWindowSS5() {
+    String sessionGap = "5";
+    Long displayBegin = 43L;
+    Long displayEnd = 100L;
+    long[] windowStart = new long[] {43, 55};
+    long[] windowEnd = new long[] {50, 99};
+    testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, displayBegin, displayEnd);
+  }
+
+  @Test
+  public void testSessionTimeWindowSS6() {
+    String sessionGap = "1";
+    Long displayBegin = 2L;
+    Long displayEnd = 20000L;
+    ArrayList<Long> windowStart = new ArrayList<>();
+    ArrayList<Long> windowEnd = new ArrayList<>();
+    for (long i = displayBegin; i <= 9999; i++) {
+      if (i == 5 || i == 6 || i == 7 || i == 51 || i == 52 || i == 53 || i == 54 || i == 9996
+          || i == 9997 || i == 9998) {
+        continue;
+      }
+      windowStart.add(i);
+      windowEnd.add(i);
+    }
+    testSessionTimeWindowSS(
+        sessionGap,
+        windowStart.stream().mapToLong(t -> t.longValue()).toArray(),
+        windowEnd.stream().mapToLong(t -> t.longValue()).toArray(),

Review Comment:
   I've changed it.



##########
integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFSessionWindowQueryIT.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.udf;
+
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.constant.UDFTestConstant;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBUDFSessionWindowQueryIT {
+
+  protected static final int ITERATION_TIMES = 10000;
+
+  protected static boolean enableSeqSpaceCompaction;
+  protected static boolean enableUnseqSpaceCompaction;
+  protected static boolean enableCrossSpaceCompaction;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    enableSeqSpaceCompaction = ConfigFactory.getConfig().isEnableSeqSpaceCompaction();
+    enableUnseqSpaceCompaction = ConfigFactory.getConfig().isEnableUnseqSpaceCompaction();
+    enableCrossSpaceCompaction = ConfigFactory.getConfig().isEnableCrossSpaceCompaction();
+    ConfigFactory.getConfig().setEnableSeqSpaceCompaction(false);
+    ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(false);
+    ConfigFactory.getConfig().setEnableCrossSpaceCompaction(false);
+    ConfigFactory.getConfig()
+        .setUdfCollectorMemoryBudgetInMB(5)
+        .setUdfTransformerMemoryBudgetInMB(5)
+        .setUdfReaderMemoryBudgetInMB(5);
+    EnvFactory.getEnv().initBeforeClass();
+    createTimeSeries();
+    generateData();
+    registerUDF();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanAfterClass();
+    ConfigFactory.getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+    ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+    ConfigFactory.getConfig().setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+    ConfigFactory.getConfig()
+        .setUdfCollectorMemoryBudgetInMB(100)
+        .setUdfTransformerMemoryBudgetInMB(100)
+        .setUdfReaderMemoryBudgetInMB(100);
+  }
+
+  private static void createTimeSeries() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("SET STORAGE GROUP TO root.vehicle");
+      statement.execute("CREATE TIMESERIES root.vehicle.d1.s3 with datatype=INT32,encoding=PLAIN");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private static void generateData() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      for (int i = 0; i < ITERATION_TIMES; ++i) {
+        if (i == 5 || i == 6 || i == 7 || i == 51 || i == 52 || i == 53 || i == 54 || i == 9996
+            || i == 9997 || i == 9998) {
+          continue;
+        }
+        statement.execute(
+            (String.format("insert into root.vehicle.d1(timestamp,s3) values(%d,%d)", i, i)));
+      }
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private static void registerUDF() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute(
+          "create function window_start_end as 'org.apache.iotdb.db.query.udf.example.WindowStartEnd'");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private void testSessionTimeWindowSS(
+      String sessionGap, long[] windowStart, long[] windowEnd, Long displayBegin, Long displayEnd) {
+    String sql;
+    if (displayBegin == null) {
+      sql =
+          String.format(
+              "select window_start_end(s3, '%s'='%s', '%s'='%s') from root.vehicle.d1",
+              UDFTestConstant.ACCESS_STRATEGY_KEY,
+              UDFTestConstant.ACCESS_STRATEGY_SESSION,
+              UDFTestConstant.SESSION_GAP_KEY,
+              sessionGap);
+    } else {
+      sql =
+          String.format(
+              "select window_start_end(s3, '%s'='%s', '%s'='%s', '%s'='%s', '%s'='%s') from root.vehicle.d1",
+              UDFTestConstant.ACCESS_STRATEGY_KEY,
+              UDFTestConstant.ACCESS_STRATEGY_SESSION,
+              UDFTestConstant.DISPLAY_WINDOW_BEGIN_KEY,
+              displayBegin.longValue(),
+              UDFTestConstant.DISPLAY_WINDOW_END_KEY,
+              displayEnd.longValue(),
+              UDFTestConstant.SESSION_GAP_KEY,
+              sessionGap);
+    }
+
+    try (Connection conn = EnvFactory.getEnv().getConnection();
+        Statement statement = conn.createStatement();
+        ResultSet resultSet = statement.executeQuery(sql)) {
+      assertEquals(2, resultSet.getMetaData().getColumnCount());
+      for (int i = 0; i < windowStart.length; i++) {
+        resultSet.next();
+        Assert.assertEquals(resultSet.getLong(1), windowStart[i]);
+        Assert.assertEquals(resultSet.getLong(2), windowEnd[i]);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testSessionTimeWindowSS1() {
+    String sessionGap = "2";
+    long[] windowStart = new long[] {0, 8, 55, 9999};
+    long[] windowEnd = new long[] {4, 50, 9995, 9999};
+    testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, null, null);
+  }
+
+  @Test
+  public void testSessionTimeWindowSS2() {
+    String sessionGap = "5";
+    long[] windowStart = new long[] {0, 55};
+    long[] windowEnd = new long[] {50, 9999};
+    testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, null, null);
+  }
+
+  @Test
+  public void testSessionTimeWindowSS3() {
+    String sessionGap = "6";
+    long[] windowStart = new long[] {0};
+    long[] windowEnd = new long[] {9999};
+    testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, null, null);
+  }
+
+  @Test
+  public void testSessionTimeWindowSS4() {
+    String sessionGap = "2";
+    Long displayBegin = 1L;
+    Long displayEnd = 9993L;
+    long[] windowStart = new long[] {1, 8, 55};
+    long[] windowEnd = new long[] {4, 50, 9992};
+    testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, displayBegin, displayEnd);
+  }
+
+  @Test
+  public void testSessionTimeWindowSS5() {
+    String sessionGap = "5";
+    Long displayBegin = 43L;
+    Long displayEnd = 100L;
+    long[] windowStart = new long[] {43, 55};
+    long[] windowEnd = new long[] {50, 99};
+    testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, displayBegin, displayEnd);
+  }
+
+  @Test
+  public void testSessionTimeWindowSS6() {
+    String sessionGap = "1";
+    Long displayBegin = 2L;
+    Long displayEnd = 20000L;
+    ArrayList<Long> windowStart = new ArrayList<>();
+    ArrayList<Long> windowEnd = new ArrayList<>();
+    for (long i = displayBegin; i <= 9999; i++) {
+      if (i == 5 || i == 6 || i == 7 || i == 51 || i == 52 || i == 53 || i == 54 || i == 9996
+          || i == 9997 || i == 9998) {
+        continue;
+      }
+      windowStart.add(i);
+      windowEnd.add(i);
+    }
+    testSessionTimeWindowSS(
+        sessionGap,
+        windowStart.stream().mapToLong(t -> t.longValue()).toArray(),
+        windowEnd.stream().mapToLong(t -> t.longValue()).toArray(),
+        displayBegin,
+        displayEnd);
+  }
+
+  private void testSessionTimeWindowSSOutOfRange(
+      String sessionGap, Long displayBegin, Long displayEnd) {
+    String sql;
+    if (displayBegin == null) {
+      sql =
+          String.format(
+              "select window_start_end(s3, '%s'='%s', '%s'='%s') from root.vehicle.d1",
+              UDFTestConstant.ACCESS_STRATEGY_KEY,
+              UDFTestConstant.ACCESS_STRATEGY_SESSION,
+              UDFTestConstant.SESSION_GAP_KEY,
+              sessionGap);
+    } else {
+      sql =
+          String.format(
+              "select window_start_end(s3, '%s'='%s', '%s'='%s', '%s'='%s', '%s'='%s') from root.vehicle.d1",
+              UDFTestConstant.ACCESS_STRATEGY_KEY,
+              UDFTestConstant.ACCESS_STRATEGY_SESSION,
+              UDFTestConstant.DISPLAY_WINDOW_BEGIN_KEY,
+              displayBegin.longValue(),
+              UDFTestConstant.DISPLAY_WINDOW_END_KEY,
+              displayEnd.longValue(),

Review Comment:
   I've changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] ZhanGHanG9991 commented on a diff in pull request #6928: [IOTDB-4073] Add SessionTimeWindowAccessStrategy in UDF

Posted by GitBox <gi...@apache.org>.
ZhanGHanG9991 commented on code in PR #6928:
URL: https://github.com/apache/iotdb/pull/6928#discussion_r945428671


##########
server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java:
##########
@@ -653,4 +654,122 @@ public RowWindow currentWindow() {
       }
     };
   }
+
+  @Override
+  protected LayerRowWindowReader constructRowSessionTimeWindowReader(
+      SessionTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
+      throws QueryProcessException {
+    final long displayWindowBegin = strategy.getDisplayWindowBegin();
+    final long displayWindowEnd = strategy.getDisplayWindowEnd();
+    final long sessionTimeGap = strategy.getSessionTimeGap();
+
+    final IUDFInputDataSet udfInputDataSet = this;
+    final ElasticSerializableRowRecordList rowRecordList =
+        new ElasticSerializableRowRecordList(
+            dataTypes, queryId, memoryBudgetInMB, CACHE_BLOCK_SIZE);
+    final ElasticSerializableRowRecordListBackedMultiColumnWindow window =
+        new ElasticSerializableRowRecordListBackedMultiColumnWindow(rowRecordList);
+
+    return new LayerRowWindowReader() {
+
+      private boolean isFirstIteration = true;
+      private boolean hasAtLeastOneRow = false;
+
+      private long nextWindowTimeBegin = displayWindowBegin;
+      private long nextWindowTimeEnd = 0;
+      private int nextIndexBegin = 0;
+      private int nextIndexEnd = 1;
+
+      @Override
+      public YieldableState yield() throws IOException, QueryProcessException {
+        if (isFirstIteration) {
+          if (rowRecordList.size() == 0) {
+            final YieldableState yieldableState =
+                LayerCacheUtils.yieldRow(udfInputDataSet, rowRecordList);
+            if (yieldableState != YieldableState.YIELDABLE) {
+              return yieldableState;
+            }
+          }
+          nextWindowTimeBegin = Math.max(displayWindowBegin, rowRecordList.getTime(0));
+          hasAtLeastOneRow = rowRecordList.size() != 0;
+          isFirstIteration = false;
+        }
+
+        if (!hasAtLeastOneRow || displayWindowEnd <= nextWindowTimeBegin) {
+          return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+        }
+
+        while (rowRecordList.getTime(rowRecordList.size() - 1) < displayWindowEnd) {
+          final YieldableState yieldableState =
+              LayerCacheUtils.yieldRow(udfInputDataSet, rowRecordList);
+          if (yieldableState == YieldableState.YIELDABLE) {
+            if (rowRecordList.getTime(rowRecordList.size() - 2) >= displayWindowBegin
+                && rowRecordList.getTime(rowRecordList.size() - 1)
+                        - rowRecordList.getTime(rowRecordList.size() - 2)
+                    >= sessionTimeGap) {
+              nextIndexEnd = rowRecordList.size() - 1;
+              break;
+            } else {
+              nextIndexEnd++;
+            }
+          } else if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
+            return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
+          } else if (yieldableState == YieldableState.NOT_YIELDABLE_NO_MORE_DATA) {
+            nextIndexEnd = rowRecordList.size();
+            break;
+          }
+        }
+
+        nextWindowTimeEnd = rowRecordList.getTime(nextIndexEnd - 1);
+
+        if (nextIndexBegin == nextIndexEnd) {
+          return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+        }
+
+        // Only if encounter user set the strategy's displayWindowBegin, which will go into the for
+        // loop to find the true index of the first window begin.
+        // For other situation, we will only go into if (nextWindowTimeBegin <= tvList.getTime(i))
+        // once.
+        for (int i = nextIndexBegin; i < rowRecordList.size(); ++i) {
+          if (nextWindowTimeBegin <= rowRecordList.getTime(i)) {
+            nextIndexBegin = i;
+            break;
+          }
+          // The first window's beginning time is greater than all the timestamp of the query result
+          // set
+          if (i == rowRecordList.size() - 1) {
+            return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+          }
+        }
+
+        window.seek(nextIndexBegin, nextIndexEnd, nextWindowTimeBegin, nextWindowTimeEnd);
+
+        return YieldableState.YIELDABLE;
+      }
+
+      @Override
+      public boolean next() throws IOException, QueryProcessException {
+        return false;
+      }
+
+      @Override
+      public void readyForNext() throws IOException, QueryProcessException {
+        if (nextIndexEnd < rowRecordList.size()) {
+          nextWindowTimeBegin = rowRecordList.getTime(nextIndexEnd);
+        }
+        rowRecordList.setEvictionUpperBound(nextIndexBegin + 1);
+        nextIndexBegin = nextIndexEnd;
+      }
+
+      @Override
+      public TSDataType[] getDataTypes() {
+        return new TSDataType[0];
+      }
+
+      @Override
+      public RowWindow currentWindow() {
+        return null;
+      }
+    };

Review Comment:
   I've corrected it. Thanks.



##########
server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java:
##########
@@ -653,4 +654,122 @@ public RowWindow currentWindow() {
       }
     };
   }
+
+  @Override
+  protected LayerRowWindowReader constructRowSessionTimeWindowReader(
+      SessionTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
+      throws QueryProcessException {
+    final long displayWindowBegin = strategy.getDisplayWindowBegin();
+    final long displayWindowEnd = strategy.getDisplayWindowEnd();
+    final long sessionTimeGap = strategy.getSessionTimeGap();
+
+    final IUDFInputDataSet udfInputDataSet = this;
+    final ElasticSerializableRowRecordList rowRecordList =
+        new ElasticSerializableRowRecordList(
+            dataTypes, queryId, memoryBudgetInMB, CACHE_BLOCK_SIZE);
+    final ElasticSerializableRowRecordListBackedMultiColumnWindow window =
+        new ElasticSerializableRowRecordListBackedMultiColumnWindow(rowRecordList);
+
+    return new LayerRowWindowReader() {
+
+      private boolean isFirstIteration = true;
+      private boolean hasAtLeastOneRow = false;
+
+      private long nextWindowTimeBegin = displayWindowBegin;
+      private long nextWindowTimeEnd = 0;
+      private int nextIndexBegin = 0;
+      private int nextIndexEnd = 1;
+
+      @Override
+      public YieldableState yield() throws IOException, QueryProcessException {
+        if (isFirstIteration) {
+          if (rowRecordList.size() == 0) {
+            final YieldableState yieldableState =
+                LayerCacheUtils.yieldRow(udfInputDataSet, rowRecordList);
+            if (yieldableState != YieldableState.YIELDABLE) {
+              return yieldableState;
+            }
+          }
+          nextWindowTimeBegin = Math.max(displayWindowBegin, rowRecordList.getTime(0));
+          hasAtLeastOneRow = rowRecordList.size() != 0;
+          isFirstIteration = false;
+        }
+
+        if (!hasAtLeastOneRow || displayWindowEnd <= nextWindowTimeBegin) {
+          return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+        }
+
+        while (rowRecordList.getTime(rowRecordList.size() - 1) < displayWindowEnd) {
+          final YieldableState yieldableState =
+              LayerCacheUtils.yieldRow(udfInputDataSet, rowRecordList);
+          if (yieldableState == YieldableState.YIELDABLE) {
+            if (rowRecordList.getTime(rowRecordList.size() - 2) >= displayWindowBegin
+                && rowRecordList.getTime(rowRecordList.size() - 1)
+                        - rowRecordList.getTime(rowRecordList.size() - 2)
+                    >= sessionTimeGap) {
+              nextIndexEnd = rowRecordList.size() - 1;
+              break;
+            } else {
+              nextIndexEnd++;
+            }
+          } else if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
+            return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
+          } else if (yieldableState == YieldableState.NOT_YIELDABLE_NO_MORE_DATA) {
+            nextIndexEnd = rowRecordList.size();
+            break;
+          }
+        }
+
+        nextWindowTimeEnd = rowRecordList.getTime(nextIndexEnd - 1);
+
+        if (nextIndexBegin == nextIndexEnd) {
+          return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+        }
+
+        // Only if encounter user set the strategy's displayWindowBegin, which will go into the for
+        // loop to find the true index of the first window begin.
+        // For other situation, we will only go into if (nextWindowTimeBegin <= tvList.getTime(i))
+        // once.
+        for (int i = nextIndexBegin; i < rowRecordList.size(); ++i) {
+          if (nextWindowTimeBegin <= rowRecordList.getTime(i)) {
+            nextIndexBegin = i;
+            break;
+          }
+          // The first window's beginning time is greater than all the timestamp of the query result
+          // set
+          if (i == rowRecordList.size() - 1) {
+            return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+          }
+        }
+
+        window.seek(nextIndexBegin, nextIndexEnd, nextWindowTimeBegin, nextWindowTimeEnd);
+
+        return YieldableState.YIELDABLE;
+      }
+
+      @Override
+      public boolean next() throws IOException, QueryProcessException {
+        return false;
+      }
+
+      @Override
+      public void readyForNext() throws IOException, QueryProcessException {
+        if (nextIndexEnd < rowRecordList.size()) {
+          nextWindowTimeBegin = rowRecordList.getTime(nextIndexEnd);
+        }
+        rowRecordList.setEvictionUpperBound(nextIndexBegin + 1);
+        nextIndexBegin = nextIndexEnd;
+      }
+
+      @Override
+      public TSDataType[] getDataTypes() {
+        return new TSDataType[0];
+      }

Review Comment:
   I've corrected it. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] ZhanGHanG9991 commented on a diff in pull request #6928: [IOTDB-4073] Add SessionTimeWindowAccessStrategy in UDF

Posted by GitBox <gi...@apache.org>.
ZhanGHanG9991 commented on code in PR #6928:
URL: https://github.com/apache/iotdb/pull/6928#discussion_r945429256


##########
integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFSessionWindowQueryIT.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.udf;
+
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.constant.UDFTestConstant;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBUDFSessionWindowQueryIT {
+
+  protected static final int ITERATION_TIMES = 10000;
+
+  protected static boolean enableSeqSpaceCompaction;
+  protected static boolean enableUnseqSpaceCompaction;
+  protected static boolean enableCrossSpaceCompaction;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    enableSeqSpaceCompaction = ConfigFactory.getConfig().isEnableSeqSpaceCompaction();
+    enableUnseqSpaceCompaction = ConfigFactory.getConfig().isEnableUnseqSpaceCompaction();
+    enableCrossSpaceCompaction = ConfigFactory.getConfig().isEnableCrossSpaceCompaction();
+    ConfigFactory.getConfig().setEnableSeqSpaceCompaction(false);
+    ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(false);
+    ConfigFactory.getConfig().setEnableCrossSpaceCompaction(false);
+    ConfigFactory.getConfig()
+        .setUdfCollectorMemoryBudgetInMB(5)
+        .setUdfTransformerMemoryBudgetInMB(5)
+        .setUdfReaderMemoryBudgetInMB(5);
+    EnvFactory.getEnv().initBeforeClass();
+    createTimeSeries();
+    generateData();
+    registerUDF();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanAfterClass();
+    ConfigFactory.getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+    ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+    ConfigFactory.getConfig().setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+    ConfigFactory.getConfig()
+        .setUdfCollectorMemoryBudgetInMB(100)
+        .setUdfTransformerMemoryBudgetInMB(100)
+        .setUdfReaderMemoryBudgetInMB(100);
+  }
+
+  private static void createTimeSeries() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("SET STORAGE GROUP TO root.vehicle");
+      statement.execute("CREATE TIMESERIES root.vehicle.d1.s3 with datatype=INT32,encoding=PLAIN");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private static void generateData() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      for (int i = 0; i < ITERATION_TIMES; ++i) {
+        if (i == 5 || i == 6 || i == 7 || i == 51 || i == 52 || i == 53 || i == 54 || i == 9996
+            || i == 9997 || i == 9998) {
+          continue;
+        }
+        statement.execute(
+            (String.format("insert into root.vehicle.d1(timestamp,s3) values(%d,%d)", i, i)));
+      }
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private static void registerUDF() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute(
+          "create function window_start_end as 'org.apache.iotdb.db.query.udf.example.WindowStartEnd'");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private void testSessionTimeWindowSS(
+      String sessionGap, long[] windowStart, long[] windowEnd, Long displayBegin, Long displayEnd) {
+    String sql;
+    if (displayBegin == null) {
+      sql =
+          String.format(
+              "select window_start_end(s3, '%s'='%s', '%s'='%s') from root.vehicle.d1",
+              UDFTestConstant.ACCESS_STRATEGY_KEY,
+              UDFTestConstant.ACCESS_STRATEGY_SESSION,
+              UDFTestConstant.SESSION_GAP_KEY,
+              sessionGap);
+    } else {
+      sql =
+          String.format(
+              "select window_start_end(s3, '%s'='%s', '%s'='%s', '%s'='%s', '%s'='%s') from root.vehicle.d1",
+              UDFTestConstant.ACCESS_STRATEGY_KEY,
+              UDFTestConstant.ACCESS_STRATEGY_SESSION,
+              UDFTestConstant.DISPLAY_WINDOW_BEGIN_KEY,
+              displayBegin.longValue(),
+              UDFTestConstant.DISPLAY_WINDOW_END_KEY,
+              displayEnd.longValue(),
+              UDFTestConstant.SESSION_GAP_KEY,
+              sessionGap);
+    }
+
+    try (Connection conn = EnvFactory.getEnv().getConnection();
+        Statement statement = conn.createStatement();
+        ResultSet resultSet = statement.executeQuery(sql)) {
+      assertEquals(2, resultSet.getMetaData().getColumnCount());
+      for (int i = 0; i < windowStart.length; i++) {
+        resultSet.next();
+        Assert.assertEquals(resultSet.getLong(1), windowStart[i]);
+        Assert.assertEquals(resultSet.getLong(2), windowEnd[i]);
+      }

Review Comment:
   I've changed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] ZhanGHanG9991 commented on a diff in pull request #6928: [IOTDB-4073] Add SessionTimeWindowAccessStrategy in UDF

Posted by GitBox <gi...@apache.org>.
ZhanGHanG9991 commented on code in PR #6928:
URL: https://github.com/apache/iotdb/pull/6928#discussion_r945428308


##########
server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java:
##########
@@ -542,4 +543,120 @@ public RowWindow currentWindow() {
       }
     };
   }
+
+  @Override
+  protected LayerRowWindowReader constructRowSessionTimeWindowReader(
+      SessionTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+    final long displayWindowBegin = strategy.getDisplayWindowBegin();
+    final long displayWindowEnd = strategy.getDisplayWindowEnd();
+    final long sessionTimeGap = strategy.getSessionTimeGap();
+
+    final SafetyPile safetyPile = safetyLine.addSafetyPile();
+    final ElasticSerializableTVListBackedSingleColumnWindow window =
+        new ElasticSerializableTVListBackedSingleColumnWindow(tvList);
+
+    return new LayerRowWindowReader() {
+
+      private boolean isFirstIteration = true;
+      private boolean hasAtLeastOneRow = false;
+
+      private long nextWindowTimeBegin = displayWindowBegin;
+      private long nextWindowTimeEnd = 0;
+      private int nextIndexBegin = 0;
+      private int nextIndexEnd = 1;
+
+      @Override
+      public YieldableState yield() throws IOException, QueryProcessException {
+        if (isFirstIteration) {
+          if (tvList.size() == 0) {
+            final YieldableState yieldableState =
+                LayerCacheUtils.yieldPoint(
+                    parentLayerPointReaderDataType, parentLayerPointReader, tvList);
+            if (yieldableState != YieldableState.YIELDABLE) {
+              return yieldableState;
+            }
+          }
+          nextWindowTimeBegin = Math.max(displayWindowBegin, tvList.getTime(0));
+          hasAtLeastOneRow = tvList.size() != 0;
+          isFirstIteration = false;
+        }
+
+        if (!hasAtLeastOneRow || displayWindowEnd <= nextWindowTimeBegin) {
+          return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+        }
+
+        while (tvList.getTime(tvList.size() - 1) < displayWindowEnd) {

Review Comment:
   The if statement below while loop has break. We needn't store all data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] lancelly commented on a diff in pull request #6928: [IOTDB-4073] Add SessionTimeWindowAccessStrategy in UDF

Posted by GitBox <gi...@apache.org>.
lancelly commented on code in PR #6928:
URL: https://github.com/apache/iotdb/pull/6928#discussion_r945399184


##########
server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java:
##########
@@ -653,4 +654,122 @@ public RowWindow currentWindow() {
       }
     };
   }
+
+  @Override
+  protected LayerRowWindowReader constructRowSessionTimeWindowReader(
+      SessionTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
+      throws QueryProcessException {
+    final long displayWindowBegin = strategy.getDisplayWindowBegin();
+    final long displayWindowEnd = strategy.getDisplayWindowEnd();
+    final long sessionTimeGap = strategy.getSessionTimeGap();
+
+    final IUDFInputDataSet udfInputDataSet = this;
+    final ElasticSerializableRowRecordList rowRecordList =
+        new ElasticSerializableRowRecordList(
+            dataTypes, queryId, memoryBudgetInMB, CACHE_BLOCK_SIZE);
+    final ElasticSerializableRowRecordListBackedMultiColumnWindow window =
+        new ElasticSerializableRowRecordListBackedMultiColumnWindow(rowRecordList);
+
+    return new LayerRowWindowReader() {
+
+      private boolean isFirstIteration = true;
+      private boolean hasAtLeastOneRow = false;
+
+      private long nextWindowTimeBegin = displayWindowBegin;
+      private long nextWindowTimeEnd = 0;
+      private int nextIndexBegin = 0;
+      private int nextIndexEnd = 1;
+
+      @Override
+      public YieldableState yield() throws IOException, QueryProcessException {
+        if (isFirstIteration) {
+          if (rowRecordList.size() == 0) {
+            final YieldableState yieldableState =
+                LayerCacheUtils.yieldRow(udfInputDataSet, rowRecordList);
+            if (yieldableState != YieldableState.YIELDABLE) {
+              return yieldableState;
+            }
+          }
+          nextWindowTimeBegin = Math.max(displayWindowBegin, rowRecordList.getTime(0));
+          hasAtLeastOneRow = rowRecordList.size() != 0;
+          isFirstIteration = false;
+        }
+
+        if (!hasAtLeastOneRow || displayWindowEnd <= nextWindowTimeBegin) {
+          return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+        }
+
+        while (rowRecordList.getTime(rowRecordList.size() - 1) < displayWindowEnd) {
+          final YieldableState yieldableState =
+              LayerCacheUtils.yieldRow(udfInputDataSet, rowRecordList);
+          if (yieldableState == YieldableState.YIELDABLE) {
+            if (rowRecordList.getTime(rowRecordList.size() - 2) >= displayWindowBegin
+                && rowRecordList.getTime(rowRecordList.size() - 1)
+                        - rowRecordList.getTime(rowRecordList.size() - 2)
+                    >= sessionTimeGap) {
+              nextIndexEnd = rowRecordList.size() - 1;
+              break;
+            } else {
+              nextIndexEnd++;
+            }
+          } else if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
+            return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
+          } else if (yieldableState == YieldableState.NOT_YIELDABLE_NO_MORE_DATA) {
+            nextIndexEnd = rowRecordList.size();
+            break;
+          }
+        }
+
+        nextWindowTimeEnd = rowRecordList.getTime(nextIndexEnd - 1);
+
+        if (nextIndexBegin == nextIndexEnd) {
+          return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+        }
+
+        // Only if encounter user set the strategy's displayWindowBegin, which will go into the for
+        // loop to find the true index of the first window begin.
+        // For other situation, we will only go into if (nextWindowTimeBegin <= tvList.getTime(i))
+        // once.
+        for (int i = nextIndexBegin; i < rowRecordList.size(); ++i) {
+          if (nextWindowTimeBegin <= rowRecordList.getTime(i)) {
+            nextIndexBegin = i;
+            break;
+          }
+          // The first window's beginning time is greater than all the timestamp of the query result
+          // set
+          if (i == rowRecordList.size() - 1) {
+            return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+          }
+        }
+
+        window.seek(nextIndexBegin, nextIndexEnd, nextWindowTimeBegin, nextWindowTimeEnd);
+
+        return YieldableState.YIELDABLE;
+      }
+
+      @Override
+      public boolean next() throws IOException, QueryProcessException {
+        return false;
+      }
+
+      @Override
+      public void readyForNext() throws IOException, QueryProcessException {
+        if (nextIndexEnd < rowRecordList.size()) {
+          nextWindowTimeBegin = rowRecordList.getTime(nextIndexEnd);
+        }
+        rowRecordList.setEvictionUpperBound(nextIndexBegin + 1);
+        nextIndexBegin = nextIndexEnd;
+      }
+
+      @Override
+      public TSDataType[] getDataTypes() {
+        return new TSDataType[0];
+      }
+
+      @Override
+      public RowWindow currentWindow() {
+        return null;
+      }
+    };

Review Comment:
   ```suggestion
         @Override
         public RowWindow currentWindow() {
           return window;
         }
       };
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] lancelly commented on a diff in pull request #6928: [IOTDB-4073] Add SessionTimeWindowAccessStrategy in UDF

Posted by GitBox <gi...@apache.org>.
lancelly commented on code in PR #6928:
URL: https://github.com/apache/iotdb/pull/6928#discussion_r945404311


##########
integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFSessionWindowQueryIT.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.udf;
+
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.constant.UDFTestConstant;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBUDFSessionWindowQueryIT {
+
+  protected static final int ITERATION_TIMES = 10000;
+
+  protected static boolean enableSeqSpaceCompaction;
+  protected static boolean enableUnseqSpaceCompaction;
+  protected static boolean enableCrossSpaceCompaction;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    enableSeqSpaceCompaction = ConfigFactory.getConfig().isEnableSeqSpaceCompaction();
+    enableUnseqSpaceCompaction = ConfigFactory.getConfig().isEnableUnseqSpaceCompaction();
+    enableCrossSpaceCompaction = ConfigFactory.getConfig().isEnableCrossSpaceCompaction();
+    ConfigFactory.getConfig().setEnableSeqSpaceCompaction(false);
+    ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(false);
+    ConfigFactory.getConfig().setEnableCrossSpaceCompaction(false);
+    ConfigFactory.getConfig()
+        .setUdfCollectorMemoryBudgetInMB(5)
+        .setUdfTransformerMemoryBudgetInMB(5)
+        .setUdfReaderMemoryBudgetInMB(5);
+    EnvFactory.getEnv().initBeforeClass();
+    createTimeSeries();
+    generateData();
+    registerUDF();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanAfterClass();
+    ConfigFactory.getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+    ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+    ConfigFactory.getConfig().setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+    ConfigFactory.getConfig()
+        .setUdfCollectorMemoryBudgetInMB(100)
+        .setUdfTransformerMemoryBudgetInMB(100)
+        .setUdfReaderMemoryBudgetInMB(100);
+  }
+
+  private static void createTimeSeries() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("SET STORAGE GROUP TO root.vehicle");
+      statement.execute("CREATE TIMESERIES root.vehicle.d1.s3 with datatype=INT32,encoding=PLAIN");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private static void generateData() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      for (int i = 0; i < ITERATION_TIMES; ++i) {
+        if (i == 5 || i == 6 || i == 7 || i == 51 || i == 52 || i == 53 || i == 54 || i == 9996
+            || i == 9997 || i == 9998) {
+          continue;
+        }
+        statement.execute(
+            (String.format("insert into root.vehicle.d1(timestamp,s3) values(%d,%d)", i, i)));
+      }
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private static void registerUDF() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute(
+          "create function window_start_end as 'org.apache.iotdb.db.query.udf.example.WindowStartEnd'");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private void testSessionTimeWindowSS(
+      String sessionGap, long[] windowStart, long[] windowEnd, Long displayBegin, Long displayEnd) {
+    String sql;
+    if (displayBegin == null) {
+      sql =
+          String.format(
+              "select window_start_end(s3, '%s'='%s', '%s'='%s') from root.vehicle.d1",
+              UDFTestConstant.ACCESS_STRATEGY_KEY,
+              UDFTestConstant.ACCESS_STRATEGY_SESSION,
+              UDFTestConstant.SESSION_GAP_KEY,
+              sessionGap);
+    } else {
+      sql =
+          String.format(
+              "select window_start_end(s3, '%s'='%s', '%s'='%s', '%s'='%s', '%s'='%s') from root.vehicle.d1",
+              UDFTestConstant.ACCESS_STRATEGY_KEY,
+              UDFTestConstant.ACCESS_STRATEGY_SESSION,
+              UDFTestConstant.DISPLAY_WINDOW_BEGIN_KEY,
+              displayBegin.longValue(),
+              UDFTestConstant.DISPLAY_WINDOW_END_KEY,
+              displayEnd.longValue(),
+              UDFTestConstant.SESSION_GAP_KEY,
+              sessionGap);
+    }
+
+    try (Connection conn = EnvFactory.getEnv().getConnection();
+        Statement statement = conn.createStatement();
+        ResultSet resultSet = statement.executeQuery(sql)) {
+      assertEquals(2, resultSet.getMetaData().getColumnCount());
+      for (int i = 0; i < windowStart.length; i++) {
+        resultSet.next();
+        Assert.assertEquals(resultSet.getLong(1), windowStart[i]);
+        Assert.assertEquals(resultSet.getLong(2), windowEnd[i]);
+      }

Review Comment:
   use while(resultSet.next()) and assert cnt instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] lancelly commented on a diff in pull request #6928: [IOTDB-4073] Add SessionTimeWindowAccessStrategy in UDF

Posted by GitBox <gi...@apache.org>.
lancelly commented on code in PR #6928:
URL: https://github.com/apache/iotdb/pull/6928#discussion_r945404122


##########
integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFSessionWindowQueryIT.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.udf;
+
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.constant.UDFTestConstant;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBUDFSessionWindowQueryIT {
+
+  protected static final int ITERATION_TIMES = 10000;

Review Comment:
   ITERATION_TIMES could be smaller.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] JackieTien97 merged pull request #6928: [IOTDB-4073] Add SessionTimeWindowAccessStrategy in UDF

Posted by GitBox <gi...@apache.org>.
JackieTien97 merged PR #6928:
URL: https://github.com/apache/iotdb/pull/6928


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org