You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by le...@apache.org on 2023/02/03 15:50:21 UTC

[iotdb] 01/28: stepRegress write

This is an automated email from the ASF dual-hosted git repository.

leirui pushed a commit to branch research/M4-visualization
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 05c9ab13cf79958881537008144b8e95099a6bc7
Author: Lei Rui <10...@qq.com>
AuthorDate: Tue Jan 24 17:18:43 2023 +0800

    stepRegress write
---
 .../dataset/groupby/LocalGroupByExecutor4CPV.java  |  64 ++--
 tsfile/pom.xml                                     |   5 +
 .../file/metadata/statistics/Statistics.java       | 176 ++++++++--
 .../file/metadata/statistics/StepRegress.java      | 383 +++++++++++++++++++++
 .../statistics/TimeExactOrderStatistics.java       | 193 +++++++++++
 5 files changed, 753 insertions(+), 68 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
index 59feabe4e1..68ab1b055a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
@@ -19,6 +19,14 @@
 
 package org.apache.iotdb.db.query.dataset.groupby;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -28,7 +36,6 @@ import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.filter.TsFileFilter;
 import org.apache.iotdb.db.query.reader.series.SeriesReader;
-import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
 import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader.MergeReaderPriority;
 import org.apache.iotdb.db.utils.FileLoaderUtils;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -51,15 +58,6 @@ import org.apache.iotdb.tsfile.read.reader.page.PageReader;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Set;
-
 /**
  * Sql format: SELECT min_time(s0), max_time(s0), first_value(s0), last_value(s0), min_value(s0),
  * max_value(s0) ROM root.xx group by ([tqs,tqe),IntervalLength). Requirements: (1) Don't change the
@@ -84,7 +82,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
 
   private TSDataType tsDataType;
 
-  private PriorityMergeReader mergeReader;
+//  private PriorityMergeReader mergeReader;
 
   public LocalGroupByExecutor4CPV(
       PartialPath path,
@@ -97,7 +95,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
       throws StorageEngineException, QueryProcessException {
 
     this.tsDataType = dataType;
-    this.mergeReader = new PriorityMergeReader();
+//    this.mergeReader = new PriorityMergeReader();
 
     // get all data sources
     QueryDataSource queryDataSource =
@@ -199,9 +197,9 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
 
   /**
    * @param curStartTime closed
-   * @param curEndTime open
-   * @param startTime closed
-   * @param endTime open
+   * @param curEndTime   open
+   * @param startTime    closed
+   * @param endTime      open
    */
   @Override
   public List<AggregateResult> calcResult(
@@ -226,7 +224,9 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
     return results;
   }
 
-  /** 对BatchData应用deletes操作,获得更新的BatchData和statistics赋值到chunkSuit4CPV中 */
+  /**
+   * 对BatchData应用deletes操作,获得更新的BatchData和statistics赋值到chunkSuit4CPV中
+   */
   private void updateBatchData(ChunkSuit4CPV chunkSuit4CPV, TSDataType dataType) {
     if (chunkSuit4CPV.getBatchData() != null) {
       BatchData batchData1 = BatchDataFactory.createBatchData(dataType, true, false);
@@ -323,8 +323,8 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
           new Comparator<ChunkSuit4CPV>() { // TODO double check the sort order logic for version
             public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) {
               return new MergeReaderPriority(
-                      o2.getChunkMetadata().getVersion(),
-                      o2.getChunkMetadata().getOffsetOfChunkHeader())
+                  o2.getChunkMetadata().getVersion(),
+                  o2.getChunkMetadata().getOffsetOfChunkHeader())
                   .compareTo(
                       new MergeReaderPriority(
                           o1.getChunkMetadata().getVersion(),
@@ -411,7 +411,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
                 .get(4) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue,
                 // minValue[bottomTimestamp], maxValue[topTimestamp]
                 .updateResultUsingValues(
-                    new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
+                    new long[]{candidateTimestamp}, 1, new Object[]{candidateValue});
             // TODO check updateResult
             return; // 计算结束
           } else { // 是被overlap,则partial scan所有这些overlap的块
@@ -455,7 +455,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
                   .get(4) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue,
                   // minValue[bottomTimestamp], maxValue[topTimestamp]
                   .updateResultUsingValues(
-                      new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
+                      new long[]{candidateTimestamp}, 1, new Object[]{candidateValue});
               // TODO check updateResult
               return; // 计算结束
             } else { // 找到这样的点,于是标记candidate point所在块为lazy
@@ -521,8 +521,8 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
           new Comparator<ChunkSuit4CPV>() { // TODO double check the sort order logic for version
             public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) {
               return new MergeReaderPriority(
-                      o2.getChunkMetadata().getVersion(),
-                      o2.getChunkMetadata().getOffsetOfChunkHeader())
+                  o2.getChunkMetadata().getVersion(),
+                  o2.getChunkMetadata().getOffsetOfChunkHeader())
                   .compareTo(
                       new MergeReaderPriority(
                           o1.getChunkMetadata().getVersion(),
@@ -609,7 +609,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
                 .get(5) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue,
                 // minValue[bottomTimestamp], maxValue[topTimestamp]
                 .updateResultUsingValues(
-                    new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
+                    new long[]{candidateTimestamp}, 1, new Object[]{candidateValue});
             // TODO check updateResult
             return; // 计算结束
           } else { // 是被overlap,则partial scan所有这些overlap的块
@@ -653,7 +653,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
                   .get(5) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue,
                   // minValue[bottomTimestamp], maxValue[topTimestamp]
                   .updateResultUsingValues(
-                      new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
+                      new long[]{candidateTimestamp}, 1, new Object[]{candidateValue});
               // TODO check updateResult
               return; // 计算结束
             } else { // 找到这样的点,于是标记candidate point所在块为lazy
@@ -701,8 +701,8 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
                 return res;
               } else {
                 return new MergeReaderPriority(
-                        o2.getChunkMetadata().getVersion(),
-                        o2.getChunkMetadata().getOffsetOfChunkHeader())
+                    o2.getChunkMetadata().getVersion(),
+                    o2.getChunkMetadata().getOffsetOfChunkHeader())
                     .compareTo(
                         new MergeReaderPriority(
                             o1.getChunkMetadata().getVersion(),
@@ -767,11 +767,11 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
           results
               .get(0)
               .updateResultUsingValues(
-                  new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
+                  new long[]{candidateTimestamp}, 1, new Object[]{candidateValue});
           results
               .get(2)
               .updateResultUsingValues(
-                  new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
+                  new long[]{candidateTimestamp}, 1, new Object[]{candidateValue});
           return;
         }
       }
@@ -798,8 +798,8 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
                 return res;
               } else {
                 return new MergeReaderPriority(
-                        o2.getChunkMetadata().getVersion(),
-                        o2.getChunkMetadata().getOffsetOfChunkHeader())
+                    o2.getChunkMetadata().getVersion(),
+                    o2.getChunkMetadata().getOffsetOfChunkHeader())
                     .compareTo(
                         new MergeReaderPriority(
                             o1.getChunkMetadata().getVersion(),
@@ -864,11 +864,11 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
           results
               .get(1)
               .updateResultUsingValues(
-                  new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
+                  new long[]{candidateTimestamp}, 1, new Object[]{candidateValue});
           results
               .get(3)
               .updateResultUsingValues(
-                  new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
+                  new long[]{candidateTimestamp}, 1, new Object[]{candidateValue});
           return;
         }
       }
diff --git a/tsfile/pom.xml b/tsfile/pom.xml
index f26c930f1a..dcddf19cf9 100644
--- a/tsfile/pom.xml
+++ b/tsfile/pom.xml
@@ -37,6 +37,11 @@
         <tsfile.ut.skip>${tsfile.test.skip}</tsfile.ut.skip>
     </properties>
     <dependencies>
+        <dependency>
+            <groupId>org.eclipse.collections</groupId>
+            <artifactId>eclipse-collections</artifactId>
+            <version>10.4.0</version>
+        </dependency>
         <dependency>
             <groupId>ch.qos.logback</groupId>
             <artifactId>logback-classic</artifactId>
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
index 947805c660..397fed5e5c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
@@ -18,27 +18,26 @@
  */
 package org.apache.iotdb.tsfile.file.metadata.statistics;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
 import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException;
 import org.apache.iotdb.tsfile.exception.write.UnknownColumnTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
+import org.eclipse.collections.impl.list.mutable.primitive.DoubleArrayList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.Objects;
-
 /**
  * This class is used for recording statistic information of each measurement in a delta file. While
  * writing processing, the processor records the statistics information. Statistics includes
- * maximum, minimum and null value count up to version 0.0.1.<br>
- * Each data type extends this Statistic as super class.<br>
+ * maximum, minimum and null value count up to version 0.0.1.<br> Each data type extends this
+ * Statistic as super class.<br>
  * <br>
  * For the statistics in the Unseq file TimeSeriesMetadata, only firstValue, lastValue, startTime
  * and endTime can be used.</br>
@@ -51,13 +50,19 @@ public abstract class Statistics<T> {
    */
   protected boolean isEmpty = true;
 
-  /** number of time-value points */
+  /**
+   * number of time-value points
+   */
   private int count = 0;
 
   private long startTime = Long.MAX_VALUE;
   private long endTime = Long.MIN_VALUE;
 
-  /** @author Yuyuan Kang */
+  private StepRegress stepRegress = new StepRegress();
+
+  /**
+   * @author Yuyuan Kang
+   */
   final String OPERATION_NOT_SUPPORT_FORMAT = "%s statistics does not support operation: %s";
 
   /**
@@ -119,34 +124,59 @@ public abstract class Statistics<T> {
     byteLen += ReadWriteForEncodingUtils.writeUnsignedVarInt(count, outputStream);
     byteLen += ReadWriteIOUtils.write(startTime, outputStream);
     byteLen += ReadWriteIOUtils.write(endTime, outputStream);
+    // TODO serialize stepRegress
+    byteLen += serializeStepRegress(outputStream);
     // value statistics of different data type
     byteLen += serializeStats(outputStream);
     return byteLen;
   }
 
+  int serializeStepRegress(OutputStream outputStream) throws IOException {
+    int byteLen = 0;
+    stepRegress.learn(); // TODO ensure excuted once and only once
+    byteLen += ReadWriteIOUtils.write(stepRegress.getSlope(), outputStream); // K
+    DoubleArrayList segmentKeys = stepRegress.getSegmentKeys();
+    // t1 is startTime, tm is endTime, so no need serialize t1 and tm
+    byteLen += ReadWriteIOUtils.write(segmentKeys.size(), outputStream); // m
+    for (int i = 1; i < segmentKeys.size() - 1; i++) { // t2,t3,...,tm-1
+      byteLen += ReadWriteIOUtils.write(segmentKeys.get(i), outputStream);
+    }
+    return byteLen;
+  }
+
   abstract int serializeStats(OutputStream outputStream) throws IOException;
 
-  /** read data from the inputStream. */
+  /**
+   * read data from the inputStream.
+   */
   public abstract void deserialize(InputStream inputStream) throws IOException;
 
   public abstract void deserialize(ByteBuffer byteBuffer);
 
   //  public abstract void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes);
 
-  /** @author Yuyuan Kang */
+  /**
+   * @author Yuyuan Kang
+   */
   public abstract MinMaxInfo<T> getMinInfo();
 
-  /** @author Yuyuan Kang */
+  /**
+   * @author Yuyuan Kang
+   */
   public abstract MinMaxInfo<T> getMaxInfo();
 
   public abstract T getMinValue();
 
   public abstract T getMaxValue();
 
-  /** @author Yuyuan Kang */
+  /**
+   * @author Yuyuan Kang
+   */
   public abstract long getBottomTimestamp();
 
-  /** @author Yuyuan Kang */
+  /**
+   * @author Yuyuan Kang
+   */
   public abstract long getTopTimestamp();
 
   public abstract T getFirstValue();
@@ -193,6 +223,9 @@ public abstract class Statistics<T> {
       // must be sure no overlap between two statistics
       this.count += stats.count;
       mergeStatisticsValue(stats);
+      // TODO M4-LSM assumes that there is always only one page in a chunk
+      // TODO M4-LSM if there are more than one chunk in a time series, then access each chunkMetadata anyway
+      this.stepRegress = stats.stepRegress;
       isEmpty = false;
     } else {
       String thisClass = this.getClass().toString();
@@ -212,9 +245,12 @@ public abstract class Statistics<T> {
     }
     count++;
     updateStats(value);
+    updateStepRegress(time);
   }
 
-  /** @author Yuyuan Kang */
+  /**
+   * @author Yuyuan Kang
+   */
   public void update(long time, int value) {
     if (time < this.startTime) {
       startTime = time;
@@ -224,9 +260,12 @@ public abstract class Statistics<T> {
     }
     count++;
     updateStats(value, time);
+    updateStepRegress(time);
   }
 
-  /** @author Yuyuan Kang */
+  /**
+   * @author Yuyuan Kang
+   */
   public void update(long time, long value) {
     if (time < this.startTime) {
       startTime = time;
@@ -236,9 +275,12 @@ public abstract class Statistics<T> {
     }
     count++;
     updateStats(value, time);
+    updateStepRegress(time);
   }
 
-  /** @author Yuyuan Kang */
+  /**
+   * @author Yuyuan Kang
+   */
   public void update(long time, float value) {
     if (time < this.startTime) {
       startTime = time;
@@ -248,9 +290,12 @@ public abstract class Statistics<T> {
     }
     count++;
     updateStats(value, time);
+    updateStepRegress(time);
   }
 
-  /** @author Yuyuan Kang */
+  /**
+   * @author Yuyuan Kang
+   */
   public void update(long time, double value) {
     if (time < this.startTime) {
       startTime = time;
@@ -260,6 +305,7 @@ public abstract class Statistics<T> {
     }
     count++;
     updateStats(value, time);
+    updateStepRegress(time);
   }
 
   public void update(long time, Binary value) {
@@ -271,6 +317,7 @@ public abstract class Statistics<T> {
     }
     count++;
     updateStats(value);
+    updateStepRegress(time);
   }
 
   public void update(long[] time, boolean[] values, int batchSize) {
@@ -282,9 +329,12 @@ public abstract class Statistics<T> {
     }
     count += batchSize;
     updateStats(values, batchSize);
+    updateStepRegress(time, batchSize);
   }
 
-  /** @author Yuyuan Kang */
+  /**
+   * @author Yuyuan Kang
+   */
   public void update(long[] time, int[] values, int batchSize) {
     if (time[0] < startTime) {
       startTime = time[0];
@@ -294,9 +344,12 @@ public abstract class Statistics<T> {
     }
     count += batchSize;
     updateStats(values, time, batchSize);
+    updateStepRegress(time, batchSize);
   }
 
-  /** @author Yuyuan Kang */
+  /**
+   * @author Yuyuan Kang
+   */
   public void update(long[] time, long[] values, int batchSize) {
     if (time[0] < startTime) {
       startTime = time[0];
@@ -306,9 +359,12 @@ public abstract class Statistics<T> {
     }
     count += batchSize;
     updateStats(values, time, batchSize);
+    updateStepRegress(time, batchSize);
   }
 
-  /** @author Yuyuan Kang */
+  /**
+   * @author Yuyuan Kang
+   */
   public void update(long[] time, float[] values, int batchSize) {
     if (time[0] < startTime) {
       startTime = time[0];
@@ -318,9 +374,12 @@ public abstract class Statistics<T> {
     }
     count += batchSize;
     updateStats(values, time, batchSize);
+    updateStepRegress(time, batchSize);
   }
 
-  /** @author Yuyuan Kang */
+  /**
+   * @author Yuyuan Kang
+   */
   public void update(long[] time, double[] values, int batchSize) {
     if (time[0] < startTime) {
       startTime = time[0];
@@ -330,6 +389,7 @@ public abstract class Statistics<T> {
     }
     count += batchSize;
     updateStats(values, time, batchSize);
+    updateStepRegress(time, batchSize);
   }
 
   public void update(long[] time, Binary[] values, int batchSize) {
@@ -341,6 +401,7 @@ public abstract class Statistics<T> {
     }
     count += batchSize;
     updateStats(values, batchSize);
+    updateStepRegress(time, batchSize);
   }
 
   protected abstract void mergeStatisticsValue(Statistics stats);
@@ -353,32 +414,54 @@ public abstract class Statistics<T> {
     isEmpty = empty;
   }
 
-  /** @author Yuyuan Kang */
+  /**
+   * @author Yuyuan Kang
+   */
   public abstract void updateMinInfo(T val, long timestamp);
 
-  /** @author Yuyuan Kang */
+  /**
+   * @author Yuyuan Kang
+   */
   public abstract void updateMaxInfo(T val, long timestamp);
 
   void updateStats(boolean value) {
     throw new UnsupportedOperationException();
   }
 
-  /** @author Yuyuan Kang */
+  void updateStepRegress(long timestamp) {
+    stepRegress.insert(timestamp);
+  }
+
+  void updateStepRegress(long[] timestamps, int batchSize) {
+    for (int i = 0; i < batchSize; i++) {
+      updateStepRegress(timestamps[i]);
+    }
+  }
+
+  /**
+   * @author Yuyuan Kang
+   */
   void updateStats(int value, long timestamp) {
     throw new UnsupportedOperationException();
   }
 
-  /** @author Yuyuan Kang */
+  /**
+   * @author Yuyuan Kang
+   */
   void updateStats(long value, long timestamp) {
     throw new UnsupportedOperationException();
   }
 
-  /** @author Yuyuan Kang */
+  /**
+   * @author Yuyuan Kang
+   */
   void updateStats(float value, long timestamp) {
     throw new UnsupportedOperationException();
   }
 
-  /** @author Yuyuan Kang */
+  /**
+   * @author Yuyuan Kang
+   */
   void updateStats(double value, long timestamp) {
     throw new UnsupportedOperationException();
   }
@@ -391,22 +474,30 @@ public abstract class Statistics<T> {
     throw new UnsupportedOperationException();
   }
 
-  /** @author Yuyuan Kang */
+  /**
+   * @author Yuyuan Kang
+   */
   void updateStats(int[] values, long[] timestamps, int batchSize) {
     throw new UnsupportedOperationException();
   }
 
-  /** @author Yuyuan Kang */
+  /**
+   * @author Yuyuan Kang
+   */
   void updateStats(long[] values, long[] timestamps, int batchSize) {
     throw new UnsupportedOperationException();
   }
 
-  /** @author Yuyuan Kang */
+  /**
+   * @author Yuyuan Kang
+   */
   void updateStats(float[] values, long[] timestamps, int batchSize) {
     throw new UnsupportedOperationException();
   }
 
-  /** @author Yuyuan Kang */
+  /**
+   * @author Yuyuan Kang
+   */
   void updateStats(double[] values, long[] timestamps, int batchSize) {
     throw new UnsupportedOperationException();
   }
@@ -416,10 +507,10 @@ public abstract class Statistics<T> {
   }
 
   /**
-   * @author Yuyuan Kang This method with two parameters is only used by {@code unsequence} which
-   *     updates/inserts/deletes timestamp.
    * @param min min timestamp
    * @param max max timestamp
+   * @author Yuyuan Kang This method with two parameters is only used by {@code unsequence} which
+   * updates/inserts/deletes timestamp.
    */
   public void updateStats(long min, long bottomTimestamp, long max, long topTimestamp) {
     throw new UnsupportedOperationException();
@@ -441,11 +532,24 @@ public abstract class Statistics<T> {
     statistics.setCount(ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
     statistics.setStartTime(ReadWriteIOUtils.readLong(buffer));
     statistics.setEndTime(ReadWriteIOUtils.readLong(buffer));
+    statistics.deserializeStepRegress(buffer); // TODO
     statistics.deserialize(buffer);
     statistics.isEmpty = false;
     return statistics;
   }
 
+  void deserializeStepRegress(ByteBuffer byteBuffer) {
+    this.stepRegress.setSlope(ReadWriteIOUtils.readDouble(byteBuffer)); //K
+    int m = ReadWriteIOUtils.readInt(byteBuffer); // m
+    DoubleArrayList segmentKeys = new DoubleArrayList();
+    segmentKeys.add(this.startTime); // t1
+    for (int i = 0; i < m - 2; i++) { // t2,t3,...,tm-1
+      segmentKeys.add(ReadWriteIOUtils.readDouble(byteBuffer));
+    }
+    segmentKeys.add(this.endTime);
+    this.stepRegress.setSegmentKeys(segmentKeys);
+  }
+
   public long getStartTime() {
     return startTime;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/StepRegress.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/StepRegress.java
new file mode 100644
index 0000000000..96d2736fb3
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/StepRegress.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.file.metadata.statistics;
+
+import java.io.IOException;
+import java.util.Arrays;
+import org.eclipse.collections.impl.list.mutable.primitive.DoubleArrayList;
+import org.eclipse.collections.impl.list.mutable.primitive.IntArrayList;
+import org.eclipse.collections.impl.list.mutable.primitive.LongArrayList;
+
+public class StepRegress {
+
+  private double slope;
+
+  // when learning parameters, we first determine segmentIntercepts and then determine segmentKeys;
+  // when using functions, we read segmentKeys and then infer segmentIntercepts.
+  // fix that the first segment is always tilt,
+  // so for indexes starting from 0, even id is tilt, odd id is level.
+  private DoubleArrayList segmentIntercepts = new DoubleArrayList(); // b1,b2,...,bm-1
+
+  // fix that the first segment [t1,t2) is always tilt,
+  // so t2=t1 in fact means that the first status is level
+  private DoubleArrayList segmentKeys = new DoubleArrayList(); // t1,t2,...,tm
+  // TODO deal with the last key tm
+
+  private LongArrayList timestamps = new LongArrayList(); // Pi.t
+  private LongArrayList intervals = new LongArrayList(); // Pi+1.t-Pi.t
+
+  enum IntervalType {
+    tilt,
+    level
+  }
+
+  private IntArrayList intervalsType = new IntArrayList();
+  private long previousTimestamp = -1;
+
+  private double mean = 0; // mean of intervals
+  private double stdDev = 0; // standard deviation of intervals
+  private long count = 0;
+  private double sumX2 = 0.0;
+  private double sumX1 = 0.0;
+
+  private double median = 0; // median of intervals
+  private double mad = 0; // median absolute deviation of intervals
+  TimeExactOrderStatistics statistics = new TimeExactOrderStatistics();
+
+  /**
+   * load data, record timestamps and intervals, preparing to calculate mean,std,median,mad along
+   * the way
+   */
+  public void insert(long timestamp) {
+    timestamps.add(timestamp); // record
+    if (previousTimestamp > 0) {
+      long delta = timestamp - previousTimestamp;
+      intervals.add(delta); // record
+      // prepare for mean and stdDev
+      count++;
+      sumX1 += delta;
+      sumX2 += delta * delta;
+      // prepare for median and mad
+      statistics.insert(delta);
+    }
+    previousTimestamp = timestamp;
+  }
+
+  private void initForLearn() {
+    this.mean = getMean();
+    this.stdDev = getStdDev();
+    this.median = getMedian();
+    this.mad = getMad();
+    this.slope = 1 / this.median;
+    this.segmentKeys.add(timestamps.get(0)); // t1
+    this.segmentIntercepts.add(1 - slope * timestamps.get(0)); // b1
+  }
+
+  /**
+   * learn the parameters of the step regression function for the loaded data.
+   */
+  public void learn() throws IOException {
+    initForLearn();
+
+    int tiltLatestSegmentID = 0;
+    IntervalType previousIntervalType = IntervalType.tilt;
+
+    for (int i = 0; i < intervals.size(); i++) {
+      long delta = intervals.get(i);
+
+      // the current point (t,pos) focused, where t is the left endpoint of the current interval.
+      long t = timestamps.get(i);
+      int pos = i + 1;
+      // the next point (t,pos), where t is the right endpoint of the current interval.
+      long nextT = timestamps.get(i + 1);
+      int nextPos = i + 2;
+
+      // 1) determine the type of the current interval
+      // level condition: big interval && the right endpoint of the interval is under the latest tilt line.
+      // Note the right endpoint, not the left endpoint.
+      // "the right endpoint of the interval is under the latest tilt line" is to ensure the
+      // monotonically decreasing order of tilt intercepts (the last interval running through the last point
+      // is handled using post-processing to avoid disorder of tilt intercepts)
+      boolean isLevel = isBigInterval(delta) && (nextPos < slope * nextT + segmentIntercepts.get(
+          tiltLatestSegmentID));
+      // to avoid TLTLTLTL... causing trivial segments, add extra rule for tilt
+      if (!isLevel) {
+        if (previousIntervalType == IntervalType.level) { // when previous interval is level
+          if (i < intervals.size() - 1) { // when having next interval
+            long nextDelta = intervals.get(i + 1);
+            if (isBigInterval(nextDelta) && (nextPos + 1
+                < slope * timestamps.get(i + 2) + segmentIntercepts.get(
+                tiltLatestSegmentID))) { // when next interval is also level
+              isLevel = true; // then fix type from tilt to level, LTL=>LLL
+            }
+          }
+        }
+      }
+
+      // 2) determine if starting a new segment
+      if (isLevel) {
+        intervalsType.add(IntervalType.level.ordinal());
+        if (previousIntervalType == IntervalType.tilt) { // else do nothing, still level
+          // [[[translate from tilt to level]]]
+          previousIntervalType = IntervalType.level;
+          // 3) to determine the intercept, let the level function run through (t,pos)
+          double intercept = pos; // b2i=pos
+          // 4) to determine the segment key, let the level function and the previous tilt function intersect
+          segmentKeys.add((intercept - segmentIntercepts.getLast()) / slope); // x2i=(b2i-b2i-1)/K
+          // then add intercept to segmentIntercepts, do not change the order of codes here
+          segmentIntercepts.add(
+              intercept); // TODO debug if the first status is actually level works
+        }
+        // deal with the last interval to make sure the last point is hit
+        // TODO create examples to debug this
+        if (i == intervals.size() - 1) {
+          // 3) to determine the intercept, let the level function run through (timestamps.getLast(),timestamps.size())
+          double intercept = timestamps.size(); // b2i=pos
+          // 4) to determine the segment key, let the level function and the previous tilt function intersect
+          // Note that here is rewrite instead of add.
+          // Note taht here is not getLast
+          segmentKeys.set(segmentKeys.size() - 1,
+              (intercept - segmentIntercepts.get(segmentIntercepts.size() - 2))
+                  / slope); // x2i=(b2i-b2i-1)/K TODO debug here not getLast!
+          // then add intercept to segmentIntercepts, do not change the order of codes here
+          // Note that here is rewrite instead of add.
+          segmentIntercepts.set(segmentIntercepts.size() - 1, intercept);
+        }
+      } else {
+        intervalsType.add(IntervalType.tilt.ordinal());
+        if (previousIntervalType == IntervalType.level) { // else do nothing, still tilt
+          // [[[translate form level to tilt]]]
+          previousIntervalType = IntervalType.tilt;
+          // 3) to determine the intercept, let the tilt function run through (t,pos)
+          double intercept = pos - slope * t; // b2i+1=pos-K*t
+          // 4) to determine the segment key, let the level function and the previous tilt function intersect
+          segmentKeys.add((segmentIntercepts.getLast() - intercept) / slope); // x2i+1=(b2i-b2i+1)/K
+          // then add intercept to segmentIntercepts, do not change the order of codes here
+          segmentIntercepts.add(intercept);
+          // remember to update tiltLatestSegmentID
+          tiltLatestSegmentID += 2;
+        }
+        // deal with the last interval to make sure the last point is hit
+        // TODO create examples to debug this
+        if (i == intervals.size() - 1) {
+          if (segmentIntercepts.size() == 1) { // all TTTTTT, only one segment info
+            // remove all segment info, and directly connect the first and the last point
+            this.slope =
+                (timestamps.size() - 1.0) / (timestamps.getLast() - timestamps.getFirst());
+            this.segmentKeys = new DoubleArrayList();
+            this.segmentIntercepts = new DoubleArrayList();
+            this.segmentKeys.add(timestamps.get(0)); // t1
+            this.segmentIntercepts.add(1 - slope * timestamps.get(0)); // b1
+          } else {
+            // 3) to determine the intercept, let the tilt function run through (timestamps.getLast(),timestamps.size())
+            double intercept = timestamps.size() - slope * timestamps.getLast(); // b2i+1=pos-K*t
+            // 4) to determine the segment key, let the level function and the previous tilt function intersect
+            // Note that here is rewrite instead of add.
+            // Note taht here is not getLast
+            segmentKeys.set(segmentKeys.size() - 1,
+                (segmentIntercepts.get(segmentIntercepts.size() - 2) - intercept)
+                    / slope); // x2i+1=(b2i-b2i+1)/K TODO debug here not getLast!
+            // then add intercept to segmentIntercepts, do not change the order of codes here
+            // Note that here is rewrite instead of add.
+            segmentIntercepts.set(segmentIntercepts.size() - 1, intercept);
+
+            // now check to remove possible disorders
+            // search from back to front to find the first tilt intercept that is equal to or larger than the current intercept
+            int start = segmentIntercepts.size() - 3; // TODO debug
+            // TODO consider only one T
+            boolean equals = false;
+            for (; start >= 0; start -= 2) {
+              // note the step is 2, only tilt intercept, no level intercept
+              if (segmentIntercepts.get(start) == intercept) {
+                equals = true;
+                break;
+              }
+              if (segmentIntercepts.get(start) > intercept) {
+                equals = false;
+                break;
+              }
+            }
+            if (start < 0) { // TODO bug consider when start<0, i.e., not found: connecting directly
+              // remove all segment info, and directly connect the first and the last point
+              this.slope =
+                  (timestamps.size() - 1.0) / (timestamps.getLast() - timestamps.getFirst());
+              this.segmentKeys = new DoubleArrayList();
+              this.segmentIntercepts = new DoubleArrayList();
+              this.segmentKeys.add(timestamps.get(0)); // t1
+              this.segmentIntercepts.add(1 - slope * timestamps.get(0)); // b1
+            } else if (start < segmentIntercepts.size() - 3) {
+              if (!equals) {
+                // remove all segment information after start+1 id, i.e., remove from start+2~end
+                // note that the level after start tilt is kept since equals=false.
+                segmentIntercepts = DoubleArrayList.newListWith(
+                    Arrays.copyOfRange(segmentIntercepts.toArray(), 0, start + 2));
+                segmentKeys = DoubleArrayList.newListWith(
+                    Arrays.copyOfRange(segmentKeys.toArray(), 0, start + 2));
+
+                // Add new segment info for TL&T
+                // 4) to determine the segment key, let the level function and the previous tilt function intersect
+                // Note that here is add and getLast again!
+                segmentKeys.add(
+                    (segmentIntercepts.getLast() - intercept) / slope); // x2i+1=(b2i-b2i+1)/K
+                // then add intercept to segmentIntercepts, do not change the order of codes here
+                // Note that here is add and getLast again!
+                segmentIntercepts.add(intercept);
+              } else {
+                // remove all segment information after start id, i.e., remove from start+1~end
+                // note that the level after start tilt is NOT kept since equal==true
+                segmentIntercepts = DoubleArrayList.newListWith(
+                    Arrays.copyOfRange(segmentIntercepts.toArray(), 0, start + 1));
+                segmentKeys = DoubleArrayList.newListWith(
+                    Arrays.copyOfRange(segmentKeys.toArray(), 0, start + 1));
+                // TODO debug the first status is level, b1
+              }
+            }
+            // otherwise start==segmentIntercepts.size()-3 means result is already ready, no disorder to handle
+          }
+        }
+      }
+    }
+    segmentKeys.add(timestamps.getLast()); // tm
+
+    checkOrder(segmentIntercepts);
+  }
+
+  /**
+   * For id starting from 0, since we fix that the first status is always tilt, then intercepts with
+   * even id should be monotonically decreasing, and intercepts with odd id should be monotonically
+   * increasing.
+   */
+  private void checkOrder(DoubleArrayList segmentIntercepts) throws IOException {
+    double tiltIntercept = Double.MAX_VALUE;
+    double levelIntercept = Double.MIN_VALUE;
+    for (int i = 0; i < segmentIntercepts.size(); i++) {
+      double intercept = segmentIntercepts.get(i);
+      if (i % 2 == 0) {
+        if (intercept >= tiltIntercept) {
+          throw new IOException(String.format("disorder of tilt intercepts!: i=%s", i));
+        }
+        tiltIntercept = intercept;
+      } else {
+        if (intercept <= levelIntercept) {
+          throw new IOException(String.format("disorder of level intercepts!: i=%s", i));
+        }
+        levelIntercept = intercept;
+      }
+    }
+  }
+
+  private boolean isBigInterval(long interval) {
+    int bigIntervalParam = 3;
+    return interval > this.mean + bigIntervalParam * this.stdDev;
+  }
+
+  public double getMedian() {
+    return statistics.getMedian();
+  }
+
+  public double getMad() {
+    return statistics.getMad();
+  }
+
+  public double getMean() { // sample mean
+    return sumX1 / count;
+  }
+
+  public double getStdDev() { // sample standard deviation
+    double std = Math.sqrt(this.sumX2 / this.count - Math.pow(this.sumX1 / this.count, 2));
+    return Math.sqrt(Math.pow(std, 2) * this.count / (this.count - 1));
+  }
+
+  public DoubleArrayList getSegmentIntercepts() {
+    return segmentIntercepts;
+  }
+
+  public double getSlope() {
+    return slope;
+  }
+
+  public void setSlope(double slope) {
+    this.slope = slope;
+  }
+
+  public void setSegmentKeys(DoubleArrayList segmentKeys) {
+    this.segmentKeys = segmentKeys;
+  }
+
+  public DoubleArrayList getSegmentKeys() {
+    return segmentKeys;
+  }
+
+  public IntArrayList getIntervalsType() {
+    return intervalsType;
+  }
+
+  public LongArrayList getIntervals() {
+    return intervals;
+  }
+
+  public LongArrayList getTimestamps() {
+    return timestamps;
+  }
+
+  /**
+   * infer m-1 intercepts b1,b2,...,bm-1 given the slope and m segmentKeys t1,t2,...,tm (tm is not
+   * used)
+   */
+  public static DoubleArrayList inferInterceptsFromSegmentKeys(double slope,
+      DoubleArrayList segmentKeys) {
+    DoubleArrayList segmentIntercepts = new DoubleArrayList();
+    segmentIntercepts.add(1 - slope * segmentKeys.get(0)); // b1=1-K*t1
+    for (int i = 1; i < segmentKeys.size() - 1; i++) { // b2,b3,...,bm-1
+      if (i % 2 == 0) { // b2i+1=b2i-1-K*(t2i+1-t2i)
+        double b = segmentIntercepts.get(segmentIntercepts.size() - 2);
+        segmentIntercepts.add(b - slope * (segmentKeys.get(i) - segmentKeys.get(i - 1)));
+      } else { // b2i=K*t2i+b2i-1
+        double b = segmentIntercepts.getLast();
+        segmentIntercepts.add(slope * segmentKeys.get(i) + b);
+      }
+    }
+    return segmentIntercepts;
+  }
+
+  /**
+   * @param t input
+   * @return output the value of the step regression function f(t)
+   */
+  public double infer(double t) throws IOException {
+    if (t < segmentKeys.get(0) || t > segmentKeys.getLast()) {
+      throw new IOException(
+          String.format("t out of range. input within [%s,%s]", segmentKeys.get(0),
+              segmentKeys.getLast()));
+    }
+    int seg = 0;
+    for (; seg < segmentKeys.size() - 1; seg++) {
+      if (t <= segmentKeys.get(seg + 1)) { // t < the right end of the segment interval
+        break;
+      }
+    }
+    // we have fixed that the first status is always tilt,
+    // so for indexes starting from 0, even id is tilt, odd id is level.
+    if (seg % 2 == 0) { // tilt
+      return slope * t + segmentIntercepts.get(seg);
+    } else {
+      return segmentIntercepts.get(seg);
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeExactOrderStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeExactOrderStatistics.java
new file mode 100644
index 0000000000..58e2fa0bf6
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeExactOrderStatistics.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.file.metadata.statistics;
+
+import java.util.NoSuchElementException;
+import org.eclipse.collections.impl.list.mutable.primitive.DoubleArrayList;
+import org.eclipse.collections.impl.list.mutable.primitive.FloatArrayList;
+import org.eclipse.collections.impl.list.mutable.primitive.IntArrayList;
+import org.eclipse.collections.impl.list.mutable.primitive.LongArrayList;
+
+/**
+ * Util for computing median, MAD, percentile
+ */
+public class TimeExactOrderStatistics {
+
+  private LongArrayList longArrayList;
+
+  public TimeExactOrderStatistics() {
+    longArrayList = new LongArrayList();
+  }
+
+  public void insert(long timestamp) {
+    longArrayList.add(timestamp);
+  }
+
+  public double getMedian() {
+    return getMedian(longArrayList);
+  }
+
+  public double getMad() {
+    return getMad(longArrayList);
+  }
+
+  public String getPercentile(double phi) {
+    return Long.toString(getPercentile(longArrayList, phi));
+  }
+
+  public static double getMedian(FloatArrayList nums) {
+    if (nums.isEmpty()) {
+      throw new NoSuchElementException();
+    } else {
+      nums.sortThis();
+      if (nums.size() % 2 == 0) {
+        return ((nums.get(nums.size() / 2) + nums.get(nums.size() / 2 - 1)) / 2.0);
+      } else {
+        return nums.get((nums.size() - 1) / 2);
+      }
+    }
+  }
+
+  public static double getMad(FloatArrayList nums) {
+    if (nums.isEmpty()) {
+      throw new NoSuchElementException();
+    } else {
+      double median = getMedian(nums);
+      DoubleArrayList dal = new DoubleArrayList();
+      for (int i = 0; i < nums.size(); ++i) {
+        dal.set(i, Math.abs(nums.get(i) - median));
+      }
+      return getMedian(dal);
+    }
+  }
+
+  public static float getPercentile(FloatArrayList nums, double phi) {
+    if (nums.isEmpty()) {
+      throw new NoSuchElementException();
+    } else {
+      nums.sortThis();
+      return nums.get((int) Math.ceil(nums.size() * phi));
+    }
+  }
+
+  public static double getMedian(DoubleArrayList nums) {
+    if (nums.isEmpty()) {
+      throw new NoSuchElementException();
+    } else {
+      nums.sortThis();
+      if (nums.size() % 2 == 0) {
+        return (nums.get(nums.size() / 2) + nums.get(nums.size() / 2 - 1)) / 2.0;
+      } else {
+        return nums.get((nums.size() - 1) / 2);
+      }
+    }
+  }
+
+  public static double getMad(DoubleArrayList nums) {
+    if (nums.isEmpty()) {
+      throw new NoSuchElementException();
+    } else {
+      double median = getMedian(nums);
+      DoubleArrayList dal = new DoubleArrayList();
+      for (int i = 0; i < nums.size(); ++i) {
+        dal.set(i, Math.abs(nums.get(i) - median));
+      }
+      return getMedian(dal);
+    }
+  }
+
+  public static double getPercentile(DoubleArrayList nums, double phi) {
+    if (nums.isEmpty()) {
+      throw new NoSuchElementException();
+    } else {
+      nums.sortThis();
+      return nums.get((int) Math.ceil(nums.size() * phi));
+    }
+  }
+
+  public static double getMedian(IntArrayList nums) {
+    if (nums.isEmpty()) {
+      throw new NoSuchElementException();
+    } else {
+      nums.sortThis();
+      if (nums.size() % 2 == 0) {
+        return (nums.get(nums.size() / 2) + nums.get(nums.size() / 2 - 1)) / 2.0;
+      } else {
+        return nums.get((nums.size() - 1) / 2);
+      }
+    }
+  }
+
+  public static double getMad(IntArrayList nums) {
+    if (nums.isEmpty()) {
+      throw new NoSuchElementException();
+    } else {
+      double median = getMedian(nums);
+      DoubleArrayList dal = new DoubleArrayList();
+      for (int i = 0; i < nums.size(); ++i) {
+        dal.set(i, Math.abs(nums.get(i) - median));
+      }
+      return getMedian(dal);
+    }
+  }
+
+  public static int getPercentile(IntArrayList nums, double phi) {
+    if (nums.isEmpty()) {
+      throw new NoSuchElementException();
+    } else {
+      nums.sortThis();
+      return nums.get((int) Math.ceil(nums.size() * phi));
+    }
+  }
+
+  public static double getMedian(LongArrayList nums) {
+    if (nums.isEmpty()) {
+      throw new NoSuchElementException();
+    } else {
+      nums.sortThis();
+      if (nums.size() % 2 == 0) {
+        return (nums.get(nums.size() / 2) + nums.get(nums.size() / 2 - 1)) / 2.0;
+      } else {
+        return nums.get((nums.size() - 1) / 2);
+      }
+    }
+  }
+
+  public static double getMad(LongArrayList nums) {
+    if (nums.isEmpty()) {
+      throw new NoSuchElementException();
+    } else {
+      double median = getMedian(nums);
+      DoubleArrayList dal = DoubleArrayList.newWithNValues(nums.size(), 0);
+      for (int i = 0; i < nums.size(); ++i) {
+        dal.set(i, Math.abs(nums.get(i) - median));
+      }
+      return getMedian(dal);
+    }
+  }
+
+  public static long getPercentile(LongArrayList nums, double phi) {
+    if (nums.isEmpty()) {
+      throw new NoSuchElementException();
+    } else {
+      nums.sortThis();
+      return nums.get((int) Math.ceil(nums.size() * phi));
+    }
+  }
+}