You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/01/21 01:08:47 UTC
[iotdb] branch master updated: Replace TreeSet with TimeSelector
(heap-based) for better raw query performance (#2495)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 060f4fc Replace TreeSet with TimeSelector (heap-based) for better raw query performance (#2495)
060f4fc is described below
commit 060f4fc14abfb7f5a018d47e0cff1f49cb13766c
Author: Steve Yurong Su <st...@outlook.com>
AuthorDate: Thu Jan 21 09:08:17 2021 +0800
Replace TreeSet with TimeSelector (heap-based) for better raw query performance (#2495)
Replace TreeSet with TimeSelector (heap-based) for better raw query performance
---
.../dataset/RawQueryDataSetWithoutValueFilter.java | 8 +-
.../db/query/dataset/UDTFAlignByTimeDataSet.java | 6 +-
.../iotdb/db/utils/datastructure/TimeSelector.java | 155 +++++++++++++++
.../db/utils/datastructure/TimeSelectorTest.java | 217 +++++++++++++++++++++
4 files changed, 378 insertions(+), 8 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index 6b035f8..317c6c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -22,9 +22,7 @@ package org.apache.iotdb.db.query.dataset;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
-import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.iotdb.db.concurrent.WrappedRunnable;
@@ -33,6 +31,7 @@ import org.apache.iotdb.db.query.control.QueryTimeManager;
import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -126,7 +125,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet implements
protected List<ManagedSeriesReader> seriesReaderList;
- protected TreeSet<Long> timeHeap;
+ protected TimeSelector timeHeap;
// Blocking queue list for each batch reader
private final BlockingQueue<BatchData>[] blockingQueueArray;
@@ -174,8 +173,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet implements
}
private void init() throws IOException, InterruptedException {
- timeHeap = new TreeSet<>(
- super.ascending ? Long::compareTo : Collections.reverseOrder());
+ timeHeap = new TimeSelector(seriesReaderList.size() << 1, ascending);
for (int i = 0; i < seriesReaderList.size(); i++) {
ManagedSeriesReader reader = seriesReaderList.get(i);
reader.setHasRemaining(true);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java
index a6c6c12..a8c02f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.util.TreeSet;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
@@ -32,6 +31,7 @@ import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -43,7 +43,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignByTimeDataSet {
- protected TreeSet<Long> timeHeap;
+ protected TimeSelector timeHeap;
/**
* execute with value filter
@@ -69,7 +69,7 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
}
protected void initTimeHeap() throws IOException, QueryProcessException {
- timeHeap = new TreeSet<>();
+ timeHeap = new TimeSelector(transformers.length << 1, true);
for (LayerPointReader reader : transformers) {
if (reader.next()) {
timeHeap.add(reader.currentTime());
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TimeSelector.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TimeSelector.java
new file mode 100644
index 0000000..2ccbbe1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TimeSelector.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.datastructure;
+
+import java.util.Arrays;
+
+public class TimeSelector {
+
+ private static final int MIN_DEFAULT_CAPACITY = 8;
+
+ private final boolean ascending;
+
+ private long[] timeHeap;
+ private int heapSize;
+ private long lastTime;
+
+ public TimeSelector(int defaultCapacity, boolean isAscending) {
+ this.ascending = isAscending;
+ timeHeap = new long[Math.max(defaultCapacity, MIN_DEFAULT_CAPACITY)];
+ heapSize = 0;
+ lastTime = Long.MIN_VALUE;
+ }
+
+ public boolean isEmpty() {
+ while (heapSize != 0 && timeHeap[0] == lastTime) {
+ timeHeap[0] = timeHeap[heapSize - 1];
+ percolateDown(0, timeHeap[0]);
+ --heapSize;
+ }
+ return heapSize == 0;
+ }
+
+ public void add(long time) {
+ if (heapSize == 0) {
+ timeHeap[0] = time;
+ }
+ if (percolateUp(heapSize, time)) {
+ ++heapSize;
+ checkExpansion();
+ }
+ }
+
+ public long pollFirst() {
+ long minTime = lastTime;
+
+ while (minTime == lastTime) {
+ minTime = timeHeap[0];
+
+ timeHeap[0] = timeHeap[heapSize - 1];
+ percolateDown(0, timeHeap[0]);
+ --heapSize;
+ }
+
+ lastTime = minTime;
+ return minTime;
+ }
+
+ private void checkExpansion() {
+ if (heapSize == timeHeap.length) {
+ timeHeap = Arrays.copyOf(timeHeap, timeHeap.length << 1);
+ }
+ }
+
+ private boolean percolateUp(int index, long element) {
+ if (index == 0) {
+ return true;
+ }
+
+ int parentIndex = (index - 1) >>> 1;
+ long parent = timeHeap[parentIndex];
+
+ if (parent == element) {
+ return false;
+ } else if (ascending ? element < parent : parent < element) {
+ timeHeap[index] = parent;
+ timeHeap[parentIndex] = element;
+ boolean isSuccessful = percolateUp(parentIndex, element);
+ if (!isSuccessful) {
+ timeHeap[index] = element;
+ timeHeap[parentIndex] = parent;
+ }
+ return isSuccessful;
+ } else { // ascending ? parent < element : element < parent
+ timeHeap[index] = element;
+ return true;
+ }
+ }
+
+ private void percolateDown(int index, long element) {
+ if (index == heapSize - 1) {
+ return;
+ }
+
+ int childIndex = getSmallerChildIndex(index);
+
+ if (childIndex != -1) {
+ long child = timeHeap[childIndex];
+ if (ascending ? child < element : element < child) {
+ timeHeap[childIndex] = element;
+ timeHeap[index] = child;
+ percolateDown(childIndex, element);
+ }
+ }
+ }
+
+ /**
+ * Calculates the children indexes for a given index and checks to see which one is smaller and
+ * returns the index.
+ *
+ * @param index the given index
+ * @return index of a smaller child or -1 if no children
+ */
+ private int getSmallerChildIndex(int index) {
+ final int leftChildIndex = (index << 1) + 1;
+ final int rightChildIndex = (index << 1) + 2;
+
+ int smallerChildIndex;
+ if (heapSize <= leftChildIndex) {
+ smallerChildIndex = -1;
+ } else if (heapSize <= rightChildIndex) {
+ smallerChildIndex = leftChildIndex;
+ } else {
+ if (ascending) {
+ smallerChildIndex = timeHeap[leftChildIndex] < timeHeap[rightChildIndex]
+ ? leftChildIndex : rightChildIndex;
+ } else {
+ smallerChildIndex = timeHeap[leftChildIndex] < timeHeap[rightChildIndex]
+ ? rightChildIndex : leftChildIndex;
+ }
+ }
+ return smallerChildIndex;
+ }
+
+ @Override
+ public String toString() {
+ return Arrays.toString(this.timeHeap);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/datastructure/TimeSelectorTest.java b/server/src/test/java/org/apache/iotdb/db/utils/datastructure/TimeSelectorTest.java
new file mode 100644
index 0000000..88d6376
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/utils/datastructure/TimeSelectorTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.datastructure;
+
+import java.util.Collections;
+import java.util.Random;
+import java.util.TreeSet;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TimeSelectorTest {
+
+ private static final long DEFAULT_ITERATION_TIMES = 10000;
+
+ private TreeSet<Long> selector1;
+ private TimeSelector selector2;
+
+ @Before
+ public void setUp() {
+ selector1 = new TreeSet<>();
+ selector2 = new TimeSelector(8, true);
+ }
+
+ @Test
+ public void testEmpty() {
+ Assert.assertTrue(selector2.isEmpty());
+ }
+
+ @Test
+ public void testAdd1() {
+ selector1.add(0L);
+ selector2.add(0L);
+ assertEquals();
+ }
+
+ @Test
+ public void testAdd2() {
+ for (long i = 0; i < DEFAULT_ITERATION_TIMES; ++i) {
+ selector1.add(0L);
+ selector2.add(0L);
+ }
+ assertEquals();
+ }
+
+ @Test
+ public void testAdd3() {
+ for (long i = 0; i < DEFAULT_ITERATION_TIMES; ++i) {
+ selector1.add(i);
+ selector2.add(i);
+ }
+ assertEquals();
+ }
+
+ @Test
+ public void testAdd4() {
+ for (long i = 0; i < DEFAULT_ITERATION_TIMES; ++i) {
+ selector1.add(DEFAULT_ITERATION_TIMES - i);
+ selector2.add(DEFAULT_ITERATION_TIMES - i);
+ }
+ assertEquals();
+ }
+
+ @Test
+ public void testAdd5() {
+ for (long i = 0; i < DEFAULT_ITERATION_TIMES; ++i) {
+ selector1.add(i);
+ selector1.add(DEFAULT_ITERATION_TIMES - i);
+ selector1.add(i);
+ selector1.add(DEFAULT_ITERATION_TIMES - i);
+
+ selector2.add(i);
+ selector2.add(DEFAULT_ITERATION_TIMES - i);
+ selector2.add(i);
+ selector2.add(DEFAULT_ITERATION_TIMES - i);
+ }
+ assertEquals();
+ }
+
+ @Test
+ public void testAdd6() {
+ for (long i = 0; i < DEFAULT_ITERATION_TIMES; ++i) {
+ selector1.add(i);
+ selector1.add(i + 2);
+ selector1.add(i + 1);
+ selector1.add(i - 1);
+ selector1.add(i);
+
+ selector2.add(i);
+ selector2.add(i + 2);
+ selector2.add(i + 1);
+ selector2.add(i - 1);
+ selector2.add(i);
+ }
+ assertEquals();
+ }
+
+ @Test
+ public void testAdd7() {
+ Random random = new Random();
+ for (long i = 0; i < DEFAULT_ITERATION_TIMES; ++i) {
+ long nextRandomLong = random.nextLong();
+ selector1.add(nextRandomLong);
+ selector2.add(nextRandomLong);
+ }
+ assertEquals();
+ }
+
+ @Test
+ public void testAddAndPoll1() {
+ for (long i = 0; i < DEFAULT_ITERATION_TIMES; ++i) {
+ selector1.add(i);
+ selector2.add(i);
+ if (i % 3 == 0) {
+ selector1.pollFirst();
+ selector2.pollFirst();
+ }
+ }
+ assertEquals();
+ }
+
+ @Test
+ public void testAddAndPoll2() {
+ for (long i = 0; i < DEFAULT_ITERATION_TIMES; ++i) {
+ selector1.add(DEFAULT_ITERATION_TIMES - i);
+ selector1.add(i);
+
+ selector2.add(DEFAULT_ITERATION_TIMES - i);
+ selector2.add(i);
+
+ if (i % 3 == 0) {
+ selector1.pollFirst();
+ selector2.pollFirst();
+ }
+ }
+ assertEquals();
+ }
+
+ @Test
+ public void testAddAndPoll3() {
+ for (long i = 0; i < DEFAULT_ITERATION_TIMES; ++i) {
+ selector1.add(i);
+ selector1.add(i + 2);
+ selector1.pollFirst();
+ selector1.add(i + 1);
+ selector1.add(i - 1);
+ selector1.pollFirst();
+ selector1.add(i);
+ selector1.pollFirst();
+
+ selector2.add(i);
+ selector2.add(i + 2);
+ selector2.pollFirst();
+ selector2.add(i + 1);
+ selector2.add(i - 1);
+ selector2.pollFirst();
+ selector2.add(i);
+ selector2.pollFirst();
+ }
+ assertEquals();
+ }
+
+ @Test
+ public void testDescending() {
+ selector1 = new TreeSet<>(Collections.reverseOrder());
+ selector2 = new TimeSelector(8, false);
+
+ for (long i = 0; i < DEFAULT_ITERATION_TIMES; ++i) {
+ selector1.add(i);
+ selector1.add(i + 2);
+ selector1.pollFirst();
+ selector1.add(i + 1);
+ selector1.add(i - 1);
+ selector1.pollFirst();
+ selector1.add(i);
+ selector1.pollFirst();
+
+ selector2.add(i);
+ selector2.add(i + 2);
+ selector2.pollFirst();
+ selector2.add(i + 1);
+ selector2.add(i - 1);
+ selector2.pollFirst();
+ selector2.add(i);
+ selector2.pollFirst();
+ }
+ assertEquals();
+ }
+
+ private void assertEquals() {
+ Assert.assertFalse(selector2.isEmpty());
+ Assert.assertFalse(selector2.isEmpty()); // on purpose
+ while (!selector1.isEmpty()) {
+ Assert.assertFalse(selector2.isEmpty());
+ Assert.assertEquals((long) selector1.pollFirst(), selector2.pollFirst());
+ }
+ Assert.assertTrue(selector2.isEmpty());
+ Assert.assertTrue(selector2.isEmpty()); // on purpose
+ }
+}