You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/01/17 07:48:04 UTC
[iotdb] branch research/LSM-quantile updated: Add quantile summary for tsfile (#8889)
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch research/LSM-quantile
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/research/LSM-quantile by this push:
new afe8979ffe Add quantile summary for tsfile (#8889)
afe8979ffe is described below
commit afe8979ffe04b29f309226bfe3e2f1ccae9236e1
Author: czllgzmzl <32...@users.noreply.github.com>
AuthorDate: Tue Jan 17 15:47:59 2023 +0800
Add quantile summary for tsfile (#8889)
---
client-py/SessionSyn.py | 44 ++-
client-py/SessionSynUnseq.py | 290 ++++++++++++++++++
.../resources/conf/iotdb-engine.properties | 31 +-
server/src/assembly/resources/conf/iotdb-env.bat | 2 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 22 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 35 +++
.../db/engine/cache/TimeSeriesMetadataCache.java | 23 +-
.../apache/iotdb/db/qp/constant/SQLConstant.java | 6 +-
.../qp/logical/crud/AggregationQueryOperator.java | 4 +
.../db/query/aggregation/AggregationType.java | 14 +-
.../db/query/aggregation/impl/CountAggrResult.java | 8 +-
...grResult.java => DDSketchSingleAggrResult.java} | 132 ++++-----
.../db/query/aggregation/impl/KLLDebugResult.java | 3 +-
...esult.java => KLLStatChunkAvailAggrResult.java} | 69 +++--
.../impl/KLLStatDebugFullReadingAggrResult.java | 3 +-
.../impl/KLLStatDebugPageDemandRateAggrResult.java | 3 +-
.../aggregation/impl/KLLStatMedianAggrResult.java | 6 +-
.../impl/KLLStatOverlapSingleAggrResult.java | 3 +-
.../aggregation/impl/KLLStatSingleAggrResult.java | 6 +-
.../impl/KLLStatSingleReadAggrResult.java | 167 +++++++----
.../impl/StrictKLLStatSingleAggrResult.java | 96 +++---
.../db/query/executor/AggregationExecutor.java | 32 +-
.../db/query/factory/AggregateResultFactory.java | 12 +
.../query/reader/series/SeriesAggregateReader.java | 19 ++
.../iotdb/db/query/reader/series/SeriesReader.java | 257 +++++++++++++---
.../org/apache/iotdb/db/utils/SchemaUtils.java | 2 +
.../apache/iotdb/db/utils/TypeInferenceUtils.java | 2 +
.../org/apache/iotdb/session/InsertCsvDataIT.java | 173 ++++++++---
.../org/apache/iotdb/session/InsertDataIT.java | 6 +-
.../apache/iotdb/session/InsertLatencyDataIT.java | 181 +++++++++---
.../{InsertDataIT.java => InsertUnseqDataIT.java} | 18 +-
.../iotdb/session/InsertUnseqLatencyDataIT.java | 327 +++++++++++++++++++++
.../java/org/apache/iotdb/session/QueryLSMIT.java | 153 ++++++++++
.../org/apache/iotdb/session/QueryLatencyIT.java | 179 +++++++++++
.../session/QuerySSTSketchWithDifferentTs.java | 137 +++++++++
.../iotdb/tsfile/common/conf/TSFileConfig.java | 34 ++-
.../tsfile/file/metadata/TimeseriesMetadata.java | 47 ++-
.../file/metadata/statistics/DoubleStatistics.java | 57 +++-
.../file/metadata/statistics/Statistics.java | 10 +-
.../iotdb/tsfile/utils/DDSketchForQuantile.java | 183 ++++++++++++
.../iotdb/tsfile/utils/KLLSketchForLSMFile.java | 193 ++++++++++++
.../iotdb/tsfile/utils/KLLSketchForQuantile.java | 12 +-
.../apache/iotdb/tsfile/utils/KLLSketchForSST.java | 104 +++++++
.../apache/iotdb/tsfile/utils/LongKLLSketch.java | 67 ++++-
.../apache/iotdb/tsfile/utils/SegTreeBySketch.java | 211 +++++++++++++
.../apache/iotdb/tsfile/write/TsFileWriter.java | 1 +
.../iotdb/tsfile/write/chunk/ChunkWriterImpl.java | 3 +
.../chunk/NonAlignedChunkGroupWriterImpl.java | 1 +
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 78 +++--
49 files changed, 3008 insertions(+), 458 deletions(-)
diff --git a/client-py/SessionSyn.py b/client-py/SessionSyn.py
index 2649c7a05d..7059cd349c 100644
--- a/client-py/SessionSyn.py
+++ b/client-py/SessionSyn.py
@@ -24,6 +24,7 @@ from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor
from iotdb.utils.Tablet import Tablet
import pandas as pd
import numpy as np
+import time
# creating session connection.
ip = "127.0.0.1"
@@ -33,12 +34,7 @@ password_ = "root"
session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
session.open(False)
-grp = "tx_syn_04"
-# syn_01: 1024
-# syn_02: 512
-# syn_03: 256
-# syn_04: 128
-# syn_05: false
+grp = "bt_syn_01"
# set and delete storage groups
session.delete_storage_group("root." + grp)
@@ -55,34 +51,52 @@ print(
session.check_time_series_exists("root." + grp + ".d_01.s_01"),
)
-# df = pd.read_csv("~/LSM-Quantile/wh.csv")
+df = pd.read_csv("D:\\Study\\Lab\\iotdb\\add_quantile_to_aggregation\\test_project_2\\1_bitcoin.csv")
+data = df["bitcoin dataset"].tolist()
+# df = pd.read_csv("../../4_wh.csv")
# data = df["value"].tolist()
-# data = [[datum] for datum in data]
-df = pd.read_csv("~/LSM-Quantile/taxi.txt")
-data = (np.array(df)).tolist()
-data = [[datum[0]] for datum in data]
-data = data[:50000000]
-batch = 81920
+data = [[datum] for datum in data]
+# df = pd.read_csv("../../SpacecraftThruster.txt")
+# df = pd.read_csv("../../4_taxipredition8M.txt")
+# data = (np.array(df)).tolist()
+# data = [[datum[0]] for datum in data]
+batch = 8192
print(data[:10])
print(type(data[0]))
+print(len(data))
measurements_ = ["s_01"]
data_types_ = [
TSDataType.DOUBLE
]
-for i in range(int(len(data) / batch)):
+values_ = range(batch)
+timestamps_ = range(1<<40,(1<<40)+batch)
+tablet_ = Tablet(
+ "root." + grp + ".d_01", measurements_, data_types_, values_, timestamps_
+)
+session.insert_tablet(tablet_)
+session.execute_non_query_statement("flush")
+
+total_time = 0
+for i in range(6713):
if i % 100 == 0:
print("Iter: " + str(i))
-# insert one tablet into the database.
+ # insert one tablet into the database.
values_ = data[i * batch : (i + 1) * batch] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
timestamps_ = list(range(i * batch, (i + 1) * batch))
+ if len(timestamps_) != len(values_):
+ break
tablet_ = Tablet(
"root." + grp + ".d_01", measurements_, data_types_, values_, timestamps_
)
+ curr_time = time.time()
session.insert_tablet(tablet_)
+ total_time += (time.time() - curr_time)
# session.execute_non_query_statement("flush")
+print(total_time)
+
# close session connection.
session.close()
diff --git a/client-py/SessionSynUnseq.py b/client-py/SessionSynUnseq.py
new file mode 100644
index 0000000000..06c8a10230
--- /dev/null
+++ b/client-py/SessionSynUnseq.py
@@ -0,0 +1,290 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Uncomment the following line to use apache-iotdb module installed by pip3
+#
+
+from iotdb.Session import Session
+from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor
+from iotdb.utils.Tablet import Tablet
+import pandas as pd
+import numpy as np
+import time
+
+mu, sig = 2.0, 2.6
+grpSuf = '_sig'+str(int(sig*10))
+
+# creating session connection.
+ip = "127.0.0.1"
+port_ = "6667"
+username_ = "root"
+password_ = "root"
+session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
+session.open(False)
+
+grp = "bt_syn_01"+grpSuf
+
+# set and delete storage groups
+session.delete_storage_group("root." + grp)
+session.set_storage_group("root." + grp)
+
+# setting time series.
+session.create_time_series(
+ "root." + grp + ".d_01.s_01", TSDataType.DOUBLE, TSEncoding.PLAIN, Compressor.SNAPPY
+)
+
+# checking time series
+print(
+ "s_01 expecting True, checking result: ",
+ session.check_time_series_exists("root." + grp + ".d_01.s_01"),
+)
+
+df = pd.read_csv("D:\\Study\\Lab\\iotdb\\add_quantile_to_aggregation\\test_project_2\\1_bitcoin.csv")
+data = df["bitcoin dataset"].tolist()
+# df = pd.read_csv("../../4_wh.csv")
+# data = df["value"].tolist()
+data = [[datum] for datum in data]
+# df = pd.read_csv("../../SpacecraftThruster.txt")
+# df = pd.read_csv("../../4_taxipredition8M.txt")
+# data = (np.array(df)).tolist()
+# data = [[datum[0]] for datum in data]
+batch = 8192
+print(data[:10])
+print(type(data[0]))
+print(len(data))
+
+n=6713*8192
+lat_data = []
+import random
+import math
+
+random.seed(233)
+for i in range(n):
+ lat_data.append( ( int(i+math.exp(mu + sig * random.gauss(0, 1))), i, data[i] ) )
+lat_data=sorted(lat_data,key=lambda x:x[0])
+
+
+measurements_ = ["s_01"]
+data_types_ = [
+ TSDataType.DOUBLE
+]
+
+values_ = range(batch)
+timestamps_ = range(1<<40,(1<<40)+batch)
+tablet_ = Tablet(
+ "root." + grp + ".d_01", measurements_, data_types_, values_, timestamps_
+)
+session.insert_tablet(tablet_)
+session.execute_non_query_statement("flush")
+
+total_time = 0
+for i in range(int(n/batch)):
+ if i % 100 == 0:
+ print("Iter: " + str(i))
+ # insert one tablet into the database.
+ values_ = data[i * batch : (i + 1) * batch][1]
+ timestamps_ = list(range(i * batch, (i + 1) * batch))
+ values_ = []
+ timestamps_ = []
+ for j in range(i*batch,(i+1)*batch):
+ timestamps_.append(lat_data[j][1])
+ values_.append(lat_data[j][2])
+ # print('\t\t'+str(lat_data[j][1]))
+ if len(timestamps_) != len(values_):
+ break
+ tablet_ = Tablet(
+ "root." + grp + ".d_01", measurements_, data_types_, values_, timestamps_
+ )
+ curr_time = time.time()
+ session.insert_tablet(tablet_)
+ total_time += (time.time() - curr_time)
+
+print(total_time)
+#
+#
+# # --------------------------------------------------------------------------------------------------------------------
+#
+# grp = "tx_syn_01"
+#
+# # set and delete storage groups
+# session.delete_storage_group("root." + grp)
+# session.set_storage_group("root." + grp)
+#
+# # setting time series.
+# session.create_time_series(
+# "root." + grp + ".d_01.s_01", TSDataType.DOUBLE, TSEncoding.PLAIN, Compressor.SNAPPY
+# )
+#
+# # checking time series
+# print(
+# "s_01 expecting True, checking result: ",
+# session.check_time_series_exists("root." + grp + ".d_01.s_01"),
+# )
+#
+# # df = pd.read_csv("../../1_bitcoin.csv")
+# # data = df["bitcoin dataset"].tolist()
+# # df = pd.read_csv("../../4_wh.csv")
+# # data = df["value"].tolist()
+# # data = [[datum] for datum in data]
+# # df = pd.read_csv("../../SpacecraftThruster.txt")
+# df = pd.read_csv("../../4_taxipredition8M.txt")
+# data = (np.array(df)).tolist()
+# data = [[datum[0]] for datum in data]
+# batch = 8192
+# print(data[:10])
+# print(type(data[0]))
+# print(len(data))
+#
+# measurements_ = ["s_01"]
+# data_types_ = [
+# TSDataType.DOUBLE
+# ]
+#
+# total_time = 0
+# for i in range(6713):
+# if i % 100 == 0:
+# print("Iter: " + str(i))
+# # insert one tablet into the database.
+# values_ = data[i * batch : (i + 1) * batch] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
+# timestamps_ = list(range(i * batch, (i + 1) * batch))
+# if len(timestamps_) != len(values_):
+# break
+# tablet_ = Tablet(
+# "root." + grp + ".d_01", measurements_, data_types_, values_, timestamps_
+# )
+# curr_time = time.time()
+# session.insert_tablet(tablet_)
+# total_time += (time.time() - curr_time)
+#
+# print(total_time)
+#
+# # --------------------------------------------------------------------------------------------------------------------
+#
+# grp = "th_syn_01"
+#
+# # set and delete storage groups
+# session.delete_storage_group("root." + grp)
+# session.set_storage_group("root." + grp)
+#
+# # setting time series.
+# session.create_time_series(
+# "root." + grp + ".d_01.s_01", TSDataType.DOUBLE, TSEncoding.PLAIN, Compressor.SNAPPY
+# )
+#
+# # checking time series
+# print(
+# "s_01 expecting True, checking result: ",
+# session.check_time_series_exists("root." + grp + ".d_01.s_01"),
+# )
+#
+# # df = pd.read_csv("../../1_bitcoin.csv")
+# # data = df["bitcoin dataset"].tolist()
+# # df = pd.read_csv("../../4_wh.csv")
+# # data = df["value"].tolist()
+# # data = [[datum] for datum in data]
+# df = pd.read_csv("../../SpacecraftThruster.txt")
+# # df = pd.read_csv("../../4_taxipredition8M.txt")
+# data = (np.array(df)).tolist()
+# data = [[datum[0]] for datum in data]
+# batch = 8192
+# print(data[:10])
+# print(type(data[0]))
+# print(len(data))
+#
+# measurements_ = ["s_01"]
+# data_types_ = [
+# TSDataType.DOUBLE
+# ]
+#
+# total_time = 0
+# for i in range(6713):
+# if i % 100 == 0:
+# print("Iter: " + str(i))
+# # insert one tablet into the database.
+# values_ = data[i * batch : (i + 1) * batch] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
+# timestamps_ = list(range(i * batch, (i + 1) * batch))
+# if len(timestamps_) != len(values_):
+# break
+# tablet_ = Tablet(
+# "root." + grp + ".d_01", measurements_, data_types_, values_, timestamps_
+# )
+# curr_time = time.time()
+# session.insert_tablet(tablet_)
+# total_time += (time.time() - curr_time)
+#
+# print(total_time)
+#
+# # --------------------------------------------------------------------------------------------------------------------
+#
+# grp = "wh_syn_01"
+#
+# # set and delete storage groups
+# session.delete_storage_group("root." + grp)
+# session.set_storage_group("root." + grp)
+#
+# # setting time series.
+# session.create_time_series(
+# "root." + grp + ".d_01.s_01", TSDataType.DOUBLE, TSEncoding.PLAIN, Compressor.SNAPPY
+# )
+#
+# # checking time series
+# print(
+# "s_01 expecting True, checking result: ",
+# session.check_time_series_exists("root." + grp + ".d_01.s_01"),
+# )
+#
+# # df = pd.read_csv("../../1_bitcoin.csv")
+# # data = df["bitcoin dataset"].tolist()
+# df = pd.read_csv("../../4_wh.csv")
+# data = df["value"].tolist()
+# data = [[datum] for datum in data]
+# # df = pd.read_csv("../../SpacecraftThruster.txt")
+# # df = pd.read_csv("../../4_taxipredition8M.txt")
+# # data = (np.array(df)).tolist()
+# # data = [[datum[0]] for datum in data]
+# batch = 8192
+# print(data[:10])
+# print(type(data[0]))
+# print(len(data))
+#
+# measurements_ = ["s_01"]
+# data_types_ = [
+# TSDataType.DOUBLE
+# ]
+#
+# total_time = 0
+# for i in range(6713):
+# if i % 100 == 0:
+# print("Iter: " + str(i))
+# # insert one tablet into the database.
+# values_ = data[i * batch : (i + 1) * batch] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
+# timestamps_ = list(range(i * batch, (i + 1) * batch))
+# if len(timestamps_) != len(values_):
+# break
+# tablet_ = Tablet(
+# "root." + grp + ".d_01", measurements_, data_types_, values_, timestamps_
+# )
+# curr_time = time.time()
+# session.insert_tablet(tablet_)
+# total_time += (time.time() - curr_time)
+#
+# print(total_time)
+
+session.close()
+
+print("All executions done!!")
\ No newline at end of file
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 4a26cf2c9f..1a41fcb1e5 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -20,32 +20,45 @@
####################
### RPC Configuration
####################
+no_update=false
+only_use_page_synopsis=false
+synopsis_for_whole_chunk_when_flush=true
+sketch_size_ratio=4
+target_chunk_point_num=8192
+max_number_of_points_in_chunk=8192
+max_number_of_points_in_page=8192
+avg_series_point_number_threshold=8192
enable_synopsis=true
-synopsis_size_in_byte=1024
-aggregator_memory_in_kb=1024
+enable_SST_sketch=true
+synopsis_size_in_byte=4096
+aggregator_memory_in_kb=256
summary_type=0
quantile=0.5
-enable_seq_space_compaction=false
-enable_unseq_space_compaction=false
+enable_seq_space_compaction=true
+enable_unseq_space_compaction=true
enable_cross_space_compaction=false
-meta_data_cache_enable=true
+meta_data_cache_enable=false
+# Read memory Allocation Ratio: BloomFilterCache, ChunkCache, TimeSeriesMetadataCache, memory used for constructing QueryDataSet and Free Memory Used in Query.
+# The parameter form is a:b:c:d:e, where a, b, c, d and e are integers. for example: 1:1:1:1:1 , 1:100:200:300:400
+chunk_timeseriesmeta_free_memory_proportion=100:1:20000:30000:40000
+
aggregation_strategy=0
bloom_filter_bits_per_key=16
enable_bloom_filter=false
-max_number_of_points_in_chunk=8000
-flush_wal_threshold=4096
+flush_wal_threshold=8192
# The memory size for each series writer to pack page, default value is 64KB
# Datatype: int
-page_size_in_byte=65536
+page_size_in_byte=131072
+#65536
# The maximum number of data points in a page, default 1024*1024
# Datatype: int
-max_number_of_points_in_page=4096
+#max_number_of_points_in_page=8192
# Datatype: String
rpc_address=0.0.0.0
diff --git a/server/src/assembly/resources/conf/iotdb-env.bat b/server/src/assembly/resources/conf/iotdb-env.bat
index df6b0d6a95..e53a65e5f3 100644
--- a/server/src/assembly/resources/conf/iotdb-env.bat
+++ b/server/src/assembly/resources/conf/iotdb-env.bat
@@ -61,7 +61,7 @@ set system_memory_in_mb=%system_memory_in_mb:,=%
set /a half_=%system_memory_in_mb%/2
set /a quarter_=%half_%/2
-if %half_% GTR 1024 set half_=1024
+if %half_% GTR 2048 set half_=2048
if %quarter_% GTR 65536 set quarter_=65536
if %half_% GTR %quarter_% (
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 9d5f4791a8..a9dff4239c 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -149,7 +149,7 @@ public class IoTDBConfig {
* When a certain amount of write ahead logs is reached, they will be flushed to the disk. It is
* possible to lose at most flush_wal_threshold operations.
*/
- private int flushWalThreshold = 10000;
+ private int flushWalThreshold = 8192;
/** this variable set timestamp precision as millisecond, microsecond or nanosecond */
private String timestampPrecision = "ms";
@@ -2545,4 +2545,24 @@ public class IoTDBConfig {
public int getAggregationStrategy() {
return aggregationStrategy;
}
+
+ private boolean noUpdate = true;
+
+ public void setNoUpdate(boolean no) {
+ this.noUpdate = no;
+ }
+
+ public boolean getNoUpdate() {
+ return this.noUpdate;
+ }
+
+ private boolean onlyUsePageSynopsis = true;
+
+ public void setOnlyUsePageSynopsis(boolean only) {
+ this.onlyUsePageSynopsis = only;
+ }
+
+ public boolean getOnlyUsePageSynopsis() {
+ return this.onlyUsePageSynopsis;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index a06f2efb9f..22d25a223a 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -158,6 +158,15 @@ public class IoTDBDescriptor {
properties.getProperty(
"aggregation_strategy", Integer.toString(conf.getAggregationStrategy()))));
+ conf.setNoUpdate(
+ Boolean.parseBoolean(
+ properties.getProperty("no_update", Boolean.toString(conf.getNoUpdate()))));
+
+ conf.setOnlyUsePageSynopsis(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "only_use_page_synopsis", Boolean.toString(conf.getOnlyUsePageSynopsis()))));
+
conf.setRpcThriftCompressionEnable(
Boolean.parseBoolean(
properties.getProperty(
@@ -1013,6 +1022,24 @@ public class IoTDBDescriptor {
private void loadTsFileProps(Properties properties) {
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setSketchSizeRatio(
+ Integer.parseInt(
+ properties.getProperty(
+ "sketch_size_ratio",
+ Integer.toString(
+ TSFileDescriptor.getInstance().getConfig().getSketchSizeRatio()))));
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setSynopsisForWholeChunkWhenFlush(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "synopsis_for_whole_chunk_when_flush",
+ Boolean.toString(
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .getSynopsisForWholeChunkWhenFlush()))));
TSFileDescriptor.getInstance()
.getConfig()
.setEnableSynopsis(
@@ -1021,6 +1048,14 @@ public class IoTDBDescriptor {
"enable_synopsis",
Boolean.toString(
TSFileDescriptor.getInstance().getConfig().isEnableSynopsis()))));
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setEnableSSTSketch(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_SST_sketch",
+ Boolean.toString(
+ TSFileDescriptor.getInstance().getConfig().isEnableSSTSketch()))));
TSFileDescriptor.getInstance()
.getConfig()
.setEnableBloomFilter(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
index 7b4a51ccdf..2e0a38702f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.db.service.metrics.MetricsService;
import org.apache.iotdb.db.service.metrics.Tag;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Path;
@@ -45,12 +44,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.ref.WeakReference;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.WeakHashMap;
+import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -97,13 +91,14 @@ public class TimeSeriesMetadataCache {
+ RamUsageEstimator.shallowSizeOf(value)
+ RamUsageEstimator.sizeOf(value.getMeasurementId())
+ RamUsageEstimator.shallowSizeOf(value.getStatistics())
- + (value.getChunkMetadataList().get(0) == null
- ? 0
- : ((ChunkMetadata) value.getChunkMetadataList().get(0))
- .calculateRamSize()
- + RamUsageEstimator.NUM_BYTES_OBJECT_REF)
- * value.getChunkMetadataList().size()
- + RamUsageEstimator.shallowSizeOf(value.getChunkMetadataList())))
+ + value.getDataSizeOfChunkMetaDataList()
+ /*(value.getChunkMetadataList().get(0) == null
+ ? 0
+ : ((ChunkMetadata) value.getChunkMetadataList().get(0))
+ .calculateRamSize()
+ + RamUsageEstimator.NUM_BYTES_OBJECT_REF)
+ * value.getChunkMetadataList().size()
+ + RamUsageEstimator.shallowSizeOf(value.getChunkMetadataList())*/ ))
.recordStats()
.build();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
index b6737a6e41..bfd6c3b423 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
@@ -101,6 +101,8 @@ public class SQLConstant {
public static final String TDIGEST_STAT_SINGLE = "tdigest_quantile";
public static final String SAMPLING_STAT_SINGLE = "sampling_quantile";
public static final String STRICT_KLL_STAT_SINGLE = "kll_quantile";
+ public static final String DDSKETCH_SINGLE = "ddsketch_quantile";
+ public static final String CHUNK_STAT_AVAIL = "chunk_stat_available";
public static final String ALL = "all";
@@ -140,7 +142,9 @@ public class SQLConstant {
EXACT_MEDIAN_KLL_STAT_OVERLAP_SINGLE,
TDIGEST_STAT_SINGLE,
SAMPLING_STAT_SINGLE,
- STRICT_KLL_STAT_SINGLE));
+ STRICT_KLL_STAT_SINGLE,
+ DDSKETCH_SINGLE,
+ CHUNK_STAT_AVAIL));
public static final int TOK_WHERE = 23;
public static final int TOK_INSERT = 24;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/AggregationQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/AggregationQueryOperator.java
index 8e5951f73a..f887c8e70d 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/AggregationQueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/AggregationQueryOperator.java
@@ -148,6 +148,8 @@ public class AggregationQueryOperator extends QueryOperator {
case SQLConstant.TDIGEST_STAT_SINGLE:
case SQLConstant.SAMPLING_STAT_SINGLE:
case SQLConstant.STRICT_KLL_STAT_SINGLE:
+ case SQLConstant.DDSKETCH_SINGLE:
+ case SQLConstant.CHUNK_STAT_AVAIL:
return dataType.isNumeric();
case SQLConstant.COUNT:
case SQLConstant.MIN_TIME:
@@ -197,6 +199,8 @@ public class AggregationQueryOperator extends QueryOperator {
case SQLConstant.TDIGEST_STAT_SINGLE:
case SQLConstant.SAMPLING_STAT_SINGLE:
case SQLConstant.STRICT_KLL_STAT_SINGLE:
+ case SQLConstant.DDSKETCH_SINGLE:
+ case SQLConstant.CHUNK_STAT_AVAIL:
return dataTypes.stream().allMatch(dataTypes.get(0)::equals);
default:
return true;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java
index e31c0d7ffb..6c4d45d142 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java
@@ -59,7 +59,9 @@ public enum AggregationType {
EXACT_MEDIAN_KLL_STAT_OVERLAP_SINGLE,
TDIGEST_STAT_SINGLE,
SAMPLING_STAT_SINGLE,
- STRICT_KLL_STAT_SINGLE;
+ STRICT_KLL_STAT_SINGLE,
+ DDSKETCH_SINGLE,
+ CHUNK_STAT_AVAIL;
/**
* give an integer to return a data type.
@@ -137,6 +139,10 @@ public enum AggregationType {
return SAMPLING_STAT_SINGLE;
case 33:
return STRICT_KLL_STAT_SINGLE;
+ case 34:
+ return DDSKETCH_SINGLE;
+ case 35:
+ return CHUNK_STAT_AVAIL;
default:
throw new IllegalArgumentException("Invalid Aggregation Type: " + i);
}
@@ -247,6 +253,12 @@ public enum AggregationType {
case STRICT_KLL_STAT_SINGLE:
i = 33;
break;
+ case DDSKETCH_SINGLE:
+ i = 34;
+ break;
+ case CHUNK_STAT_AVAIL:
+ i = 35;
+ break;
default:
throw new IllegalArgumentException("Invalid Aggregation Type: " + this.name());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
index f344d55a22..2802bdf5e3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
@@ -46,7 +46,13 @@ public class CountAggrResult extends AggregateResult {
@Override
public void updateResultFromStatistics(Statistics statistics) {
- System.out.println("\t\t[DEBUG Count] update from statistics:" + statistics.getCount());
+ // System.out.println(
+ // "\t\t[DEBUG Count] update from statistics:"
+ // + statistics.getCount()
+ // + "\tT:"
+ // + statistics.getStartTime()
+ // + "..."
+ // + statistics.getEndTime());
setLongValue(getLongValue() + statistics.getCount());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatSingleReadAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/DDSketchSingleAggrResult.java
similarity index 73%
copy from server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatSingleReadAggrResult.java
copy to server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/DDSketchSingleAggrResult.java
index 45c6dc5c93..e6f13d1d82 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatSingleReadAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/DDSketchSingleAggrResult.java
@@ -25,9 +25,9 @@ import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.utils.DDSketchForQuantile;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
@@ -35,18 +35,15 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.DOUBLE;
-import static org.apache.iotdb.tsfile.file.metadata.statistics.Statistics.SYNOPSIS_SIZE_IN_BYTE;
-public class KLLStatSingleReadAggrResult extends AggregateResult {
+public class DDSketchSingleAggrResult extends AggregateResult {
private TSDataType seriesDataType;
private int iteration;
- private long pageKLLNum;
+ private long pageKLLNum, statNum;
private long cntL, cntR, lastL;
private long n, K1, heapN;
+ private DDSketchForQuantile DDSketch;
private boolean hasFinalResult;
- private final int pageAvgError = 50, pageMaxError = 127;
- private final int pageKLLMemoryByte = (68 + 15) * 8, pageKLLNumMemoryByte = SYNOPSIS_SIZE_IN_BYTE;
- // private List<HeapLongKLLSketch> pageKLL;
long DEBUG = 0;
private int getBitsOfDataType() {
@@ -63,9 +60,37 @@ public class KLLStatSingleReadAggrResult extends AggregateResult {
}
}
- public KLLStatSingleReadAggrResult(TSDataType seriesDataType)
- throws UnSupportedDataTypeException {
- super(DOUBLE, AggregationType.EXACT_MEDIAN_KLL_STAT_SINGLE_READ);
+ // private long approximateDataAvgError() {
+ // long dataAvgError = (long) Math.ceil(2.0 * heapN / heapKLL.getMaxMemoryNum()) + 1;
+ // return dataAvgError;
+ // }
+ //
+ // private long approximateStatAvgError() {
+ // if (SKETCH_SIZE < 0) return 0;
+ // double pageAvgError = 1.0 * TOT_SKETCH_N / TOT_SKETCH_SIZE / 3.0;
+ // double rate = 1.0 * SKETCH_SIZE * pageKLLNum / (maxMemoryByte );
+ // long pageStatAvgError;
+ // if (rate < 1.0) {
+ // pageStatAvgError = (long) Math.ceil(pageAvgError * Math.pow(pageKLLNum, 0.5));
+ // if (pageKLLNum <= 10) pageStatAvgError += pageAvgError * 3.0;
+ // } else {
+ // int memKLLNum = (maxMemoryByte ) / SKETCH_SIZE;
+ // long memErr = (long) Math.ceil(pageAvgError * Math.pow(memKLLNum, 0.5));
+ // pageStatAvgError = (long) Math.ceil(rate * 0.5 * memErr + 0.5 * memErr);
+ // }
+ // return pageStatAvgError;
+ // }
+ //
+ // private long approximateMaxError() {
+ // return 0;
+ // }
+
+ private boolean hasTwoMedians() {
+ return (n & 1) == 0;
+ }
+
+ public DDSketchSingleAggrResult(TSDataType seriesDataType) throws UnSupportedDataTypeException {
+ super(DOUBLE, AggregationType.DDSKETCH_SINGLE);
this.seriesDataType = seriesDataType;
reset();
}
@@ -111,41 +136,30 @@ public class KLLStatSingleReadAggrResult extends AggregateResult {
long dataL = dataToLong(data);
if (iteration == 0) n++;
if (cntL <= dataL && dataL <= cntR) {
- // heapKLL.update(dataL);
+ DDSketch.insert(longToResult(dataL));
heapN++;
} else if (lastL <= dataL && dataL < cntL) K1--;
}
@Override
public void startIteration() {
- heapN = 0;
+ heapN = statNum = 0;
if (iteration == 0) { // first iteration
+
+ int dataset_V = 40000, limit = maxMemoryByte / 42;
+ double DDSketch_ALPHA = Math.pow(10, Math.log10(dataset_V) / limit) - 1;
+ DDSketch = new DDSketchForQuantile(DDSketch_ALPHA, limit);
lastL = cntL = Long.MIN_VALUE;
cntR = Long.MAX_VALUE;
n = 0;
pageKLLNum = 0;
- } else {
- pageKLLNum = 0;
- System.out.println(
- "\t[KLL STAT DEBUG] start iteration "
- + iteration
- + " cntL,R:"
- + "["
- + cntL
- + ","
- + cntR
- + "]"
- + "\tlastL:"
- + lastL
- + "\tK1:"
- + K1);
}
}
@Override
public void finishIteration() {
System.out.println(
- "\t[KLL STAT DEBUG]"
+ "\t[DDSKETCH SINGLE DEBUG]"
+ "finish iteration "
+ iteration
+ " cntL,R:"
@@ -158,13 +172,21 @@ public class KLLStatSingleReadAggrResult extends AggregateResult {
+ lastL
+ "\tK1:"
+ K1);
- System.out.println("\t[KLL STAT DEBUG]" + " statNum:" + pageKLLNum);
iteration++;
if (n == 0) {
hasFinalResult = true;
return;
}
- setDoubleValue(-233.0);
+ lastL = cntL;
+
+ if (iteration == 1) { // first iteration over
+ K1 = (n + 1) >> 1;
+ }
+ long K2 = hasTwoMedians() ? (K1 + 1) : K1;
+
+ System.out.println("\t[DDSKETCH SINGLE DEBUG]" + " K1,K2:" + K1 + ", " + K2);
+ double ans = DDSketch.getQuantile(QUANTILE);
+ setDoubleValue(ans);
hasFinalResult = true;
}
@@ -193,31 +215,6 @@ public class KLLStatSingleReadAggrResult extends AggregateResult {
String.format(
"Unsupported data type in aggregation MEDIAN : %s", statistics.getType()));
}
- if (iteration == 0) {
- n += statistics.getCount();
- if (statistics.getType() == DOUBLE && ((DoubleStatistics) statistics).getKllSketchNum() > 0)
- pageKLLNum += ((DoubleStatistics) statistics).getKllSketchNum();
- }
- long minVal = dataToLong(statistics.getMinValue());
- long maxVal = dataToLong(statistics.getMaxValue());
- // System.out.println(
- // "\t[KLL STAT DEBUG] update from statistics:\t"
- // + "min,max:"
- // + minVal
- // + ","
- // + maxVal
- // + " n:"
- // + statistics.getCount());
- // out of range
- if (minVal > cntR || maxVal < lastL) return;
- if (lastL <= minVal && maxVal < cntL) {
- K1 -= statistics.getCount();
- return;
- }
- if (minVal == maxVal) { // min == max
- for (int i = 0; i < statistics.getCount(); i++) updateStatusFromData(minVal);
- return;
- }
}
@Override
@@ -228,24 +225,34 @@ public class KLLStatSingleReadAggrResult extends AggregateResult {
@Override
public void updateResultFromPageData(
IBatchDataIterator batchIterator, long minBound, long maxBound) {
+ // System.out.print("\t[KLL STAT DEBUG]\tupdateResultFromPageData:");
+ // int tmp_tot = 0;
while (batchIterator.hasNext()) {
if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) {
break;
}
+ // System.out.print(
+ // " (" + batchIterator.currentTime() + "," + batchIterator.currentValue() + ")");
+ // tmp_tot++;
updateStatusFromData(batchIterator.currentValue());
batchIterator.next();
}
+ // System.out.println(" tot:" + tmp_tot);
}
@Override
public void updateResultUsingTimestamps(
long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
+ // System.out.print("\t[KLL STAT DEBUG]\tupdateResultUsingTimestamps:");
+ // int tmp_tot = 0;
Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
for (int i = 0; i < length; i++) {
if (values[i] != null) {
updateStatusFromData(values[i]);
+ // tmp_tot++;
}
}
+ // System.out.println(" tot:" + tmp_tot);
}
@Override
@@ -297,6 +304,7 @@ public class KLLStatSingleReadAggrResult extends AggregateResult {
@Override
public void reset() {
super.reset();
+ DDSketch = null;
lastL = cntL = Long.MIN_VALUE;
cntR = Long.MAX_VALUE;
n = 0;
@@ -306,19 +314,7 @@ public class KLLStatSingleReadAggrResult extends AggregateResult {
@Override
public boolean canUpdateFromStatistics(Statistics statistics) {
- if ((seriesDataType == DOUBLE) && iteration == 0) {
- DoubleStatistics doubleStats = (DoubleStatistics) statistics;
- if (doubleStats.getKllSketchNum() > 0) return true;
- }
- if (iteration > 0) {
- long minVal = dataToLong(statistics.getMinValue());
- long maxVal = dataToLong(statistics.getMaxValue());
- if (minVal > cntR || maxVal < lastL) return true;
- if (lastL <= minVal && maxVal < cntL) return true;
- }
- Comparable<Object> minVal = (Comparable<Object>) statistics.getMinValue();
- Comparable<Object> maxVal = (Comparable<Object>) statistics.getMaxValue();
- return (minVal.compareTo(maxVal) == 0); // min==max
+ return false;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLDebugResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLDebugResult.java
index a5eb166a97..ab44728595 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLDebugResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLDebugResult.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import org.apache.iotdb.tsfile.utils.HeapLongKLLSketch;
import org.apache.iotdb.tsfile.utils.KLLSketchForQuantile;
-import org.apache.iotdb.tsfile.utils.LongKLLSketch;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
@@ -348,7 +347,7 @@ public class KLLDebugResult extends AggregateResult {
if (statistics.getType() == DOUBLE) {
DoubleStatistics stat = (DoubleStatistics) statistics;
if (stat.getKllSketchNum() > 0) {
- for (LongKLLSketch sketch : stat.getKllSketchList()) addSketch(sketch);
+ for (KLLSketchForQuantile sketch : stat.getKllSketchList()) addSketch(sketch);
return;
} else System.out.println("\t\t\t\t!!!!!![ERROR!] no KLL in stat!");
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/StrictKLLStatSingleAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatChunkAvailAggrResult.java
similarity index 86%
copy from server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/StrictKLLStatSingleAggrResult.java
copy to server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatChunkAvailAggrResult.java
index 24bfb02909..b6a660a738 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/StrictKLLStatSingleAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatChunkAvailAggrResult.java
@@ -28,7 +28,10 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
-import org.apache.iotdb.tsfile.utils.*;
+import org.apache.iotdb.tsfile.utils.HeapLongStrictKLLSketch;
+import org.apache.iotdb.tsfile.utils.KLLSketchForQuantile;
+import org.apache.iotdb.tsfile.utils.LongKLLSketch;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
import java.io.OutputStream;
@@ -38,7 +41,7 @@ import java.util.List;
import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.DOUBLE;
-public class StrictKLLStatSingleAggrResult extends AggregateResult {
+public class KLLStatChunkAvailAggrResult extends AggregateResult {
private TSDataType seriesDataType;
private int iteration;
private long pageKLLNum, statNum;
@@ -96,9 +99,9 @@ public class StrictKLLStatSingleAggrResult extends AggregateResult {
return (n & 1) == 0;
}
- public StrictKLLStatSingleAggrResult(TSDataType seriesDataType)
+ public KLLStatChunkAvailAggrResult(TSDataType seriesDataType)
throws UnSupportedDataTypeException {
- super(DOUBLE, AggregationType.STRICT_KLL_STAT_SINGLE);
+ super(DOUBLE, AggregationType.CHUNK_STAT_AVAIL);
this.seriesDataType = seriesDataType;
reset();
}
@@ -220,7 +223,7 @@ public class StrictKLLStatSingleAggrResult extends AggregateResult {
@Override
public void finishIteration() {
System.out.println(
- "\t[KLL STAT Single DEBUG]"
+ "\t[STAT CHUNK AVAIL DEBUG]"
+ "finish iteration "
+ iteration
+ " cntL,R:"
@@ -234,7 +237,7 @@ public class StrictKLLStatSingleAggrResult extends AggregateResult {
+ "\tK1:"
+ K1);
System.out.println(
- "\t[KLL STAT Single DEBUG]"
+ "\t[STAT CHUNK AVAIL DEBUG]"
+ " statNum:"
+ statNum
+ " pageKllNum:"
@@ -243,6 +246,7 @@ public class StrictKLLStatSingleAggrResult extends AggregateResult {
+ heapN);
iteration++;
if (n == 0) {
+ setDoubleValue(0.0);
hasFinalResult = true;
return;
}
@@ -253,16 +257,16 @@ public class StrictKLLStatSingleAggrResult extends AggregateResult {
}
long K2 = K1 + 1; // hasTwoMedians() ? (K1 + 1) : K1;
- System.out.println("\t[KLL STAT Single DEBUG]" + " K1,K2:" + K1 + ", " + K2);
+ System.out.println("\t[STAT CHUNK AVAIL DEBUG]" + " K1,K2:" + K1 + ", " + K2);
if (pageKLLNum == 0) { // all in heap
- System.out.println("\t[KLL STAT Single DEBUG]" + " calc by heap only. N:" + heapKLL.getN());
+ System.out.println("\t[STAT CHUNK AVAIL DEBUG]" + " calc by heap only. N:" + heapKLL.getN());
heapKLL.show();
double v1 = longToResult(heapKLL.findMinValueWithRank(K1 - 1));
// System.out.println("\t[KLL STAT DEBUG]" + "v1:" + v1);
double v2 = longToResult(heapKLL.findMinValueWithRank(K2 - 1));
double ans = 0.5 * (v1 + v2);
- setDoubleValue(ans);
+ setDoubleValue(0.0);
hasFinalResult = true;
return;
}
@@ -272,13 +276,14 @@ public class StrictKLLStatSingleAggrResult extends AggregateResult {
mergePageKLL();
heapKLL.show();
System.out.println(
- "\t[KLL STAT Single DEBUG] after merge. heapN:" + heapKLL.getN() + "\tn_true:" + n);
+ "\t[STAT CHUNK AVAIL DEBUG] after merge. heapN:" + heapKLL.getN() + "\tn_true:" + n);
double v1 = longToResult(heapKLL.findMinValueWithRank(K1 - 1));
double v2 = longToResult(heapKLL.findMinValueWithRank(K2 - 1));
double ans = 0.5 * (v1 + v2);
- setDoubleValue(ans);
+ setDoubleValue(1.0 - 1.0 * heapN / heapKLL.getN());
hasFinalResult = true;
- System.out.println("\t[KLL STAT Single DEBUG]" + " est_stats_err:" + approximateStatAvgError());
+ System.out.println(
+ "\t[STAT CHUNK AVAIL DEBUG]" + " est_stats_err:" + approximateStatAvgError());
}
@Override
@@ -304,7 +309,9 @@ public class StrictKLLStatSingleAggrResult extends AggregateResult {
// heapKLL.mergeWithTempSpace(bigger_sketch);
// } else a.set(pos0, bigger_sketch);
// }
- private void addSketch(KLLSketchForQuantile sketch) {
+ public void addSketch(KLLSketchForQuantile sketch) {
+ n += sketch.getN();
+ pageKLLNum++;
TOT_SKETCH_N += sketch.getN();
TOT_SKETCH_SIZE += sketch.getNumLen();
if (SKETCH_SIZE < 0) {
@@ -351,19 +358,30 @@ public class StrictKLLStatSingleAggrResult extends AggregateResult {
"Unsupported data type in aggregation MEDIAN : %s", statistics.getType()));
}
if (iteration == 0) {
- n += statistics.getCount();
+ // n += statistics.getCount();
// if (statistics.getType() == DOUBLE) {
// }
if (statistics.getType() == DOUBLE) {
DoubleStatistics stat = (DoubleStatistics) statistics;
if (stat.getSummaryNum() > 0) {
- pageKLLNum += stat.getSummaryNum();
+ // pageKLLNum += stat.getSummaryNum();
statNum += 1;
- for (LongKLLSketch sketch : stat.getKllSketchList()) addSketch(sketch);
- // System.out.println(
- // "\t[KLL STAT Single DEBUG] updateResultFromStatistics. pageN:"
- // + stat.getKllSketch().getN());
- // stat.getKllSketch().show();
+ for (KLLSketchForQuantile sketch : stat.getKllSketchList()) {
+ // System.out.println("\t[STRICT STAT CHUNK AVAIL DEBUG] pageTime:"+);
+ // if (sketch.getN() > 10000)
+ // System.out.println(
+ // "\t[STRICT KLL STAT DEBUG] updateResultFromStatistics\tstatN:"
+ // + sketch.getN()
+ // + "\tstatNumLen:"
+ // + sketch.getNumLen());
+
+ ((LongKLLSketch) sketch).deserializeFromBuffer();
+ addSketch(sketch);
+ }
+ // System.out.println(
+ // "\t[STAT CHUNK AVAIL DEBUG] updateResultFromStatistics. pageN:"
+ // + stat.getKllSketch().getN());
+ // stat.getKllSketch().show();
return;
} else System.out.println("\t\t\t\t!!!!!![ERROR!] no KLL in stat!");
}
@@ -491,11 +509,12 @@ public class StrictKLLStatSingleAggrResult extends AggregateResult {
public boolean canUpdateFromStatistics(Statistics statistics) {
if ((seriesDataType == DOUBLE) && iteration == 0) {
DoubleStatistics doubleStats = (DoubleStatistics) statistics;
- // System.out.println(
- // "\t[DEBUG][KLL STAT SINGLE]\tcanUseStat? count:"
- // + doubleStats.getCount()
- // + " KLLNum:"
- // + doubleStats.getKllSketchNum());
+ // if (statistics.getCount() > 10000)
+ // System.out.println(
+ // "\t[DEBUG][STRICT KLL STAT DEBUG]\tcanUseStat? count:"
+ // + doubleStats.getCount()
+ // + " summaryNum:"
+ // + doubleStats.getSummaryNum());
if (doubleStats.getSummaryNum() > 0) return true;
}
return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatDebugFullReadingAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatDebugFullReadingAggrResult.java
index 4be7500887..32167886de 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatDebugFullReadingAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatDebugFullReadingAggrResult.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import org.apache.iotdb.tsfile.utils.HeapLongKLLSketch;
import org.apache.iotdb.tsfile.utils.KLLSketchForQuantile;
-import org.apache.iotdb.tsfile.utils.LongKLLSketch;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
@@ -385,7 +384,7 @@ public class KLLStatDebugFullReadingAggrResult extends AggregateResult {
// stat.getKllSketch().getN());
// addSketch(stat.getKllSketch(), pageKLL, pageKLLMemoryByte);
pageKLLNum += stat.getKllSketchNum();
- for (LongKLLSketch sketch : stat.getKllSketchList()) addSketch(sketch);
+ for (KLLSketchForQuantile sketch : stat.getKllSketchList()) addSketch(sketch);
// heapKLL.mergeWithTempSpace(stat.getKllSketch());
return;
} else System.out.println("\t\t\t\t!!!!!![ERROR!] no KLL in stat!");
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatDebugPageDemandRateAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatDebugPageDemandRateAggrResult.java
index 974089874d..955e894ddf 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatDebugPageDemandRateAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatDebugPageDemandRateAggrResult.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import org.apache.iotdb.tsfile.utils.HeapLongKLLSketch;
import org.apache.iotdb.tsfile.utils.KLLSketchForQuantile;
-import org.apache.iotdb.tsfile.utils.LongKLLSketch;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
@@ -398,7 +397,7 @@ public class KLLStatDebugPageDemandRateAggrResult extends AggregateResult {
// stat.getKllSketch().getN());
// addSketch(stat.getKllSketch(), pageKLL, pageKLLMemoryByte);
pageKLLNum += stat.getKllSketchNum();
- for (LongKLLSketch sketch : stat.getKllSketchList()) addSketch(sketch);
+ for (KLLSketchForQuantile sketch : stat.getKllSketchList()) addSketch(sketch);
// heapKLL.mergeWithTempSpace(stat.getKllSketch());
return;
} else System.out.println("\t\t\t\t!!!!!![ERROR!] no KLL in stat!");
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatMedianAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatMedianAggrResult.java
index 8275defcef..499918e510 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatMedianAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatMedianAggrResult.java
@@ -398,7 +398,11 @@ public class KLLStatMedianAggrResult extends AggregateResult {
// if(stat.getBfNum()>1)
// System.out.println("\t\tFK\tstat:" +
// stat.getStartTime() + "..." + stat.getEndTime());
- for (LongKLLSketch sketch : stat.getKllSketchList()) addSketch(sketch);
+ for (KLLSketchForQuantile sketch : stat.getKllSketchList()) {
+
+ ((LongKLLSketch) sketch).deserializeFromBuffer();
+ addSketch(sketch);
+ }
// heapKLL.mergeWithTempSpace(stat.getKllSketch());
return;
} else System.out.println("\t\t\t\t!!!!!![ERROR!] no KLL in stat!");
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatOverlapSingleAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatOverlapSingleAggrResult.java
index c233815629..560cd2b5c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatOverlapSingleAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatOverlapSingleAggrResult.java
@@ -33,7 +33,6 @@ import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.utils.HeapLongKLLSketch;
import org.apache.iotdb.tsfile.utils.KLLSketchForQuantile;
-import org.apache.iotdb.tsfile.utils.LongKLLSketch;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.eclipse.collections.api.iterator.MutableLongIterator;
@@ -370,7 +369,7 @@ public class KLLStatOverlapSingleAggrResult extends AggregateResult {
DoubleStatistics stat = (DoubleStatistics) statistics;
if (stat.getKllSketchNum() > 0) {
pageKLLNum += stat.getKllSketchNum();
- for (LongKLLSketch sketch : stat.getKllSketchList()) addSketch(sketch);
+ for (KLLSketchForQuantile sketch : stat.getKllSketchList()) addSketch(sketch);
// System.out.println(
// "\t[KLL STAT Single DEBUG] updateResultFromStatistics. pageN:"
// + stat.getKllSketch().getN());
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatSingleAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatSingleAggrResult.java
index 98e7211912..085306245c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatSingleAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatSingleAggrResult.java
@@ -361,7 +361,11 @@ public class KLLStatSingleAggrResult extends AggregateResult {
if (stat.getSummaryNum() > 0) {
pageKLLNum += stat.getSummaryNum();
statNum += 1;
- for (LongKLLSketch sketch : stat.getKllSketchList()) addSketch(sketch);
+ for (KLLSketchForQuantile sketch : stat.getKllSketchList()) {
+
+ ((LongKLLSketch) sketch).deserializeFromBuffer();
+ addSketch(sketch);
+ }
// System.out.println(
// "\t[KLL STAT Single DEBUG] updateResultFromStatistics. pageN:"
// + stat.getKllSketch().getN());
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatSingleReadAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatSingleReadAggrResult.java
index 45c6dc5c93..de7220af66 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatSingleReadAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatSingleReadAggrResult.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.query.aggregation.impl;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
@@ -28,26 +29,34 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.utils.HeapLongKLLSketch;
+import org.apache.iotdb.tsfile.utils.KLLSketchForQuantile;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.DOUBLE;
-import static org.apache.iotdb.tsfile.file.metadata.statistics.Statistics.SYNOPSIS_SIZE_IN_BYTE;
public class KLLStatSingleReadAggrResult extends AggregateResult {
private TSDataType seriesDataType;
private int iteration;
- private long pageKLLNum;
+ private long pageKLLNum, statNum;
private long cntL, cntR, lastL;
private long n, K1, heapN;
+ private HeapLongKLLSketch heapKLL;
private boolean hasFinalResult;
- private final int pageAvgError = 50, pageMaxError = 127;
- private final int pageKLLMemoryByte = (68 + 15) * 8, pageKLLNumMemoryByte = SYNOPSIS_SIZE_IN_BYTE;
- // private List<HeapLongKLLSketch> pageKLL;
+ private List<KLLSketchForQuantile> pageKLL;
+ private int pageKLLIndex;
+ private long TOT_SKETCH_N = 0, TOT_SKETCH_SIZE = 0;
+ private int SKETCH_SIZE = -1;
+ private int pageKLLMaxIndex;
long DEBUG = 0;
+ public final boolean onlyUsePageSynopsis =
+ IoTDBDescriptor.getInstance().getConfig().getOnlyUsePageSynopsis();
private int getBitsOfDataType() {
switch (seriesDataType) {
@@ -111,21 +120,25 @@ public class KLLStatSingleReadAggrResult extends AggregateResult {
long dataL = dataToLong(data);
if (iteration == 0) n++;
if (cntL <= dataL && dataL <= cntR) {
- // heapKLL.update(dataL);
+ heapKLL.update(dataL);
heapN++;
} else if (lastL <= dataL && dataL < cntL) K1--;
}
@Override
public void startIteration() {
- heapN = 0;
+ heapN = statNum = 0;
if (iteration == 0) { // first iteration
+ heapKLL = new HeapLongKLLSketch(maxMemoryByte);
lastL = cntL = Long.MIN_VALUE;
cntR = Long.MAX_VALUE;
n = 0;
pageKLLNum = 0;
+ pageKLLIndex = 0;
} else {
+ heapKLL = new HeapLongKLLSketch(maxMemoryByte);
pageKLLNum = 0;
+ pageKLL = null;
System.out.println(
"\t[KLL STAT DEBUG] start iteration "
+ iteration
@@ -145,26 +158,15 @@ public class KLLStatSingleReadAggrResult extends AggregateResult {
@Override
public void finishIteration() {
System.out.println(
- "\t[KLL STAT DEBUG]"
+ "\t[KLL STAT SINGLE READ DEBUG]"
+ "finish iteration "
- + iteration
- + " cntL,R:"
- + "["
- + cntL
- + ","
- + cntR
- + "]"
- + "\tlastL:"
- + lastL
- + "\tK1:"
- + K1);
- System.out.println("\t[KLL STAT DEBUG]" + " statNum:" + pageKLLNum);
- iteration++;
- if (n == 0) {
- hasFinalResult = true;
- return;
- }
- setDoubleValue(-233.0);
+ + " statNum:"
+ + statNum
+ + " pageKllNum:"
+ + pageKLLNum
+ + " heapN:"
+ + heapN);
+ setDoubleValue(heapN);
hasFinalResult = true;
}
@@ -178,6 +180,43 @@ public class KLLStatSingleReadAggrResult extends AggregateResult {
return hasCandidateResult() ? getDoubleValue() : null;
}
+ // private void addSketch(KLLSketchForQuantile sketch, List<HeapLongKLLSketch> a, int baseByte) {
+ // int pos0 = 0;
+ // while (pos0 < pageKLLMaxLen && a.get(pos0) != null) pos0++;
+ // HeapLongKLLSketch bigger_sketch = new HeapLongKLLSketch(baseByte << pos0);
+ // bigger_sketch.mergeWithTempSpace(sketch);
+ // for (int i = 0; i < pos0; i++) {
+ // bigger_sketch.mergeWithTempSpace(a.get(i));
+ // a.set(i, null);
+ // }
+ // if (pos0 == pageKLLMaxLen) { // mem of pageKLL list is too large.
+ // heapKLL.mergeWithTempSpace(bigger_sketch);
+ // } else a.set(pos0, bigger_sketch);
+ // }
+ private void addSketch(KLLSketchForQuantile sketch) {
+ TOT_SKETCH_N += sketch.getN();
+ TOT_SKETCH_SIZE += sketch.getNumLen();
+ if (SKETCH_SIZE < 0) {
+ SKETCH_SIZE = sketch.getNumLen() * 8;
+ pageKLLMaxIndex = (int) Math.floor((0.5 * maxMemoryByte / SKETCH_SIZE));
+ pageKLL = new ArrayList<>(pageKLLMaxIndex);
+ for (int i = 0; i < pageKLLMaxIndex; i++) pageKLL.add(null);
+ }
+ if (pageKLLIndex < pageKLLMaxIndex) pageKLL.set(pageKLLIndex++, sketch);
+ else {
+ heapKLL.mergeWithTempSpace(pageKLL);
+ for (int i = 0; i < pageKLLMaxIndex; i++) pageKLL.set(i, null);
+ pageKLLIndex = 0;
+ pageKLL.set(pageKLLIndex++, sketch);
+ // System.out.println(
+ // "\t[KLL STAT DEBUG]\theapKLL merge pageKLLList. newN: "
+ // + heapKLL.getN()
+ // + " n_true:"
+ // + n);
+ // heapKLL.show();
+ }
+ }
+
@Override
public void updateResultFromStatistics(Statistics statistics) {
switch (statistics.getType()) {
@@ -195,28 +234,29 @@ public class KLLStatSingleReadAggrResult extends AggregateResult {
}
if (iteration == 0) {
n += statistics.getCount();
- if (statistics.getType() == DOUBLE && ((DoubleStatistics) statistics).getKllSketchNum() > 0)
- pageKLLNum += ((DoubleStatistics) statistics).getKllSketchNum();
- }
- long minVal = dataToLong(statistics.getMinValue());
- long maxVal = dataToLong(statistics.getMaxValue());
- // System.out.println(
- // "\t[KLL STAT DEBUG] update from statistics:\t"
- // + "min,max:"
- // + minVal
- // + ","
- // + maxVal
- // + " n:"
- // + statistics.getCount());
- // out of range
- if (minVal > cntR || maxVal < lastL) return;
- if (lastL <= minVal && maxVal < cntL) {
- K1 -= statistics.getCount();
- return;
- }
- if (minVal == maxVal) { // min == max
- for (int i = 0; i < statistics.getCount(); i++) updateStatusFromData(minVal);
- return;
+ // if (statistics.getType() == DOUBLE) {
+ // }
+ if (statistics.getType() == DOUBLE) {
+ DoubleStatistics stat = (DoubleStatistics) statistics;
+ if (stat.getSummaryNum() > 0) {
+ pageKLLNum += stat.getSummaryNum();
+ statNum += 1;
+ // for (LongKLLSketch sketch : stat.getKllSketchList()) addSketch(sketch);
+ System.out.println(
+ "\t[KLL STAT SINGLE READ DEBUG] updateResultFromStatistics. N:"
+ + stat.getCount()
+ + "\tT:"
+ + stat.getStartTime()
+ + "..."
+ + stat.getEndTime());
+ System.out.print("\t\t\t\tPageN:");
+ for (KLLSketchForQuantile sketch : stat.getKllSketchList())
+ System.out.print("\t" + sketch.getN());
+ System.out.println();
+ // stat.getKllSketch().show();
+ return;
+ } else System.out.println("\t\t\t\t!!!!!![ERROR!] no KLL in stat!");
+ }
}
}
@@ -228,24 +268,34 @@ public class KLLStatSingleReadAggrResult extends AggregateResult {
@Override
public void updateResultFromPageData(
IBatchDataIterator batchIterator, long minBound, long maxBound) {
+ // System.out.print("\t[KLL STAT DEBUG]\tupdateResultFromPageData:");
+ // int tmp_tot = 0;
while (batchIterator.hasNext()) {
if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) {
break;
}
+ // System.out.print(
+ // " (" + batchIterator.currentTime() + "," + batchIterator.currentValue() + ")");
+ // tmp_tot++;
updateStatusFromData(batchIterator.currentValue());
batchIterator.next();
}
+ // System.out.println(" tot:" + tmp_tot);
}
@Override
public void updateResultUsingTimestamps(
long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
+ // System.out.print("\t[KLL STAT DEBUG]\tupdateResultUsingTimestamps:");
+ // int tmp_tot = 0;
Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
for (int i = 0; i < length; i++) {
if (values[i] != null) {
updateStatusFromData(values[i]);
+ // tmp_tot++;
}
}
+ // System.out.println(" tot:" + tmp_tot);
}
@Override
@@ -297,28 +347,29 @@ public class KLLStatSingleReadAggrResult extends AggregateResult {
@Override
public void reset() {
super.reset();
+ heapKLL = null;
lastL = cntL = Long.MIN_VALUE;
cntR = Long.MAX_VALUE;
n = 0;
iteration = 0;
hasFinalResult = false;
+ TOT_SKETCH_N = TOT_SKETCH_SIZE = 0;
+ SKETCH_SIZE = -1;
}
@Override
public boolean canUpdateFromStatistics(Statistics statistics) {
+ System.out.println(
+ "\t\t[DEBUG single read]\tcanUpdateFromStatistics\tT:"
+ + statistics.getStartTime()
+ + "..."
+ + statistics.getEndTime());
if ((seriesDataType == DOUBLE) && iteration == 0) {
DoubleStatistics doubleStats = (DoubleStatistics) statistics;
- if (doubleStats.getKllSketchNum() > 0) return true;
- }
- if (iteration > 0) {
- long minVal = dataToLong(statistics.getMinValue());
- long maxVal = dataToLong(statistics.getMaxValue());
- if (minVal > cntR || maxVal < lastL) return true;
- if (lastL <= minVal && maxVal < cntL) return true;
+ if (onlyUsePageSynopsis ? doubleStats.getSummaryNum() == 1 : doubleStats.getSummaryNum() >= 1)
+ return true; // DEBUG
}
- Comparable<Object> minVal = (Comparable<Object>) statistics.getMinValue();
- Comparable<Object> maxVal = (Comparable<Object>) statistics.getMaxValue();
- return (minVal.compareTo(maxVal) == 0); // min==max
+ return false;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/StrictKLLStatSingleAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/StrictKLLStatSingleAggrResult.java
index 24bfb02909..2119371f54 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/StrictKLLStatSingleAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/StrictKLLStatSingleAggrResult.java
@@ -46,11 +46,10 @@ public class StrictKLLStatSingleAggrResult extends AggregateResult {
private long n, K1, heapN;
private HeapLongStrictKLLSketch heapKLL;
private boolean hasFinalResult;
- private List<KLLSketchForQuantile> pageKLL;
- private int pageKLLIndex;
+ private List<KLLSketchForQuantile> preComputedSketch;
+ private int preComputedSketchSize = 0;
private long TOT_SKETCH_N = 0, TOT_SKETCH_SIZE = 0;
private int SKETCH_SIZE = -1;
- private int pageKLLMaxIndex;
long DEBUG = 0;
private int getBitsOfDataType() {
@@ -196,11 +195,10 @@ public class StrictKLLStatSingleAggrResult extends AggregateResult {
cntR = Long.MAX_VALUE;
n = 0;
pageKLLNum = 0;
- pageKLLIndex = 0;
} else {
heapKLL = new HeapLongStrictKLLSketch(maxMemoryByte);
pageKLLNum = 0;
- pageKLL = null;
+ preComputedSketch = null;
System.out.println(
"\t[KLL STAT DEBUG] start iteration "
+ iteration
@@ -220,7 +218,7 @@ public class StrictKLLStatSingleAggrResult extends AggregateResult {
@Override
public void finishIteration() {
System.out.println(
- "\t[KLL STAT Single DEBUG]"
+ "\t[KLL STAT SINGLE"
+ "finish iteration "
+ iteration
+ " cntL,R:"
@@ -234,10 +232,10 @@ public class StrictKLLStatSingleAggrResult extends AggregateResult {
+ "\tK1:"
+ K1);
System.out.println(
- "\t[KLL STAT Single DEBUG]"
+ "\t[KLL STAT SINGLE"
+ " statNum:"
+ statNum
- + " pageKllNum:"
+ + " summaryNum:"
+ pageKLLNum
+ " heapN:"
+ heapN);
@@ -253,9 +251,9 @@ public class StrictKLLStatSingleAggrResult extends AggregateResult {
}
long K2 = K1 + 1; // hasTwoMedians() ? (K1 + 1) : K1;
- System.out.println("\t[KLL STAT Single DEBUG]" + " K1,K2:" + K1 + ", " + K2);
+ System.out.println("\t[KLL STAT SINGLE" + " K1,K2:" + K1 + ", " + K2);
if (pageKLLNum == 0) { // all in heap
- System.out.println("\t[KLL STAT Single DEBUG]" + " calc by heap only. N:" + heapKLL.getN());
+ System.out.println("\t[KLL STAT SINGLE" + " calc by heap only. N:" + heapKLL.getN());
heapKLL.show();
double v1 = longToResult(heapKLL.findMinValueWithRank(K1 - 1));
@@ -271,14 +269,13 @@ public class StrictKLLStatSingleAggrResult extends AggregateResult {
// System.out.println("\t[KLL STAT DEBUG] remaining pageKLLSize:" + pageKLLIndex);
mergePageKLL();
heapKLL.show();
- System.out.println(
- "\t[KLL STAT Single DEBUG] after merge. heapN:" + heapKLL.getN() + "\tn_true:" + n);
+ System.out.println("\t[KLL STAT SINGLE after merge. heapN:" + heapKLL.getN() + "\tn_true:" + n);
double v1 = longToResult(heapKLL.findMinValueWithRank(K1 - 1));
double v2 = longToResult(heapKLL.findMinValueWithRank(K2 - 1));
double ans = 0.5 * (v1 + v2);
setDoubleValue(ans);
hasFinalResult = true;
- System.out.println("\t[KLL STAT Single DEBUG]" + " est_stats_err:" + approximateStatAvgError());
+ // System.out.println("\t[KLL STAT SINGLE" + " est_stats_err:" + approximateStatAvgError());
}
@Override
@@ -304,35 +301,43 @@ public class StrictKLLStatSingleAggrResult extends AggregateResult {
// heapKLL.mergeWithTempSpace(bigger_sketch);
// } else a.set(pos0, bigger_sketch);
// }
- private void addSketch(KLLSketchForQuantile sketch) {
+ public void addSketch(KLLSketchForQuantile sketch) {
+ n += sketch.getN();
+ pageKLLNum++;
TOT_SKETCH_N += sketch.getN();
TOT_SKETCH_SIZE += sketch.getNumLen();
if (SKETCH_SIZE < 0) {
SKETCH_SIZE = sketch.getNumLen() * 8;
- pageKLLMaxIndex = (int) Math.floor((0.5 * maxMemoryByte / SKETCH_SIZE));
- pageKLL = new ArrayList<>(pageKLLMaxIndex);
- for (int i = 0; i < pageKLLMaxIndex; i++) pageKLL.add(null);
+ preComputedSketch = new ArrayList<>();
+ preComputedSketchSize = 0;
}
- if (pageKLLIndex < pageKLLMaxIndex) pageKLL.set(pageKLLIndex++, sketch);
- else {
- heapKLL.mergeWithTempSpace(pageKLL);
- for (int i = 0; i < pageKLLMaxIndex; i++) pageKLL.set(i, null);
- pageKLLIndex = 0;
- pageKLL.set(pageKLLIndex++, sketch);
- // System.out.println(
- // "\t[KLL STAT DEBUG]\theapKLL merge pageKLLList. newN: "
- // + heapKLL.getN()
- // + " n_true:"
- // + n);
- // heapKLL.show();
+ preComputedSketch.add(sketch);
+ preComputedSketchSize += sketch.getNumLen() * 8;
+ if (preComputedSketchSize >= maxMemoryByte / 2) {
+ heapKLL.mergeWithTempSpace(preComputedSketch);
+ preComputedSketch.clear();
+ preComputedSketchSize = 0;
}
+ // if (pageKLLIndex < pageKLLMaxIndex) preComputedSketch.set(pageKLLIndex++, sketch);
+ // else {
+ // heapKLL.mergeWithTempSpace(preComputedSketch);
+ // for (int i = 0; i < pageKLLMaxIndex; i++) preComputedSketch.set(i, null);
+ // pageKLLIndex = 0;
+ // preComputedSketch.set(pageKLLIndex++, sketch);
+ // // System.out.println(
+ // // "\t[KLL STAT DEBUG]\theapKLL merge pageKLLList. newN: "
+ // // + heapKLL.getN()
+ // // + " n_true:"
+ // // + n);
+ // // heapKLL.show();
+ // }
}
private void mergePageKLL() {
HeapLongStrictKLLSketch tmpSketch = heapKLL;
heapKLL = new HeapLongStrictKLLSketch(maxMemoryByte);
heapKLL.mergeWithTempSpace(tmpSketch);
- heapKLL.mergeWithTempSpace(pageKLL);
+ heapKLL.mergeWithTempSpace(preComputedSketch);
}
@Override
@@ -351,19 +356,29 @@ public class StrictKLLStatSingleAggrResult extends AggregateResult {
"Unsupported data type in aggregation MEDIAN : %s", statistics.getType()));
}
if (iteration == 0) {
- n += statistics.getCount();
+ // n += statistics.getCount();
// if (statistics.getType() == DOUBLE) {
// }
if (statistics.getType() == DOUBLE) {
DoubleStatistics stat = (DoubleStatistics) statistics;
if (stat.getSummaryNum() > 0) {
- pageKLLNum += stat.getSummaryNum();
+ // pageKLLNum += stat.getSummaryNum();
statNum += 1;
- for (LongKLLSketch sketch : stat.getKllSketchList()) addSketch(sketch);
- // System.out.println(
- // "\t[KLL STAT Single DEBUG] updateResultFromStatistics. pageN:"
- // + stat.getKllSketch().getN());
- // stat.getKllSketch().show();
+ for (KLLSketchForQuantile sketch : stat.getKllSketchList()) {
+ // System.out.println("\t[STRICT KLL STAT Single DEBUG] pageTime:"+);
+ // if (sketch.getN() > 10000)
+ // System.out.println(
+ // "\t[STRICT KLL STAT DEBUG] updateResultFromStatistics\tstatN:"
+ // + sketch.getN()
+ // + "\tstatNumLen:"
+ // + sketch.getNumLen());
+ ((LongKLLSketch) sketch).deserializeFromBuffer();
+ addSketch(sketch);
+ }
+ // System.out.println(
+ // "\t[KLL STAT SINGLE updateResultFromStatistics. pageN:"
+ // + stat.getKllSketch().getN());
+ // stat.getKllSketch().show();
return;
} else System.out.println("\t\t\t\t!!!!!![ERROR!] no KLL in stat!");
}
@@ -485,17 +500,14 @@ public class StrictKLLStatSingleAggrResult extends AggregateResult {
hasFinalResult = false;
TOT_SKETCH_N = TOT_SKETCH_SIZE = 0;
SKETCH_SIZE = -1;
+ preComputedSketch = new ArrayList<>();
+ preComputedSketchSize = 0;
}
@Override
public boolean canUpdateFromStatistics(Statistics statistics) {
if ((seriesDataType == DOUBLE) && iteration == 0) {
DoubleStatistics doubleStats = (DoubleStatistics) statistics;
- // System.out.println(
- // "\t[DEBUG][KLL STAT SINGLE]\tcanUseStat? count:"
- // + doubleStats.getCount()
- // + " KLLNum:"
- // + doubleStats.getKllSketchNum());
if (doubleStats.getSummaryNum() > 0) return true;
}
return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 0c6cdcaacd..cd912d9c85 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.SingleDataSet;
@@ -315,7 +316,7 @@ public class AggregationExecutor {
allUseStatistics &= aggregateResult.useStatisticsIfPossible();
IAggregateReader seriesReader;
int strategy = IoTDBDescriptor.getInstance().getConfig().getAggregationStrategy();
- if (!allUseStatistics || strategy == 0)
+ if (!allUseStatistics || strategy == 0) {
seriesReader =
new SeriesAggregateReader(
seriesPath,
@@ -327,7 +328,13 @@ public class AggregationExecutor {
null,
null,
true);
- else {
+ if (ascAggregateResultList.get(0).getAggregationType()
+ == AggregationType.STRICT_KLL_STAT_SINGLE) {
+ (((SeriesAggregateReader) seriesReader).getSeriesReader()).quantileAggrResult =
+ ascAggregateResultList.get(0);
+ (((SeriesAggregateReader) seriesReader).getSeriesReader()).aggrSST = true;
+ }
+ } else {
if (strategy == 1)
seriesReader =
new SeriesAggregateReaderForStatChain(
@@ -614,9 +621,29 @@ public class AggregationExecutor {
while (seriesReader.hasNextFile()) {
aggregateResultList =
findUnfinishedAggregateResults(aggregateResultList, resultToGroupedAhead);
+ // System.out.println(
+ // "\t\t[DEBUG nextFile In AggrExe] startT:"
+ // + seriesReader.currentFileStatistics().getStartTime()
+ // + "\tN:"
+ // + seriesReader.currentFileStatistics().getCount());
// try to calc by file statistics
if (seriesReader.canUseCurrentFileStatistics()) {
Statistics fileStatistics = seriesReader.currentFileStatistics();
+ // {
+ // DoubleStatistics doubleStats = (DoubleStatistics) fileStatistics;
+ // System.out.println(
+ // "\t\t\t[aggr files from reader]\tfileStat\tN:"
+ // + doubleStats.getCount()
+ // + "\tT:"
+ // + doubleStats.getStartTime()
+ // + "..."
+ // + doubleStats.getEndTime()
+ // + "\tsummaryNum:"
+ // + doubleStats.getSummaryNum());
+ // if (doubleStats.getSummaryNum() > 0)
+ // System.out.println(
+ // "\t\t\t\t\t\tsketch numLen:" + doubleStats.getOneKllSketch().getNumLen());
+ // }
List<AggregateResult> remainingAggregateResultList =
tryToAggregateFromStatistics(aggregateResultList, fileStatistics, resultToGroupedAhead);
if (remainingAggregateResultList.isEmpty()) {
@@ -644,6 +671,7 @@ public class AggregationExecutor {
while (seriesReader.hasNextFile()) {
// cal by file statistics
// TODO
+
// if (seriesReader.canUseCurrentFileStatistics()) {
// while (seriesReader.hasNextSubSeries()) {
// Statistics fileStatistics = seriesReader.currentFileStatistics();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java b/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java
index 602ca68b23..a829a6de7f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java
@@ -115,6 +115,10 @@ public class AggregateResultFactory {
return new SamplingStatSingleAggrResult(dataType);
case SQLConstant.STRICT_KLL_STAT_SINGLE:
return new StrictKLLStatSingleAggrResult(dataType);
+ case SQLConstant.DDSKETCH_SINGLE:
+ return new DDSketchSingleAggrResult(dataType);
+ case SQLConstant.CHUNK_STAT_AVAIL:
+ return new KLLStatChunkAvailAggrResult(dataType);
default:
throw new IllegalArgumentException("Invalid Aggregation function: " + aggrFuncName);
}
@@ -194,6 +198,10 @@ public class AggregateResultFactory {
return new SamplingStatSingleAggrResult(dataType);
case SQLConstant.STRICT_KLL_STAT_SINGLE:
return new StrictKLLStatSingleAggrResult(dataType);
+ case SQLConstant.DDSKETCH_SINGLE:
+ return new DDSketchSingleAggrResult(dataType);
+ case SQLConstant.CHUNK_STAT_AVAIL:
+ return new KLLStatChunkAvailAggrResult(dataType);
default:
throw new IllegalArgumentException("Invalid Aggregation function: " + aggrFuncName);
}
@@ -274,6 +282,10 @@ public class AggregateResultFactory {
return new SamplingStatSingleAggrResult(dataType);
case STRICT_KLL_STAT_SINGLE:
return new StrictKLLStatSingleAggrResult(dataType);
+ case DDSKETCH_SINGLE:
+ return new DDSketchSingleAggrResult(dataType);
+ case CHUNK_STAT_AVAIL:
+ return new KLLStatChunkAvailAggrResult(dataType);
default:
throw new IllegalArgumentException("Invalid Aggregation Type: " + aggregationType.name());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
index aef6f5562a..0d434c54c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
@@ -41,6 +41,12 @@ public class SeriesAggregateReader implements IAggregateReader {
private final SeriesReader seriesReader;
+ public SeriesReader getSeriesReader() {
+ return seriesReader;
+ }
+
+ // public AggregateResult quantileAggrResult;
+
public SeriesAggregateReader(
PartialPath seriesPath,
Set<String> allSensors,
@@ -100,6 +106,19 @@ public class SeriesAggregateReader implements IAggregateReader {
@Override
public boolean canUseCurrentFileStatistics() throws IOException {
Statistics fileStatistics = currentFileStatistics();
+ // if (fileStatistics.getCount() > 10000) {
+ // System.out.println(
+ // "\t\t\t\t[SeriesAggrReader checkFileStat]\toverlap:"
+ // + seriesReader.isFileOverlapped()
+ // + "\t"
+ // + fileStatistics.getStartTime()
+ // + "..."
+ // + fileStatistics.getEndTime()
+ // + "containedByT:"
+ // + containedByTimeFilter(fileStatistics)
+ // + "\tmodified:"
+ // + seriesReader.currentFileModified());
+ // }
return !seriesReader.isFileOverlapped()
&& containedByTimeFilter(fileStatistics)
&& !seriesReader.currentFileModified();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 2e7eea5889..1f852f23a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -18,10 +18,14 @@
*/
package org.apache.iotdb.db.query.reader.series;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.metadata.idtable.IDTable;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.aggregation.impl.KLLStatChunkAvailAggrResult;
+import org.apache.iotdb.db.query.aggregation.impl.StrictKLLStatSingleAggrResult;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryTimeManager;
import org.apache.iotdb.db.query.control.tracing.TracingManager;
@@ -37,6 +41,7 @@ import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -45,18 +50,18 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
+import org.apache.iotdb.tsfile.utils.KLLSketchForQuantile;
+import org.apache.iotdb.tsfile.utils.LongKLLSketch;
+import org.apache.iotdb.tsfile.utils.SegTreeBySketch;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.IOException;
import java.io.Serializable;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Objects;
-import java.util.PriorityQueue;
-import java.util.Set;
+import java.util.*;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
@@ -122,6 +127,12 @@ public class SeriesReader {
protected boolean hasCachedNextOverlappedPage;
protected BatchData cachedBatchData;
+ public final boolean NoUpdate = IoTDBDescriptor.getInstance().getConfig().getNoUpdate();
+
+ public boolean aggrSST = false;
+ public AggregateResult quantileAggrResult;
+ private long unpackedTSEndTime = Long.MIN_VALUE;
+
/**
* @param seriesPath For querying aligned series, the seriesPath should be AlignedPath. All
* selected series belonging to one aligned device should be all in this one AlignedPath's
@@ -264,7 +275,11 @@ public class SeriesReader {
// init first time series metadata whose startTime is minimum
tryToUnpackAllOverlappedFilesToTimeSeriesMetadata();
}
-
+ // if (firstTimeSeriesMetadata != null) {
+ // System.out.println("\tHasNextFile:" +
+ // firstTimeSeriesMetadata.getStatistics().getStartTime());
+ // // System.out.println("\t\tunpacked other seq:" + seqTimeSeriesMetadata.size());
+ // }
return firstTimeSeriesMetadata != null;
}
@@ -272,6 +287,7 @@ public class SeriesReader {
if (firstTimeSeriesMetadata == null) {
throw new IOException("no first file");
}
+ if (NoUpdate) return false;
Statistics fileStatistics = firstTimeSeriesMetadata.getStatistics();
return !seqTimeSeriesMetadata.isEmpty()
@@ -335,7 +351,9 @@ public class SeriesReader {
return true;
}
- while (firstChunkMetadata == null && (!cachedChunkMetadata.isEmpty() || hasNextFile())) {
+ if (firstChunkMetadata == null
+ && (!cachedChunkMetadata.isEmpty()
+ || /*hasNextFile()*/ firstTimeSeriesMetadata != null)) { // TO CHECK
initFirstChunkMetadata();
}
return firstChunkMetadata != null;
@@ -344,28 +362,37 @@ public class SeriesReader {
/** construct first chunk metadata */
private void initFirstChunkMetadata() throws IOException {
if (firstTimeSeriesMetadata != null) {
+ // System.out.println("\t??? initChunkMD by firstTSMD.");
/*
* try to unpack all overlapped TimeSeriesMetadata to cachedChunkMetadata
*/
- unpackAllOverlappedTsFilesToTimeSeriesMetadata(
- orderUtils.getOverlapCheckTime(firstTimeSeriesMetadata.getStatistics()));
+
+ if (!NoUpdate) {
+ unpackAllOverlappedTsFilesToTimeSeriesMetadata(
+ orderUtils.getOverlapCheckTime(firstTimeSeriesMetadata.getStatistics()));
+ }
unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
orderUtils.getOverlapCheckTime(firstTimeSeriesMetadata.getStatistics()), true);
} else {
/*
* first time series metadata is already unpacked, consume cached ChunkMetadata
*/
- while (!cachedChunkMetadata.isEmpty()) {
- firstChunkMetadata = cachedChunkMetadata.first();
- unpackAllOverlappedTsFilesToTimeSeriesMetadata(
- orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()));
- unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
- orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), false);
- if (firstChunkMetadata.equals(cachedChunkMetadata.first())) {
+ if (!NoUpdate) {
+ while (!cachedChunkMetadata.isEmpty()) {
firstChunkMetadata = cachedChunkMetadata.first();
- cachedChunkMetadata.remove(firstChunkMetadata);
- break;
+ unpackAllOverlappedTsFilesToTimeSeriesMetadata(
+ orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()));
+ unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
+ orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), false);
+ if (firstChunkMetadata.equals(cachedChunkMetadata.first())) {
+ firstChunkMetadata = cachedChunkMetadata.first();
+ cachedChunkMetadata.remove(firstChunkMetadata);
+ break;
+ }
}
+ } else {
+ firstChunkMetadata = cachedChunkMetadata.first();
+ cachedChunkMetadata.remove(firstChunkMetadata);
}
}
if (valueFilter != null
@@ -376,15 +403,100 @@ public class SeriesReader {
}
}
+ // protected void unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
+ // long endpointTime, boolean init) throws IOException {
+ // if (!NoUpdate) {
+ // while (!seqTimeSeriesMetadata.isEmpty()
+ // && orderUtils.isOverlapped(endpointTime,
+ // seqTimeSeriesMetadata.get(0).getStatistics())) {
+ // unpackOneTimeSeriesMetadata(seqTimeSeriesMetadata.remove(0));
+ // }
+ // while (!unSeqTimeSeriesMetadata.isEmpty()
+ // && orderUtils.isOverlapped(
+ // endpointTime, unSeqTimeSeriesMetadata.peek().getStatistics())) {
+ // unpackOneTimeSeriesMetadata(unSeqTimeSeriesMetadata.poll());
+ // }
+ // }
+ //
+ // if (firstTimeSeriesMetadata != null
+ // && orderUtils.isOverlapped(endpointTime, firstTimeSeriesMetadata.getStatistics())) {
+ // unpackOneTimeSeriesMetadata(firstTimeSeriesMetadata);
+ // firstTimeSeriesMetadata = null;
+ // }
+ //
+ // if (init && firstChunkMetadata == null && !cachedChunkMetadata.isEmpty()) {
+ // firstChunkMetadata = cachedChunkMetadata.first();
+ // cachedChunkMetadata.remove(firstChunkMetadata);
+ // }
+ // }
protected void unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
long endpointTime, boolean init) throws IOException {
- while (!seqTimeSeriesMetadata.isEmpty()
- && orderUtils.isOverlapped(endpointTime, seqTimeSeriesMetadata.get(0).getStatistics())) {
- unpackOneTimeSeriesMetadata(seqTimeSeriesMetadata.remove(0));
+ ObjectArrayList<ITimeSeriesMetadata> allTSMD = new ObjectArrayList<>();
+ if (!NoUpdate && aggrSST) {
+ while (!seqTimeSeriesMetadata.isEmpty()
+ && orderUtils.isOverlapped(endpointTime, seqTimeSeriesMetadata.get(0).getStatistics())) {
+ // unpackOneTimeSeriesMetadata(seqTimeSeriesMetadata.remove(0));
+ allTSMD.add(seqTimeSeriesMetadata.remove(0));
+ }
+ while (!unSeqTimeSeriesMetadata.isEmpty()
+ && orderUtils.isOverlapped(
+ endpointTime, unSeqTimeSeriesMetadata.peek().getStatistics())) {
+ // unpackOneTimeSeriesMetadata(unSeqTimeSeriesMetadata.poll());
+ allTSMD.add(unSeqTimeSeriesMetadata.poll());
+ }
+ if (firstTimeSeriesMetadata != null
+ && orderUtils.isOverlapped(endpointTime, firstTimeSeriesMetadata.getStatistics())) {
+ allTSMD.add(firstTimeSeriesMetadata);
+ firstTimeSeriesMetadata = null;
+ }
+ for (ITimeSeriesMetadata tsmd : allTSMD) {
+ System.out.println(
+ "\t\t\t\t[debug unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata]\t\ttsmd_T:"
+ + tsmd.getStatistics().getStartTime()
+ + "..."
+ + tsmd.getStatistics().getEndTime()
+ + "\t\t\tunpackedTSEndTime="
+ + unpackedTSEndTime
+ + " statN="
+ + tsmd.getStatistics().getCount());
+ LongArrayList usedChunkL = new LongArrayList(), usedChunkR = new LongArrayList();
+ ObjectArrayList<ITimeSeriesMetadata> otherTSMD = new ObjectArrayList<>(allTSMD);
+ otherTSMD.remove(tsmd);
+ if (((DoubleStatistics) tsmd.getStatistics()).hasSegTreeBySketch) {
+ SegTreeBySketch segT = ((DoubleStatistics) tsmd.getStatistics()).segTreeBySketch;
+ ObjectArrayList<KLLSketchForQuantile> queriedSketch = new ObjectArrayList<>();
+ segT.range_query_in_SST_sketches(
+ queriedSketch, timeFilter, unpackedTSEndTime + 1, Long.MAX_VALUE, otherTSMD);
+ for (KLLSketchForQuantile sketch : queriedSketch) {
+ ((LongKLLSketch) sketch).deserializeFromBuffer();
+ System.out.println("\t\t\t\t\ta sketch in SegT queried. sketchN=" + sketch.getN());
+ if (quantileAggrResult instanceof StrictKLLStatSingleAggrResult)
+ ((StrictKLLStatSingleAggrResult) quantileAggrResult).addSketch(sketch);
+ if (quantileAggrResult instanceof KLLStatChunkAvailAggrResult)
+ ((KLLStatChunkAvailAggrResult) quantileAggrResult).addSketch(sketch);
+ }
+ usedChunkL.addAll(segT.queriedChunkL);
+ usedChunkR.addAll(segT.queriedChunkR);
+ }
+ unpackOneTimeSeriesMetadata(tsmd, usedChunkL, usedChunkR);
+ }
+ for (ITimeSeriesMetadata tsmd : allTSMD)
+ unpackedTSEndTime = Math.max(unpackedTSEndTime, tsmd.getStatistics().getEndTime());
+ // if (!allTSMD.isEmpty())
+ // System.out.println(
+ // "\t\t\t\t\tafter try to use sgt. unpackedTSEndTime<--" + unpackedTSEndTime);
}
- while (!unSeqTimeSeriesMetadata.isEmpty()
- && orderUtils.isOverlapped(endpointTime, unSeqTimeSeriesMetadata.peek().getStatistics())) {
- unpackOneTimeSeriesMetadata(unSeqTimeSeriesMetadata.poll());
+ if (!aggrSST) {
+
+ while (!seqTimeSeriesMetadata.isEmpty()
+ && orderUtils.isOverlapped(endpointTime, seqTimeSeriesMetadata.get(0).getStatistics())) {
+ unpackOneTimeSeriesMetadata(seqTimeSeriesMetadata.remove(0));
+ }
+ while (!unSeqTimeSeriesMetadata.isEmpty()
+ && orderUtils.isOverlapped(
+ endpointTime, unSeqTimeSeriesMetadata.peek().getStatistics())) {
+ unpackOneTimeSeriesMetadata(unSeqTimeSeriesMetadata.poll());
+ }
}
if (firstTimeSeriesMetadata != null
@@ -399,8 +511,47 @@ public class SeriesReader {
}
}
+ protected void unpackOneTimeSeriesMetadata(
+ ITimeSeriesMetadata timeSeriesMetadata, LongArrayList usedChunkL, LongArrayList usedChunkR)
+ throws IOException {
+ List<IChunkMetadata> chunkMetadataList =
+ FileLoaderUtils.loadChunkMetadataList(timeSeriesMetadata);
+ chunkMetadataList.forEach(chunkMetadata -> chunkMetadata.setSeq(timeSeriesMetadata.isSeq()));
+ int last = 0, beforeUnpack = cachedChunkMetadata.size(), skippedN = 0;
+ for (int i = 0; i < usedChunkL.size(); i++) {
+ while (last < chunkMetadataList.size()
+ && chunkMetadataList.get(last).getEndTime() < usedChunkL.getLong(i))
+ cachedChunkMetadata.add(chunkMetadataList.get(last++));
+ while (last < chunkMetadataList.size()
+ && chunkMetadataList.get(last).getEndTime() <= usedChunkR.getLong(i)) {
+ // System.out.println(
+ // "\t\t\t\t\t\ta chunkMD skipped since queried in seg. chunk T:"
+ // + chunkMetadataList.get(last).getStartTime()
+ // + "..."
+ // + chunkMetadataList.get(last).getEndTime()
+ // + "\t\tchunk_sketch_num="
+ // + ((DoubleStatistics)
+ // chunkMetadataList.get(last).getStatistics()).getSummaryNum());
+ skippedN += chunkMetadataList.get(last).getStatistics().getCount();
+ last++;
+ }
+ }
+ while (last < chunkMetadataList.size()) cachedChunkMetadata.add(chunkMetadataList.get(last++));
+ System.out.println(
+ "\t\tin the partial TS_unpacking, "
+ + (cachedChunkMetadata.size() - beforeUnpack)
+ + " of all "
+ + chunkMetadataList.size()
+ + "chunks unpacked.");
+ System.out.println("\t\t\tskippedN:" + skippedN);
+ }
+
protected void unpackOneTimeSeriesMetadata(ITimeSeriesMetadata timeSeriesMetadata)
throws IOException {
+ unpackedTSEndTime =
+ Math.max(unpackedTSEndTime, timeSeriesMetadata.getStatistics().getEndTime());
+ System.out.println(
+ "\t\t\t\t\tunpackOneTimeSeriesMetadata. unpackedTSEndTime<--" + unpackedTSEndTime);
List<IChunkMetadata> chunkMetadataList =
FileLoaderUtils.loadChunkMetadataList(timeSeriesMetadata);
chunkMetadataList.forEach(chunkMetadata -> chunkMetadata.setSeq(timeSeriesMetadata.isSeq()));
@@ -426,6 +577,7 @@ public class SeriesReader {
if (firstChunkMetadata == null) {
throw new IOException("no first chunk");
}
+ if (NoUpdate) return false;
Statistics chunkStatistics = firstChunkMetadata.getStatistics();
return !cachedChunkMetadata.isEmpty()
@@ -545,6 +697,7 @@ public class SeriesReader {
if (firstPageReader == null) {
return false;
}
+ if (NoUpdate) return false;
long endpointTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
unpackAllOverlappedTsFilesToTimeSeriesMetadata(endpointTime);
@@ -570,18 +723,20 @@ public class SeriesReader {
unpackOneChunkMetaData(firstChunkMetadata);
firstChunkMetadata = null;
}
- // In case unpacking too many sequence chunks
- boolean hasMeetSeq = false;
- while (!cachedChunkMetadata.isEmpty()
- && orderUtils.isOverlapped(endpointTime, cachedChunkMetadata.first().getStatistics())) {
- if (cachedChunkMetadata.first().isSeq() && hasMeetSeq) {
- break;
- } else if (cachedChunkMetadata.first().isSeq()) {
- hasMeetSeq = true;
+ if (!NoUpdate) {
+ // In case unpacking too many sequence chunks
+ boolean hasMeetSeq = false;
+ while (!cachedChunkMetadata.isEmpty()
+ && orderUtils.isOverlapped(endpointTime, cachedChunkMetadata.first().getStatistics())) {
+ if (cachedChunkMetadata.first().isSeq() && hasMeetSeq) {
+ break;
+ } else if (cachedChunkMetadata.first().isSeq()) {
+ hasMeetSeq = true;
+ }
+ IChunkMetadata tmp = cachedChunkMetadata.first();
+ cachedChunkMetadata.remove(tmp);
+ unpackOneChunkMetaData(tmp);
}
- IChunkMetadata tmp = cachedChunkMetadata.first();
- cachedChunkMetadata.remove(tmp);
- unpackOneChunkMetaData(tmp);
}
if (init
&& firstPageReader == null
@@ -655,6 +810,7 @@ public class SeriesReader {
if (hasCachedNextOverlappedPage) {
return true;
}
+ if (NoUpdate) return false;
/*
* has a non-overlapped page in firstPageReader
@@ -749,6 +905,8 @@ public class SeriesReader {
return true;
}
+ if (NoUpdate) return false;
+
tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader();
while (true) {
@@ -935,10 +1093,12 @@ public class SeriesReader {
// unpack overlapped page using current page reader
if (firstPageReader != null) {
- long overlapCheckTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
- unpackAllOverlappedTsFilesToTimeSeriesMetadata(overlapCheckTime);
- unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(overlapCheckTime, false);
- unpackAllOverlappedChunkMetadataToPageReaders(overlapCheckTime, false);
+ if (!NoUpdate) {
+ long overlapCheckTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
+ unpackAllOverlappedTsFilesToTimeSeriesMetadata(overlapCheckTime);
+ unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(overlapCheckTime, false);
+ unpackAllOverlappedChunkMetadataToPageReaders(overlapCheckTime, false);
+ }
// this page after unpacking must be the first page
if (firstPageReader.equals(getFirstPageReaderFromCachedReaders())) {
@@ -1032,9 +1192,11 @@ public class SeriesReader {
/*
* Fill unSequence TimeSeriesMetadata Priority Queue until it is not empty
*/
- while (unSeqTimeSeriesMetadata.isEmpty() && orderUtils.hasNextUnseqResource()) {
- unpackUnseqTsFileResource();
- }
+ if (!NoUpdate || seqTimeSeriesMetadata.isEmpty()) {
+ while (unSeqTimeSeriesMetadata.isEmpty() && orderUtils.hasNextUnseqResource()) {
+ unpackUnseqTsFileResource();
+ }
+ } // if NoUpdate, consume all seq first.
/*
* find end time of the first TimeSeriesMetadata
@@ -1057,8 +1219,10 @@ public class SeriesReader {
/*
* unpack all directly overlapped seq/unseq files with first TimeSeriesMetadata
*/
- if (endTime != -1) {
- unpackAllOverlappedTsFilesToTimeSeriesMetadata(endTime);
+ if (!NoUpdate) {
+ if (endTime != -1) {
+ unpackAllOverlappedTsFilesToTimeSeriesMetadata(endTime);
+ }
}
/*
@@ -1086,6 +1250,9 @@ public class SeriesReader {
&& !valueFilter.satisfy(firstTimeSeriesMetadata.getStatistics())) {
firstTimeSeriesMetadata = null;
}
+ // System.out.println("[tryToUnpackAllOverlappedFilesToTimeSeriesMetadata]"+
+ // "\tFirst:"+firstTimeSeriesMetadata.getStatistics().getStartTime()+
+ // "--"+firstTimeSeriesMetadata.getStatistics().getEndTime());
}
protected void unpackAllOverlappedTsFilesToTimeSeriesMetadata(long endpointTime)
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
index d2329d019e..480bfb6d36 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
@@ -216,6 +216,8 @@ public class SchemaUtils {
case SQLConstant.TDIGEST_STAT_SINGLE:
case SQLConstant.SAMPLING_STAT_SINGLE:
case SQLConstant.STRICT_KLL_STAT_SINGLE:
+ case SQLConstant.DDSKETCH_SINGLE:
+ case SQLConstant.CHUNK_STAT_AVAIL:
return TSDataType.DOUBLE;
case SQLConstant.LAST_VALUE:
case SQLConstant.FIRST_VALUE:
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
index aaf30face6..1af961646c 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
@@ -145,6 +145,8 @@ public class TypeInferenceUtils {
case SQLConstant.TDIGEST_STAT_SINGLE:
case SQLConstant.SAMPLING_STAT_SINGLE:
case SQLConstant.STRICT_KLL_STAT_SINGLE:
+ case SQLConstant.DDSKETCH_SINGLE:
+ case SQLConstant.CHUNK_STAT_AVAIL:
return TSDataType.DOUBLE;
default:
throw new IllegalArgumentException("Invalid Aggregation function: " + aggrFuncName);
diff --git a/session/src/test/java/org/apache/iotdb/session/InsertCsvDataIT.java b/session/src/test/java/org/apache/iotdb/session/InsertCsvDataIT.java
index d8d97ac787..3d3fa94150 100644
--- a/session/src/test/java/org/apache/iotdb/session/InsertCsvDataIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/InsertCsvDataIT.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -49,8 +50,8 @@ public class InsertCsvDataIT {
private static int originCompactionThreadNum;
private static final List<String> deviceList = new ArrayList<>();
private static final List<Integer> sizeList = new ArrayList<>();
- private static final int baseSize = 4096 * 20000;
- private static final int TABLET_SIZE = 4096;
+ private static final int baseSize = 8192 * 6713;
+ private static final int TABLET_SIZE = 8192;
private static final int deviceNumL = 0, deviceNumR = 1;
private static final List<String> seriesList = new ArrayList<>();
private static final List<TSDataType> dataTypeList = new ArrayList<>();
@@ -163,69 +164,145 @@ public class InsertCsvDataIT {
private static void insertDataFromTXT()
throws IoTDBConnectionException, StatementExecutionException, IOException {
- long START_TIME = new Date().getTime();
-
- String[] fileList = new String[10];
- fileList[0] = "tmp_1_55.txt";
- fileList[1] = "tmp_3_55.txt";
- fileList[2] = "tmp_0_55.txt";
- fileList[3] = "tmp_2_55.txt";
- fileList[4] = "tmp_1_60.txt";
- fileList[5] = "tmp_0_131.txt";
- fileList[6] = "tmp_1_131.txt";
- fileList[7] = "tmp_0_356.txt";
-
- for (int i = 0; i < 8; i++) {
- String series = "s" + i;
- session.createTimeseries(
- "root.real.d0." + series, TSDataType.DOUBLE, TSEncoding.PLAIN, CompressionType.SNAPPY);
+ final int TEST_CASE = 1;
+ String[] fileList = new String[10], sgName = new String[10];
+ String sketch_size = "4096T4";
+ fileList[0] = "1_bitcoin.csv";
+ sgName[0] = "root.bitcoin" + sketch_size;
+ fileList[1] = "2_SpacecraftThruster.txt";
+ sgName[1] = "root.thruster" + sketch_size;
+ fileList[2] = "3_taxipredition8M.txt";
+ sgName[2] = "root.taxi" + sketch_size;
+ fileList[3] = "4_wh.csv";
+ sgName[3] = "root.wh" + sketch_size;
+ // fileList[1] = "tmp_3_55.txt";
+ // fileList[2] = "tmp_0_55.txt";
+ // fileList[3] = "tmp_2_55.txt";
+ // fileList[4] = "tmp_1_60.txt";
+ // fileList[5] = "tmp_0_131.txt";
+ // fileList[6] = "tmp_1_131.txt";
+ // fileList[7] = "tmp_0_356.txt";
+ for (int fileID : new int[] {0}) {
+ System.out.println("\t\t" + fileList[fileID] + "\t" + sketch_size + "\t\t");
+ System.out.print("\t\t\t");
+
+ String filename = fileList[fileID];
+ String folder = "D:\\Study\\Lab\\iotdb\\add_quantile_to_aggregation\\test_project_2";
+ String filepath = folder + "\\" + filename;
+ DoubleArrayList vv = new DoubleArrayList();
+
+ for (int T = 0; T < TEST_CASE; T++) {
+ try {
+ session.executeNonQueryStatement("delete storage group " + sgName[fileID]);
+ Thread.sleep(4000);
+ } catch (Exception e) {
+ // no-op
+ }
+ File file = new File(filepath);
+ BufferedInputStream fis = null;
+ fis = new BufferedInputStream(new FileInputStream(file));
+ BufferedReader reader =
+ new BufferedReader(
+ new InputStreamReader(fis, StandardCharsets.UTF_8), 50 * 1024 * 1024);
+ reader.readLine(); // ignore first line!
+
+ long START_TIME = new Date().getTime();
+
+ String series = "s0";
+ session.createTimeseries(
+ sgName[fileID] + ".d0.s0", TSDataType.DOUBLE, TSEncoding.PLAIN, CompressionType.SNAPPY);
+ MeasurementSchema schema =
+ new MeasurementSchema(series, TSDataType.DOUBLE, TSEncoding.PLAIN);
+ List<MeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(schema);
+
+ // Random random = new Random(233);
+
+ int chunk_num = 0;
+ String device = sgName[fileID] + ".d0";
+ long CNT_TIME = 0; // new Date().getTime();
+ long INGEST_TIME = 0;
+ while (true) {
+ vv.clear();
+ for (String tmps = reader.readLine();
+ tmps != null && vv.size() < TABLET_SIZE * 1000;
+ tmps = reader.readLine()) vv.add(Double.parseDouble(tmps));
+
+ INGEST_TIME -= new Date().getTime();
+ for (int i = 0; i < vv.size() / TABLET_SIZE; i++) {
+ Tablet tablet = new Tablet(device, schemaList, TABLET_SIZE);
+ long[] timestamps = tablet.timestamps;
+ Object[] values = tablet.values;
+ for (int j = 0; j < TABLET_SIZE; j++) {
+ int row = tablet.rowSize++;
+ timestamps[row] = CNT_TIME++;
+ ((double[]) values[0])[row] = vv.getDouble(i * TABLET_SIZE + j);
+ }
+ session.insertTablet(tablet);
+ if (++chunk_num == 6713) break;
+ }
+ INGEST_TIME += new Date().getTime();
+ if (chunk_num == 6713) break;
+ if (vv.size() < TABLET_SIZE) break;
+ }
+ System.out.print("\t" + INGEST_TIME);
+ System.out.flush();
+ }
+ System.out.println();
+ }
+ }
+
+ private static void append(int chunkToAppend)
+ throws IoTDBConnectionException, StatementExecutionException, IOException {
+ final int TEST_CASE = 1;
+ String[] fileList = new String[10], sgName = new String[10];
+ String sketch_size = "4096T32";
+ fileList[0] = "1_bitcoin.csv";
+ sgName[0] = "root.bitcoin" + sketch_size;
+ fileList[1] = "2_SpacecraftThruster.txt";
+ sgName[1] = "root.thruster" + sketch_size;
+ fileList[2] = "3_taxipredition8M.txt";
+ sgName[2] = "root.taxi" + sketch_size;
+ fileList[3] = "4_wh.csv";
+ sgName[3] = "root.wh" + sketch_size;
+ for (int fileID : new int[] {1}) {
+ System.out.println("APPEND to\t\t" + fileList[fileID] + "\t" + sketch_size + "\t\t");
+ System.out.print("\t\t\t");
+
+ String series = "s0";
MeasurementSchema schema = new MeasurementSchema(series, TSDataType.DOUBLE, TSEncoding.PLAIN);
List<MeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(schema);
+ // Random random = new Random(233);
+
+ int chunk_num = 0;
+ String device = sgName[fileID] + ".d0";
+ long CNT_TIME = new Date().getTime();
+ long INGEST_TIME = 0;
Random random = new Random(233);
- String filename = fileList[i];
- String folder = "E:\\real-world data\\Kaggle";
- String filepath = folder + "\\" + filename;
- File file = new File(filepath);
- BufferedInputStream fis = null;
- fis = new BufferedInputStream(new FileInputStream(file));
- BufferedReader reader =
- new BufferedReader(new InputStreamReader(fis, StandardCharsets.UTF_8), 50 * 1024 * 1024);
- reader.readLine(); // ignore first line!
-
- String device = "root.real.d0";
- long CNT_TIME = i * real_data_series_base_time;
- String tmps;
- boolean over_flag = false;
- while (!over_flag) {
+
+ INGEST_TIME -= new Date().getTime();
+ for (int i = 0; i < chunkToAppend; i++) {
Tablet tablet = new Tablet(device, schemaList, TABLET_SIZE);
long[] timestamps = tablet.timestamps;
Object[] values = tablet.values;
for (int j = 0; j < TABLET_SIZE; j++) {
- if ((tmps = reader.readLine()) != null) {
- int row = tablet.rowSize++;
- timestamps[row] = CNT_TIME;
- ((double[]) values[0])[row] = Double.parseDouble(tmps);
- CNT_TIME++;
- } else {
- over_flag = true;
- break;
- }
- }
- if (!over_flag) {
- session.insertTablet(tablet);
+ int row = tablet.rowSize++;
+ timestamps[row] = CNT_TIME++;
+ ((double[]) values[0])[row] = random.nextGaussian();
}
+ session.insertTablet(tablet);
}
}
- System.out.println("\t\t[WRITE FINISH]:\t" + (new Date().getTime() - START_TIME));
}
@Test
public void insertDATA() {
try {
- prepareTimeSeriesData();
- // insertDataFromTXT();
+ // prepareTimeSeriesData();
+ insertDataFromTXT();
+ // append(5);
// insertDataFromTXT();
// insertDataFromTXT(3, 3, 0);
} catch (IoTDBConnectionException | StatementExecutionException | IOException e) {
diff --git a/session/src/test/java/org/apache/iotdb/session/InsertDataIT.java b/session/src/test/java/org/apache/iotdb/session/InsertDataIT.java
index 269032a4cf..7aca826d26 100644
--- a/session/src/test/java/org/apache/iotdb/session/InsertDataIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/InsertDataIT.java
@@ -49,8 +49,8 @@ public class InsertDataIT {
private static int originCompactionThreadNum;
private static final List<String> deviceList = new ArrayList<>();
private static final List<Integer> sizeList = new ArrayList<>();
- private static final int baseSize = 4096 * 20000;
- private static final int TABLET_SIZE = 4096 * 20;
+ private static final int TABLET_SIZE = 8192;
+ private static final int baseSize = TABLET_SIZE * 1500; // (30 * 30 * 30 - 1);
private static final int deviceNumL = 0, deviceNumR = 1;
private static final List<String> seriesList = new ArrayList<>();
private static final List<TSDataType> dataTypeList = new ArrayList<>();
@@ -61,7 +61,7 @@ public class InsertDataIT {
@BeforeClass
public static void setUp() throws Exception {
for (int i = deviceNumL; i < deviceNumR; i++) {
- deviceList.add("root.Summary1.d" + i);
+ deviceList.add("root.test1500.SSTd" + i);
sizeList.add(baseSize * (i + 1));
}
for (int i = 0; i < series_num; i++) {
diff --git a/session/src/test/java/org/apache/iotdb/session/InsertLatencyDataIT.java b/session/src/test/java/org/apache/iotdb/session/InsertLatencyDataIT.java
index 160ff7e3e7..6b5166e5d1 100644
--- a/session/src/test/java/org/apache/iotdb/session/InsertLatencyDataIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/InsertLatencyDataIT.java
@@ -35,11 +35,9 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Random;
+import java.io.*;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
import static org.junit.Assert.fail;
@@ -49,20 +47,22 @@ public class InsertLatencyDataIT {
private static int originCompactionThreadNum;
private static final List<String> deviceList = new ArrayList<>();
private static final List<Integer> sizeList = new ArrayList<>();
- private static final int baseSize = 100000 * 200;
- private static final int TABLET_SIZE = 100000;
+ private static final int baseSize = 4096 * 64;
+ private static final int TABLET_SIZE = 4096 * 16;
private static final int device_num = 1;
private static final List<String> seriesList = new ArrayList<>();
private static final List<TSDataType> dataTypeList = new ArrayList<>();
private static final int series_num = 1;
private static final int Long_Series_Num = 0;
private static final boolean inMemory = false;
- static final double mu = 2, sig = 3;
+ static final double mu = 2, sig = 3.0;
+ static final String muS = Integer.toString((int) (Math.round(mu * 10)));
+ static final String sigS = Integer.toString((int) (Math.round(sig * 10)));
@BeforeClass
public static void setUp() throws Exception {
for (int i = 0; i < device_num; i++) {
- deviceList.add("root.disorder_16_23.d" + i);
+ deviceList.add("root.latency_" + muS + "_" + sigS + ".d" + i);
sizeList.add(baseSize * (i + 1));
}
for (int i = 0; i < series_num; i++) {
@@ -97,6 +97,12 @@ public class InsertLatencyDataIT {
for (String device : deviceList) {
for (int seriesID = START_SERIES; seriesID < series_num; seriesID++) {
String series = seriesList.get(seriesID);
+ try {
+ session.executeNonQueryStatement(
+ "delete storage group " + device.substring(0, device.length() - 3));
+ } catch (Exception e) {
+ // no-op
+ }
session.createTimeseries(
device + "." + series,
dataTypeList.get(seriesID),
@@ -114,17 +120,6 @@ public class InsertLatencyDataIT {
}
Random random = new Random(233);
- long REVERSE_TIME = 1L << 40;
-
- // List<LongLongPair> a = new ArrayList<>(baseSize);
- // for (int i = 0; i < baseSize; i++) {
- // a.add(
- // PrimitiveTuples.pair(
- // (long) i, (long) Math.round(i + Math.exp(mu + sig * random.nextGaussian()))));
- // // System.out.println("\t\t"+a.get(i).getOne()+"\t"+a.get(i).getTwo());
- // }
- // a.sort(Comparator.comparingLong(LongLongPair::getTwo));
-
long[] aa;
long[] bb;
IntArrayList cc;
@@ -140,13 +135,19 @@ public class InsertLatencyDataIT {
for (int deviceID = 0; deviceID < device_num; deviceID++) {
String device = deviceList.get(deviceID);
+
+ // ArrayList<Double> tmpList = new ArrayList<>();
+ // for (int seriesID = 0; seriesID < series_num; seriesID++) tmpList.add(-233.0);
+ // session.insertRecord(device, 1L << 40, seriesList, dataTypeList, tmpList.toArray());
+ // session.executeNonQueryStatement("flush");
+
int TABLET_NUM = (baseSize / TABLET_SIZE) * (deviceID + 1);
long TOTAL_SIZE = baseSize * (deviceID + 1);
long index = 0;
for (int i = 0; i < TABLET_NUM; i++) {
- long BASE_TIME;
- if (i == 0) BASE_TIME = REVERSE_TIME;
- else BASE_TIME = (long) (i - 1) * TABLET_SIZE + 1;
+ // long BASE_TIME;
+ // if (i == 0) BASE_TIME = REVERSE_TIME;
+ // else BASE_TIME = (long) (i - 1) * TABLET_SIZE + 1;
Tablet tablet = new Tablet(device, schemaList, TABLET_SIZE);
@@ -155,9 +156,9 @@ public class InsertLatencyDataIT {
for (long time = 0; time < TABLET_SIZE; index++, time++) {
int row = tablet.rowSize++;
- timestamps[row] = aa[cc.get((int) index)];
+ timestamps[row] = aa[cc.getInt((int) index)];
// if (index < 100) System.out.println("\t" + timestamps[row]);
- if (i == 0) timestamps[row] += 1L << 30;
+ // if (i == 0) timestamps[row] += 1L << 30;
for (int seriesID = START_SERIES; seriesID < series_num; seriesID++) {
String series = seriesList.get(seriesID);
@@ -169,23 +170,112 @@ public class InsertLatencyDataIT {
}
}
session.insertTablet(tablet);
- session.executeNonQueryStatement("flush");
+ // session.executeNonQueryStatement("flush");
}
+ session.executeNonQueryStatement("flush");
}
System.out.println(
"\t\t create designed data cost time:" + (System.currentTimeMillis() - START_TIME));
}
- static final long real_data_series_base_time = 1L << 32;
+ private static void insertDataFromFile()
+ throws IoTDBConnectionException, StatementExecutionException, IOException {
+ long START_TIME = System.currentTimeMillis();
+
+ Random random = new Random(233);
+ long[] aa;
+ long[] bb;
+ IntArrayList cc;
+ aa = new long[baseSize];
+ bb = new long[baseSize];
+ cc = new IntArrayList(baseSize);
+ for (int i = 0; i < baseSize; i++) {
+ cc.add(i);
+ aa[i] = i;
+ bb[i] = (long) Math.round(i + Math.exp(mu + sig * random.nextGaussian()));
+ }
+ cc.sort((x, y) -> (Long.compare(bb[x], bb[y])));
+
+ String[] fileList = new String[10];
+ fileList[1] = "1_bitcoin.csv";
+ fileList[2] = "2_physiological_stress.txt";
+ fileList[3] = "4_taxipredition8M.txt";
+ fileList[4] = "5_wh.csv";
- private static void insertDataFromTXT()
- throws IoTDBConnectionException, StatementExecutionException, IOException {}
+ for (int fileID = 2; fileID <= 2; fileID++) {
+ String series = "s0";
+ String storage_group = "root.real_" + fileID + "_latency_" + muS + "_" + sigS;
+ String device = storage_group + ".d0";
+ try {
+ session.executeNonQueryStatement("delete storage group " + storage_group);
+ } catch (Exception e) {
+ // no-op
+ }
+ session.createTimeseries(
+ device + "." + series, TSDataType.DOUBLE, TSEncoding.PLAIN, CompressionType.SNAPPY);
+
+ MeasurementSchema schema = new MeasurementSchema(series, TSDataType.DOUBLE, TSEncoding.PLAIN);
+ List<MeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(schema);
+
+ session.insertRecord(
+ device,
+ 1L << 40,
+ Collections.singletonList(series),
+ Collections.singletonList(TSDataType.DOUBLE),
+ Collections.singletonList(-233.0).toArray());
+ session.executeNonQueryStatement("flush");
+ String filename = fileList[fileID];
+ String folder = "F:\\real-world data\\LSM-quantile";
+ String filepath = folder + "\\" + filename;
+ File file = new File(filepath);
+ BufferedInputStream fis = null;
+ fis = new BufferedInputStream(new FileInputStream(file));
+ BufferedReader reader =
+ new BufferedReader(new InputStreamReader(fis, StandardCharsets.UTF_8), 50 * 1024 * 1024);
+ reader.readLine(); // ignore first line!
+ String tmps;
+ boolean over_flag = false;
+ long index = 0;
+ double[] vv = new double[baseSize];
+ while (!over_flag && index < baseSize) {
+ Tablet tablet = new Tablet(device, schemaList, TABLET_SIZE);
+ long[] timestamps = tablet.timestamps;
+ Object[] values = tablet.values;
+ for (int j = 0; j < TABLET_SIZE; index++, j++) {
+ if ((tmps = reader.readLine()) != null) {
+ vv[(int) index] = Double.parseDouble(tmps);
+ } else {
+ over_flag = true;
+ break;
+ }
+ }
+ }
+
+ index = 0;
+ for (int i = 0; i < baseSize / TABLET_SIZE; i++) {
+ Tablet tablet = new Tablet(device, schemaList, TABLET_SIZE);
+ long[] timestamps = tablet.timestamps;
+ Object[] values = tablet.values;
+ for (long time = 0; time < TABLET_SIZE; index++, time++) {
+ int row = tablet.rowSize++;
+ timestamps[row] = aa[cc.getInt((int) index)];
+ double num = random.nextGaussian();
+ ((double[]) values[0])[row] = vv[cc.getInt((int) index)];
+ }
+ session.insertTablet(tablet);
+ session.executeNonQueryStatement("flush");
+ }
+ }
+ System.out.println(
+ "\t\t create designed data cost time:" + (System.currentTimeMillis() - START_TIME));
+ }
@Test
public void insertDATA() {
try {
prepareTimeSeriesData(mu, sig);
- // insertDataFromTXT();
+ // insertDataFromFile();
// insertDataFromTXT();
// insertDataFromTXT(3, 3, 0);
} catch (IoTDBConnectionException | StatementExecutionException | IOException e) {
@@ -194,20 +284,19 @@ public class InsertLatencyDataIT {
}
}
- @Test
- public void executeStatement()
- throws IoTDBConnectionException, StatementExecutionException, IOException {
- SessionDataSet dataSet;
- dataSet = session.executeQueryStatement("show timeseries");
- while (dataSet.hasNext()) System.out.println("[DEBUG]" + dataSet.next().getFields().toString());
- long ST;
-
- ST = new Date().getTime();
- for (int i = 0; i < 1; i++)
- dataSet =
- session.executeQueryStatement(
- "select exact_median_kll_stat_single(s0) from " + deviceList.get(0));
- System.out.println("[DEBUG]" + dataSet.next().getFields().toString());
- System.out.println("\t\ttime:" + (new Date().getTime() - ST));
- }
+ // @Test
+ // public void executeStatement()
+ // throws IoTDBConnectionException, StatementExecutionException, IOException {
+ // SessionDataSet dataSet;
+ // dataSet = session.executeQueryStatement("show timeseries");
+ // while (dataSet.hasNext()) System.out.println("[DEBUG]" +
+ // dataSet.next().getFields().toString());
+ // long ST;
+ //
+ // ST = new Date().getTime();
+ // for (int i = 0; i < 1; i++)
+ // dataSet = session.executeQueryStatement("select count(s0) from " + deviceList.get(0));
+ // System.out.println("[DEBUG]" + dataSet.next().getFields().toString());
+ // System.out.println("\t\ttime:" + (new Date().getTime() - ST));
+ // }
}
diff --git a/session/src/test/java/org/apache/iotdb/session/InsertDataIT.java b/session/src/test/java/org/apache/iotdb/session/InsertUnseqDataIT.java
similarity index 93%
copy from session/src/test/java/org/apache/iotdb/session/InsertDataIT.java
copy to session/src/test/java/org/apache/iotdb/session/InsertUnseqDataIT.java
index 269032a4cf..cc9aed472a 100644
--- a/session/src/test/java/org/apache/iotdb/session/InsertDataIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/InsertUnseqDataIT.java
@@ -43,14 +43,14 @@ import java.util.Random;
import static org.junit.Assert.fail;
-public class InsertDataIT {
+public class InsertUnseqDataIT {
private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
private static Session session;
private static int originCompactionThreadNum;
private static final List<String> deviceList = new ArrayList<>();
private static final List<Integer> sizeList = new ArrayList<>();
- private static final int baseSize = 4096 * 20000;
- private static final int TABLET_SIZE = 4096 * 20;
+ private static final int TABLET_SIZE = 8192;
+ private static final int baseSize = TABLET_SIZE * 6713; // (30 * 30 * 30 - 1);
private static final int deviceNumL = 0, deviceNumR = 1;
private static final List<String> seriesList = new ArrayList<>();
private static final List<TSDataType> dataTypeList = new ArrayList<>();
@@ -61,7 +61,7 @@ public class InsertDataIT {
@BeforeClass
public static void setUp() throws Exception {
for (int i = deviceNumL; i < deviceNumR; i++) {
- deviceList.add("root.Summary1.d" + i);
+ deviceList.add("root.testU1.d" + i);
sizeList.add(baseSize * (i + 1));
}
for (int i = 0; i < series_num; i++) {
@@ -117,8 +117,16 @@ public class InsertDataIT {
String device = deviceList.get(deviceID);
int TABLET_NUM = (baseSize / TABLET_SIZE) * (deviceID + 1);
long TOTAL_SIZE = baseSize * (deviceID + 1);
- for (int i = 0; i < TABLET_NUM; i++) {
+
+ // Tablet unSeqTablet = new Tablet(device, schemaList, 1);
+ // unSeqTablet.timestamps[0]=1L<<40;
+ // ((double[]) unSeqTablet.values[0])[0]=-2.33;
+ // session.insertTablet(unSeqTablet);
+ // session.executeNonQueryStatement("flush");
+
+ for (int i = -1; i < TABLET_NUM; i++) {
long BASE_TIME = (long) i * TABLET_SIZE;
+ if (i < 0) BASE_TIME = 1L << 40;
Tablet tablet = new Tablet(device, schemaList, TABLET_SIZE);
long[] timestamps = tablet.timestamps;
diff --git a/session/src/test/java/org/apache/iotdb/session/InsertUnseqLatencyDataIT.java b/session/src/test/java/org/apache/iotdb/session/InsertUnseqLatencyDataIT.java
new file mode 100644
index 0000000000..781511c5bb
--- /dev/null
+++ b/session/src/test/java/org/apache/iotdb/session/InsertUnseqLatencyDataIT.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.*;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.fail;
+
+public class InsertUnseqLatencyDataIT {
+ private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+ private static Session session;
+ private static int originCompactionThreadNum;
+ private static final List<String> deviceList = new ArrayList<>();
+ private static final List<Integer> sizeList = new ArrayList<>();
+ private static final int TABLET_SIZE = 8192;
+ private static final int baseSize = TABLET_SIZE * 6713; // 6713;
+ private static final int device_num = 1;
+ private static final List<String> seriesList = new ArrayList<>();
+ private static final List<TSDataType> dataTypeList = new ArrayList<>();
+ private static final int series_num = 1;
+ private static final int Long_Series_Num = 0;
+ private static final boolean inMemory = false;
+ static final double mu = 3.0, sig = 2.6; // sig: 0,1.0,2.1,2.4,2.6
+ static final String muS = Integer.toString((int) (Math.round(mu * 10)));
+ static final String sigS = Integer.toString((int) (Math.round(sig * 10)));
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ System.out.println("\t\tfreeMem\t" + (Runtime.getRuntime().freeMemory()) / (1024 * 1024.0));
+ System.out.println("\t\tmaxMem\t" + (Runtime.getRuntime().maxMemory()) / (1024 * 1024.0));
+ System.out.println("\t\ttotalMem\t" + (Runtime.getRuntime().totalMemory()) / (1024 * 1024.0));
+ for (int i = 0; i < device_num; i++) {
+ deviceList.add("root.sst_latency_" + muS + "_" + sigS + ".d" + i);
+ sizeList.add(baseSize * (i + 1));
+ }
+ for (int i = 0; i < series_num; i++) {
+ seriesList.add("s" + i);
+ if (i < Long_Series_Num) dataTypeList.add(TSDataType.INT64);
+ else dataTypeList.add(TSDataType.DOUBLE);
+ }
+ originCompactionThreadNum = CONFIG.getConcurrentCompactionThread();
+ CONFIG.setConcurrentCompactionThread(0);
+ if (inMemory) EnvironmentUtils.envSetUp();
+ session = new Session("127.0.0.1", 6667, "root", "root");
+ session.open();
+ if (inMemory) {
+ prepareTimeSeriesData(mu, sig);
+ // insertDataFromTXT(5);
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ session.close();
+ if (inMemory) EnvironmentUtils.cleanEnv();
+ CONFIG.setConcurrentCompactionThread(originCompactionThreadNum);
+ }
+
+ private static void prepareTimeSeriesData(double mu, double sig)
+ throws IoTDBConnectionException, StatementExecutionException, IOException {
+ System.out.println("\t\t????" + deviceList + "||||" + seriesList);
+
+ long START_TIME = System.currentTimeMillis();
+ final int START_SERIES = 0;
+ for (String device : deviceList) {
+ for (int seriesID = START_SERIES; seriesID < series_num; seriesID++) {
+ String series = seriesList.get(seriesID);
+ try {
+ session.executeNonQueryStatement(
+ "delete storage group " + device.substring(0, device.length() - 3));
+ } catch (Exception e) {
+ // no-op
+ }
+ session.createTimeseries(
+ device + "." + series,
+ dataTypeList.get(seriesID),
+ TSEncoding.PLAIN,
+ CompressionType.SNAPPY);
+ }
+ }
+
+ List<MeasurementSchema> schemaList = new ArrayList<>();
+
+ for (int seriesID = START_SERIES; seriesID < series_num; seriesID++) {
+ schemaList.add(
+ new MeasurementSchema(
+ seriesList.get(seriesID), dataTypeList.get(seriesID), TSEncoding.PLAIN));
+ }
+
+ Random random = new Random(233);
+ // long[] aa;
+ long[] bb;
+ IntArrayList cc;
+ // aa = new long[baseSize];
+ bb = new long[baseSize];
+ System.out.println("\t\tfreeMem\t" + (Runtime.getRuntime().freeMemory()) / (1024 * 1024.0));
+ System.out.println("\t\tmaxMem\t" + (Runtime.getRuntime().maxMemory()) / (1024 * 1024.0));
+ System.out.println("\t\ttotalMem\t" + (Runtime.getRuntime().totalMemory()) / (1024 * 1024.0));
+ cc = new IntArrayList(baseSize);
+ for (int i = 0; i < baseSize; i++) {
+ cc.add(i);
+ // aa[i] = i;
+ bb[i] = (long) Math.round(i + Math.exp(mu + sig * random.nextGaussian()));
+ }
+ cc.sort((x, y) -> (Long.compare(bb[x], bb[y])));
+
+ for (int deviceID = 0; deviceID < device_num; deviceID++) {
+ String device = deviceList.get(deviceID);
+
+ // ArrayList<Double> tmpList = new ArrayList<>();
+ // for (int seriesID = 0; seriesID < series_num; seriesID++) tmpList.add(-233.0);
+ // session.insertRecord(device, 1L << 40, seriesList, dataTypeList, tmpList.toArray());
+ // session.executeNonQueryStatement("flush");
+
+ Tablet unSeqTablet =
+ new Tablet(device, schemaList, TABLET_SIZE); // 重要! 先插一批时间戳极大的数据并且flush,这样后续数据才会全部划归乱序区。
+ for (int i = 0; i < TABLET_SIZE; i++) {
+ unSeqTablet.rowSize++;
+ unSeqTablet.timestamps[i] = (1L << 40) + i;
+ ((double[]) unSeqTablet.values[0])[i] = -2.33;
+ }
+ session.insertTablet(unSeqTablet);
+ session.executeNonQueryStatement("flush");
+
+ int TABLET_NUM = (baseSize / TABLET_SIZE) * (deviceID + 1);
+ long TOTAL_SIZE = baseSize * (deviceID + 1);
+ long index = 0;
+ for (int i = 0; i < TABLET_NUM; i++) {
+ // long BASE_TIME;
+ // if (i == 0) BASE_TIME = REVERSE_TIME;
+ // else BASE_TIME = (long) (i - 1) * TABLET_SIZE + 1;
+
+ Tablet tablet = new Tablet(device, schemaList, TABLET_SIZE);
+
+ long[] timestamps = tablet.timestamps;
+ Object[] values = tablet.values;
+
+ for (long time = 0; time < TABLET_SIZE; index++, time++) {
+ int row = tablet.rowSize++;
+ timestamps[row] = cc.getInt((int) index); // aa[cc.getInt((int) index)];
+ // if (index < 100) System.out.println("\t" + timestamps[row]);
+ // if (i == 0) timestamps[row] += 1L << 30;
+
+ for (int seriesID = START_SERIES; seriesID < series_num; seriesID++) {
+ String series = seriesList.get(seriesID);
+
+ if (seriesID == 0) {
+ double num = random.nextGaussian();
+ ((double[]) values[seriesID])[row] = num;
+ }
+ }
+ }
+ session.insertTablet(tablet);
+ // session.executeNonQueryStatement("flush");
+ }
+ // session.executeNonQueryStatement("flush");
+ }
+ System.out.println(
+ "\t\t create designed data cost time:" + (System.currentTimeMillis() - START_TIME));
+ }
+
+ private static void insertDataFromFile(int fileID)
+ throws IoTDBConnectionException, StatementExecutionException, IOException {
+ long START_TIME = System.currentTimeMillis();
+
+ Random random = new Random(233);
+ // long[] aa;
+ int[] bb;
+ IntArrayList cc;
+ // aa = new long[baseSize];
+ bb = new int[baseSize];
+ cc = new IntArrayList(baseSize);
+ for (int i = 0; i < baseSize; i++) {
+ cc.add(i);
+ // aa[i] = i;
+ bb[i] = (int) Math.round(i + Math.exp(mu + sig * random.nextGaussian()));
+ }
+ cc.sort((x, y) -> (Long.compare(bb[x], bb[y])));
+
+ String[] fileList = new String[10], fileName = new String[10];
+ String folder = "D:\\Study\\Lab\\iotdb\\add_quantile_to_aggregation\\test_project_2";
+ fileList[1] = "1_bitcoin.csv";
+ fileList[2] = "2_SpacecraftThruster.txt";
+ fileList[3] = "3_taxipredition8M.txt";
+ fileList[4] = "4_wh.csv";
+ fileName[1] = "bitcoin";
+ fileName[2] = "thruster";
+ fileName[3] = "taxi";
+ fileName[4] = "wh";
+
+ {
+ String series = "s0";
+ String storage_group = "root.real_" + fileName[fileID] + "_latency_" + muS + "_" + sigS;
+ String device = storage_group + ".d0";
+ try {
+ session.executeNonQueryStatement("delete storage group " + storage_group);
+ } catch (Exception e) {
+ // no-op
+ }
+ session.createTimeseries(
+ device + "." + series, TSDataType.DOUBLE, TSEncoding.PLAIN, CompressionType.SNAPPY);
+
+ MeasurementSchema schema = new MeasurementSchema(series, TSDataType.DOUBLE, TSEncoding.PLAIN);
+ List<MeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(schema);
+
+ Tablet unSeqTablet =
+ new Tablet(device, schemaList, TABLET_SIZE); // 重要! 先插一批时间戳极大的数据并且flush,这样后续数据才会全部划归乱序区。
+ for (int i = 0; i < TABLET_SIZE; i++) {
+ unSeqTablet.rowSize++;
+ unSeqTablet.timestamps[i] = (1L << 40) + i;
+ ((double[]) unSeqTablet.values[0])[i] = -2.33;
+ }
+ session.insertTablet(unSeqTablet);
+ session.executeNonQueryStatement("flush");
+
+ String filename = fileList[fileID];
+ String filepath = folder + "\\" + filename;
+ File file = new File(filepath);
+ BufferedInputStream fis = null;
+ fis = new BufferedInputStream(new FileInputStream(file));
+ BufferedReader reader =
+ new BufferedReader(new InputStreamReader(fis, StandardCharsets.UTF_8), 50 * 1024 * 1024);
+ reader.readLine(); // ignore first line!
+ String tmps;
+ boolean over_flag = false;
+ long index = 0;
+ double[] vv = new double[baseSize];
+ while (!over_flag && index < baseSize) {
+ Tablet tablet = new Tablet(device, schemaList, TABLET_SIZE);
+ long[] timestamps = tablet.timestamps;
+ Object[] values = tablet.values;
+ for (int j = 0; j < TABLET_SIZE; index++, j++) {
+ if ((tmps = reader.readLine()) != null) {
+ vv[(int) index] = Double.parseDouble(tmps);
+ } else {
+ over_flag = true;
+ break;
+ }
+ }
+ }
+
+ index = 0;
+ for (int i = 0; i < baseSize / TABLET_SIZE; i++) {
+ Tablet tablet = new Tablet(device, schemaList, TABLET_SIZE);
+ long[] timestamps = tablet.timestamps;
+ Object[] values = tablet.values;
+ for (long time = 0; time < TABLET_SIZE; index++, time++) {
+ int row = tablet.rowSize++;
+ timestamps[row] = cc.getInt((int) index);
+ // double num = random.nextGaussian();
+ ((double[]) values[0])[row] = vv[cc.getInt((int) index)];
+ }
+ session.insertTablet(tablet);
+ // session.executeNonQueryStatement("flush");
+ }
+ }
+ System.out.println(
+ "\t\t create designed data cost time:" + (System.currentTimeMillis() - START_TIME));
+ }
+
+ @Test
+ public void insertDATA() {
+ try {
+ // prepareTimeSeriesData(mu, sig);
+ insertDataFromFile(1);
+ // insertDataFromTXT();
+ // insertDataFromTXT(3, 3, 0);
+ } catch (IoTDBConnectionException | StatementExecutionException | IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // @Test
+ // public void executeStatement()
+ // throws IoTDBConnectionException, StatementExecutionException, IOException {
+ // SessionDataSet dataSet;
+ // dataSet = session.executeQueryStatement("show timeseries");
+ // while (dataSet.hasNext()) System.out.println("[DEBUG]" +
+ // dataSet.next().getFields().toString());
+ // long ST;
+ //
+ // ST = new Date().getTime();
+ // for (int i = 0; i < 1; i++)
+ // dataSet = session.executeQueryStatement("select count(s0) from " + deviceList.get(0));
+ // System.out.println("[DEBUG]" + dataSet.next().getFields().toString());
+ // System.out.println("\t\ttime:" + (new Date().getTime() - ST));
+ // }
+}
diff --git a/session/src/test/java/org/apache/iotdb/session/QueryLSMIT.java b/session/src/test/java/org/apache/iotdb/session/QueryLSMIT.java
new file mode 100644
index 0000000000..903ff5a5fd
--- /dev/null
+++ b/session/src/test/java/org/apache/iotdb/session/QueryLSMIT.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+
+public class QueryLSMIT {
+ private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+ private static Session session;
+ private static int originCompactionThreadNum;
+ private static final int baseSize = 4096 * 5000; // 7989 * (12518 - 1);
+ private static final int series_num = 1;
+ private static final boolean inMemory = false;
+ private static final long REVERSE_TIME = 1L << 60, UPDATE_ARRIVAL_TIME = 1L << 50;
+ private static final List<String> storageGroupList = new ArrayList<>();
+ private static final int datasetID = 1;
+ int TEST_CASE = 512;
+ int queryN = 100000000, seriesN = 110000000;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+
+ storageGroupList.add("root.Summary" + "3");
+ storageGroupList.add("root.Summary" + "0");
+ // storageGroupList.add("root.noSum");
+ originCompactionThreadNum = CONFIG.getConcurrentCompactionThread();
+ CONFIG.setConcurrentCompactionThread(0);
+ if (inMemory) EnvironmentUtils.envSetUp();
+ session = new Session("127.0.0.1", 6667, "root", "root");
+ session.open();
+ if (inMemory) {}
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ session.close();
+ if (inMemory) EnvironmentUtils.cleanEnv();
+ CONFIG.setConcurrentCompactionThread(originCompactionThreadNum);
+ }
+
+ private String getQueryStatement(String body, long L, long R) {
+ return body + " where time>=" + L + " and time<" + R;
+ }
+
+ private void testTime() throws IoTDBConnectionException, StatementExecutionException {
+ List<String> aggrList = new ArrayList<>();
+ aggrList.add("kll_quantile");
+ aggrList.add("count");
+
+ SessionDataSet dataSet;
+ // System.out.println("\t\t\tqueryN:" + queryN + "\tDataset: random");
+ long[] LL = new long[TEST_CASE];
+ long[] RR = new long[TEST_CASE];
+ Random random = new Random(233);
+ for (int i = 0; i < TEST_CASE; i++) {
+ LL[i] = 0;
+ // random.nextInt(seriesN - queryN + 1);
+ RR[i] = LL[i] + queryN;
+ // System.out.println("\t\t\t"+(LL[i])+" "+(RR[i]));
+ }
+ for (String sg : storageGroupList) {
+ System.out.print(queryN);
+ for (String aggr : aggrList) {
+ // if (aggr.equals("count")) if (!sg.contains("Summary0")) continue;
+ String queryBody = "select " + aggr + "(" + "s0" + ") from " + sg + ".d0";
+ session.executeQueryStatement(queryBody);
+ for (int i = 0; i < TEST_CASE / 8 + 4; i++)
+ session.executeQueryStatement(getQueryStatement(queryBody, LL[i], RR[i]));
+ // warm up.
+
+ long TIME = new Date().getTime();
+ LongArrayList tList = new LongArrayList();
+ for (int t = 0; t < TEST_CASE; t++) {
+ dataSet = session.executeQueryStatement(getQueryStatement(queryBody, LL[t], RR[t]));
+ long mmp = new Date().getTime();
+ tList.add(mmp - TIME);
+ TIME = mmp;
+ // System.out.println(getQueryStatement(queryBody,LL[t],RR[t]));
+ }
+ tList.sort(Long::compare);
+ long sum = 0, cnt = 0;
+ for (int i = TEST_CASE / 4; i < TEST_CASE * 3 / 4; cnt++, i++) sum += tList.getLong(i);
+ System.out.print("\t\t" + 1.0 * sum / cnt);
+ }
+ }
+ System.out.println();
+ }
+
+ // @Test
+ // public void executeStatement()
+ // throws IoTDBConnectionException, StatementExecutionException, IOException {
+ // SessionDataSet dataSet;
+ // dataSet = session.executeQueryStatement("show timeseries");
+ // while (dataSet.hasNext()) System.out.println("[DEBUG]" +
+ // dataSet.next().getFields().toString());
+ // long ST;
+ // ST = new Date().getTime();
+ // for (int i = 0; i < 1; i++)
+ // dataSet =
+ // session.executeQueryStatement(
+ // "select exact_median_kll_stat_single(s0) from "
+ // + storageGroupList.get(0)
+ // + " where time<"
+ // + REVERSE_TIME);
+ // System.out.println("[DEBUG]" + dataSet.next().getFields().toString());
+ // System.out.println("\t\ttime:" + (new Date().getTime() - ST));
+ // }
+
+ @Test
+ public void run() throws IoTDBConnectionException, StatementExecutionException, IOException {
+ long ST = new Date().getTime();
+ System.out.println(
+ "queryN\t\ttime_lsm\t\ttime_count_lsm\t\ttime_chunk\t\ttime_count_chunk\t\t两个count只是数据不同");
+ for (int x = 20000000; x <= 100000000; x += 20000000) {
+ queryN = x;
+ testTime();
+ }
+ // testValue();
+ System.out.println("\t\tALL_TIME:" + (new Date().getTime() - ST));
+ }
+}
diff --git a/session/src/test/java/org/apache/iotdb/session/QueryLatencyIT.java b/session/src/test/java/org/apache/iotdb/session/QueryLatencyIT.java
new file mode 100644
index 0000000000..a5b223853b
--- /dev/null
+++ b/session/src/test/java/org/apache/iotdb/session/QueryLatencyIT.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+
+public class QueryLatencyIT {
+ private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+ private static Session session;
+ private static int originCompactionThreadNum;
+ private static final int baseSize = 4096 * 5000; // 7989 * (12518 - 1);
+ private static final int series_num = 1;
+ private static final boolean inMemory = false;
+ private static final long REVERSE_TIME = 1L << 60, UPDATE_ARRIVAL_TIME = 1L << 50;
+ private static final List<String> storageGroupList = new ArrayList<>();
+ private static final int datasetID = 1;
+ int TEST_CASE = 1024;
+ int queryN = 100000, seriesN = 4096 * 5000;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ for (double mu = 2, sig = 1.5; sig <= 3.6; sig += 0.5) {
+ String muS = Integer.toString((int) (Math.round(mu * 10)));
+ String sigS = Integer.toString((int) (Math.round(sig * 10)));
+ storageGroupList.add("root.real_" + datasetID + "_latency_" + muS + "_" + sigS);
+ }
+ originCompactionThreadNum = CONFIG.getConcurrentCompactionThread();
+ CONFIG.setConcurrentCompactionThread(0);
+ if (inMemory) EnvironmentUtils.envSetUp();
+ session = new Session("127.0.0.1", 6667, "root", "root");
+ session.open();
+ if (inMemory) {}
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ session.close();
+ if (inMemory) EnvironmentUtils.cleanEnv();
+ CONFIG.setConcurrentCompactionThread(originCompactionThreadNum);
+ }
+
+ private String getQueryStatement(String body, long L, long R) {
+ return body + " where time>=" + L + " and time<" + R;
+ }
+
+ private void testTime() throws IoTDBConnectionException, StatementExecutionException {
+ List<String> aggrList = new ArrayList<>();
+ aggrList.add("kll_quantile");
+ aggrList.add("count");
+
+ SessionDataSet dataSet;
+ System.out.println("\t\t\tqueryN:" + queryN + "\tDataset:" + datasetID);
+ long[] LL = new long[TEST_CASE];
+ long[] RR = new long[TEST_CASE];
+ Random random = new Random(233);
+ for (int i = 0; i < TEST_CASE; i++) {
+ LL[i] = random.nextInt(seriesN - queryN + 1);
+ RR[i] = LL[i] + queryN;
+ // System.out.println("\t\t\t"+(LL[i])+" "+(RR[i]));
+ }
+ for (String latencyData : storageGroupList) {
+ System.out.print(
+ "\tlatency=" + latencyData.substring(latencyData.lastIndexOf("y_") + 2) + "\t\t");
+ for (String aggr : aggrList) {
+ String queryBody = "select " + aggr + "(" + "s0" + ") from " + latencyData + ".d0";
+ session.executeQueryStatement(queryBody);
+ for (int i = 0; i < TEST_CASE / 8 + 4; i++)
+ session.executeQueryStatement(getQueryStatement(queryBody, LL[i], RR[i]));
+ // warm up.
+
+ long TIME = new Date().getTime();
+ for (int t = 0; t < TEST_CASE; t++) {
+ dataSet = session.executeQueryStatement(getQueryStatement(queryBody, LL[t], RR[t]));
+ // System.out.println(getQueryStatement(queryBody,LL[t],RR[t]));
+ }
+ TIME = new Date().getTime() - TIME;
+ System.out.print("\t" + 1.0 * TIME / TEST_CASE);
+ }
+ System.out.println();
+ }
+ }
+
+ private void testValue() throws IoTDBConnectionException, StatementExecutionException {
+ List<String> aggrList = new ArrayList<>();
+ aggrList.add("exact_median_kll_stat_single_read");
+
+ SessionDataSet dataSet;
+ System.out.println("\t\t\tqueryN:" + queryN + "\tDataset:" + datasetID);
+ long[] LL = new long[TEST_CASE];
+ long[] RR = new long[TEST_CASE];
+ Random random = new Random(233);
+ for (int i = 0; i < TEST_CASE; i++) {
+ LL[i] = random.nextInt(seriesN - queryN + 1);
+ RR[i] = LL[i] + queryN;
+ // System.out.println("\t\t\t"+(LL[i])+" "+(RR[i]));
+ }
+ for (String latencyData : storageGroupList) {
+ System.out.print(
+ "\tlatency=" + latencyData.substring(latencyData.lastIndexOf("y_") + 2) + "\t\t");
+ for (String aggr : aggrList) {
+ String queryBody = "select " + aggr + "(" + "s0" + ") from " + latencyData + ".d0";
+ session.executeQueryStatement(queryBody);
+ for (int i = 0; i < TEST_CASE / 8 + 4; i++)
+ session.executeQueryStatement(getQueryStatement(queryBody, LL[i], RR[i]));
+ // warm up.
+
+ long TIME = new Date().getTime();
+ double SUM = 0;
+ for (int t = 0; t < TEST_CASE; t++) {
+ dataSet = session.executeQueryStatement(getQueryStatement(queryBody, LL[t], RR[t]));
+ String value = dataSet.next().getFields().toString();
+ value = value.substring(1, value.length() - 1);
+ SUM += Double.parseDouble(value);
+ // System.out.println(getQueryStatement(queryBody,LL[t],RR[t]));
+ }
+ TIME = new Date().getTime() - TIME;
+ System.out.print("\t" + SUM / TEST_CASE + "\t" + SUM / TEST_CASE / queryN);
+ }
+ System.out.println();
+ }
+ }
+
+ // @Test
+ // public void executeStatement()
+ // throws IoTDBConnectionException, StatementExecutionException, IOException {
+ // SessionDataSet dataSet;
+ // dataSet = session.executeQueryStatement("show timeseries");
+ // while (dataSet.hasNext()) System.out.println("[DEBUG]" +
+ // dataSet.next().getFields().toString());
+ // long ST;
+ // ST = new Date().getTime();
+ // for (int i = 0; i < 1; i++)
+ // dataSet =
+ // session.executeQueryStatement(
+ // "select exact_median_kll_stat_single(s0) from "
+ // + storageGroupList.get(0)
+ // + " where time<"
+ // + REVERSE_TIME);
+ // System.out.println("[DEBUG]" + dataSet.next().getFields().toString());
+ // System.out.println("\t\ttime:" + (new Date().getTime() - ST));
+ // }
+
+ @Test
+ public void run() throws IoTDBConnectionException, StatementExecutionException, IOException {
+ testTime();
+ testValue();
+ }
+}
diff --git a/session/src/test/java/org/apache/iotdb/session/QuerySSTSketchWithDifferentTs.java b/session/src/test/java/org/apache/iotdb/session/QuerySSTSketchWithDifferentTs.java
new file mode 100644
index 0000000000..f78112eb77
--- /dev/null
+++ b/session/src/test/java/org/apache/iotdb/session/QuerySSTSketchWithDifferentTs.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+
+public class QuerySSTSketchWithDifferentTs {
+ private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+ private static Session session;
+ private static int originCompactionThreadNum;
+ private static final boolean inMemory = false;
+ int TEST_CASE = 64;
+ int queryN = 40000000, seriesN = 8192 * 6713;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ originCompactionThreadNum = CONFIG.getConcurrentCompactionThread();
+ CONFIG.setConcurrentCompactionThread(0);
+ if (inMemory) EnvironmentUtils.envSetUp();
+ session = new Session("127.0.0.1", 6667, "root", "root");
+ session.open();
+ if (inMemory) {}
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ session.close();
+ if (inMemory) EnvironmentUtils.cleanEnv();
+ CONFIG.setConcurrentCompactionThread(originCompactionThreadNum);
+ }
+
+ private String getQueryStatement(String body, long L, long R) {
+ return body + " where time>=" + L + " and time<" + R;
+ }
+
+ private void testTime() throws IoTDBConnectionException, StatementExecutionException {
+ String dataset = "thruster";
+ List<String> aggrList = new ArrayList<>();
+ aggrList.add("kll_quantile");
+
+ System.out.println("\t\t\tqueryN:" + queryN + "\tDataset:" + dataset);
+ long[] LL = new long[TEST_CASE];
+ long[] RR = new long[TEST_CASE];
+ Random random = new Random(233);
+ for (int i = 0; i < TEST_CASE; i++) {
+ LL[i] = 0; // random.nextInt(seriesN - queryN + 1);
+ RR[i] = LL[i] + queryN;
+ // System.out.println("\t\t\t"+(LL[i])+" "+(RR[i]));
+ }
+ long ALL_START = new Date().getTime();
+ System.out.println();
+ SessionDataSet tmpResult;
+ for (int T : new int[] {1, 2, 4, 8, 16, 32}) {
+ String sgName = "root." + dataset + "4096" + "T" + T;
+ String queryBody = "select " + "kll_quantile" + "(s0)" + " from " + sgName + ".d0";
+
+ for (int i = 0; i < TEST_CASE / 8 + 4; i++) session.executeQueryStatement(queryBody); // drop
+
+ long TIME = new Date().getTime();
+ LongArrayList tArr = new LongArrayList();
+ for (int t = 0; t < TEST_CASE; t++) {
+ long tmpT = new Date().getTime();
+ tmpResult =
+ session.executeQueryStatement(/*getQueryStatement(queryBody, LL[t], RR[t])*/ queryBody);
+ tmpT = new Date().getTime() - tmpT;
+ tArr.add(tmpT);
+ // System.out.println(getQueryStatement(queryBody,LL[t],RR[t]));
+ }
+ tArr.sort(Long::compare);
+
+ System.out.print(T + "\t\t" + 1.0 * tArr.getLong(tArr.size() / 2) + "\t");
+ TIME = new Date().getTime() - TIME;
+ System.out.print("" + 1.0 * TIME / TEST_CASE + "\n");
+ }
+
+ System.out.println(
+ "\n\n\t\tTEST_CASE=" + TEST_CASE + "\t\tALL_TIME=" + (new Date().getTime() - ALL_START));
+ }
+
+ // @Test
+ // public void executeStatement()
+ // throws IoTDBConnectionException, StatementExecutionException, IOException {
+ // SessionDataSet dataSet;
+ // dataSet = session.executeQueryStatement("show timeseries");
+ // while (dataSet.hasNext()) System.out.println("[DEBUG]" +
+ // dataSet.next().getFields().toString());
+ // long ST;
+ // ST = new Date().getTime();
+ // for (int i = 0; i < 1; i++)
+ // dataSet =
+ // session.executeQueryStatement(
+ // "select exact_median_kll_stat_single(s0) from "
+ // + storageGroupList.get(0)
+ // + " where time<"
+ // + REVERSE_TIME);
+ // System.out.println("[DEBUG]" + dataSet.next().getFields().toString());
+ // System.out.println("\t\ttime:" + (new Date().getTime() - ST));
+ // }
+
+ @Test
+ public void run() throws IoTDBConnectionException, StatementExecutionException, IOException {
+ testTime();
+ // testValue();
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
index 28c992355e..0d6baff940 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
@@ -73,8 +73,8 @@ public class TSFileConfig implements Serializable {
private int groupSizeInByte = 128 * 1024 * 1024;
/** The memory size for each series writer to pack page, default value is 64KB. */
private int pageSizeInByte = 64 * 1024;
- /** The maximum number of data points in a page, default value is 1024 * 1024. */
- private int maxNumberOfPointsInPage = 1000;
+ /** The maximum number of data points in a page, default value 4096. */
+ private int maxNumberOfPointsInPage = 4096;
/** The maximum degree of a metadataIndex node, default value is 256 */
private int maxDegreeOfIndexNode = 256;
/** Data type for input timestamp, TsFile supports INT64. */
@@ -422,6 +422,16 @@ public class TSFileConfig implements Serializable {
this.enableSynopsis = enableSynopsis;
}
+ private boolean enableSSTSketch = false;
+
+ public boolean isEnableSSTSketch() {
+ return enableSSTSketch;
+ }
+
+ public void setEnableSSTSketch(boolean enableSSTSketch) {
+ this.enableSSTSketch = enableSSTSketch;
+ }
+
private boolean enableBloomFilter = false;
public boolean isEnableBloomFilter() {
@@ -501,4 +511,24 @@ public class TSFileConfig implements Serializable {
public void setQuantileFile(String file) {
this.quantileFile = file;
}
+
+ private boolean synopsisForWholeChunkWhenFlush = true;
+
+ public void setSynopsisForWholeChunkWhenFlush(boolean x) {
+ this.synopsisForWholeChunkWhenFlush = x;
+ }
+
+ public boolean getSynopsisForWholeChunkWhenFlush() {
+ return this.synopsisForWholeChunkWhenFlush;
+ }
+
+ private int sketchSizeRatio = 1;
+
+ public void setSketchSizeRatio(int x) {
+ this.sketchSizeRatio = x;
+ }
+
+ public int getSketchSizeRatio() {
+ return this.sketchSizeRatio;
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
index f7589de193..c791312cce 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
@@ -72,6 +72,9 @@ public class TimeseriesMetadata implements ITimeSeriesMetadata {
private ArrayList<IChunkMetadata> chunkMetadataList;
+ public ByteBuffer cMDBuffer;
+ public boolean hasCMDBuffer = false;
+
public TimeseriesMetadata() {}
public TimeseriesMetadata(
@@ -96,7 +99,10 @@ public class TimeseriesMetadata implements ITimeSeriesMetadata {
this.dataType = timeseriesMetadata.dataType;
this.statistics = timeseriesMetadata.statistics;
this.modified = timeseriesMetadata.modified;
- this.chunkMetadataList = new ArrayList<>(timeseriesMetadata.chunkMetadataList);
+ if (timeseriesMetadata.hasCMDBuffer) {
+ this.cMDBuffer = timeseriesMetadata.cMDBuffer;
+ this.hasCMDBuffer = true;
+ } else this.chunkMetadataList = new ArrayList<>(timeseriesMetadata.getChunkMetadataList());
}
public static TimeseriesMetadata deserializeFrom(ByteBuffer buffer, boolean needChunkMetadata)
@@ -109,15 +115,18 @@ public class TimeseriesMetadata implements ITimeSeriesMetadata {
timeseriesMetaData.setDataSizeOfChunkMetaDataList(chunkMetaDataListDataSize);
timeseriesMetaData.setStatistics(Statistics.deserialize(buffer, timeseriesMetaData.dataType));
if (needChunkMetadata) {
- ByteBuffer byteBuffer = buffer.slice();
- byteBuffer.limit(chunkMetaDataListDataSize);
- timeseriesMetaData.chunkMetadataList = new ArrayList<>();
- while (byteBuffer.hasRemaining()) {
- timeseriesMetaData.chunkMetadataList.add(
- ChunkMetadata.deserializeFrom(byteBuffer, timeseriesMetaData));
- }
- // minimize the storage of an ArrayList instance.
- timeseriesMetaData.chunkMetadataList.trimToSize();
+ timeseriesMetaData.cMDBuffer = buffer.slice();
+ timeseriesMetaData.cMDBuffer.limit(chunkMetaDataListDataSize);
+ timeseriesMetaData.hasCMDBuffer = true;
+ // ByteBuffer byteBuffer = buffer.slice();
+ // byteBuffer.limit(chunkMetaDataListDataSize);
+ // timeseriesMetaData.chunkMetadataList = new ArrayList<>();
+ // while (byteBuffer.hasRemaining()) {
+ // timeseriesMetaData.chunkMetadataList.add(
+ // ChunkMetadata.deserializeFrom(byteBuffer, timeseriesMetaData));
+ // }
+ // // minimize the storage of an ArrayList instance.
+ // timeseriesMetaData.chunkMetadataList.trimToSize();
}
buffer.position(buffer.position() + chunkMetaDataListDataSize);
return timeseriesMetaData;
@@ -137,7 +146,7 @@ public class TimeseriesMetadata implements ITimeSeriesMetadata {
byteLen += ReadWriteIOUtils.write(dataType, outputStream);
byteLen +=
ReadWriteForEncodingUtils.writeUnsignedVarInt(chunkMetaDataListDataSize, outputStream);
- byteLen += statistics.serialize(outputStream);
+ byteLen += statistics.serialize(outputStream, true);
chunkMetadataListBuffer.writeTo(outputStream);
byteLen += chunkMetadataListBuffer.size();
return byteLen;
@@ -206,6 +215,20 @@ public class TimeseriesMetadata implements ITimeSeriesMetadata {
}
public List<IChunkMetadata> getChunkMetadataList() {
+ if (hasCMDBuffer) {
+ // System.out.println("\t\t\t[TSMD]\tgetCMDList from delayed buffer.");
+ chunkMetadataList = new ArrayList<>();
+ try {
+ while (cMDBuffer.hasRemaining()) {
+ chunkMetadataList.add(ChunkMetadata.deserializeFrom(cMDBuffer, this));
+ }
+ } catch (IOException e) {
+ // no-op
+ }
+ // minimize the storage of an ArrayList instance.
+ chunkMetadataList.trimToSize();
+ hasCMDBuffer = false;
+ }
return chunkMetadataList;
}
@@ -260,7 +283,7 @@ public class TimeseriesMetadata implements ITimeSeriesMetadata {
+ ", isSeq="
+ isSeq
+ ", chunkMetadataList="
- + chunkMetadataList
+ + getChunkMetadataList()
+ '}';
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java
index 046f89794a..be7448c257 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java
@@ -20,10 +20,7 @@ package org.apache.iotdb.tsfile.file.metadata.statistics;
import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.LongKLLSketch;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.iotdb.tsfile.utils.SamplingHeapForStatMerge;
-import org.apache.iotdb.tsfile.utils.TDigestForStatMerge;
+import org.apache.iotdb.tsfile.utils.*;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
@@ -48,7 +45,9 @@ public class DoubleStatistics extends Statistics<Double> {
private double sumValue;
private int summaryNum = 0;
private LongKLLSketch kllSketch = null;
- private List<LongKLLSketch> kllSketchList = null;
+ private List<KLLSketchForQuantile> kllSketchList = null;
+ // public ByteBuffer chunkSketchBuffer;
+ // public boolean hasChunkSketchBuffer = false;
private TDigestForStatMerge tDigest = null;
private List<TDigestForStatMerge> tDigestList = null;
private SamplingHeapForStatMerge sampling = null;
@@ -57,6 +56,13 @@ public class DoubleStatistics extends Statistics<Double> {
private BloomFilter<Long> bf = null;
private List<BloomFilter<Long>> bfList = null;
private MutableLongList MinTimeMaxTimeCountList = null;
+ private boolean LSMFile = false;
+
+ public boolean hasSegTreeBySketch = false;
+ public SegTreeBySketch segTreeBySketch = null;
+ // private int lsmLevel = 0;
+ // public List<KLLSketchForQuantile> lsmSketch = null;
+ // private MutableLongList lsmMinTMaxT = null;
static final int DOUBLE_STATISTICS_FIXED_RAM_SIZE = 90;
@@ -170,7 +176,18 @@ public class DoubleStatistics extends Statistics<Double> {
// public LongKLLSketch getKllSketch() {
// return kllSketch;
// }
- public List<LongKLLSketch> getKllSketchList() {
+ public void setKLLSketch(KLLSketchForQuantile sketch) {
+ LSMFile = true;
+ summaryNum = 1;
+ kllSketchList = new ArrayList<>();
+ kllSketchList.add(sketch);
+ }
+
+ public KLLSketchForQuantile getOneKllSketch() {
+ return kllSketchList.get(0);
+ }
+
+ public List<KLLSketchForQuantile> getKllSketchList() {
return kllSketchList;
}
@@ -337,7 +354,7 @@ public class DoubleStatistics extends Statistics<Double> {
if (doubleStat.summaryNum > 0) {
this.summaryNum = 1;
if (SUMMARY_TYPE == 0) {
- this.kllSketch = doubleStat.kllSketchList.get(pageID);
+ this.kllSketch = (LongKLLSketch) (doubleStat.kllSketchList.get(pageID));
this.kllSketchList = new ArrayList<>(1);
this.kllSketchList.add(this.kllSketch);
}
@@ -493,7 +510,7 @@ public class DoubleStatistics extends Statistics<Double> {
}
@Override
- int serializeChunkMetadataStat(OutputStream outputStream) throws IOException {
+ int serializeSketchStat(OutputStream outputStream) throws IOException {
// System.out.println("\t\t\t\t\t[DEBUG DOUBLE stat] serializeStats
// hashmap:"+serializeHashMap);
int byteLen = 0;
@@ -504,10 +521,17 @@ public class DoubleStatistics extends Statistics<Double> {
byteLen += ReadWriteIOUtils.write(sumValue, outputStream);
byteLen += ReadWriteIOUtils.write(summaryNum, outputStream);
byteLen += ReadWriteIOUtils.write(bfNum, outputStream);
+ byteLen += ReadWriteIOUtils.write(hasSegTreeBySketch, outputStream);
+ if (hasSegTreeBySketch) {
+ byteLen += segTreeBySketch.serializeSegTree(outputStream);
+ return byteLen;
+ }
if (summaryNum > 0) {
if (SUMMARY_TYPE == 0)
- for (LongKLLSketch sketch : kllSketchList) {
- int tmp = sketch.serialize(outputStream);
+ for (KLLSketchForQuantile sketch : kllSketchList) {
+ LongKLLSketch diskSketch =
+ !LSMFile ? ((LongKLLSketch) sketch) : (new LongKLLSketch(sketch));
+ int tmp = diskSketch.serialize(outputStream);
byteLen += tmp;
// System.out.println("\t[DEBUG][DoubleStat serializeStats]:\tbytes:" + tmp);
// sketch.show();
@@ -554,6 +578,7 @@ public class DoubleStatistics extends Statistics<Double> {
byteLen += ReadWriteIOUtils.write(sumValue, outputStream);
byteLen += ReadWriteIOUtils.write(0, outputStream);
byteLen += ReadWriteIOUtils.write(0, outputStream);
+ byteLen += ReadWriteIOUtils.write(false, outputStream);
return byteLen;
}
@@ -566,6 +591,7 @@ public class DoubleStatistics extends Statistics<Double> {
this.sumValue = ReadWriteIOUtils.readDouble(inputStream);
this.summaryNum = ReadWriteIOUtils.readInt(inputStream);
this.bfNum = ReadWriteIOUtils.readInt(inputStream);
+ this.hasSegTreeBySketch = ReadWriteIOUtils.readBool(inputStream);
if (this.summaryNum > 0) {
if (SUMMARY_TYPE == 0) {
this.kllSketchList = new ArrayList<>(summaryNum);
@@ -601,10 +627,21 @@ public class DoubleStatistics extends Statistics<Double> {
this.sumValue = ReadWriteIOUtils.readDouble(byteBuffer);
this.summaryNum = ReadWriteIOUtils.readInt(byteBuffer);
this.bfNum = ReadWriteIOUtils.readInt(byteBuffer);
+ this.hasSegTreeBySketch = ReadWriteIOUtils.readBool(byteBuffer);
+ if (hasSegTreeBySketch) {
+ this.segTreeBySketch = new SegTreeBySketch(byteBuffer);
+ return;
+ }
if (this.summaryNum > 0) {
+ // System.out.println(
+ // "\t\t\t[Stat Deserialize]\tsketch\tsumNum=" + summaryNum + "\tstartT:" +
+ // getStartTime());
if (SUMMARY_TYPE == 0) {
this.kllSketchList = new ArrayList<>(summaryNum);
for (int i = 0; i < summaryNum; i++) this.kllSketchList.add(new LongKLLSketch(byteBuffer));
+ // for (int i = 0; i < summaryNum; i++)
+ // System.out.println("\t\t\t\t\tsketch numLen:" +
+ // this.kllSketchList.get(i).getNumLen());
}
if (SUMMARY_TYPE == 1) {
this.tDigestList = new ArrayList<>(summaryNum);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
index 63ba0e3629..20bf4b0f2b 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
@@ -74,6 +74,8 @@ public abstract class Statistics<T extends Serializable> {
public static int BLOOM_FILTER_SIZE = 0;
public static int PAGE_SIZE_IN_BYTE = 65536;
public static int SUMMARY_TYPE = 0;
+ // public static boolean ENABLE_LSM_SKETCH =
+ // TSFileDescriptor.getInstance().getConfig().getCompressionPerLSMLevel();
protected static double getFPP(double bitsPerKey) {
return Math.exp(-1 * bitsPerKey * Math.pow(Math.log(2.0D), 2));
@@ -181,20 +183,20 @@ public abstract class Statistics<T extends Serializable> {
return serialize(outputStream, false);
}
- public int serialize(OutputStream outputStream, boolean isChunkMetaData) throws IOException {
+ public int serialize(OutputStream outputStream, boolean sketch) throws IOException {
int byteLen = 0;
byteLen += ReadWriteForEncodingUtils.writeUnsignedVarInt(count, outputStream);
byteLen += ReadWriteIOUtils.write(startTime, outputStream);
byteLen += ReadWriteIOUtils.write(endTime, outputStream);
// value statistics of different data type
- if (!isChunkMetaData) byteLen += serializeStats(outputStream);
- else byteLen += serializeChunkMetadataStat(outputStream);
+ if (!sketch) byteLen += serializeStats(outputStream);
+ else byteLen += serializeSketchStat(outputStream);
return byteLen;
}
abstract int serializeStats(OutputStream outputStream) throws IOException;
- int serializeChunkMetadataStat(OutputStream outputStream) throws IOException {
+ int serializeSketchStat(OutputStream outputStream) throws IOException {
return serializeStats(outputStream);
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/DDSketchForQuantile.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/DDSketchForQuantile.java
new file mode 100644
index 0000000000..37747146cb
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/DDSketchForQuantile.java
@@ -0,0 +1,183 @@
+package org.apache.iotdb.tsfile.utils;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+
+public class DDSketchForQuantile implements Serializable {
+ private double alpha;
+ private double gamma;
+ private double multiplier;
+ private int bucket_num_limit;
+ private int threshold_for_compression;
+
+ private Map<Integer, Long> positive_buckets;
+ private Map<Integer, Long> negative_buckets;
+ private double collapse_bound;
+ private long zero_count;
+
+ private transient double beta;
+ private final transient double[] valid_range;
+
+ private static double MIN_POSITIVE_VALUE = 1e-6;
+ private static double COEFFICIENT = 1.5;
+ boolean valid_buckets = false;
+ Bucket[] buckets;
+
+ public DDSketchForQuantile(double alpha, int bucket_num_limit) {
+ // System.out.println(alpha);
+ this.alpha = alpha;
+ this.bucket_num_limit = Math.max(bucket_num_limit, 2);
+ this.threshold_for_compression = (int) (bucket_num_limit * COEFFICIENT);
+ //
+ // System.out.println("\t\t\t\tcompression:"+threshold_for_compression+"\t\tlimit="+bucket_num_limit);
+
+ this.gamma = 2 * alpha / (1 - alpha) + 1;
+ this.multiplier = Math.log(Math.E) / (Math.log1p(gamma - 1));
+ this.positive_buckets = new HashMap<>((int) (bucket_num_limit * 0.75));
+ this.negative_buckets = new HashMap<>((int) (bucket_num_limit * 0.25));
+ this.zero_count = 0;
+ this.collapse_bound = -Double.MAX_VALUE;
+ this.valid_range = new double[6];
+ }
+
+ public void insert(double v) {
+ valid_buckets = false;
+ if (v < collapse_bound) {
+ v = collapse_bound;
+ }
+ if (v > MIN_POSITIVE_VALUE) {
+ int i = (int) Math.ceil(Math.log(v) * multiplier);
+ positive_buckets.put(i, positive_buckets.getOrDefault(i, 0L) + 1);
+ } else if (v < -MIN_POSITIVE_VALUE) {
+ int i = (int) Math.ceil(Math.log(-v) * multiplier);
+ negative_buckets.put(i, negative_buckets.getOrDefault(i, 0L) + 1);
+ } else {
+ zero_count++;
+ }
+ collapse(threshold_for_compression);
+ }
+
+ private void collapse(int limit) {
+ if (sketch_size() > limit) {
+ int exceed = sketch_size() - bucket_num_limit;
+ Integer[] indices = negative_buckets.keySet().toArray(new Integer[0]);
+ Arrays.sort(indices);
+ long count = 0;
+ for (int i = Math.max(0, indices.length - exceed); i < indices.length; ++i) {
+ count += negative_buckets.remove(indices[i]);
+ }
+ if (count > 0) {
+ int i = indices.length - exceed - 1;
+ if (i >= 0) {
+ negative_buckets.put(indices[i], negative_buckets.get(indices[i]) + count);
+ collapse_bound = -Math.pow(gamma, indices[i]);
+ } else {
+ zero_count += count;
+ collapse_bound = 0;
+ }
+ }
+ exceed -= (indices.length - Math.max(0, indices.length - exceed));
+ if (exceed > 0) {
+ count = zero_count;
+ if (zero_count > 0) {
+ exceed--;
+ }
+ indices = positive_buckets.keySet().toArray(new Integer[0]);
+ Arrays.sort(indices);
+ for (int i = exceed - 1; i >= 0; --i) {
+ count += positive_buckets.remove(indices[i]);
+ }
+ positive_buckets.put(indices[exceed], positive_buckets.get(indices[exceed]) + count);
+ collapse_bound = Math.pow(gamma, indices[exceed] - 1);
+ }
+ }
+ }
+
+ static final int DIVIDE_DELTA = 1000000000, DIVIDE_HALF = DIVIDE_DELTA / 2;
+
+ private double getL(int index) {
+ return index > DIVIDE_HALF
+ ? Math.pow(gamma, index - DIVIDE_DELTA - 1)
+ : (index == DIVIDE_HALF ? 0 : -Math.pow(gamma, index));
+ }
+
+ private double getR(int index) {
+ return index > DIVIDE_HALF
+ ? Math.pow(gamma, index - DIVIDE_DELTA)
+ : (index == DIVIDE_HALF ? 0 : -Math.pow(gamma, index - 1));
+ }
+
+ private long getCount(int index) {
+ // System.out.println("\t\t\t\t\t-index="+(-index));
+ // System.out.println("\t\t\t\t\t\t\texist"+negative_buckets.containsKey(-index));
+ return index > DIVIDE_HALF
+ ? positive_buckets.get(index - DIVIDE_DELTA)
+ : (index == DIVIDE_HALF ? zero_count : negative_buckets.get(index));
+ }
+
+ private void union_buckets() {
+ buckets = new Bucket[sketch_size()];
+ int i = 0;
+ for (Map.Entry<Integer, Long> e : positive_buckets.entrySet()) {
+ buckets[i++] = new Bucket(e.getKey() + DIVIDE_DELTA);
+ }
+ for (Map.Entry<Integer, Long> e : negative_buckets.entrySet()) {
+ buckets[i++] = new Bucket(e.getKey());
+ }
+ if (zero_count > 0) {
+ buckets[i] = new Bucket(DIVIDE_HALF);
+ }
+ Arrays.sort(buckets, Comparator.comparingDouble(o -> (getL(o.bucketIndex))));
+ long sum = 0;
+ for (i = 0; i < sketch_size(); i++) {
+ sum += getCount(buckets[i].bucketIndex);
+ buckets[i].prefixSum = sum;
+ }
+ valid_buckets = true;
+ }
+
+ private long total_count() {
+ return positive_buckets.values().stream().mapToLong(l -> l).sum()
+ + negative_buckets.values().stream().mapToLong(l -> l).sum()
+ + zero_count;
+ }
+
+ private int find_p_index(Bucket[] buckets, long total_count, double q) {
+ double rank = q * (total_count - 1);
+ int tmp1 = Integer.highestOneBit(buckets.length);
+ int p = -1;
+ while (tmp1 > 0) {
+ if (p + tmp1 < buckets.length && buckets[p + tmp1].prefixSum <= rank) p += tmp1;
+ tmp1 /= 2;
+ }
+ return p + 1;
+ }
+
+ public double getQuantile(double q) {
+ if (!valid_buckets) union_buckets();
+ long total_count = total_count();
+ Bucket p = buckets[find_p_index(buckets, total_count, q)];
+ if (getL(p.bucketIndex) < 0) {
+ return 2 * getL(p.bucketIndex) / (1 + gamma);
+ } else {
+ return 2 * getR(p.bucketIndex) / (1 + gamma);
+ }
+ }
+
+ public int sketch_size() {
+ return positive_buckets.size() + negative_buckets.size() + (zero_count == 0 ? 0 : 1);
+ }
+
+ private static class Bucket {
+ public int bucketIndex;
+ public long prefixSum;
+
+ Bucket(int bucketIndex) {
+ this.bucketIndex = bucketIndex;
+ this.prefixSum = 0;
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/KLLSketchForLSMFile.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/KLLSketchForLSMFile.java
new file mode 100644
index 0000000000..4ba7a86337
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/KLLSketchForLSMFile.java
@@ -0,0 +1,193 @@
+package org.apache.iotdb.tsfile.utils;
+
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class KLLSketchForLSMFile extends KLLSketchForQuantile {
+ ObjectArrayList<KLLSketchForQuantile> subSketchList;
+
+ /** Built with sorted MemTable(ordered by t). */
+ public KLLSketchForLSMFile(KLLSketchForQuantile subSketch) {
+ subSketchList = new ObjectArrayList<>();
+ subSketchList.add(subSketch);
+ }
+
+ public KLLSketchForLSMFile() {
+ subSketchList = new ObjectArrayList<>();
+ }
+
+ public void addSubSketch(KLLSketchForQuantile subSketch) {
+ this.N += subSketch.getN();
+ subSketchList.add(subSketch);
+ }
+
+ public void compactSubSketches(int additionalLevel) {
+ // int additionalLevel = 1;
+ // while((1<<additionalLevel)<subSketchList.size())additionalLevel++;
+ assert !subSketchList.isEmpty();
+ int subLevel = subSketchList.get(0).cntLevel,
+ targetLevel = subLevel + additionalLevel,
+ tmpNumLen = 0;
+ levelPos = new int[targetLevel + 1];
+ // System.out.println("\t\t[CompactSubSketchesInLSM:] addLV:"+additionalLevel);
+ for (KLLSketchForQuantile subSketch : subSketchList) {
+ assert subSketch.cntLevel == subLevel;
+ assert subSketch.levelPos[0] == subSketch.levelPos[subLevel - 1];
+ tmpNumLen += subSketch.levelPos[subLevel] - subSketch.levelPos[subLevel - 1];
+ }
+ long[] tmpNum = new long[tmpNumLen * 2];
+ int cntNumLen = 0;
+ for (KLLSketchForQuantile subSketch : subSketchList) {
+ System.arraycopy(
+ subSketch.num,
+ subSketch.levelPos[subLevel - 1],
+ tmpNum,
+ cntNumLen,
+ subSketch.getLevelSize(subLevel - 1));
+ cntNumLen += subSketch.getLevelSize(subLevel - 1);
+ }
+ Arrays.sort(tmpNum, 0, cntNumLen);
+ num = tmpNum;
+ for (int i = 0; i < subLevel - 1; i++) levelPos[i] = 0;
+ cntLevel = subLevel;
+ levelPos[cntLevel] = tmpNumLen;
+ // showNum();
+ for (int i = 0; i < additionalLevel; i++) compactOneLevel(cntLevel - 1);
+ num = Arrays.copyOfRange(num, 0, levelPos[cntLevel]);
+ // showNum();
+ }
+
+ private void compactOneLevel(int level) { // compact half of data when numToReduce is small
+ if (level == cntLevel - 1) calcLevelMaxSize(cntLevel + 1);
+ int L1 = levelPos[level], R1 = levelPos[level + 1]; // [L,R)
+ // System.out.println("T_T\t"+(R1-L1));
+ if (level == 0 && !level0Sorted) {
+ Arrays.sort(num, L1, R1);
+ level0Sorted = true;
+ }
+ L1 += (R1 - L1) & 1;
+ if (L1 == R1) return;
+
+ randomlyHalveDownToLeft(L1, R1);
+
+ int mid = (L1 + R1) >>> 1;
+ mergeSortWithoutSpace(L1, mid, levelPos[level + 1], levelPos[level + 2]);
+ levelPos[level + 1] = mid;
+ int newP = levelPos[level + 1] - 1, oldP = L1 - 1;
+ for (int i = oldP; i >= levelPos[0]; i--) num[newP--] = num[oldP--];
+
+ levelPos[level] = levelPos[level + 1] - (L1 - levelPos[level]);
+ int numReduced = (R1 - L1) >>> 1;
+ for (int i = level - 1; i >= 0; i--) levelPos[i] += numReduced;
+ // if(levelPos[level+1]-levelPos[level]>levelMaxSize[level+1]){
+ // compactOneLevel(level+1);
+ // }
+ }
+
+ @Override
+ protected void calcLevelMaxSize(int setLevel) { // set cntLevel. cntLevel won't decrease
+ int[] tmpArr = new int[setLevel + 1];
+ int maxPos = cntLevel > 0 ? Math.max(maxMemoryNum, levelPos[cntLevel]) : maxMemoryNum;
+ for (int i = 0; i < setLevel + 1; i++) tmpArr[i] = i < cntLevel ? levelPos[i] : maxPos;
+ levelPos = tmpArr;
+ cntLevel = setLevel;
+ levelMaxSize = new int[cntLevel];
+ int newK = 0;
+ for (int addK = 1 << 28; addK > 0; addK >>>= 1) { // find a new K to fit the memory limit.
+ int need = 0;
+ for (int i = 0; i < cntLevel; i++)
+ need +=
+ Math.max(1, (int) Math.round(((newK + addK) * Math.pow(2.0 / 3, cntLevel - i - 1))));
+ if (need <= maxMemoryNum) newK += addK;
+ }
+ for (int i = 0; i < cntLevel; i++)
+ levelMaxSize[i] = Math.max(1, (int) Math.round((newK * Math.pow(2.0 / 3, cntLevel - i - 1))));
+ // show();
+ }
+
+ public void mergeWithTempSpace(List<KLLSketchForQuantile> otherList) {
+ // System.out.println("[MERGE]");
+ // show();
+ //
+ // System.out.println("[mergeWithTempSpace]\t???\t"+num.length+"\t??\t"+cntLevel+"\t??\toldPos0:"+levelPos[0]);
+ // System.out.println("[mergeWithTempSpace]\t???\tmaxMemNum:"+maxMemoryNum);
+ // another.show();
+ int[] oldLevelPos = Arrays.copyOf(levelPos, cntLevel + 1);
+ int oldCntLevel = cntLevel;
+ int otherNumLen = 0;
+ long otherN = 0;
+ // System.out.print("\t\t\t\t[mergeWithTempSpace] others:");
+ for (KLLSketchForQuantile another : otherList)
+ if (another != null) {
+ // System.out.print("\t"+another.getN());
+ if (another.cntLevel > cntLevel) calcLevelMaxSize(another.cntLevel);
+ otherNumLen += another.getNumLen();
+ otherN += another.getN();
+ }
+ // System.out.println();
+ // System.out.println("[mergeWithTempSpace]\totherNumLen:"+otherNumLen);
+ if (getNumLen() + otherNumLen <= maxMemoryNum) {
+ int cntPos = oldLevelPos[0] - otherNumLen;
+ for (int i = 0; i < cntLevel; i++) {
+ levelPos[i] = cntPos;
+ if (i < oldCntLevel) {
+ System.arraycopy(num, oldLevelPos[i], num, cntPos, oldLevelPos[i + 1] - oldLevelPos[i]);
+ cntPos += oldLevelPos[i + 1] - oldLevelPos[i];
+ }
+ for (KLLSketchForQuantile another : otherList)
+ if (another != null && i < another.cntLevel) {
+ System.arraycopy(
+ another.num, another.levelPos[i], num, cntPos, another.getLevelSize(i));
+ cntPos += another.getLevelSize(i);
+ }
+ Arrays.sort(num, levelPos[i], cntPos);
+ // System.out.println("\t\t!!\t"+cntPos);
+ }
+ levelPos[cntLevel] = cntPos;
+ this.N += otherN;
+ } else {
+ long[] oldNum = num;
+ num = new long[getNumLen() + otherNumLen];
+ // System.out.println("\t\t\t\ttmp_num:"+num.length+"
+ // old_num:"+levelPos[0]+"..."+levelPos[oldCntLevel]);
+ int numLen = 0;
+ for (int i = 0; i < cntLevel; i++) {
+ levelPos[i] = numLen;
+ if (i < oldCntLevel) {
+ // System.out.println("\t\t\tlv"+i+"\toldPos:"+oldLevelPos[i]+"\t"+numLen+"
+ // this_level_old_len:"+(oldLevelPos[i + 1] - oldLevelPos[i]));
+ // System.out.println("\t\t\t"+oldNum[oldLevelPos[i + 1]-1]);
+ System.arraycopy(
+ oldNum, oldLevelPos[i], num, numLen, oldLevelPos[i + 1] - oldLevelPos[i]);
+ numLen += oldLevelPos[i + 1] - oldLevelPos[i];
+ }
+ for (KLLSketchForQuantile another : otherList)
+ if (another != null && i < another.cntLevel) {
+ System.arraycopy(
+ another.num, another.levelPos[i], num, numLen, another.getLevelSize(i));
+ numLen += another.getLevelSize(i);
+ }
+ Arrays.sort(num, levelPos[i], numLen);
+ }
+ levelPos[cntLevel] = numLen;
+ this.N += otherN;
+ // System.out.println("-------------------------------.............---------");
+ // show();System.out.println("\t?\t"+levelPos[0]);
+ while (getNumLen() > maxMemoryNum) compact();
+ // show();System.out.println("\t?\t"+levelPos[0]);
+ // System.out.println("\t\t??\t\t"+Arrays.toString(num));
+ int newPos0 = maxMemoryNum - getNumLen();
+ System.arraycopy(num, levelPos[0], oldNum, newPos0, getNumLen());
+ for (int i = cntLevel; i >= 0; i--) levelPos[i] += newPos0 - levelPos[0];
+ num = oldNum;
+ }
+ // System.out.println("\t\t??\t\t"+Arrays.toString(num));
+ // System.out.println("\t\t??\t\t"+Arrays.toString(levelPos));
+ // System.out.println("-------------------------------.............---------");
+ // System.out.println("[MERGE result]");
+ // show();
+ // System.out.println();
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/KLLSketchForQuantile.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/KLLSketchForQuantile.java
index bbc88d93c1..8bc5e19036 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/KLLSketchForQuantile.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/KLLSketchForQuantile.java
@@ -17,9 +17,11 @@ public abstract class KLLSketchForQuantile {
public KLLSketchForQuantile() {}
- protected abstract int calcMaxMemoryNum(int maxMemoryByte);
+ protected int calcMaxMemoryNum(int maxMemoryByte) {
+ return 0;
+ }
- protected abstract void calcLevelMaxSize(int setLevel);
+ protected void calcLevelMaxSize(int setLevel) {}
public int getLevelSize(int level) {
return levelPos[level + 1] - levelPos[level];
@@ -55,7 +57,7 @@ public abstract class KLLSketchForQuantile {
// System.out.println("\t\t\t"+x);
}
- protected abstract void compact();
+ protected void compact() {}
protected int getNextRand01() { // xor shift *
XORSHIFT ^= XORSHIFT >>> 12;
@@ -151,4 +153,8 @@ public abstract class KLLSketchForQuantile {
}
return num[L + K];
}
+
+ public void active() {
+ // no-op
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/KLLSketchForSST.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/KLLSketchForSST.java
new file mode 100644
index 0000000000..6e025bb724
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/KLLSketchForSST.java
@@ -0,0 +1,104 @@
+package org.apache.iotdb.tsfile.utils;
+
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+
+import java.util.Arrays;
+
+public class KLLSketchForSST extends KLLSketchForQuantile {
+ ObjectArrayList<KLLSketchForQuantile> subSketchList;
+
+ /** Built with other sketches */
+ public KLLSketchForSST(KLLSketchForQuantile subSketch) {
+ subSketchList = new ObjectArrayList<>();
+ subSketchList.add(subSketch);
+ }
+
+ public KLLSketchForSST() {
+ subSketchList = new ObjectArrayList<>();
+ }
+
+ public void addSubSketch(KLLSketchForQuantile subSketch) {
+ this.N += subSketch.getN();
+ subSketchList.add(subSketch);
+ }
+
+ public void compactSubSketches(int SketchSizeRatio) {
+ assert !subSketchList.isEmpty();
+ int subLevel = subSketchList.get(0).cntLevel, tmpNumLen = 0, subSizeSum = 0;
+ for (KLLSketchForQuantile sketch : subSketchList) {
+ subSizeSum += sketch.getNumLen();
+ subLevel = Math.max(subLevel, sketch.cntLevel);
+ }
+ maxMemoryNum = subSizeSum * SketchSizeRatio / subSketchList.size();
+ num = new long[subSizeSum];
+ levelPos = new int[subLevel + 1];
+ cntLevel = subLevel;
+ for (int lv = levelPos[0] = 0; lv < cntLevel; lv++) {
+ levelPos[lv + 1] = levelPos[lv];
+ for (KLLSketchForQuantile sketch : subSketchList)
+ if (sketch.cntLevel > lv && sketch.getLevelSize(lv) > 0) {
+ System.arraycopy(
+ sketch.num, sketch.levelPos[lv], num, levelPos[lv + 1], sketch.getLevelSize(lv));
+ levelPos[lv + 1] += sketch.getLevelSize(lv);
+ }
+ if (getLevelSize(lv) > 0) Arrays.sort(num, levelPos[lv], levelPos[lv + 1]);
+ }
+ level0Sorted = true;
+ // System.out.println("\t\t before compact");
+ // show();
+ for (int lv = 0; getNumLen() > maxMemoryNum; lv++) {
+ compactOneLevel(lv);
+ }
+ // show();
+ // showNum();
+ // System.out.println("\t\t compact over");
+ }
+
+ private void compactOneLevel(int level) { // compact half of data when numToReduce is small
+ if (level == cntLevel - 1) calcLevelMaxSize(cntLevel + 1);
+ int L1 = levelPos[level], R1 = levelPos[level + 1]; // [L,R)
+ // System.out.println("T_T\t"+(R1-L1));
+ if (level == 0 && !level0Sorted) {
+ Arrays.sort(num, L1, R1);
+ level0Sorted = true;
+ }
+ L1 += (R1 - L1) & 1;
+ if (L1 == R1) return;
+
+ randomlyHalveDownToLeft(L1, R1);
+
+ int mid = (L1 + R1) >>> 1;
+ mergeSortWithoutSpace(L1, mid, levelPos[level + 1], levelPos[level + 2]);
+ levelPos[level + 1] = mid;
+ int newP = levelPos[level + 1] - 1, oldP = L1 - 1;
+ for (int i = oldP; i >= levelPos[0]; i--) num[newP--] = num[oldP--];
+
+ levelPos[level] = levelPos[level + 1] - (L1 - levelPos[level]);
+ int numReduced = (R1 - L1) >>> 1;
+ for (int i = level - 1; i >= 0; i--) levelPos[i] += numReduced;
+ // if(levelPos[level+1]-levelPos[level]>levelMaxSize[level+1]){
+ // compactOneLevel(level+1);
+ // }
+ }
+
+ @Override
+ protected void calcLevelMaxSize(int setLevel) { // set cntLevel. cntLevel won't decrease
+ int[] tmpArr = new int[setLevel + 1];
+ int maxPos = cntLevel > 0 ? Math.max(maxMemoryNum, levelPos[cntLevel]) : maxMemoryNum;
+ for (int i = 0; i < setLevel + 1; i++) tmpArr[i] = i < cntLevel ? levelPos[i] : maxPos;
+ levelPos = tmpArr;
+ cntLevel = setLevel;
+ levelMaxSize = new int[cntLevel];
+ int newK = 0;
+ for (int addK = 1 << 28; addK > 0; addK >>>= 1) { // find a new K to fit the memory limit.
+ int need = 0;
+ for (int i = 0; i < cntLevel; i++)
+ need +=
+ Math.max(1, (int) Math.round(((newK + addK) * Math.pow(2.0 / 3, cntLevel - i - 1))));
+ if (need <= maxMemoryNum) newK += addK;
+ }
+ for (int i = 0; i < cntLevel; i++)
+ levelMaxSize[i] = Math.max(1, (int) Math.round((newK * Math.pow(2.0 / 3, cntLevel - i - 1))));
+ // show();
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/LongKLLSketch.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/LongKLLSketch.java
index 16f933bfb2..1be9776a2e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/LongKLLSketch.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/LongKLLSketch.java
@@ -17,9 +17,9 @@ public class LongKLLSketch extends KLLSketchForQuantile {
int maxN, maxSerializeNum;
int K;
int maxLevel;
+ ByteBuffer sketchBuffer = null;
- public LongKLLSketch(
- int maxN, int maxMemoryByte, int maxSerializeByte) { // maxN=7000 for PAGE, 1.6e6 for CHUNK
+ public LongKLLSketch(int maxN, int maxMemoryByte, int maxSerializeByte) {
this.maxN = maxN;
N = 0;
maxLevel = calcMaxLevel(maxN, maxSerializeByte);
@@ -27,6 +27,24 @@ public class LongKLLSketch extends KLLSketchForQuantile {
calcLevelMaxSize(1);
}
+ public LongKLLSketch(KLLSketchForQuantile sketch) {
+ N = sketch.getN();
+ maxN = (int) N;
+ cntLevel = maxLevel = sketch.cntLevel;
+ maxSerializeNum = Integer.MAX_VALUE;
+ num = sketch.num;
+ levelPos = sketch.levelPos;
+ }
+
+ @Override
+ public void update(long x) { // signed long
+ deserializeFromBuffer();
+ if (levelPos[0] == 0) compact();
+ num[--levelPos[0]] = x;
+ N++;
+ level0Sorted = false;
+ }
+
// public int getCntLevel(){return cntLevel;}
// public int[] getLevelPos(){return levelPos;}
// public long[] getNum(){return num;}
@@ -97,7 +115,7 @@ public class LongKLLSketch extends KLLSketchForQuantile {
int numLEN = getNumLen();
System.out.println(
"\t\tCOMPACT_SIZE:"
- + (13 + (maxLevel) * (numLEN < 256 ? 1 : 2) + numLEN * 8)
+ + (13 + (maxLevel) * (numLEN < 256 ? 1 : 4) + numLEN * 8)
+ "\t//maxMemNum:"
+ maxMemoryNum
+ ",maxSeriNum:"
@@ -173,19 +191,20 @@ public class LongKLLSketch extends KLLSketchForQuantile {
}
public int serialize(OutputStream outputStream) throws IOException { // 15+1*?+8*?
+ deserializeFromBuffer();
compactBeforeSerialization(); // if N==maxN
int byteLen = 0;
byteLen += ReadWriteIOUtils.write(N, outputStream);
byteLen += ReadWriteIOUtils.write(maxN, outputStream);
byteLen += ReadWriteIOUtils.write((byte) maxLevel, outputStream);
int numLEN = getNumLen();
- byteLen += ReadWriteIOUtils.write((short) numLEN, outputStream);
+ byteLen += ReadWriteIOUtils.write(numLEN, outputStream);
if (numLEN < 256)
for (int i = 0; i < maxLevel; i++)
byteLen += ReadWriteIOUtils.write((byte) (levelPos[i + 1] - levelPos[i]), outputStream);
else
for (int i = 0; i < maxLevel; i++)
- byteLen += ReadWriteIOUtils.write((short) (levelPos[i + 1] - levelPos[i]), outputStream);
+ byteLen += ReadWriteIOUtils.write((levelPos[i + 1] - levelPos[i]), outputStream);
for (int i = levelPos[0]; i < levelPos[maxLevel]; i++)
byteLen += ReadWriteIOUtils.write(num[i], outputStream);
return byteLen;
@@ -197,11 +216,11 @@ public class LongKLLSketch extends KLLSketchForQuantile {
this.maxN = ReadWriteIOUtils.readInt(inputStream);
this.maxLevel = ReadWriteIOUtils.readByte(inputStream);
calcParameters(maxMemoryByte, maxSerializeByte);
- int numLEN = ReadWriteIOUtils.readShort(inputStream);
+ int numLEN = ReadWriteIOUtils.readInt(inputStream);
for (int i = 0, tmp = 0; i < maxLevel; i++) {
levelPos[i] = maxMemoryNum - (numLEN - tmp);
if (numLEN < 256) tmp += UnsignedBytes.toInt(ReadWriteIOUtils.readByte(inputStream));
- else tmp += ReadWriteIOUtils.readShort(inputStream);
+ else tmp += ReadWriteIOUtils.readInt(inputStream);
}
int actualLevel = maxLevel - 1;
while (levelPos[actualLevel] == levelPos[actualLevel + 1]) actualLevel--;
@@ -215,11 +234,11 @@ public class LongKLLSketch extends KLLSketchForQuantile {
this.maxN = ReadWriteIOUtils.readInt(byteBuffer);
this.maxLevel = ReadWriteIOUtils.readByte(byteBuffer);
calcParameters(maxMemoryByte, maxSerializeByte);
- int numLEN = ReadWriteIOUtils.readShort(byteBuffer);
+ int numLEN = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0, tmp = 0; i < maxLevel; i++) {
levelPos[i] = maxMemoryNum - (numLEN - tmp);
if (numLEN < 256) tmp += UnsignedBytes.toInt(ReadWriteIOUtils.readByte(byteBuffer));
- else tmp += ReadWriteIOUtils.readShort(byteBuffer);
+ else tmp += ReadWriteIOUtils.readInt(byteBuffer);
}
int actualLevel = maxLevel - 1;
while (levelPos[actualLevel] == levelPos[actualLevel + 1]) actualLevel--;
@@ -232,7 +251,7 @@ public class LongKLLSketch extends KLLSketchForQuantile {
this.N = ReadWriteIOUtils.readLong(inputStream);
this.maxN = ReadWriteIOUtils.readInt(inputStream);
this.maxLevel = ReadWriteIOUtils.readByte(inputStream);
- int numLEN = ReadWriteIOUtils.readShort(inputStream);
+ int numLEN = ReadWriteIOUtils.readInt(inputStream);
this.maxSerializeNum = numLEN;
K = calcK(maxN, maxLevel);
@@ -246,7 +265,7 @@ public class LongKLLSketch extends KLLSketchForQuantile {
// System.out.println("\t\ttmp:"+tmp+"\t\tnumLen:"+numLEN+"\t\tmemMemNum:"+maxMemoryNum);
levelPos[i] = maxMemoryNum - (numLEN - tmp);
if (numLEN < 256) tmp += UnsignedBytes.toInt(ReadWriteIOUtils.readByte(inputStream));
- else tmp += ReadWriteIOUtils.readShort(inputStream);
+ else tmp += ReadWriteIOUtils.readInt(inputStream);
}
int actualLevel = maxLevel - 1;
while (levelPos[actualLevel] == levelPos[actualLevel + 1]) actualLevel--;
@@ -259,10 +278,26 @@ public class LongKLLSketch extends KLLSketchForQuantile {
this.N = ReadWriteIOUtils.readLong(byteBuffer);
this.maxN = ReadWriteIOUtils.readInt(byteBuffer);
this.maxLevel = ReadWriteIOUtils.readByte(byteBuffer);
+ // System.out.println("\t\t[DEBUG deserialize sketch]\tmaxLevel=" + maxLevel);
- int numLEN = ReadWriteIOUtils.readShort(byteBuffer);
+ int numLEN = ReadWriteIOUtils.readInt(byteBuffer);
maxSerializeNum = numLEN;
+ sketchBuffer = byteBuffer.slice();
+ int bytesToRead = 0;
+ if (numLEN < 256) bytesToRead = maxLevel;
+ else bytesToRead = maxLevel * 4;
+ bytesToRead += numLEN * 8;
+ sketchBuffer.limit(bytesToRead);
+ byteBuffer.position(byteBuffer.position() + bytesToRead);
+ // return;
+
+ }
+
+ public void deserializeFromBuffer() {
+ if (sketchBuffer == null) return;
+ // System.out.println("\t\tdelay deseri sketch. bytes:" + sketchBuffer.capacity());
+ int numLEN = maxSerializeNum;
K = calcK(maxN, maxLevel);
maxMemoryNum = numLEN;
num = new long[maxMemoryNum];
@@ -272,13 +307,15 @@ public class LongKLLSketch extends KLLSketchForQuantile {
for (int i = 0, tmp = 0; i < maxLevel; i++) {
levelPos[i] = maxMemoryNum - (numLEN - tmp);
- if (numLEN < 256) tmp += UnsignedBytes.toInt(ReadWriteIOUtils.readByte(byteBuffer));
- else tmp += ReadWriteIOUtils.readShort(byteBuffer);
+ if (numLEN < 256) tmp += UnsignedBytes.toInt(ReadWriteIOUtils.readByte(sketchBuffer));
+ else tmp += ReadWriteIOUtils.readInt(sketchBuffer);
}
int actualLevel = maxLevel - 1;
while (levelPos[actualLevel] == levelPos[actualLevel + 1]) actualLevel--;
calcLevelMaxSize(actualLevel + 1);
for (int i = 0; i < numLEN; i++)
- num[maxMemoryNum - numLEN + i] = ReadWriteIOUtils.readLong(byteBuffer);
+ num[maxMemoryNum - numLEN + i] = ReadWriteIOUtils.readLong(sketchBuffer);
+
+ sketchBuffer = null;
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/SegTreeBySketch.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/SegTreeBySketch.java
new file mode 100644
index 0000000000..83fa1e9207
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/SegTreeBySketch.java
@@ -0,0 +1,211 @@
+package org.apache.iotdb.tsfile.utils;
+
+import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class SegTreeBySketch {
+ int level, leafNum; // ,sizeRatio;
+ public LongArrayList queriedChunkL, queriedChunkR;
+ public ObjectArrayList<LongArrayList> sketchMinT, sketchMaxT;
+ public ObjectArrayList<ObjectArrayList<KLLSketchForQuantile>> sketch;
+ public static final int LSM_T = 30;
+
+ public void show() {
+ System.out.println("\t\tshowing a Seg Tree sketches for an SSTable.\tlevel=" + level);
+ for (int i = 1; i <= level; i++)
+ System.out.print("\t\t[ " + sketch.get(i).size() + " sketches ]");
+ System.out.println();
+ }
+
+ public SegTreeBySketch(
+ ObjectArrayList<KLLSketchForQuantile> leaves,
+ LongArrayList leafMinT,
+ LongArrayList leafMaxT,
+ int sketch_size_ratio) {
+ leafNum = leaves.size();
+ // sizeRatio = sketch_size_ratio;
+ int tmpNum = leaves.size();
+ level = 0;
+ sketch = new ObjectArrayList<>();
+ sketch.add(new ObjectArrayList<>());
+ sketchMinT = new ObjectArrayList<>();
+ sketchMinT.add(new LongArrayList());
+ sketchMaxT = new ObjectArrayList<>();
+ sketchMaxT.add(new LongArrayList());
+ while (tmpNum >= LSM_T) {
+ level++;
+ sketch.add(new ObjectArrayList<>());
+ sketchMinT.add(new LongArrayList());
+ sketchMaxT.add(new LongArrayList());
+ for (int i = 0; i < tmpNum / LSM_T; i++) {
+ KLLSketchForSST segSketch = new KLLSketchForSST();
+ for (int j = 0; j < LSM_T; j++)
+ segSketch.addSubSketch(
+ level == 1 ? leaves.get(i * LSM_T + j) : sketch.get(level - 1).get(i * LSM_T + j));
+ segSketch.compactSubSketches(sketch_size_ratio);
+ sketch.get(level).add(segSketch);
+ sketchMinT
+ .get(level)
+ .add(
+ level == 1
+ ? leafMinT.getLong(i * LSM_T)
+ : sketchMinT.get(level - 1).getLong(i * LSM_T));
+ sketchMaxT
+ .get(level)
+ .add(
+ level == 1
+ ? leafMaxT.getLong(i * LSM_T + LSM_T - 1)
+ : sketchMaxT.get(level - 1).getLong(i * LSM_T + LSM_T - 1));
+ }
+ tmpNum /= LSM_T;
+ }
+ }
+
+ public int serializeSegTree(OutputStream outputStream) throws IOException {
+ // System.out.println("\t\t\t\t\t[DEBUG DOUBLE stat] serializeStats
+ // hashmap:"+serializeHashMap);
+ int byteLen = 0;
+ byteLen += ReadWriteIOUtils.write(level, outputStream);
+ byteLen += ReadWriteIOUtils.write(leafNum, outputStream); // how many chunks
+ // byteLen += ReadWriteIOUtils.write(sizeRatio, outputStream);
+
+ for (int i = 1; i <= level; i++) {
+ for (int j = 0; j < sketch.get(i).size(); j++) {
+ byteLen += ReadWriteIOUtils.write(sketchMinT.get(i).getLong(j), outputStream);
+ byteLen += ReadWriteIOUtils.write(sketchMaxT.get(i).getLong(j), outputStream);
+ byteLen += (new LongKLLSketch(sketch.get(i).get(j))).serialize(outputStream);
+ }
+ }
+ return byteLen;
+ }
+
+ public SegTreeBySketch(ByteBuffer byteBuffer) {
+ level = ReadWriteIOUtils.readInt(byteBuffer);
+ leafNum = ReadWriteIOUtils.readInt(byteBuffer);
+ // sizeRatio = ReadWriteIOUtils.readInt(byteBuffer);
+
+ sketch = new ObjectArrayList<>();
+ sketch.add(new ObjectArrayList<>());
+ sketchMinT = new ObjectArrayList<>();
+ sketchMinT.add(new LongArrayList());
+ sketchMaxT = new ObjectArrayList<>();
+ sketchMaxT.add(new LongArrayList());
+
+ for (int i = 1, tmpNum = leafNum; i <= level; i++) {
+ sketch.add(new ObjectArrayList<>());
+ sketchMinT.add(new LongArrayList());
+ sketchMaxT.add(new LongArrayList());
+ for (int j = 0; j < tmpNum / LSM_T; j++) {
+ sketchMinT.get(i).add(ReadWriteIOUtils.readLong(byteBuffer));
+ sketchMaxT.get(i).add(ReadWriteIOUtils.readLong(byteBuffer));
+ sketch.get(i).add(new LongKLLSketch(byteBuffer));
+ }
+ tmpNum /= LSM_T;
+ }
+ }
+
+ private boolean inInterval(long x, long y, long L, long R) {
+ return x >= L && y <= R;
+ }
+
+ private boolean inInterval(long x, long L, long R) {
+ return x >= L && x <= R;
+ }
+
+ private boolean overlapInterval(long x, long y, long L, long R) { // [L,R]
+ return !(y < L || x > R);
+ }
+
+ private boolean timeFilter_contains(Filter timeFilter, long L, long R) {
+ return timeFilter == null || timeFilter.containStartEndTime(L, R);
+ }
+
+ // start from a node without father in this SegTree
+ private void range_query_in_node(
+ int lv,
+ int p,
+ ObjectArrayList<KLLSketchForQuantile> queriedSketch,
+ Filter timeFilter,
+ long otherL,
+ long otherR,
+ ObjectArrayList<ITimeSeriesMetadata> overlappedTSMD) {
+ // other SST may partially overlap with this SSTable. only [otherL,otherR] in this SST don't
+ // overlap.
+ if (lv <= 0) return;
+ long cntL = sketchMinT.get(lv).getLong(p), cntR = sketchMaxT.get(lv).getLong(p);
+ // if (!timeFilter.o overlapInterval(cntL, cntR, L, R)) return;
+ System.out.println(
+ "\t\t\t\t\tcnt seg node:"
+ + "lv="
+ + lv
+ + cntL
+ + "..."
+ + cntR
+ + "\t\t\t\tqueryTimeFilter:"
+ + timeFilter
+ + "\t\tcontainedInQuery:"
+ + timeFilter_contains(timeFilter, cntL, cntR)
+ + "\t\tcontainedInQuery:"
+ + timeFilter_contains(timeFilter, cntL, cntR));
+ if (timeFilter_contains(timeFilter, cntL, cntR) && inInterval(cntL, cntR, otherL, otherR)) {
+ boolean node_non_overlap = true;
+ for (ITimeSeriesMetadata tsmd : overlappedTSMD)
+ if (overlapInterval(
+ cntL, cntR, tsmd.getStatistics().getStartTime(), tsmd.getStatistics().getEndTime()))
+ node_non_overlap = false;
+ if (node_non_overlap) {
+ // System.out.println("\t\tmerge with
+ // T:"+cntL+"..."+cntR+"\t\t\tlv="+lv+"\t\tcntN:"+query_sketch.getN());
+ ((LongKLLSketch) sketch.get(lv).get(p)).deserializeFromBuffer();
+ queriedSketch.add(sketch.get(lv).get(p));
+
+ int weight = 1;
+ for (int i = 1; i <= lv; i++) weight *= LSM_T;
+ System.out.println(
+ "\t\t\tmerge with the "
+ + (p * weight)
+ + "..."
+ + (p * weight + weight - 1)
+ + " chunks in SST sketches.");
+ queriedChunkL.add(sketchMinT.get(lv).getLong(p));
+ queriedChunkR.add(sketchMaxT.get(lv).getLong(p));
+ return;
+ }
+ }
+ for (int i = 0; i < LSM_T; i++)
+ range_query_in_node(
+ lv - 1, p * LSM_T + i, queriedSketch, timeFilter, otherL, otherR, overlappedTSMD);
+ }
+
+ public void range_query_in_SST_sketches(
+ ObjectArrayList<KLLSketchForQuantile> queriesSketch,
+ Filter timeFilter,
+ long otherL,
+ long otherR,
+ ObjectArrayList<ITimeSeriesMetadata>
+ overlappedTSMD) { // other SST may partially overlap with this SSTable. only
+ // [otherL,otherR] in
+ // this SST don't overlap.
+ queriedChunkL = new LongArrayList();
+ queriedChunkR = new LongArrayList();
+ int last = 0;
+ for (int i = level; i >= 1; i--) {
+ for (int j = last * LSM_T; j < sketch.get(i).size(); j++) {
+ // if (timeFilter.containStartEndTime(
+ // sketchMinT.get(i).getLong(j), sketchMaxT.get(i).getLong(j))
+ // && overlapInterval(
+ // sketchMinT.get(i).getLong(j), sketchMaxT.get(i).getLong(j), otherL,
+ // otherR))
+ range_query_in_node(i, j, queriesSketch, timeFilter, otherL, otherR, overlappedTSMD);
+ }
+ last = sketch.get(i).size();
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
index f2ced038c9..8be013c71e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
@@ -506,6 +506,7 @@ public class TsFileWriter implements AutoCloseable {
* @throws WriteProcessException exception in write process
*/
public boolean write(Tablet tablet) throws IOException, WriteProcessException {
+ // System.out.println("\t\t[TsFileWriter write_tablet]:N=" + tablet.rowSize);
// make sure the ChunkGroupWriter for this Tablet exist
checkIsTimeseriesExist(tablet, false);
// get corresponding ChunkGroupWriter and write this Tablet
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
index 822e0a861a..76798bd574 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
@@ -198,6 +198,8 @@ public class ChunkWriterImpl implements IChunkWriter {
if (isSdtEncoding && isLastPoint) {
pageWriter.write(time, value);
}
+ // if (pageWriter.getPointNumber() % 4000 == 0)
+ // System.out.println("\t\t[write point] 4000 points written");
checkPageSizeAndMayOpenANewPage();
}
@@ -241,6 +243,7 @@ public class ChunkWriterImpl implements IChunkWriter {
}
pageWriter.write(timestamps, values, batchSize);
checkPageSizeAndMayOpenANewPage();
+ System.out.println("\t\t[write batch] " + batchSize + " points written");
}
public void write(long[] timestamps, Binary[] values, int batchSize) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
index 8b6038e478..62c657933f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
@@ -88,6 +88,7 @@ public class NonAlignedChunkGroupWriterImpl implements IChunkGroupWriter {
@Override
public int write(Tablet tablet) throws WriteProcessException {
+ System.out.println("\t\t[NonAlignedCGWriter] write tablet:N=" + tablet.rowSize);
int pointCount = 0;
List<MeasurementSchema> timeseries = tablet.getSchemas();
for (int row = 0; row < tablet.rowSize; row++) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 92e64f62d3..3fbb33cc2c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -23,37 +23,26 @@ import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
-import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor;
-import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
-import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
-import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
+import org.apache.iotdb.tsfile.file.metadata.*;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.utils.BytesUtils;
-import org.apache.iotdb.tsfile.utils.PublicBAOS;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.utils.*;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
+import java.util.*;
/**
* TsFileIOWriter is used to construct metadata and write data stored in memory to output stream.
@@ -328,6 +317,9 @@ public class TsFileIOWriter implements AutoCloseable {
*/
private void flushOneChunkMetadata(Path path, List<IChunkMetadata> chunkMetadataList)
throws IOException {
+ int chunkNum = chunkMetadataList.size(),
+ lsmLevel = (int) Math.round(Math.log(chunkNum) / Math.log(30));
+ System.out.println("\t[DEBUG flushOneChunkMetadata]\tchunkNum=" + chunkMetadataList.size());
// create TimeseriesMetaData
PublicBAOS publicBAOS = new PublicBAOS();
TSDataType dataType = chunkMetadataList.get(chunkMetadataList.size() - 1).getDataType();
@@ -343,6 +335,58 @@ public class TsFileIOWriter implements AutoCloseable {
chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS, serializeStatistic);
seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
}
+ // int addKLLLevel =
+ // lsmLevel
+ // * (int)
+ // Math.round(
+ // Math.log(30.0 /
+ // TSFileDescriptor.getInstance().getConfig().getSketchSizeRatio())
+ // / Math.log(2));
+ // if (TSFileDescriptor.getInstance().getConfig().isEnableSSTSketch()
+ // && lsmLevel > 0
+ // && addKLLLevel > 0) {
+ // KLLSketchForLSMFile lsmSketch = new KLLSketchForLSMFile();
+ // for (IChunkMetadata chunkMetadata : chunkMetadataList)
+ // lsmSketch.addSubSketch(
+ // ((DoubleStatistics) (chunkMetadata.getStatistics())).getOneKllSketch());
+ // lsmSketch.compactSubSketches(addKLLLevel);
+ // lsmSketch.show();
+ // System.out.println(
+ // "\t\t[DEBUG] compactingSketchForLSM\tN:"
+ // + lsmSketch.getN()
+ // + "\tnumLen:"
+ // + lsmSketch.getNumLen());
+ // ((DoubleStatistics) seriesStatistics).setKLLSketch(lsmSketch);
+ // }
+ if (TSFileDescriptor.getInstance().getConfig().isEnableSSTSketch() && lsmLevel > 0) {
+ ObjectArrayList<KLLSketchForQuantile> leaves =
+ new ObjectArrayList<>(chunkMetadataList.size());
+ LongArrayList leafMinT = new LongArrayList(), leafMaxT = new LongArrayList();
+ for (IChunkMetadata chunkMetadata : chunkMetadataList) {
+ ObjectArrayList<KLLSketchForQuantile> chunk_sketches = new ObjectArrayList<>();
+ int tmpNumLen = 0;
+ for (KLLSketchForQuantile chunkSketch :
+ ((DoubleStatistics) (chunkMetadata.getStatistics())).getKllSketchList()) {
+ ((LongKLLSketch) chunkSketch).deserializeFromBuffer();
+ chunk_sketches.add(chunkSketch);
+ tmpNumLen += chunkSketch.getNumLen();
+ }
+ HeapLongKLLSketch leafSketch = new HeapLongKLLSketch(tmpNumLen * 8);
+ leafSketch.mergeWithTempSpace(chunk_sketches);
+ leaves.add(leafSketch);
+ leafMinT.add(chunkMetadata.getStartTime());
+ leafMaxT.add(chunkMetadata.getEndTime());
+ }
+ SegTreeBySketch segTreeBySketch =
+ new SegTreeBySketch(
+ leaves,
+ leafMinT,
+ leafMaxT,
+ TSFileDescriptor.getInstance().getConfig().getSketchSizeRatio());
+ segTreeBySketch.show();
+ ((DoubleStatistics) seriesStatistics).segTreeBySketch = segTreeBySketch;
+ ((DoubleStatistics) seriesStatistics).hasSegTreeBySketch = true;
+ }
TimeseriesMetadata timeseriesMetadata =
new TimeseriesMetadata(