You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/01/21 01:08:47 UTC

[iotdb] branch master updated: Replace TreeSet with TimeSelector (heap-based) for better raw query performance (#2495)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 060f4fc  Replace TreeSet with TimeSelector (heap-based) for better raw query performance (#2495)
060f4fc is described below

commit 060f4fc14abfb7f5a018d47e0cff1f49cb13766c
Author: Steve Yurong Su <st...@outlook.com>
AuthorDate: Thu Jan 21 09:08:17 2021 +0800

    Replace TreeSet with TimeSelector (heap-based) for better raw query performance (#2495)
    
    Replace TreeSet with TimeSelector (heap-based) for better raw query performance
---
 .../dataset/RawQueryDataSetWithoutValueFilter.java |   8 +-
 .../db/query/dataset/UDTFAlignByTimeDataSet.java   |   6 +-
 .../iotdb/db/utils/datastructure/TimeSelector.java | 155 +++++++++++++++
 .../db/utils/datastructure/TimeSelectorTest.java   | 217 +++++++++++++++++++++
 4 files changed, 378 insertions(+), 8 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index 6b035f8..317c6c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -22,9 +22,7 @@ package org.apache.iotdb.db.query.dataset;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
-import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.iotdb.db.concurrent.WrappedRunnable;
@@ -33,6 +31,7 @@ import org.apache.iotdb.db.query.control.QueryTimeManager;
 import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
 import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -126,7 +125,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet implements
 
   protected List<ManagedSeriesReader> seriesReaderList;
 
-  protected TreeSet<Long> timeHeap;
+  protected TimeSelector timeHeap;
 
   // Blocking queue list for each batch reader
   private final BlockingQueue<BatchData>[] blockingQueueArray;
@@ -174,8 +173,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet implements
   }
 
   private void init() throws IOException, InterruptedException {
-    timeHeap = new TreeSet<>(
-        super.ascending ? Long::compareTo : Collections.reverseOrder());
+    timeHeap = new TimeSelector(seriesReaderList.size() << 1, ascending);
     for (int i = 0; i < seriesReaderList.size(); i++) {
       ManagedSeriesReader reader = seriesReaderList.get(i);
       reader.setHasRemaining(true);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java
index a6c6c12..a8c02f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.TreeSet;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
@@ -32,6 +31,7 @@ import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -43,7 +43,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignByTimeDataSet {
 
-  protected TreeSet<Long> timeHeap;
+  protected TimeSelector timeHeap;
 
   /**
    * execute with value filter
@@ -69,7 +69,7 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
   }
 
   protected void initTimeHeap() throws IOException, QueryProcessException {
-    timeHeap = new TreeSet<>();
+    timeHeap = new TimeSelector(transformers.length << 1, true);
     for (LayerPointReader reader : transformers) {
       if (reader.next()) {
         timeHeap.add(reader.currentTime());
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TimeSelector.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TimeSelector.java
new file mode 100644
index 0000000..2ccbbe1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TimeSelector.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.datastructure;
+
+import java.util.Arrays;
+
+public class TimeSelector {
+
+  private static final int MIN_DEFAULT_CAPACITY = 8;
+
+  private final boolean ascending;
+
+  private long[] timeHeap;
+  private int heapSize;
+  private long lastTime;
+
+  public TimeSelector(int defaultCapacity, boolean isAscending) {
+    this.ascending = isAscending;
+    timeHeap = new long[Math.max(defaultCapacity, MIN_DEFAULT_CAPACITY)];
+    heapSize = 0;
+    lastTime = Long.MIN_VALUE;
+  }
+
+  public boolean isEmpty() {
+    while (heapSize != 0 && timeHeap[0] == lastTime) {
+      timeHeap[0] = timeHeap[heapSize - 1];
+      percolateDown(0, timeHeap[0]);
+      --heapSize;
+    }
+    return heapSize == 0;
+  }
+
+  public void add(long time) {
+    if (heapSize == 0) {
+      timeHeap[0] = time;
+    }
+    if (percolateUp(heapSize, time)) {
+      ++heapSize;
+      checkExpansion();
+    }
+  }
+
+  public long pollFirst() {
+    long minTime = lastTime;
+
+    while (minTime == lastTime) {
+      minTime = timeHeap[0];
+
+      timeHeap[0] = timeHeap[heapSize - 1];
+      percolateDown(0, timeHeap[0]);
+      --heapSize;
+    }
+
+    lastTime = minTime;
+    return minTime;
+  }
+
+  private void checkExpansion() {
+    if (heapSize == timeHeap.length) {
+      timeHeap = Arrays.copyOf(timeHeap, timeHeap.length << 1);
+    }
+  }
+
+  private boolean percolateUp(int index, long element) {
+    if (index == 0) {
+      return true;
+    }
+
+    int parentIndex = (index - 1) >>> 1;
+    long parent = timeHeap[parentIndex];
+
+    if (parent == element) {
+      return false;
+    } else if (ascending ? element < parent : parent < element) {
+      timeHeap[index] = parent;
+      timeHeap[parentIndex] = element;
+      boolean isSuccessful = percolateUp(parentIndex, element);
+      if (!isSuccessful) {
+        timeHeap[index] = element;
+        timeHeap[parentIndex] = parent;
+      }
+      return isSuccessful;
+    } else { // ascending ? parent < element : element < parent
+      timeHeap[index] = element;
+      return true;
+    }
+  }
+
+  private void percolateDown(int index, long element) {
+    if (index == heapSize - 1) {
+      return;
+    }
+
+    int childIndex = getSmallerChildIndex(index);
+
+    if (childIndex != -1) {
+      long child = timeHeap[childIndex];
+      if (ascending ? child < element : element < child) {
+        timeHeap[childIndex] = element;
+        timeHeap[index] = child;
+        percolateDown(childIndex, element);
+      }
+    }
+  }
+
+  /**
+   * Calculates the children indexes for a given index and checks to see which one is smaller and
+   * returns the index.
+   *
+   * @param index the given index
+   * @return index of a smaller child or -1 if no children
+   */
+  private int getSmallerChildIndex(int index) {
+    final int leftChildIndex = (index << 1) + 1;
+    final int rightChildIndex = (index << 1) + 2;
+
+    int smallerChildIndex;
+    if (heapSize <= leftChildIndex) {
+      smallerChildIndex = -1;
+    } else if (heapSize <= rightChildIndex) {
+      smallerChildIndex = leftChildIndex;
+    } else {
+      if (ascending) {
+        smallerChildIndex = timeHeap[leftChildIndex] < timeHeap[rightChildIndex]
+            ? leftChildIndex : rightChildIndex;
+      } else {
+        smallerChildIndex = timeHeap[leftChildIndex] < timeHeap[rightChildIndex]
+            ? rightChildIndex : leftChildIndex;
+      }
+    }
+    return smallerChildIndex;
+  }
+
+  @Override
+  public String toString() {
+    return Arrays.toString(this.timeHeap);
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/datastructure/TimeSelectorTest.java b/server/src/test/java/org/apache/iotdb/db/utils/datastructure/TimeSelectorTest.java
new file mode 100644
index 0000000..88d6376
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/utils/datastructure/TimeSelectorTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.datastructure;
+
+import java.util.Collections;
+import java.util.Random;
+import java.util.TreeSet;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TimeSelectorTest {
+
+  private static final long DEFAULT_ITERATION_TIMES = 10000;
+
+  private TreeSet<Long> selector1;
+  private TimeSelector selector2;
+
+  @Before
+  public void setUp() {
+    selector1 = new TreeSet<>();
+    selector2 = new TimeSelector(8, true);
+  }
+
+  @Test
+  public void testEmpty() {
+    Assert.assertTrue(selector2.isEmpty());
+  }
+
+  @Test
+  public void testAdd1() {
+    selector1.add(0L);
+    selector2.add(0L);
+    assertEquals();
+  }
+
+  @Test
+  public void testAdd2() {
+    for (long i = 0; i < DEFAULT_ITERATION_TIMES; ++i) {
+      selector1.add(0L);
+      selector2.add(0L);
+    }
+    assertEquals();
+  }
+
+  @Test
+  public void testAdd3() {
+    for (long i = 0; i < DEFAULT_ITERATION_TIMES; ++i) {
+      selector1.add(i);
+      selector2.add(i);
+    }
+    assertEquals();
+  }
+
+  @Test
+  public void testAdd4() {
+    for (long i = 0; i < DEFAULT_ITERATION_TIMES; ++i) {
+      selector1.add(DEFAULT_ITERATION_TIMES - i);
+      selector2.add(DEFAULT_ITERATION_TIMES - i);
+    }
+    assertEquals();
+  }
+
+  @Test
+  public void testAdd5() {
+    for (long i = 0; i < DEFAULT_ITERATION_TIMES; ++i) {
+      selector1.add(i);
+      selector1.add(DEFAULT_ITERATION_TIMES - i);
+      selector1.add(i);
+      selector1.add(DEFAULT_ITERATION_TIMES - i);
+
+      selector2.add(i);
+      selector2.add(DEFAULT_ITERATION_TIMES - i);
+      selector2.add(i);
+      selector2.add(DEFAULT_ITERATION_TIMES - i);
+    }
+    assertEquals();
+  }
+
+  @Test
+  public void testAdd6() {
+    for (long i = 0; i < DEFAULT_ITERATION_TIMES; ++i) {
+      selector1.add(i);
+      selector1.add(i + 2);
+      selector1.add(i + 1);
+      selector1.add(i - 1);
+      selector1.add(i);
+
+      selector2.add(i);
+      selector2.add(i + 2);
+      selector2.add(i + 1);
+      selector2.add(i - 1);
+      selector2.add(i);
+    }
+    assertEquals();
+  }
+
+  @Test
+  public void testAdd7() {
+    Random random = new Random();
+    for (long i = 0; i < DEFAULT_ITERATION_TIMES; ++i) {
+      long nextRandomLong = random.nextLong();
+      selector1.add(nextRandomLong);
+      selector2.add(nextRandomLong);
+    }
+    assertEquals();
+  }
+
+  @Test
+  public void testAddAndPoll1() {
+    for (long i = 0; i < DEFAULT_ITERATION_TIMES; ++i) {
+      selector1.add(i);
+      selector2.add(i);
+      if (i % 3 == 0) {
+        selector1.pollFirst();
+        selector2.pollFirst();
+      }
+    }
+    assertEquals();
+  }
+
+  @Test
+  public void testAddAndPoll2() {
+    for (long i = 0; i < DEFAULT_ITERATION_TIMES; ++i) {
+      selector1.add(DEFAULT_ITERATION_TIMES - i);
+      selector1.add(i);
+
+      selector2.add(DEFAULT_ITERATION_TIMES - i);
+      selector2.add(i);
+
+      if (i % 3 == 0) {
+        selector1.pollFirst();
+        selector2.pollFirst();
+      }
+    }
+    assertEquals();
+  }
+
+  @Test
+  public void testAddAndPoll3() {
+    for (long i = 0; i < DEFAULT_ITERATION_TIMES; ++i) {
+      selector1.add(i);
+      selector1.add(i + 2);
+      selector1.pollFirst();
+      selector1.add(i + 1);
+      selector1.add(i - 1);
+      selector1.pollFirst();
+      selector1.add(i);
+      selector1.pollFirst();
+
+      selector2.add(i);
+      selector2.add(i + 2);
+      selector2.pollFirst();
+      selector2.add(i + 1);
+      selector2.add(i - 1);
+      selector2.pollFirst();
+      selector2.add(i);
+      selector2.pollFirst();
+    }
+    assertEquals();
+  }
+
+  @Test
+  public void testDescending() {
+    selector1 = new TreeSet<>(Collections.reverseOrder());
+    selector2 = new TimeSelector(8, false);
+
+    for (long i = 0; i < DEFAULT_ITERATION_TIMES; ++i) {
+      selector1.add(i);
+      selector1.add(i + 2);
+      selector1.pollFirst();
+      selector1.add(i + 1);
+      selector1.add(i - 1);
+      selector1.pollFirst();
+      selector1.add(i);
+      selector1.pollFirst();
+
+      selector2.add(i);
+      selector2.add(i + 2);
+      selector2.pollFirst();
+      selector2.add(i + 1);
+      selector2.add(i - 1);
+      selector2.pollFirst();
+      selector2.add(i);
+      selector2.pollFirst();
+    }
+    assertEquals();
+  }
+
+  private void assertEquals() {
+    Assert.assertFalse(selector2.isEmpty());
+    Assert.assertFalse(selector2.isEmpty()); // on purpose
+    while (!selector1.isEmpty()) {
+      Assert.assertFalse(selector2.isEmpty());
+      Assert.assertEquals((long) selector1.pollFirst(), selector2.pollFirst());
+    }
+    Assert.assertTrue(selector2.isEmpty());
+    Assert.assertTrue(selector2.isEmpty()); // on purpose
+  }
+}