You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by le...@apache.org on 2022/02/14 04:06:45 UTC
[iotdb] 11/32: update deleteTest
This is an automated email from the ASF dual-hosted git repository.
leirui pushed a commit to branch research/M4-visualization
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8ebfeafc92f87766f33e4ed3eae7e5f4964380f8
Author: Lei Rui <10...@qq.com>
AuthorDate: Thu Feb 3 16:54:50 2022 +0800
update deleteTest
---
README.md | 4 +-
.../iotdb/queryExp/QuerySyntheticData1VaryTqe.java | 110 ++++++++++++++
.../iotdb/writeData/WriteSyntheticData1.java | 5 +-
.../writeData/WriteSyntheticDataWithDeletes.java | 63 ++++++++
.../iotdb/db/integration/m4/MyDebugTest1.java | 163 +++++++++++++++++++++
5 files changed, 340 insertions(+), 5 deletions(-)
diff --git a/README.md b/README.md
index 81c0d2f..4baf990 100644
--- a/README.md
+++ b/README.md
@@ -23,7 +23,7 @@ enable_unseq_compaction=false # unseq compaction is di
**1.2 写数据程序:WriteSyntheticData1.java**
-时间点等间隔1ms、总时长10000s、值正态分布模拟、double类型的数据,使用默认压缩方法。
+时间点等间隔1ms、总时长10000s(从0到9999s的10000000个点)、值正态分布模拟、double类型的数据,使用默认压缩方法。
无乱序,无删除。每个chunk默认参数是装10000个点,所以每个chunk就是10s的数据量。
使用默认压缩方法,每个chunk大约70KB,所以一共是1000个10KB的chunks也就是总共70MB左右的一个顺序tsfile。
@@ -146,7 +146,7 @@ a=10
for((i=0;i<a;i++)) do
./start-server.sh &
sleep 3s
- java -jar /data3/ruilei/rl/synData1_testspace/varyW/QuerySyntheticData1VaryW-0.12.4.jar $1 $2
+ java -jar /data3/ruilei/rl/synData1_testspace/vary_w/QuerySyntheticData1VaryW-0.12.4.jar $1 $2
./stop-server.sh
echo 3 | sudo tee /proc/sys/vm/drop_caches
sleep 3s
diff --git a/example/session/src/main/java/org/apache/iotdb/queryExp/QuerySyntheticData1VaryTqe.java b/example/session/src/main/java/org/apache/iotdb/queryExp/QuerySyntheticData1VaryTqe.java
new file mode 100644
index 0000000..baf1f02
--- /dev/null
+++ b/example/session/src/main/java/org/apache/iotdb/queryExp/QuerySyntheticData1VaryTqe.java
@@ -0,0 +1,110 @@
+package org.apache.iotdb.queryExp;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import org.apache.thrift.TException;
+
+/**
+ * !!!!!!!Before query data, make sure check the following server parameters:
+ *
+ * <p>system_dir=/data3/ruilei/iotdb-server-0.12.4/synData1/system
+ * data_dirs=/data3/ruilei/iotdb-server-0.12.4/synData1/data
+ * wal_dir=/data3/ruilei/iotdb-server-0.12.4/synData1/wal timestamp_precision=ms
+ * unseq_tsfile_size=1073741824 # maximum size of unseq TsFile is 1024^3 Bytes
+ * seq_tsfile_size=1073741824 # maximum size of seq TsFile is 1024^3 Bytes
+ * avg_series_point_number_threshold=10000 # each chunk contains 10000 data points
+ * compaction_strategy=NO_COMPACTION # compaction between levels is disabled
+ * enable_unseq_compaction=false # unseq compaction is disabled
+ */
+public class QuerySyntheticData1VaryTqe {
+
+ // * (1) min_time(%s), max_time(%s), first_value(%s), last_value(%s), min_value(%s), max_value(%s)
+ // => Don't change the sequence of the above six aggregates!
+ // * (2) group by ([tqs,tqe),IntervalLength) => Make sure (tqe-tqs) is divisible by
+ // IntervalLength!
+ // * (3) NOTE the time unit of interval. Update for different datasets!!!!!!!!!!!
+ private static final String queryFormat =
+ "select min_time(%s), max_time(%s), first_value(%s), last_value(%s), min_value(%s), max_value(%s) "
+ + "from %s "
+ + "group by ([%d, %d), %dms)";
+
+ private static final String queryFormat_UDF =
+ "select M4(%1$s,'tqs'='%3$d','tqe'='%4$d','w'='%5$d') from %2$s where time>=%3$d and time<%4$d";
+
+ public static Session session;
+
+ public static void main(String[] args)
+ throws IoTDBConnectionException, StatementExecutionException, TException {
+ // fix parameters for synthetic data1
+ String measurement = "s0";
+ String device = "root.vehicle.d0";
+ // fixed interval num
+ int intervalNum = 20;
+ // fix tqs
+ long minTime = 0L;
+ // 实验自变量1:tqe
+ long maxTime = Long.parseLong(args[0]);
+ // 实验自变量2:方法
+ // 1: MAC, 2: MOC, 3: CPV
+ int approach = Integer.parseInt(args[1]);
+ if (approach != 1 && approach != 2 && approach != 3) {
+ throw new TException("Wrong input parameter approach!");
+ }
+ if (approach != 1) {
+ // MOC and CPV sql are the same sql: queryFormat.
+ // Set the server parameter in iotdb-engine.properties: enable_CPV=true for CPV, false for
+ // MOC.
+ if (approach == 2) { // MOC
+ System.out.println(
+ "MAKE SURE you have set the enable_CPV as false in `iotdb-engine.properties` for MOC!");
+ } else { // CPV
+ System.out.println(
+ "MAKE SURE you have set the enable_CPV as true in `iotdb-engine.properties` for CPV!");
+ }
+ }
+
+ session = new Session("127.0.0.1", 6667, "root", "root");
+ session.open(false);
+ session.setFetchSize(100000); // this is important. Set it big to avoid multiple fetch.
+
+ long interval = (long) Math.ceil((double) (maxTime - minTime) / intervalNum);
+ maxTime = minTime + interval * intervalNum;
+
+ String sql;
+ if (approach == 1) { // MAC UDF
+ sql =
+ String.format(queryFormat_UDF, measurement, device, minTime, maxTime, intervalNum); // MAC
+ } else {
+ // MOC and CPV sql use the same sql queryFormat.
+ sql =
+ String.format(
+ queryFormat,
+ measurement,
+ measurement,
+ measurement,
+ measurement,
+ measurement,
+ measurement,
+ device,
+ minTime,
+ maxTime,
+ interval);
+ }
+
+ SessionDataSet dataSet;
+ dataSet = session.executeQueryStatement(sql);
+ while (dataSet.hasNext()) {
+ RowRecord r = dataSet.next();
+ }
+ session.executeNonQueryStatement("clear cache");
+ dataSet = session.executeFinish();
+ String info = dataSet.getFinishResult();
+ System.out.println(info);
+ dataSet.closeOperationHandle();
+ session.close();
+ }
+}
diff --git a/example/session/src/main/java/org/apache/iotdb/writeData/WriteSyntheticData1.java b/example/session/src/main/java/org/apache/iotdb/writeData/WriteSyntheticData1.java
index 93d8bf5..e72acc3 100644
--- a/example/session/src/main/java/org/apache/iotdb/writeData/WriteSyntheticData1.java
+++ b/example/session/src/main/java/org/apache/iotdb/writeData/WriteSyntheticData1.java
@@ -5,7 +5,6 @@ import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import java.io.IOException;
import java.util.Collections;
public class WriteSyntheticData1 {
@@ -27,12 +26,12 @@ public class WriteSyntheticData1 {
public static final String measurements = "s0";
public static void main(String[] args)
- throws IOException, IoTDBConnectionException, StatementExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
Session session = new Session("127.0.0.1", 6667, "root", "root");
session.open(false);
- for (long timestamp = 1; timestamp <= 10000000; timestamp++) {
+ for (long timestamp = 0; timestamp < 10000000; timestamp++) {
double value = Math.random();
session.insertRecord(
device,
diff --git a/example/session/src/main/java/org/apache/iotdb/writeData/WriteSyntheticDataWithDeletes.java b/example/session/src/main/java/org/apache/iotdb/writeData/WriteSyntheticDataWithDeletes.java
new file mode 100644
index 0000000..2a24c6d
--- /dev/null
+++ b/example/session/src/main/java/org/apache/iotdb/writeData/WriteSyntheticDataWithDeletes.java
@@ -0,0 +1,63 @@
+package org.apache.iotdb.writeData;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+public class WriteSyntheticDataWithDeletes {
+
+ /**
+ * !!!!!!!Before writing data, make sure check the following server parameters:
+ *
+ * <p>system_dir data_dirs wal_dir timestamp_precision=ms unseq_tsfile_size=1073741824 # maximum
+ * size of unseq TsFile is 1024^3 Bytes seq_tsfile_size=1073741824 # maximum size of seq TsFile is
+ * 1024^3 Bytes avg_series_point_number_threshold=10000 # each chunk contains 10000 data points
+ * compaction_strategy=NO_COMPACTION # compaction between levels is disabled
+ * enable_unseq_compaction=false # unseq compaction is disabled
+ */
+ public static final String device = "root.vehicle.d0";
+
+ public static final String measurements = "s0";
+
+ public static void main(String[] args)
+ throws IoTDBConnectionException, StatementExecutionException {
+ // 实验自变量1:每隔deleteFreq时间(本实验时间单位ms)就执行一次删除
+ long deleteFreq = Long.parseLong(args[0]);
+ // 实验自变量2:每次删除的时间长度
+ long deleteLen = Long.parseLong(args[1]);
+
+ Session session = new Session("127.0.0.1", 6667, "root", "root");
+ session.open(false);
+
+ List<String> paths = new ArrayList<>();
+ paths.add(device + "." + measurements);
+
+ for (long timestamp = 0; timestamp < 10000000; timestamp++) {
+ if (timestamp != 0 && timestamp % deleteFreq == 0) {
+ // [timestamp-deleteFreq, timestamp-1]内随机取一个删除时间起点
+ long min = timestamp - deleteFreq;
+ long max = timestamp - 1;
+ long deleteStartTime = min + (((long) (new Random().nextDouble() * (max - min))));
+ // delete length 5000ms, half the time length of a chunk
+ long deleteEndTime = deleteStartTime + deleteLen;
+ session.deleteData(paths, deleteStartTime, deleteEndTime);
+ }
+
+ double value = Math.random();
+ session.insertRecord(
+ device,
+ timestamp,
+ Collections.singletonList(measurements),
+ Collections.singletonList(TSDataType.DOUBLE),
+ value);
+ }
+ session.executeNonQueryStatement("flush");
+ session.close();
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyDebugTest1.java b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyDebugTest1.java
new file mode 100644
index 0000000..a36a912
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyDebugTest1.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.integration.m4;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Locale;
+
+import static org.junit.Assert.fail;
+
+public class MyDebugTest1 {
+
+ private static final String TIMESTAMP_STR = "Time";
+
+ private static String[] creationSqls =
+ new String[] {
+ "SET STORAGE GROUP TO root.vehicle.d0",
+ "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=DOUBLE, ENCODING=RLE",
+ };
+
+ private final String d0s0 = "root.vehicle.d0.s0";
+
+ private static final String insertTemplate =
+ "INSERT INTO root.vehicle.d0(timestamp,s0)" + " VALUES(%d,%f)";
+
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private static boolean originalEnableCPV;
+ private static CompactionStrategy originalCompactionStrategy;
+ private static int originalAvgSeriesPointNumberThreshold;
+ private static long originalSeqTsFileSize;
+ private static long originalUnSeqTsFileSize;
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.envSetUp();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+
+ originalEnableCPV = config.isEnableCPV();
+ originalCompactionStrategy = config.getCompactionStrategy();
+ originalAvgSeriesPointNumberThreshold = config.getAvgSeriesPointNumberThreshold();
+ originalSeqTsFileSize = config.getSeqTsFileSize();
+ originalUnSeqTsFileSize = config.getUnSeqTsFileSize();
+
+ config.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+
+ config.setSeqTsFileSize(1024 * 1024 * 1024); // 1G
+ config.setUnSeqTsFileSize(1024 * 1024 * 1024); // 1G
+ config.setAvgSeriesPointNumberThreshold(10); // this step cannot be omitted
+
+ config.setEnableCPV(false);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ config.setCompactionStrategy(originalCompactionStrategy);
+ config.setAvgSeriesPointNumberThreshold(originalAvgSeriesPointNumberThreshold);
+ config.setEnableCPV(originalEnableCPV);
+ config.setSeqTsFileSize(originalSeqTsFileSize);
+ config.setUnSeqTsFileSize(originalUnSeqTsFileSize);
+ }
+
+ @Test
+ public void test1() {
+ // 现象:为什么synData1写成[0,10000000)反而更奇怪了?想象中的MOC在m=1k的时候每个interval刚好一个chunk从而不需要load任何chunk的画面没有出现
+ prepareData1();
+
+ String[] res =
+ new String[] {
+ "0,1,20,5,20,5[1],30[10]",
+ "25,25,45,8,30,8[25],40[30]",
+ "50,52,54,8,18,8[52],18[54]",
+ "75,null,null,null,null,null,null"
+ };
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "SELECT min_time(s0), max_time(s0), first_value(s0), last_value(s0), min_value(s0), max_value(s0)"
+ + " FROM root.vehicle.d0 group by ([0,100),10ms)");
+
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ int i = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(String.format("min_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("first_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("last_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("min_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_value(%s)", d0s0));
+ System.out.println(ans);
+ // Assert.assertEquals(res[i++], ans);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static void prepareData1() {
+ // data:
+ // https://user-images.githubusercontent.com/33376433/151985070-73158010-8ba0-409d-a1c1-df69bad1aaee.png
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ for (String sql : creationSqls) {
+ statement.execute(sql);
+ }
+
+ for (int i = 0; i < 100; i++) {
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, i, Math.random()));
+ }
+
+ statement.execute("FLUSH");
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}