You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by le...@apache.org on 2022/02/14 04:06:45 UTC

[iotdb] 11/32: update deleteTest

This is an automated email from the ASF dual-hosted git repository.

leirui pushed a commit to branch research/M4-visualization
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8ebfeafc92f87766f33e4ed3eae7e5f4964380f8
Author: Lei Rui <10...@qq.com>
AuthorDate: Thu Feb 3 16:54:50 2022 +0800

    update deleteTest
---
 README.md                                          |   4 +-
 .../iotdb/queryExp/QuerySyntheticData1VaryTqe.java | 110 ++++++++++++++
 .../iotdb/writeData/WriteSyntheticData1.java       |   5 +-
 .../writeData/WriteSyntheticDataWithDeletes.java   |  63 ++++++++
 .../iotdb/db/integration/m4/MyDebugTest1.java      | 163 +++++++++++++++++++++
 5 files changed, 340 insertions(+), 5 deletions(-)

diff --git a/README.md b/README.md
index 81c0d2f..4baf990 100644
--- a/README.md
+++ b/README.md
@@ -23,7 +23,7 @@ enable_unseq_compaction=false                           # unseq compaction is di
 
 **1.2 写数据程序:WriteSyntheticData1.java**
 
-时间点等间隔1ms、总时长10000s、值正态分布模拟、double类型的数据,使用默认压缩方法。
+时间点等间隔1ms、总时长10000s(从0到9999s的10000000个点)、值正态分布模拟、double类型的数据,使用默认压缩方法。
 无乱序,无删除。每个chunk默认参数是装10000个点,所以每个chunk就是10s的数据量。
 使用默认压缩方法,每个chunk大约70KB,所以一共是1000个10KB的chunks也就是总共70MB左右的一个顺序tsfile。
 
@@ -146,7 +146,7 @@ a=10
 for((i=0;i<a;i++)) do
     ./start-server.sh &
     sleep 3s
-    java -jar /data3/ruilei/rl/synData1_testspace/varyW/QuerySyntheticData1VaryW-0.12.4.jar $1 $2
+    java -jar /data3/ruilei/rl/synData1_testspace/vary_w/QuerySyntheticData1VaryW-0.12.4.jar $1 $2
     ./stop-server.sh
     echo 3 | sudo tee /proc/sys/vm/drop_caches
     sleep 3s
diff --git a/example/session/src/main/java/org/apache/iotdb/queryExp/QuerySyntheticData1VaryTqe.java b/example/session/src/main/java/org/apache/iotdb/queryExp/QuerySyntheticData1VaryTqe.java
new file mode 100644
index 0000000..baf1f02
--- /dev/null
+++ b/example/session/src/main/java/org/apache/iotdb/queryExp/QuerySyntheticData1VaryTqe.java
@@ -0,0 +1,110 @@
+package org.apache.iotdb.queryExp;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import org.apache.thrift.TException;
+
+/**
+ * !!!!!!!Before query data, make sure check the following server parameters:
+ *
+ * <p>system_dir=/data3/ruilei/iotdb-server-0.12.4/synData1/system
+ * data_dirs=/data3/ruilei/iotdb-server-0.12.4/synData1/data
+ * wal_dir=/data3/ruilei/iotdb-server-0.12.4/synData1/wal timestamp_precision=ms
+ * unseq_tsfile_size=1073741824 # maximum size of unseq TsFile is 1024^3 Bytes
+ * seq_tsfile_size=1073741824 # maximum size of seq TsFile is 1024^3 Bytes
+ * avg_series_point_number_threshold=10000 # each chunk contains 10000 data points
+ * compaction_strategy=NO_COMPACTION # compaction between levels is disabled
+ * enable_unseq_compaction=false # unseq compaction is disabled
+ */
+public class QuerySyntheticData1VaryTqe {
+
+  // * (1) min_time(%s), max_time(%s), first_value(%s), last_value(%s), min_value(%s), max_value(%s)
+  //       => Don't change the sequence of the above six aggregates!
+  // * (2) group by ([tqs,tqe),IntervalLength) => Make sure (tqe-tqs) is divisible by
+  // IntervalLength!
+  // * (3) NOTE the time unit of interval. Update for different datasets!!!!!!!!!!!
+  private static final String queryFormat =
+      "select min_time(%s), max_time(%s), first_value(%s), last_value(%s), min_value(%s), max_value(%s) "
+          + "from %s "
+          + "group by ([%d, %d), %dms)";
+
+  private static final String queryFormat_UDF =
+      "select M4(%1$s,'tqs'='%3$d','tqe'='%4$d','w'='%5$d') from %2$s where time>=%3$d and time<%4$d";
+
+  public static Session session;
+
+  public static void main(String[] args)
+      throws IoTDBConnectionException, StatementExecutionException, TException {
+    // fix parameters for synthetic data1
+    String measurement = "s0";
+    String device = "root.vehicle.d0";
+    // fixed interval num
+    int intervalNum = 20;
+    // fix tqs
+    long minTime = 0L;
+    // 实验自变量1:tqe
+    long maxTime = Long.parseLong(args[0]);
+    // 实验自变量2:方法
+    // 1: MAC, 2: MOC, 3: CPV
+    int approach = Integer.parseInt(args[1]);
+    if (approach != 1 && approach != 2 && approach != 3) {
+      throw new TException("Wrong input parameter approach!");
+    }
+    if (approach != 1) {
+      // MOC and CPV sql are the same sql: queryFormat.
+      // Set the server parameter in iotdb-engine.properties: enable_CPV=true for CPV, false for
+      // MOC.
+      if (approach == 2) { // MOC
+        System.out.println(
+            "MAKE SURE you have set the enable_CPV as false in `iotdb-engine.properties` for MOC!");
+      } else { // CPV
+        System.out.println(
+            "MAKE SURE you have set the enable_CPV as true in `iotdb-engine.properties` for CPV!");
+      }
+    }
+
+    session = new Session("127.0.0.1", 6667, "root", "root");
+    session.open(false);
+    session.setFetchSize(100000); // this is important. Set it big to avoid multiple fetch.
+
+    long interval = (long) Math.ceil((double) (maxTime - minTime) / intervalNum);
+    maxTime = minTime + interval * intervalNum;
+
+    String sql;
+    if (approach == 1) { // MAC UDF
+      sql =
+          String.format(queryFormat_UDF, measurement, device, minTime, maxTime, intervalNum); // MAC
+    } else {
+      // MOC and CPV sql use the same sql queryFormat.
+      sql =
+          String.format(
+              queryFormat,
+              measurement,
+              measurement,
+              measurement,
+              measurement,
+              measurement,
+              measurement,
+              device,
+              minTime,
+              maxTime,
+              interval);
+    }
+
+    SessionDataSet dataSet;
+    dataSet = session.executeQueryStatement(sql);
+    while (dataSet.hasNext()) {
+      RowRecord r = dataSet.next();
+    }
+    session.executeNonQueryStatement("clear cache");
+    dataSet = session.executeFinish();
+    String info = dataSet.getFinishResult();
+    System.out.println(info);
+    dataSet.closeOperationHandle();
+    session.close();
+  }
+}
diff --git a/example/session/src/main/java/org/apache/iotdb/writeData/WriteSyntheticData1.java b/example/session/src/main/java/org/apache/iotdb/writeData/WriteSyntheticData1.java
index 93d8bf5..e72acc3 100644
--- a/example/session/src/main/java/org/apache/iotdb/writeData/WriteSyntheticData1.java
+++ b/example/session/src/main/java/org/apache/iotdb/writeData/WriteSyntheticData1.java
@@ -5,7 +5,6 @@ import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.Session;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
-import java.io.IOException;
 import java.util.Collections;
 
 public class WriteSyntheticData1 {
@@ -27,12 +26,12 @@ public class WriteSyntheticData1 {
   public static final String measurements = "s0";
 
   public static void main(String[] args)
-      throws IOException, IoTDBConnectionException, StatementExecutionException {
+      throws IoTDBConnectionException, StatementExecutionException {
 
     Session session = new Session("127.0.0.1", 6667, "root", "root");
     session.open(false);
 
-    for (long timestamp = 1; timestamp <= 10000000; timestamp++) {
+    for (long timestamp = 0; timestamp < 10000000; timestamp++) {
       double value = Math.random();
       session.insertRecord(
           device,
diff --git a/example/session/src/main/java/org/apache/iotdb/writeData/WriteSyntheticDataWithDeletes.java b/example/session/src/main/java/org/apache/iotdb/writeData/WriteSyntheticDataWithDeletes.java
new file mode 100644
index 0000000..2a24c6d
--- /dev/null
+++ b/example/session/src/main/java/org/apache/iotdb/writeData/WriteSyntheticDataWithDeletes.java
@@ -0,0 +1,63 @@
+package org.apache.iotdb.writeData;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+public class WriteSyntheticDataWithDeletes {
+
+  /**
+   * !!!!!!!Before writing data, make sure check the following server parameters:
+   *
+   * <p>system_dir data_dirs wal_dir timestamp_precision=ms unseq_tsfile_size=1073741824 # maximum
+   * size of unseq TsFile is 1024^3 Bytes seq_tsfile_size=1073741824 # maximum size of seq TsFile is
+   * 1024^3 Bytes avg_series_point_number_threshold=10000 # each chunk contains 10000 data points
+   * compaction_strategy=NO_COMPACTION # compaction between levels is disabled
+   * enable_unseq_compaction=false # unseq compaction is disabled
+   */
+  public static final String device = "root.vehicle.d0";
+
+  public static final String measurements = "s0";
+
+  public static void main(String[] args)
+      throws IoTDBConnectionException, StatementExecutionException {
+    // 实验自变量1:每隔deleteFreq时间(本实验时间单位ms)就执行一次删除
+    long deleteFreq = Long.parseLong(args[0]);
+    // 实验自变量2:每次删除的时间长度
+    long deleteLen = Long.parseLong(args[1]);
+
+    Session session = new Session("127.0.0.1", 6667, "root", "root");
+    session.open(false);
+
+    List<String> paths = new ArrayList<>();
+    paths.add(device + "." + measurements);
+
+    for (long timestamp = 0; timestamp < 10000000; timestamp++) {
+      if (timestamp != 0 && timestamp % deleteFreq == 0) {
+        // [timestamp-deleteFreq, timestamp-1]内随机取一个删除时间起点
+        long min = timestamp - deleteFreq;
+        long max = timestamp - 1;
+        long deleteStartTime = min + (((long) (new Random().nextDouble() * (max - min))));
+        // delete length 5000ms, half the time length of a chunk
+        long deleteEndTime = deleteStartTime + deleteLen;
+        session.deleteData(paths, deleteStartTime, deleteEndTime);
+      }
+
+      double value = Math.random();
+      session.insertRecord(
+          device,
+          timestamp,
+          Collections.singletonList(measurements),
+          Collections.singletonList(TSDataType.DOUBLE),
+          value);
+    }
+    session.executeNonQueryStatement("flush");
+    session.close();
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyDebugTest1.java b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyDebugTest1.java
new file mode 100644
index 0000000..a36a912
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyDebugTest1.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.integration.m4;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Locale;
+
+import static org.junit.Assert.fail;
+
+public class MyDebugTest1 {
+
+  private static final String TIMESTAMP_STR = "Time";
+
+  private static String[] creationSqls =
+      new String[] {
+        "SET STORAGE GROUP TO root.vehicle.d0",
+        "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=DOUBLE, ENCODING=RLE",
+      };
+
+  private final String d0s0 = "root.vehicle.d0.s0";
+
+  private static final String insertTemplate =
+      "INSERT INTO root.vehicle.d0(timestamp,s0)" + " VALUES(%d,%f)";
+
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static boolean originalEnableCPV;
+  private static CompactionStrategy originalCompactionStrategy;
+  private static int originalAvgSeriesPointNumberThreshold;
+  private static long originalSeqTsFileSize;
+  private static long originalUnSeqTsFileSize;
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.envSetUp();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+
+    originalEnableCPV = config.isEnableCPV();
+    originalCompactionStrategy = config.getCompactionStrategy();
+    originalAvgSeriesPointNumberThreshold = config.getAvgSeriesPointNumberThreshold();
+    originalSeqTsFileSize = config.getSeqTsFileSize();
+    originalUnSeqTsFileSize = config.getUnSeqTsFileSize();
+
+    config.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+
+    config.setSeqTsFileSize(1024 * 1024 * 1024); // 1G
+    config.setUnSeqTsFileSize(1024 * 1024 * 1024); // 1G
+    config.setAvgSeriesPointNumberThreshold(10); // this step cannot be omitted
+
+    config.setEnableCPV(false);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+    config.setCompactionStrategy(originalCompactionStrategy);
+    config.setAvgSeriesPointNumberThreshold(originalAvgSeriesPointNumberThreshold);
+    config.setEnableCPV(originalEnableCPV);
+    config.setSeqTsFileSize(originalSeqTsFileSize);
+    config.setUnSeqTsFileSize(originalUnSeqTsFileSize);
+  }
+
+  @Test
+  public void test1() {
+    // 现象:为什么synData1写成[0,10000000)反而更奇怪了?想象中的MOC在m=1k的时候每个interval刚好一个chunk从而不需要load任何chunk的画面没有出现
+    prepareData1();
+
+    String[] res =
+        new String[] {
+          "0,1,20,5,20,5[1],30[10]",
+          "25,25,45,8,30,8[25],40[30]",
+          "50,52,54,8,18,8[52],18[54]",
+          "75,null,null,null,null,null,null"
+        };
+    try (Connection connection =
+            DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      boolean hasResultSet =
+          statement.execute(
+              "SELECT min_time(s0), max_time(s0), first_value(s0), last_value(s0), min_value(s0), max_value(s0)"
+                  + " FROM root.vehicle.d0 group by ([0,100),10ms)");
+
+      Assert.assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        int i = 0;
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR)
+                  + ","
+                  + resultSet.getString(String.format("min_time(%s)", d0s0))
+                  + ","
+                  + resultSet.getString(String.format("max_time(%s)", d0s0))
+                  + ","
+                  + resultSet.getString(String.format("first_value(%s)", d0s0))
+                  + ","
+                  + resultSet.getString(String.format("last_value(%s)", d0s0))
+                  + ","
+                  + resultSet.getString(String.format("min_value(%s)", d0s0))
+                  + ","
+                  + resultSet.getString(String.format("max_value(%s)", d0s0));
+          System.out.println(ans);
+          //          Assert.assertEquals(res[i++], ans);
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  private static void prepareData1() {
+    // data:
+    // https://user-images.githubusercontent.com/33376433/151985070-73158010-8ba0-409d-a1c1-df69bad1aaee.png
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      for (String sql : creationSqls) {
+        statement.execute(sql);
+      }
+
+      for (int i = 0; i < 100; i++) {
+        statement.execute(String.format(Locale.ENGLISH, insertTemplate, i, Math.random()));
+      }
+
+      statement.execute("FLUSH");
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+}