You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by le...@apache.org on 2023/02/03 15:50:21 UTC
[iotdb] 01/28: stepRegress write
This is an automated email from the ASF dual-hosted git repository.
leirui pushed a commit to branch research/M4-visualization
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 05c9ab13cf79958881537008144b8e95099a6bc7
Author: Lei Rui <10...@qq.com>
AuthorDate: Tue Jan 24 17:18:43 2023 +0800
stepRegress write
---
.../dataset/groupby/LocalGroupByExecutor4CPV.java | 64 ++--
tsfile/pom.xml | 5 +
.../file/metadata/statistics/Statistics.java | 176 ++++++++--
.../file/metadata/statistics/StepRegress.java | 383 +++++++++++++++++++++
.../statistics/TimeExactOrderStatistics.java | 193 +++++++++++
5 files changed, 753 insertions(+), 68 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
index 59feabe4e1..68ab1b055a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
@@ -19,6 +19,14 @@
package org.apache.iotdb.db.query.dataset.groupby;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -28,7 +36,6 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.db.query.reader.series.SeriesReader;
-import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader.MergeReaderPriority;
import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -51,15 +58,6 @@ import org.apache.iotdb.tsfile.read.reader.page.PageReader;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Set;
-
/**
* Sql format: SELECT min_time(s0), max_time(s0), first_value(s0), last_value(s0), min_value(s0),
* max_value(s0) ROM root.xx group by ([tqs,tqe),IntervalLength). Requirements: (1) Don't change the
@@ -84,7 +82,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
private TSDataType tsDataType;
- private PriorityMergeReader mergeReader;
+// private PriorityMergeReader mergeReader;
public LocalGroupByExecutor4CPV(
PartialPath path,
@@ -97,7 +95,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
throws StorageEngineException, QueryProcessException {
this.tsDataType = dataType;
- this.mergeReader = new PriorityMergeReader();
+// this.mergeReader = new PriorityMergeReader();
// get all data sources
QueryDataSource queryDataSource =
@@ -199,9 +197,9 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
/**
* @param curStartTime closed
- * @param curEndTime open
- * @param startTime closed
- * @param endTime open
+ * @param curEndTime open
+ * @param startTime closed
+ * @param endTime open
*/
@Override
public List<AggregateResult> calcResult(
@@ -226,7 +224,9 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
return results;
}
- /** 对BatchData应用deletes操作,获得更新的BatchData和statistics赋值到chunkSuit4CPV中 */
+ /**
+ * 对BatchData应用deletes操作,获得更新的BatchData和statistics赋值到chunkSuit4CPV中
+ */
private void updateBatchData(ChunkSuit4CPV chunkSuit4CPV, TSDataType dataType) {
if (chunkSuit4CPV.getBatchData() != null) {
BatchData batchData1 = BatchDataFactory.createBatchData(dataType, true, false);
@@ -323,8 +323,8 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
new Comparator<ChunkSuit4CPV>() { // TODO double check the sort order logic for version
public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) {
return new MergeReaderPriority(
- o2.getChunkMetadata().getVersion(),
- o2.getChunkMetadata().getOffsetOfChunkHeader())
+ o2.getChunkMetadata().getVersion(),
+ o2.getChunkMetadata().getOffsetOfChunkHeader())
.compareTo(
new MergeReaderPriority(
o1.getChunkMetadata().getVersion(),
@@ -411,7 +411,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
.get(4) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue,
// minValue[bottomTimestamp], maxValue[topTimestamp]
.updateResultUsingValues(
- new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
+ new long[]{candidateTimestamp}, 1, new Object[]{candidateValue});
// TODO check updateResult
return; // 计算结束
} else { // 是被overlap,则partial scan所有这些overlap的块
@@ -455,7 +455,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
.get(4) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue,
// minValue[bottomTimestamp], maxValue[topTimestamp]
.updateResultUsingValues(
- new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
+ new long[]{candidateTimestamp}, 1, new Object[]{candidateValue});
// TODO check updateResult
return; // 计算结束
} else { // 找到这样的点,于是标记candidate point所在块为lazy
@@ -521,8 +521,8 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
new Comparator<ChunkSuit4CPV>() { // TODO double check the sort order logic for version
public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) {
return new MergeReaderPriority(
- o2.getChunkMetadata().getVersion(),
- o2.getChunkMetadata().getOffsetOfChunkHeader())
+ o2.getChunkMetadata().getVersion(),
+ o2.getChunkMetadata().getOffsetOfChunkHeader())
.compareTo(
new MergeReaderPriority(
o1.getChunkMetadata().getVersion(),
@@ -609,7 +609,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
.get(5) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue,
// minValue[bottomTimestamp], maxValue[topTimestamp]
.updateResultUsingValues(
- new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
+ new long[]{candidateTimestamp}, 1, new Object[]{candidateValue});
// TODO check updateResult
return; // 计算结束
} else { // 是被overlap,则partial scan所有这些overlap的块
@@ -653,7 +653,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
.get(5) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue,
// minValue[bottomTimestamp], maxValue[topTimestamp]
.updateResultUsingValues(
- new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
+ new long[]{candidateTimestamp}, 1, new Object[]{candidateValue});
// TODO check updateResult
return; // 计算结束
} else { // 找到这样的点,于是标记candidate point所在块为lazy
@@ -701,8 +701,8 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
return res;
} else {
return new MergeReaderPriority(
- o2.getChunkMetadata().getVersion(),
- o2.getChunkMetadata().getOffsetOfChunkHeader())
+ o2.getChunkMetadata().getVersion(),
+ o2.getChunkMetadata().getOffsetOfChunkHeader())
.compareTo(
new MergeReaderPriority(
o1.getChunkMetadata().getVersion(),
@@ -767,11 +767,11 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
results
.get(0)
.updateResultUsingValues(
- new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
+ new long[]{candidateTimestamp}, 1, new Object[]{candidateValue});
results
.get(2)
.updateResultUsingValues(
- new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
+ new long[]{candidateTimestamp}, 1, new Object[]{candidateValue});
return;
}
}
@@ -798,8 +798,8 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
return res;
} else {
return new MergeReaderPriority(
- o2.getChunkMetadata().getVersion(),
- o2.getChunkMetadata().getOffsetOfChunkHeader())
+ o2.getChunkMetadata().getVersion(),
+ o2.getChunkMetadata().getOffsetOfChunkHeader())
.compareTo(
new MergeReaderPriority(
o1.getChunkMetadata().getVersion(),
@@ -864,11 +864,11 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
results
.get(1)
.updateResultUsingValues(
- new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
+ new long[]{candidateTimestamp}, 1, new Object[]{candidateValue});
results
.get(3)
.updateResultUsingValues(
- new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
+ new long[]{candidateTimestamp}, 1, new Object[]{candidateValue});
return;
}
}
diff --git a/tsfile/pom.xml b/tsfile/pom.xml
index f26c930f1a..dcddf19cf9 100644
--- a/tsfile/pom.xml
+++ b/tsfile/pom.xml
@@ -37,6 +37,11 @@
<tsfile.ut.skip>${tsfile.test.skip}</tsfile.ut.skip>
</properties>
<dependencies>
+ <dependency>
+ <groupId>org.eclipse.collections</groupId>
+ <artifactId>eclipse-collections</artifactId>
+ <version>10.4.0</version>
+ </dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
index 947805c660..397fed5e5c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
@@ -18,27 +18,26 @@
*/
package org.apache.iotdb.tsfile.file.metadata.statistics;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException;
import org.apache.iotdb.tsfile.exception.write.UnknownColumnTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
+import org.eclipse.collections.impl.list.mutable.primitive.DoubleArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.Objects;
-
/**
* This class is used for recording statistic information of each measurement in a delta file. While
* writing processing, the processor records the statistics information. Statistics includes
- * maximum, minimum and null value count up to version 0.0.1.<br>
- * Each data type extends this Statistic as super class.<br>
+ * maximum, minimum and null value count up to version 0.0.1.<br> Each data type extends this
+ * Statistic as super class.<br>
* <br>
* For the statistics in the Unseq file TimeSeriesMetadata, only firstValue, lastValue, startTime
* and endTime can be used.</br>
@@ -51,13 +50,19 @@ public abstract class Statistics<T> {
*/
protected boolean isEmpty = true;
- /** number of time-value points */
+ /**
+ * number of time-value points
+ */
private int count = 0;
private long startTime = Long.MAX_VALUE;
private long endTime = Long.MIN_VALUE;
- /** @author Yuyuan Kang */
+ private StepRegress stepRegress = new StepRegress();
+
+ /**
+ * @author Yuyuan Kang
+ */
final String OPERATION_NOT_SUPPORT_FORMAT = "%s statistics does not support operation: %s";
/**
@@ -119,34 +124,59 @@ public abstract class Statistics<T> {
byteLen += ReadWriteForEncodingUtils.writeUnsignedVarInt(count, outputStream);
byteLen += ReadWriteIOUtils.write(startTime, outputStream);
byteLen += ReadWriteIOUtils.write(endTime, outputStream);
+ // TODO serialize stepRegress
+ byteLen += serializeStepRegress(outputStream);
// value statistics of different data type
byteLen += serializeStats(outputStream);
return byteLen;
}
+ int serializeStepRegress(OutputStream outputStream) throws IOException {
+ int byteLen = 0;
+ stepRegress.learn(); // TODO ensure excuted once and only once
+ byteLen += ReadWriteIOUtils.write(stepRegress.getSlope(), outputStream); // K
+ DoubleArrayList segmentKeys = stepRegress.getSegmentKeys();
+ // t1 is startTime, tm is endTime, so no need serialize t1 and tm
+ byteLen += ReadWriteIOUtils.write(segmentKeys.size(), outputStream); // m
+ for (int i = 1; i < segmentKeys.size() - 1; i++) { // t2,t3,...,tm-1
+ byteLen += ReadWriteIOUtils.write(segmentKeys.get(i), outputStream);
+ }
+ return byteLen;
+ }
+
abstract int serializeStats(OutputStream outputStream) throws IOException;
- /** read data from the inputStream. */
+ /**
+ * read data from the inputStream.
+ */
public abstract void deserialize(InputStream inputStream) throws IOException;
public abstract void deserialize(ByteBuffer byteBuffer);
// public abstract void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes);
- /** @author Yuyuan Kang */
+ /**
+ * @author Yuyuan Kang
+ */
public abstract MinMaxInfo<T> getMinInfo();
- /** @author Yuyuan Kang */
+ /**
+ * @author Yuyuan Kang
+ */
public abstract MinMaxInfo<T> getMaxInfo();
public abstract T getMinValue();
public abstract T getMaxValue();
- /** @author Yuyuan Kang */
+ /**
+ * @author Yuyuan Kang
+ */
public abstract long getBottomTimestamp();
- /** @author Yuyuan Kang */
+ /**
+ * @author Yuyuan Kang
+ */
public abstract long getTopTimestamp();
public abstract T getFirstValue();
@@ -193,6 +223,9 @@ public abstract class Statistics<T> {
// must be sure no overlap between two statistics
this.count += stats.count;
mergeStatisticsValue(stats);
+ // TODO M4-LSM assumes that there is always only one page in a chunk
+ // TODO M4-LSM if there are more than one chunk in a time series, then access each chunkMetadata anyway
+ this.stepRegress = stats.stepRegress;
isEmpty = false;
} else {
String thisClass = this.getClass().toString();
@@ -212,9 +245,12 @@ public abstract class Statistics<T> {
}
count++;
updateStats(value);
+ updateStepRegress(time);
}
- /** @author Yuyuan Kang */
+ /**
+ * @author Yuyuan Kang
+ */
public void update(long time, int value) {
if (time < this.startTime) {
startTime = time;
@@ -224,9 +260,12 @@ public abstract class Statistics<T> {
}
count++;
updateStats(value, time);
+ updateStepRegress(time);
}
- /** @author Yuyuan Kang */
+ /**
+ * @author Yuyuan Kang
+ */
public void update(long time, long value) {
if (time < this.startTime) {
startTime = time;
@@ -236,9 +275,12 @@ public abstract class Statistics<T> {
}
count++;
updateStats(value, time);
+ updateStepRegress(time);
}
- /** @author Yuyuan Kang */
+ /**
+ * @author Yuyuan Kang
+ */
public void update(long time, float value) {
if (time < this.startTime) {
startTime = time;
@@ -248,9 +290,12 @@ public abstract class Statistics<T> {
}
count++;
updateStats(value, time);
+ updateStepRegress(time);
}
- /** @author Yuyuan Kang */
+ /**
+ * @author Yuyuan Kang
+ */
public void update(long time, double value) {
if (time < this.startTime) {
startTime = time;
@@ -260,6 +305,7 @@ public abstract class Statistics<T> {
}
count++;
updateStats(value, time);
+ updateStepRegress(time);
}
public void update(long time, Binary value) {
@@ -271,6 +317,7 @@ public abstract class Statistics<T> {
}
count++;
updateStats(value);
+ updateStepRegress(time);
}
public void update(long[] time, boolean[] values, int batchSize) {
@@ -282,9 +329,12 @@ public abstract class Statistics<T> {
}
count += batchSize;
updateStats(values, batchSize);
+ updateStepRegress(time, batchSize);
}
- /** @author Yuyuan Kang */
+ /**
+ * @author Yuyuan Kang
+ */
public void update(long[] time, int[] values, int batchSize) {
if (time[0] < startTime) {
startTime = time[0];
@@ -294,9 +344,12 @@ public abstract class Statistics<T> {
}
count += batchSize;
updateStats(values, time, batchSize);
+ updateStepRegress(time, batchSize);
}
- /** @author Yuyuan Kang */
+ /**
+ * @author Yuyuan Kang
+ */
public void update(long[] time, long[] values, int batchSize) {
if (time[0] < startTime) {
startTime = time[0];
@@ -306,9 +359,12 @@ public abstract class Statistics<T> {
}
count += batchSize;
updateStats(values, time, batchSize);
+ updateStepRegress(time, batchSize);
}
- /** @author Yuyuan Kang */
+ /**
+ * @author Yuyuan Kang
+ */
public void update(long[] time, float[] values, int batchSize) {
if (time[0] < startTime) {
startTime = time[0];
@@ -318,9 +374,12 @@ public abstract class Statistics<T> {
}
count += batchSize;
updateStats(values, time, batchSize);
+ updateStepRegress(time, batchSize);
}
- /** @author Yuyuan Kang */
+ /**
+ * @author Yuyuan Kang
+ */
public void update(long[] time, double[] values, int batchSize) {
if (time[0] < startTime) {
startTime = time[0];
@@ -330,6 +389,7 @@ public abstract class Statistics<T> {
}
count += batchSize;
updateStats(values, time, batchSize);
+ updateStepRegress(time, batchSize);
}
public void update(long[] time, Binary[] values, int batchSize) {
@@ -341,6 +401,7 @@ public abstract class Statistics<T> {
}
count += batchSize;
updateStats(values, batchSize);
+ updateStepRegress(time, batchSize);
}
protected abstract void mergeStatisticsValue(Statistics stats);
@@ -353,32 +414,54 @@ public abstract class Statistics<T> {
isEmpty = empty;
}
- /** @author Yuyuan Kang */
+ /**
+ * @author Yuyuan Kang
+ */
public abstract void updateMinInfo(T val, long timestamp);
- /** @author Yuyuan Kang */
+ /**
+ * @author Yuyuan Kang
+ */
public abstract void updateMaxInfo(T val, long timestamp);
void updateStats(boolean value) {
throw new UnsupportedOperationException();
}
- /** @author Yuyuan Kang */
+ void updateStepRegress(long timestamp) {
+ stepRegress.insert(timestamp);
+ }
+
+ void updateStepRegress(long[] timestamps, int batchSize) {
+ for (int i = 0; i < batchSize; i++) {
+ updateStepRegress(timestamps[i]);
+ }
+ }
+
+ /**
+ * @author Yuyuan Kang
+ */
void updateStats(int value, long timestamp) {
throw new UnsupportedOperationException();
}
- /** @author Yuyuan Kang */
+ /**
+ * @author Yuyuan Kang
+ */
void updateStats(long value, long timestamp) {
throw new UnsupportedOperationException();
}
- /** @author Yuyuan Kang */
+ /**
+ * @author Yuyuan Kang
+ */
void updateStats(float value, long timestamp) {
throw new UnsupportedOperationException();
}
- /** @author Yuyuan Kang */
+ /**
+ * @author Yuyuan Kang
+ */
void updateStats(double value, long timestamp) {
throw new UnsupportedOperationException();
}
@@ -391,22 +474,30 @@ public abstract class Statistics<T> {
throw new UnsupportedOperationException();
}
- /** @author Yuyuan Kang */
+ /**
+ * @author Yuyuan Kang
+ */
void updateStats(int[] values, long[] timestamps, int batchSize) {
throw new UnsupportedOperationException();
}
- /** @author Yuyuan Kang */
+ /**
+ * @author Yuyuan Kang
+ */
void updateStats(long[] values, long[] timestamps, int batchSize) {
throw new UnsupportedOperationException();
}
- /** @author Yuyuan Kang */
+ /**
+ * @author Yuyuan Kang
+ */
void updateStats(float[] values, long[] timestamps, int batchSize) {
throw new UnsupportedOperationException();
}
- /** @author Yuyuan Kang */
+ /**
+ * @author Yuyuan Kang
+ */
void updateStats(double[] values, long[] timestamps, int batchSize) {
throw new UnsupportedOperationException();
}
@@ -416,10 +507,10 @@ public abstract class Statistics<T> {
}
/**
- * @author Yuyuan Kang This method with two parameters is only used by {@code unsequence} which
- * updates/inserts/deletes timestamp.
* @param min min timestamp
* @param max max timestamp
+ * @author Yuyuan Kang This method with two parameters is only used by {@code unsequence} which
+ * updates/inserts/deletes timestamp.
*/
public void updateStats(long min, long bottomTimestamp, long max, long topTimestamp) {
throw new UnsupportedOperationException();
@@ -441,11 +532,24 @@ public abstract class Statistics<T> {
statistics.setCount(ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
statistics.setStartTime(ReadWriteIOUtils.readLong(buffer));
statistics.setEndTime(ReadWriteIOUtils.readLong(buffer));
+ statistics.deserializeStepRegress(buffer); // TODO
statistics.deserialize(buffer);
statistics.isEmpty = false;
return statistics;
}
+ void deserializeStepRegress(ByteBuffer byteBuffer) {
+ this.stepRegress.setSlope(ReadWriteIOUtils.readDouble(byteBuffer)); //K
+ int m = ReadWriteIOUtils.readInt(byteBuffer); // m
+ DoubleArrayList segmentKeys = new DoubleArrayList();
+ segmentKeys.add(this.startTime); // t1
+ for (int i = 0; i < m - 2; i++) { // t2,t3,...,tm-1
+ segmentKeys.add(ReadWriteIOUtils.readDouble(byteBuffer));
+ }
+ segmentKeys.add(this.endTime);
+ this.stepRegress.setSegmentKeys(segmentKeys);
+ }
+
public long getStartTime() {
return startTime;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/StepRegress.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/StepRegress.java
new file mode 100644
index 0000000000..96d2736fb3
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/StepRegress.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.file.metadata.statistics;
+
+import java.io.IOException;
+import java.util.Arrays;
+import org.eclipse.collections.impl.list.mutable.primitive.DoubleArrayList;
+import org.eclipse.collections.impl.list.mutable.primitive.IntArrayList;
+import org.eclipse.collections.impl.list.mutable.primitive.LongArrayList;
+
+public class StepRegress {
+
+ private double slope;
+
+ // when learning parameters, we first determine segmentIntercepts and then determine segmentKeys;
+ // when using functions, we read segmentKeys and then infer segmentIntercepts.
+ // fix that the first segment is always tilt,
+ // so for indexes starting from 0, even id is tilt, odd id is level.
+ private DoubleArrayList segmentIntercepts = new DoubleArrayList(); // b1,b2,...,bm-1
+
+ // fix that the first segment [t1,t2) is always tilt,
+ // so t2=t1 in fact means that the first status is level
+ private DoubleArrayList segmentKeys = new DoubleArrayList(); // t1,t2,...,tm
+ // TODO deal with the last key tm
+
+ private LongArrayList timestamps = new LongArrayList(); // Pi.t
+ private LongArrayList intervals = new LongArrayList(); // Pi+1.t-Pi.t
+
+ enum IntervalType {
+ tilt,
+ level
+ }
+
+ private IntArrayList intervalsType = new IntArrayList();
+ private long previousTimestamp = -1;
+
+ private double mean = 0; // mean of intervals
+ private double stdDev = 0; // standard deviation of intervals
+ private long count = 0;
+ private double sumX2 = 0.0;
+ private double sumX1 = 0.0;
+
+ private double median = 0; // median of intervals
+ private double mad = 0; // median absolute deviation of intervals
+ TimeExactOrderStatistics statistics = new TimeExactOrderStatistics();
+
+ /**
+ * load data, record timestamps and intervals, preparing to calculate mean,std,median,mad along
+ * the way
+ */
+ public void insert(long timestamp) {
+ timestamps.add(timestamp); // record
+ if (previousTimestamp > 0) {
+ long delta = timestamp - previousTimestamp;
+ intervals.add(delta); // record
+ // prepare for mean and stdDev
+ count++;
+ sumX1 += delta;
+ sumX2 += delta * delta;
+ // prepare for median and mad
+ statistics.insert(delta);
+ }
+ previousTimestamp = timestamp;
+ }
+
+ private void initForLearn() {
+ this.mean = getMean();
+ this.stdDev = getStdDev();
+ this.median = getMedian();
+ this.mad = getMad();
+ this.slope = 1 / this.median;
+ this.segmentKeys.add(timestamps.get(0)); // t1
+ this.segmentIntercepts.add(1 - slope * timestamps.get(0)); // b1
+ }
+
+ /**
+ * learn the parameters of the step regression function for the loaded data.
+ */
+ public void learn() throws IOException {
+ initForLearn();
+
+ int tiltLatestSegmentID = 0;
+ IntervalType previousIntervalType = IntervalType.tilt;
+
+ for (int i = 0; i < intervals.size(); i++) {
+ long delta = intervals.get(i);
+
+ // the current point (t,pos) focused, where t is the left endpoint of the current interval.
+ long t = timestamps.get(i);
+ int pos = i + 1;
+ // the next point (t,pos), where t is the right endpoint of the current interval.
+ long nextT = timestamps.get(i + 1);
+ int nextPos = i + 2;
+
+ // 1) determine the type of the current interval
+ // level condition: big interval && the right endpoint of the interval is under the latest tilt line.
+ // Note the right endpoint, not the left endpoint.
+ // "the right endpoint of the interval is under the latest tilt line" is to ensure the
+ // monotonically decreasing order of tilt intercepts (the last interval running through the last point
+ // is handled using post-processing to avoid disorder of tilt intercepts)
+ boolean isLevel = isBigInterval(delta) && (nextPos < slope * nextT + segmentIntercepts.get(
+ tiltLatestSegmentID));
+ // to avoid TLTLTLTL... causing trivial segments, add extra rule for tilt
+ if (!isLevel) {
+ if (previousIntervalType == IntervalType.level) { // when previous interval is level
+ if (i < intervals.size() - 1) { // when having next interval
+ long nextDelta = intervals.get(i + 1);
+ if (isBigInterval(nextDelta) && (nextPos + 1
+ < slope * timestamps.get(i + 2) + segmentIntercepts.get(
+ tiltLatestSegmentID))) { // when next interval is also level
+ isLevel = true; // then fix type from tilt to level, LTL=>LLL
+ }
+ }
+ }
+ }
+
+ // 2) determine if starting a new segment
+ if (isLevel) {
+ intervalsType.add(IntervalType.level.ordinal());
+ if (previousIntervalType == IntervalType.tilt) { // else do nothing, still level
+ // [[[translate from tilt to level]]]
+ previousIntervalType = IntervalType.level;
+ // 3) to determine the intercept, let the level function run through (t,pos)
+ double intercept = pos; // b2i=pos
+ // 4) to determine the segment key, let the level function and the previous tilt function intersect
+ segmentKeys.add((intercept - segmentIntercepts.getLast()) / slope); // x2i=(b2i-b2i-1)/K
+ // then add intercept to segmentIntercepts, do not change the order of codes here
+ segmentIntercepts.add(
+ intercept); // TODO debug if the first status is actually level works
+ }
+ // deal with the last interval to make sure the last point is hit
+ // TODO create examples to debug this
+ if (i == intervals.size() - 1) {
+ // 3) to determine the intercept, let the level function run through (timestamps.getLast(),timestamps.size())
+ double intercept = timestamps.size(); // b2i=pos
+ // 4) to determine the segment key, let the level function and the previous tilt function intersect
+ // Note that here is rewrite instead of add.
+ // Note taht here is not getLast
+ segmentKeys.set(segmentKeys.size() - 1,
+ (intercept - segmentIntercepts.get(segmentIntercepts.size() - 2))
+ / slope); // x2i=(b2i-b2i-1)/K TODO debug here not getLast!
+ // then add intercept to segmentIntercepts, do not change the order of codes here
+ // Note that here is rewrite instead of add.
+ segmentIntercepts.set(segmentIntercepts.size() - 1, intercept);
+ }
+ } else {
+ intervalsType.add(IntervalType.tilt.ordinal());
+ if (previousIntervalType == IntervalType.level) { // else do nothing, still tilt
+ // [[[translate form level to tilt]]]
+ previousIntervalType = IntervalType.tilt;
+ // 3) to determine the intercept, let the tilt function run through (t,pos)
+ double intercept = pos - slope * t; // b2i+1=pos-K*t
+ // 4) to determine the segment key, let the level function and the previous tilt function intersect
+ segmentKeys.add((segmentIntercepts.getLast() - intercept) / slope); // x2i+1=(b2i-b2i+1)/K
+ // then add intercept to segmentIntercepts, do not change the order of codes here
+ segmentIntercepts.add(intercept);
+ // remember to update tiltLatestSegmentID
+ tiltLatestSegmentID += 2;
+ }
+ // deal with the last interval to make sure the last point is hit
+ // TODO create examples to debug this
+ if (i == intervals.size() - 1) {
+ if (segmentIntercepts.size() == 1) { // all TTTTTT, only one segment info
+ // remove all segment info, and directly connect the first and the last point
+ this.slope =
+ (timestamps.size() - 1.0) / (timestamps.getLast() - timestamps.getFirst());
+ this.segmentKeys = new DoubleArrayList();
+ this.segmentIntercepts = new DoubleArrayList();
+ this.segmentKeys.add(timestamps.get(0)); // t1
+ this.segmentIntercepts.add(1 - slope * timestamps.get(0)); // b1
+ } else {
+ // 3) to determine the intercept, let the tilt function run through (timestamps.getLast(),timestamps.size())
+ double intercept = timestamps.size() - slope * timestamps.getLast(); // b2i+1=pos-K*t
+ // 4) to determine the segment key, let the level function and the previous tilt function intersect
+ // Note that here is rewrite instead of add.
+ // Note taht here is not getLast
+ segmentKeys.set(segmentKeys.size() - 1,
+ (segmentIntercepts.get(segmentIntercepts.size() - 2) - intercept)
+ / slope); // x2i+1=(b2i-b2i+1)/K TODO debug here not getLast!
+ // then add intercept to segmentIntercepts, do not change the order of codes here
+ // Note that here is rewrite instead of add.
+ segmentIntercepts.set(segmentIntercepts.size() - 1, intercept);
+
+ // now check to remove possible disorders
+ // search from back to front to find the first tilt intercept that is equal to or larger than the current intercept
+ int start = segmentIntercepts.size() - 3; // TODO debug
+ // TODO consider only one T
+ boolean equals = false;
+ for (; start >= 0; start -= 2) {
+ // note the step is 2, only tilt intercept, no level intercept
+ if (segmentIntercepts.get(start) == intercept) {
+ equals = true;
+ break;
+ }
+ if (segmentIntercepts.get(start) > intercept) {
+ equals = false;
+ break;
+ }
+ }
+ if (start < 0) { // TODO bug consider when start<0, i.e., not found: connecting directly
+ // remove all segment info, and directly connect the first and the last point
+ this.slope =
+ (timestamps.size() - 1.0) / (timestamps.getLast() - timestamps.getFirst());
+ this.segmentKeys = new DoubleArrayList();
+ this.segmentIntercepts = new DoubleArrayList();
+ this.segmentKeys.add(timestamps.get(0)); // t1
+ this.segmentIntercepts.add(1 - slope * timestamps.get(0)); // b1
+ } else if (start < segmentIntercepts.size() - 3) {
+ if (!equals) {
+ // remove all segment information after start+1 id, i.e., remove from start+2~end
+ // note that the level after start tilt is kept since equals=false.
+ segmentIntercepts = DoubleArrayList.newListWith(
+ Arrays.copyOfRange(segmentIntercepts.toArray(), 0, start + 2));
+ segmentKeys = DoubleArrayList.newListWith(
+ Arrays.copyOfRange(segmentKeys.toArray(), 0, start + 2));
+
+ // Add new segment info for TL&T
+ // 4) to determine the segment key, let the level function and the previous tilt function intersect
+ // Note that here is add and getLast again!
+ segmentKeys.add(
+ (segmentIntercepts.getLast() - intercept) / slope); // x2i+1=(b2i-b2i+1)/K
+ // then add intercept to segmentIntercepts, do not change the order of codes here
+ // Note that here is add and getLast again!
+ segmentIntercepts.add(intercept);
+ } else {
+ // remove all segment information after start id, i.e., remove from start+1~end
+ // note that the level after start tilt is NOT kept since equal==true
+ segmentIntercepts = DoubleArrayList.newListWith(
+ Arrays.copyOfRange(segmentIntercepts.toArray(), 0, start + 1));
+ segmentKeys = DoubleArrayList.newListWith(
+ Arrays.copyOfRange(segmentKeys.toArray(), 0, start + 1));
+ // TODO debug the first status is level, b1
+ }
+ }
+ // otherwise start==segmentIntercepts.size()-3 means result is already ready, no disorder to handle
+ }
+ }
+ }
+ }
+ segmentKeys.add(timestamps.getLast()); // tm
+
+ checkOrder(segmentIntercepts);
+ }
+
+ /**
+ * For id starting from 0, since we fix that the first status is always tilt, then intercepts with
+ * even id should be monotonically decreasing, and intercepts with odd id should be monotonically
+ * increasing.
+ */
+ private void checkOrder(DoubleArrayList segmentIntercepts) throws IOException {
+ double tiltIntercept = Double.MAX_VALUE;
+ double levelIntercept = Double.MIN_VALUE;
+ for (int i = 0; i < segmentIntercepts.size(); i++) {
+ double intercept = segmentIntercepts.get(i);
+ if (i % 2 == 0) {
+ if (intercept >= tiltIntercept) {
+ throw new IOException(String.format("disorder of tilt intercepts!: i=%s", i));
+ }
+ tiltIntercept = intercept;
+ } else {
+ if (intercept <= levelIntercept) {
+ throw new IOException(String.format("disorder of level intercepts!: i=%s", i));
+ }
+ levelIntercept = intercept;
+ }
+ }
+ }
+
+ private boolean isBigInterval(long interval) {
+ int bigIntervalParam = 3;
+ return interval > this.mean + bigIntervalParam * this.stdDev;
+ }
+
+ public double getMedian() {
+ return statistics.getMedian();
+ }
+
+ public double getMad() {
+ return statistics.getMad();
+ }
+
+ public double getMean() { // sample mean
+ return sumX1 / count;
+ }
+
+ public double getStdDev() { // sample standard deviation
+ double std = Math.sqrt(this.sumX2 / this.count - Math.pow(this.sumX1 / this.count, 2));
+ return Math.sqrt(Math.pow(std, 2) * this.count / (this.count - 1));
+ }
+
+ public DoubleArrayList getSegmentIntercepts() {
+ return segmentIntercepts;
+ }
+
+ public double getSlope() {
+ return slope;
+ }
+
+ public void setSlope(double slope) {
+ this.slope = slope;
+ }
+
+ public void setSegmentKeys(DoubleArrayList segmentKeys) {
+ this.segmentKeys = segmentKeys;
+ }
+
+ public DoubleArrayList getSegmentKeys() {
+ return segmentKeys;
+ }
+
+ public IntArrayList getIntervalsType() {
+ return intervalsType;
+ }
+
+ public LongArrayList getIntervals() {
+ return intervals;
+ }
+
+ public LongArrayList getTimestamps() {
+ return timestamps;
+ }
+
+ /**
+ * infer m-1 intercepts b1,b2,...,bm-1 given the slope and m segmentKeys t1,t2,...,tm (tm is not
+ * used)
+ */
+ public static DoubleArrayList inferInterceptsFromSegmentKeys(double slope,
+ DoubleArrayList segmentKeys) {
+ DoubleArrayList segmentIntercepts = new DoubleArrayList();
+ segmentIntercepts.add(1 - slope * segmentKeys.get(0)); // b1=1-K*t1
+ for (int i = 1; i < segmentKeys.size() - 1; i++) { // b2,b3,...,bm-1
+ if (i % 2 == 0) { // b2i+1=b2i-1-K*(t2i+1-t2i)
+ double b = segmentIntercepts.get(segmentIntercepts.size() - 2);
+ segmentIntercepts.add(b - slope * (segmentKeys.get(i) - segmentKeys.get(i - 1)));
+ } else { // b2i=K*t2i+b2i-1
+ double b = segmentIntercepts.getLast();
+ segmentIntercepts.add(slope * segmentKeys.get(i) + b);
+ }
+ }
+ return segmentIntercepts;
+ }
+
+ /**
+ * @param t input
+ * @return output the value of the step regression function f(t)
+ */
+ public double infer(double t) throws IOException {
+ if (t < segmentKeys.get(0) || t > segmentKeys.getLast()) {
+ throw new IOException(
+ String.format("t out of range. input within [%s,%s]", segmentKeys.get(0),
+ segmentKeys.getLast()));
+ }
+ int seg = 0;
+ for (; seg < segmentKeys.size() - 1; seg++) {
+ if (t <= segmentKeys.get(seg + 1)) { // t < the right end of the segment interval
+ break;
+ }
+ }
+ // we have fixed that the first status is always tilt,
+ // so for indexes starting from 0, even id is tilt, odd id is level.
+ if (seg % 2 == 0) { // tilt
+ return slope * t + segmentIntercepts.get(seg);
+ } else {
+ return segmentIntercepts.get(seg);
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeExactOrderStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeExactOrderStatistics.java
new file mode 100644
index 0000000000..58e2fa0bf6
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeExactOrderStatistics.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.file.metadata.statistics;
+
+import java.util.NoSuchElementException;
+import org.eclipse.collections.impl.list.mutable.primitive.DoubleArrayList;
+import org.eclipse.collections.impl.list.mutable.primitive.FloatArrayList;
+import org.eclipse.collections.impl.list.mutable.primitive.IntArrayList;
+import org.eclipse.collections.impl.list.mutable.primitive.LongArrayList;
+
+/**
+ * Util for computing median, MAD, percentile
+ */
+public class TimeExactOrderStatistics {
+
+ private LongArrayList longArrayList;
+
+ public TimeExactOrderStatistics() {
+ longArrayList = new LongArrayList();
+ }
+
+ public void insert(long timestamp) {
+ longArrayList.add(timestamp);
+ }
+
+ public double getMedian() {
+ return getMedian(longArrayList);
+ }
+
+ public double getMad() {
+ return getMad(longArrayList);
+ }
+
+ public String getPercentile(double phi) {
+ return Long.toString(getPercentile(longArrayList, phi));
+ }
+
+ public static double getMedian(FloatArrayList nums) {
+ if (nums.isEmpty()) {
+ throw new NoSuchElementException();
+ } else {
+ nums.sortThis();
+ if (nums.size() % 2 == 0) {
+ return ((nums.get(nums.size() / 2) + nums.get(nums.size() / 2 - 1)) / 2.0);
+ } else {
+ return nums.get((nums.size() - 1) / 2);
+ }
+ }
+ }
+
+ public static double getMad(FloatArrayList nums) {
+ if (nums.isEmpty()) {
+ throw new NoSuchElementException();
+ } else {
+ double median = getMedian(nums);
+ DoubleArrayList dal = new DoubleArrayList();
+ for (int i = 0; i < nums.size(); ++i) {
+ dal.set(i, Math.abs(nums.get(i) - median));
+ }
+ return getMedian(dal);
+ }
+ }
+
+ public static float getPercentile(FloatArrayList nums, double phi) {
+ if (nums.isEmpty()) {
+ throw new NoSuchElementException();
+ } else {
+ nums.sortThis();
+ return nums.get((int) Math.ceil(nums.size() * phi));
+ }
+ }
+
+ public static double getMedian(DoubleArrayList nums) {
+ if (nums.isEmpty()) {
+ throw new NoSuchElementException();
+ } else {
+ nums.sortThis();
+ if (nums.size() % 2 == 0) {
+ return (nums.get(nums.size() / 2) + nums.get(nums.size() / 2 - 1)) / 2.0;
+ } else {
+ return nums.get((nums.size() - 1) / 2);
+ }
+ }
+ }
+
+ public static double getMad(DoubleArrayList nums) {
+ if (nums.isEmpty()) {
+ throw new NoSuchElementException();
+ } else {
+ double median = getMedian(nums);
+ DoubleArrayList dal = new DoubleArrayList();
+ for (int i = 0; i < nums.size(); ++i) {
+ dal.set(i, Math.abs(nums.get(i) - median));
+ }
+ return getMedian(dal);
+ }
+ }
+
+ public static double getPercentile(DoubleArrayList nums, double phi) {
+ if (nums.isEmpty()) {
+ throw new NoSuchElementException();
+ } else {
+ nums.sortThis();
+ return nums.get((int) Math.ceil(nums.size() * phi));
+ }
+ }
+
+ public static double getMedian(IntArrayList nums) {
+ if (nums.isEmpty()) {
+ throw new NoSuchElementException();
+ } else {
+ nums.sortThis();
+ if (nums.size() % 2 == 0) {
+ return (nums.get(nums.size() / 2) + nums.get(nums.size() / 2 - 1)) / 2.0;
+ } else {
+ return nums.get((nums.size() - 1) / 2);
+ }
+ }
+ }
+
+ public static double getMad(IntArrayList nums) {
+ if (nums.isEmpty()) {
+ throw new NoSuchElementException();
+ } else {
+ double median = getMedian(nums);
+ DoubleArrayList dal = new DoubleArrayList();
+ for (int i = 0; i < nums.size(); ++i) {
+ dal.set(i, Math.abs(nums.get(i) - median));
+ }
+ return getMedian(dal);
+ }
+ }
+
+ public static int getPercentile(IntArrayList nums, double phi) {
+ if (nums.isEmpty()) {
+ throw new NoSuchElementException();
+ } else {
+ nums.sortThis();
+ return nums.get((int) Math.ceil(nums.size() * phi));
+ }
+ }
+
+ public static double getMedian(LongArrayList nums) {
+ if (nums.isEmpty()) {
+ throw new NoSuchElementException();
+ } else {
+ nums.sortThis();
+ if (nums.size() % 2 == 0) {
+ return (nums.get(nums.size() / 2) + nums.get(nums.size() / 2 - 1)) / 2.0;
+ } else {
+ return nums.get((nums.size() - 1) / 2);
+ }
+ }
+ }
+
+ public static double getMad(LongArrayList nums) {
+ if (nums.isEmpty()) {
+ throw new NoSuchElementException();
+ } else {
+ double median = getMedian(nums);
+ DoubleArrayList dal = DoubleArrayList.newWithNValues(nums.size(), 0);
+ for (int i = 0; i < nums.size(); ++i) {
+ dal.set(i, Math.abs(nums.get(i) - median));
+ }
+ return getMedian(dal);
+ }
+ }
+
+ public static long getPercentile(LongArrayList nums, double phi) {
+ if (nums.isEmpty()) {
+ throw new NoSuchElementException();
+ } else {
+ nums.sortThis();
+ return nums.get((int) Math.ceil(nums.size() * phi));
+ }
+ }
+}