You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/03/19 09:15:07 UTC
[iotdb] branch master updated: Optimize cluster query (#2859)
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c9026e2 Optimize cluster query (#2859)
c9026e2 is described below
commit c9026e2f29fe18c08fe9d7ad3fbb541dd14f5913
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Fri Mar 19 17:14:39 2021 +0800
Optimize cluster query (#2859)
---
.../iotdb/cluster/query/LocalQueryExecutor.java | 15 +-
.../iotdb/cluster/query/reader/EmptyReader.java | 3 +-
.../cluster/query/reader/MergedReaderByTime.java | 8 +-
.../reader/RemoteSeriesReaderByTimestamp.java | 40 +++--
.../iotdb/cluster/server/DataClusterServer.java | 13 +-
.../cluster/server/service/DataAsyncService.java | 10 +-
.../cluster/server/service/DataSyncService.java | 5 +-
.../iotdb/cluster/common/TestAsyncDataClient.java | 9 +-
.../cluster/common/TestManagedSeriesReader.java | 23 +--
.../query/manage/ClusterQueryManagerTest.java | 2 +-
.../reader/RemoteSeriesReaderByTimestampTest.java | 68 +++++---
.../cluster/server/member/DataGroupMemberTest.java | 30 ++--
.../cluster/server/member/MetaGroupMemberTest.java | 8 +-
.../db/query/aggregation/AggregateResult.java | 4 +
.../db/query/aggregation/impl/AvgAggrResult.java | 15 +-
.../db/query/aggregation/impl/CountAggrResult.java | 18 ++-
.../aggregation/impl/FirstValueAggrResult.java | 26 +++-
.../aggregation/impl/FirstValueDescAggrResult.java | 20 ++-
.../aggregation/impl/LastValueAggrResult.java | 26 ++--
.../aggregation/impl/LastValueDescAggrResult.java | 37 +++--
.../query/aggregation/impl/MaxTimeAggrResult.java | 20 ++-
.../aggregation/impl/MaxTimeDescAggrResult.java | 31 +++-
.../query/aggregation/impl/MaxValueAggrResult.java | 18 ++-
.../query/aggregation/impl/MinTimeAggrResult.java | 24 ++-
.../aggregation/impl/MinTimeDescAggrResult.java | 17 +-
.../query/aggregation/impl/MinValueAggrResult.java | 18 ++-
.../db/query/aggregation/impl/SumAggrResult.java | 15 +-
.../dataset/RawQueryDataSetWithValueFilter.java | 172 ++++++++++++++-------
.../db/query/executor/AggregationExecutor.java | 87 +++++++----
.../adapter/ByTimestampReaderAdapter.java | 48 +++---
.../reader/chunk/DiskChunkReaderByTimestamp.java | 55 +++----
.../db/query/reader/series/IReaderByTimestamp.java | 2 +-
.../reader/series/SeriesReaderByTimestamp.java | 17 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 5 +-
.../reader/series/SeriesReaderByTimestampTest.java | 12 +-
thrift/src/main/thrift/cluster.thrift | 4 +-
.../query/dataset/DataSetWithTimeGenerator.java | 2 +-
.../tsfile/read/query/dataset/QueryDataSet.java | 5 +
.../read/query/timegenerator/TimeGenerator.java | 52 +++++--
.../read/query/timegenerator/node/AndNode.java | 18 +--
.../read/query/timegenerator/node/OrNode.java | 42 ++---
.../tsfile/read/reader/FakedTimeGenerator.java | 15 +-
42 files changed, 706 insertions(+), 353 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index 028b3fc..24c7847 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -102,24 +102,19 @@ public class LocalQueryExecutor {
return ((CMManager) IoTDB.metaManager);
}
- /**
- * Return the data of the reader whose id is "readerId", using timestamps in "timeBuffer".
- *
- * @param readerId
- * @param time
- */
- public ByteBuffer fetchSingleSeriesByTimestamp(long readerId, long time)
+ /** Return the data of the reader whose id is "readerId", using timestamps in "timeBuffer". */
+ public ByteBuffer fetchSingleSeriesByTimestamps(long readerId, long[] timestamps, int length)
throws ReaderNotFoundException, IOException {
IReaderByTimestamp reader = dataGroupMember.getQueryManager().getReaderByTimestamp(readerId);
if (reader == null) {
throw new ReaderNotFoundException(readerId);
}
- Object value = reader.getValueInTimestamp(time);
- if (value != null) {
+ Object[] values = reader.getValuesInTimestamps(timestamps, length);
+ if (values != null) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
- SerializeUtils.serializeObject(value, dataOutputStream);
+ SerializeUtils.serializeObjects(values, dataOutputStream);
return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
} else {
return ByteBuffer.allocate(0);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java
index 933a9ab..3b29a69 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.utils.Pair;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -155,7 +156,7 @@ public class EmptyReader extends BaseManagedSeriesReader
}
@Override
- public Object getValueInTimestamp(long timestamp) {
+ public Object[] getValuesInTimestamps(long[] timestamps, int length) throws IOException {
return null;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java
index 2db6a09..802355c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java
@@ -33,12 +33,12 @@ public class MergedReaderByTime implements IReaderByTimestamp {
}
@Override
- public Object getValueInTimestamp(long timestamp) throws IOException {
+ public Object[] getValuesInTimestamps(long[] timestamps, int length) throws IOException {
for (IReaderByTimestamp innerReader : innerReaders) {
if (innerReader != null) {
- Object valueInTimestamp = innerReader.getValueInTimestamp(timestamp);
- if (valueInTimestamp != null) {
- return valueInTimestamp;
+ Object[] results = innerReader.getValuesInTimestamps(timestamps, length);
+ if (results != null) {
+ return results;
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
index 2b68ef4..d077f02 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
@@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
public class RemoteSeriesReaderByTimestamp implements IReaderByTimestamp {
@@ -49,37 +51,42 @@ public class RemoteSeriesReaderByTimestamp implements IReaderByTimestamp {
}
@Override
- public Object getValueInTimestamp(long timestamp) throws IOException {
+ public Object[] getValuesInTimestamps(long[] timestamps, int length) throws IOException {
if (!sourceInfo.checkCurClient()) {
return null;
}
ByteBuffer result;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
- result = fetchResultAsync(timestamp);
+ result = fetchResultAsync(timestamps, length);
} else {
- result = fetchResultSync(timestamp);
+ result = fetchResultSync(timestamps, length);
}
- return SerializeUtils.deserializeObject(result);
+ return SerializeUtils.deserializeObjects(result);
}
@SuppressWarnings("java:S2274") // enable timeout
- private ByteBuffer fetchResultAsync(long timestamp) throws IOException {
+ private ByteBuffer fetchResultAsync(long[] timestamps, int length) throws IOException {
+ // convert long[] to List<Long>, which is used for thrift
+ List<Long> timestampList = new ArrayList<>(length);
+ for (int i = 0; i < length; i++) {
+ timestampList.add(timestamps[i]);
+ }
synchronized (fetchResult) {
fetchResult.set(null);
try {
sourceInfo
.getCurAsyncClient(RaftServer.getReadOperationTimeoutMS())
- .fetchSingleSeriesByTimestamp(
- sourceInfo.getHeader(), sourceInfo.getReaderId(), timestamp, handler);
+ .fetchSingleSeriesByTimestamps(
+ sourceInfo.getHeader(), sourceInfo.getReaderId(), timestampList, handler);
fetchResult.wait(RaftServer.getReadOperationTimeoutMS());
} catch (TException e) {
// try other node
- if (!sourceInfo.switchNode(true, timestamp)) {
+ if (!sourceInfo.switchNode(true, timestamps[0])) {
return null;
}
- return fetchResultAsync(timestamp);
+ return fetchResultAsync(timestamps, length);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Query {} interrupted", sourceInfo);
@@ -89,18 +96,23 @@ public class RemoteSeriesReaderByTimestamp implements IReaderByTimestamp {
return fetchResult.get();
}
- private ByteBuffer fetchResultSync(long timestamp) throws IOException {
+ private ByteBuffer fetchResultSync(long[] timestamps, int length) throws IOException {
SyncDataClient curSyncClient = null;
+ // convert long[] to List<Long>, which is used for thrift
+ List<Long> timestampList = new ArrayList<>(length);
+ for (int i = 0; i < length; i++) {
+ timestampList.add(timestamps[i]);
+ }
try {
curSyncClient = sourceInfo.getCurSyncClient(RaftServer.getReadOperationTimeoutMS());
- return curSyncClient.fetchSingleSeriesByTimestamp(
- sourceInfo.getHeader(), sourceInfo.getReaderId(), timestamp);
+ return curSyncClient.fetchSingleSeriesByTimestamps(
+ sourceInfo.getHeader(), sourceInfo.getReaderId(), timestampList);
} catch (TException e) {
// try other node
- if (!sourceInfo.switchNode(true, timestamp)) {
+ if (!sourceInfo.switchNode(true, timestamps[0])) {
return null;
}
- return fetchResultSync(timestamp);
+ return fetchResultSync(timestamps, length);
} finally {
if (curSyncClient != null) {
ClientUtils.putBackSyncClient(curSyncClient);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 012569e..03a78a9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -384,12 +384,15 @@ public class DataClusterServer extends RaftServer
}
@Override
- public void fetchSingleSeriesByTimestamp(
- Node header, long readerId, long time, AsyncMethodCallback<ByteBuffer> resultHandler) {
+ public void fetchSingleSeriesByTimestamps(
+ Node header,
+ long readerId,
+ List<Long> timestamps,
+ AsyncMethodCallback<ByteBuffer> resultHandler) {
DataAsyncService service =
getDataAsyncService(header, resultHandler, "Fetch by timestamp:" + readerId);
if (service != null) {
- service.fetchSingleSeriesByTimestamp(header, readerId, time, resultHandler);
+ service.fetchSingleSeriesByTimestamps(header, readerId, timestamps, resultHandler);
}
}
@@ -737,9 +740,9 @@ public class DataClusterServer extends RaftServer
}
@Override
- public ByteBuffer fetchSingleSeriesByTimestamp(Node header, long readerId, long timestamp)
+ public ByteBuffer fetchSingleSeriesByTimestamps(Node header, long readerId, List<Long> timestamps)
throws TException {
- return getDataSyncService(header).fetchSingleSeriesByTimestamp(header, readerId, timestamp);
+ return getDataSyncService(header).fetchSingleSeriesByTimestamps(header, readerId, timestamps);
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
index 41a2689..99fe858 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
@@ -207,13 +207,17 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
}
@Override
- public void fetchSingleSeriesByTimestamp(
- Node header, long readerId, long timestamp, AsyncMethodCallback<ByteBuffer> resultHandler) {
+ public void fetchSingleSeriesByTimestamps(
+ Node header,
+ long readerId,
+ List<Long> timestamps,
+ AsyncMethodCallback<ByteBuffer> resultHandler) {
try {
resultHandler.onComplete(
dataGroupMember
.getLocalQueryExecutor()
- .fetchSingleSeriesByTimestamp(readerId, timestamp));
+ .fetchSingleSeriesByTimestamps(
+ readerId, timestamps.stream().mapToLong(k -> k).toArray(), timestamps.size()));
} catch (ReaderNotFoundException | IOException e) {
resultHandler.onError(e);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index 555a0bc..b3c5610 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -210,12 +210,13 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
}
@Override
- public ByteBuffer fetchSingleSeriesByTimestamp(Node header, long readerId, long timestamp)
+ public ByteBuffer fetchSingleSeriesByTimestamps(Node header, long readerId, List<Long> timestamps)
throws TException {
try {
return dataGroupMember
.getLocalQueryExecutor()
- .fetchSingleSeriesByTimestamp(readerId, timestamp);
+ .fetchSingleSeriesByTimestamps(
+ readerId, timestamps.stream().mapToLong(k -> k).toArray(), timestamps.size());
} catch (ReaderNotFoundException | IOException e) {
throw new TException(e);
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
index 661dfbf..5b2797a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
@@ -99,12 +99,15 @@ public class TestAsyncDataClient extends AsyncDataClient {
}
@Override
- public void fetchSingleSeriesByTimestamp(
- Node header, long readerId, long time, AsyncMethodCallback<ByteBuffer> resultHandler) {
+ public void fetchSingleSeriesByTimestamps(
+ Node header,
+ long readerId,
+ List<Long> timestamps,
+ AsyncMethodCallback<ByteBuffer> resultHandler) {
new Thread(
() ->
new DataAsyncService(dataGroupMemberMap.get(header))
- .fetchSingleSeriesByTimestamp(header, readerId, time, resultHandler))
+ .fetchSingleSeriesByTimestamps(header, readerId, timestamps, resultHandler))
.start();
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestManagedSeriesReader.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestManagedSeriesReader.java
index 5ac3f64..cbd9bb6 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestManagedSeriesReader.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestManagedSeriesReader.java
@@ -57,17 +57,22 @@ public class TestManagedSeriesReader implements ManagedSeriesReader, IReaderByTi
}
@Override
- public Object getValueInTimestamp(long timestamp) {
- while (batchData.hasCurrent()) {
- long currTime = batchData.currentTime();
- if (currTime == timestamp) {
- return batchData.currentValue();
- } else if (currTime > timestamp) {
- break;
+ public Object[] getValuesInTimestamps(long[] timestamps, int length) {
+ Object[] results = new Object[length];
+ for (int i = 0; i < length; i++) {
+ while (batchData.hasCurrent()) {
+ long currTime = batchData.currentTime();
+ if (currTime == timestamps[i]) {
+ results[i] = batchData.currentValue();
+ break;
+ } else if (currTime > timestamps[i]) {
+ results[i] = null;
+ break;
+ }
+ batchData.next();
}
- batchData.next();
}
- return null;
+ return results;
}
@Override
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/ClusterQueryManagerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/ClusterQueryManagerTest.java
index 01a7b8b..35a404e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/ClusterQueryManagerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/ClusterQueryManagerTest.java
@@ -79,7 +79,7 @@ public class ClusterQueryManagerTest {
@Test
public void testRegisterReaderByTime() {
- IReaderByTimestamp reader = timestamp -> null;
+ IReaderByTimestamp reader = (timestamp, length) -> null;
long id = queryManager.registerReaderByTime(reader);
assertSame(reader, queryManager.getReaderByTimestamp(id));
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
index a81dc66..ef2772e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
@@ -45,11 +45,11 @@ import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
public class RemoteSeriesReaderByTimestampTest {
@@ -68,10 +68,10 @@ public class RemoteSeriesReaderByTimestampTest {
public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
return new AsyncDataClient(null, null, node, null) {
@Override
- public void fetchSingleSeriesByTimestamp(
+ public void fetchSingleSeriesByTimestamps(
Node header,
long readerId,
- long time,
+ List<Long> timestamps,
AsyncMethodCallback<ByteBuffer> resultHandler)
throws TException {
if (failedNodes.contains(node)) {
@@ -83,24 +83,24 @@ public class RemoteSeriesReaderByTimestampTest {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream =
new DataOutputStream(byteArrayOutputStream);
- boolean isNull = true;
- while (batchData.hasCurrent()) {
- long currentTime = batchData.currentTime();
- Object value = batchData.currentValue();
- if (currentTime == time) {
- SerializeUtils.serializeObject(value, dataOutputStream);
+ Object[] results = new Object[timestamps.size()];
+ for (int i = 0; i < timestamps.size(); i++) {
+ while (batchData.hasCurrent()) {
+ long currentTime = batchData.currentTime();
+ if (currentTime == timestamps.get(i)) {
+ results[i] = batchData.currentValue();
+ batchData.next();
+ break;
+ } else if (currentTime > timestamps.get(i)) {
+ results[i] = null;
+ break;
+ }
+ // time < timestamp, continue
batchData.next();
- isNull = false;
- break;
- } else if (currentTime > time) {
- break;
}
- // time < timestamp, continue
- batchData.next();
- }
- if (isNull) {
- SerializeUtils.serializeObject(null, dataOutputStream);
}
+ SerializeUtils.serializeObjects(results, dataOutputStream);
+
resultHandler.onComplete(
ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
})
@@ -146,10 +146,16 @@ public class RemoteSeriesReaderByTimestampTest {
RemoteSeriesReaderByTimestamp reader = new RemoteSeriesReaderByTimestamp(sourceInfo);
+ long[] times = new long[100];
for (int i = 0; i < 100; i++) {
- assertEquals(i * 1.0, reader.getValueInTimestamp(i));
+ times[i] = i;
}
- assertNull(reader.getValueInTimestamp(101));
+ Object[] results = reader.getValuesInTimestamps(times, times.length);
+ for (int i = 0; i < 100; i++) {
+ assertEquals(i * 1.0, results[i]);
+ }
+ times[0] = 101;
+ assertEquals(null, reader.getValuesInTimestamps(times, 1)[0]);
} finally {
QueryResourceManager.getInstance().endQuery(context.getQueryId());
}
@@ -180,8 +186,13 @@ public class RemoteSeriesReaderByTimestampTest {
+ (endTime - startTime));
// normal read
assertEquals(TestUtils.getNode(0), sourceInfo.getCurrentNode());
+ long[] times = new long[50];
+ for (int i = 0; i < 50; i++) {
+ times[i] = i;
+ }
+ Object[] results = reader.getValuesInTimestamps(times, 50);
for (int i = 0; i < 50; i++) {
- assertEquals(i * 1.0, reader.getValueInTimestamp(i));
+ assertEquals(i * 1.0, results[i]);
}
endTime = System.currentTimeMillis();
@@ -191,14 +202,22 @@ public class RemoteSeriesReaderByTimestampTest {
+ (endTime - startTime));
failedNodes.add(TestUtils.getNode(0));
for (int i = 50; i < 80; i++) {
- assertEquals(i * 1.0, reader.getValueInTimestamp(i));
+ times[i - 50] = i;
+ }
+ results = reader.getValuesInTimestamps(times, 30);
+ for (int i = 50; i < 80; i++) {
+ assertEquals(i * 1.0, results[i - 50]);
}
assertEquals(TestUtils.getNode(1), sourceInfo.getCurrentNode());
// a bad client, change to another node again
failedNodes.add(TestUtils.getNode(1));
for (int i = 80; i < 90; i++) {
- assertEquals(i * 1.0, reader.getValueInTimestamp(i));
+ times[i - 80] = i;
+ }
+ results = reader.getValuesInTimestamps(times, 10);
+ for (int i = 80; i < 90; i++) {
+ assertEquals(i * 1.0, results[i - 80]);
}
assertEquals(TestUtils.getNode(2), sourceInfo.getCurrentNode());
@@ -211,7 +230,8 @@ public class RemoteSeriesReaderByTimestampTest {
failedNodes.add(TestUtils.getNode(2));
try {
- reader.getValueInTimestamp(90);
+ times[0] = 90;
+ reader.getValuesInTimestamps(times, 1);
fail();
} catch (IOException e) {
assertEquals("no available client.", e.getMessage());
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index 84d069a..1a52082 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -803,11 +803,15 @@ public class DataGroupMemberTest extends MemberTest {
AtomicReference<ByteBuffer> dataResult = new AtomicReference<>();
GenericHandler<ByteBuffer> dataHandler = new GenericHandler<>(TestUtils.getNode(0), dataResult);
+ List<Long> timestamps = new ArrayList<>(5);
for (int i = 5; i < 10; i++) {
- new DataAsyncService(dataGroupMember)
- .fetchSingleSeriesByTimestamp(TestUtils.getNode(0), readerId, i, dataHandler);
- Object value = SerializeUtils.deserializeObject(dataResult.get());
- assertEquals(i * 1.0, (Double) value, 0.00001);
+ timestamps.add((long) i);
+ }
+ new DataAsyncService(dataGroupMember)
+ .fetchSingleSeriesByTimestamps(TestUtils.getNode(0), readerId, timestamps, dataHandler);
+ Object[] values = SerializeUtils.deserializeObjects(dataResult.get());
+ for (int i = 5; i < 10; i++) {
+ assertEquals(i * 1.0, (Double) values[i - 5], 0.00001);
}
new DataAsyncService(dataGroupMember)
@@ -861,11 +865,15 @@ public class DataGroupMemberTest extends MemberTest {
AtomicReference<ByteBuffer> dataResult = new AtomicReference<>();
GenericHandler<ByteBuffer> dataHandler = new GenericHandler<>(TestUtils.getNode(0), dataResult);
+ List<Long> timestamps = new ArrayList<>(4);
for (int i = 5; i < 9; i++) {
- new DataAsyncService(dataGroupMember)
- .fetchSingleSeriesByTimestamp(TestUtils.getNode(0), readerId, i, dataHandler);
- Object value = SerializeUtils.deserializeObject(dataResult.get());
- assertEquals(i * 1.0, (Double) value, 0.00001);
+ timestamps.add((long) i);
+ }
+ new DataAsyncService(dataGroupMember)
+ .fetchSingleSeriesByTimestamps(TestUtils.getNode(0), readerId, timestamps, dataHandler);
+ Object[] values = SerializeUtils.deserializeObjects(dataResult.get());
+ for (int i = 5; i < 9; i++) {
+ assertEquals(i * 1.0, (Double) values[i - 5], 0.00001);
}
new DataAsyncService(dataGroupMember)
@@ -896,11 +904,13 @@ public class DataGroupMemberTest extends MemberTest {
public void testFetchWithoutQuery() {
System.out.println("Start testFetchWithoutQuery()");
AtomicReference<Exception> result = new AtomicReference<>();
+ List<Long> timestamps = new ArrayList<>(1);
+ timestamps.add((long) 0);
new DataAsyncService(dataGroupMember)
- .fetchSingleSeriesByTimestamp(
+ .fetchSingleSeriesByTimestamps(
TestUtils.getNode(0),
0,
- 0,
+ timestamps,
new AsyncMethodCallback<ByteBuffer>() {
@Override
public void onComplete(ByteBuffer buffer) {}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 08914d1..e4e6b25 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -881,6 +881,10 @@ public class MetaGroupMemberTest extends MemberTest {
try {
ClusterReaderFactory readerFactory = new ClusterReaderFactory(testMetaMember);
+ long[] times = new long[10];
+ for (int i = 0; i < 10; i++) {
+ times[i] = i;
+ }
for (int i = 0; i < 10; i++) {
IReaderByTimestamp readerByTimestamp =
readerFactory.getReaderByTimestamp(
@@ -889,8 +893,10 @@ public class MetaGroupMemberTest extends MemberTest {
TSDataType.DOUBLE,
context,
true);
+
+ Object[] values = readerByTimestamp.getValuesInTimestamps(times, 10);
for (int j = 0; j < 10; j++) {
- assertEquals(j * 1.0, (double) readerByTimestamp.getValueInTimestamp(j), 0.00001);
+ assertEquals(j * 1.0, (double) values[j], 0.00001);
}
}
} finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
index 7b94a93..8016be7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
@@ -35,6 +35,7 @@ import java.nio.ByteBuffer;
public abstract class AggregateResult {
+ public static final int TIME_LENGTH_FOR_FIRST_VALUE = 100;
private final AggregationType aggregationType;
protected TSDataType resultDataType;
@@ -94,6 +95,9 @@ public abstract class AggregateResult {
public abstract void updateResultUsingTimestamps(
long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException;
+ /** This method calculates the aggregation using values that have been calculated */
+ public abstract void updateResultUsingValues(long[] timestamps, int length, Object[] values);
+
/**
* Judge if aggregation results have been calculated. In other words, if the aggregated result
* does not need to compute the remaining data, it returns true.
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
index 237fa08..23842cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
@@ -102,10 +102,19 @@ public class AvgAggrResult extends AggregateResult {
@Override
public void updateResultUsingTimestamps(
long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
+ Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
for (int i = 0; i < length; i++) {
- Object value = dataReader.getValueInTimestamp(timestamps[i]);
- if (value != null) {
- updateAvg(seriesDataType, value);
+ if (values[i] != null) {
+ updateAvg(seriesDataType, values[i]);
+ }
+ }
+ }
+
+ @Override
+ public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ for (int i = 0; i < length; i++) {
+ if (values[i] != null) {
+ updateAvg(seriesDataType, values[i]);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
index adf0069..7b40fda 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
@@ -74,9 +74,23 @@ public class CountAggrResult extends AggregateResult {
public void updateResultUsingTimestamps(
long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
int cnt = 0;
+ Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
for (int i = 0; i < length; i++) {
- Object value = dataReader.getValueInTimestamp(timestamps[i]);
- if (value != null) {
+ if (values[i] != null) {
+ cnt++;
+ }
+ }
+
+ long preValue = getLongValue();
+ preValue += cnt;
+ setLongValue(preValue);
+ }
+
+ @Override
+ public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ int cnt = 0;
+ for (int i = 0; i < length; i++) {
+ if (values[i] != null) {
cnt++;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
index 0f51dc3..56ce8c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
@@ -94,11 +94,31 @@ public class FirstValueAggrResult extends AggregateResult {
if (hasFinalResult()) {
return;
}
+ int currentPos = 0;
+ long[] timesForFirstValue = new long[TIME_LENGTH_FOR_FIRST_VALUE];
+ while (currentPos < length) {
+ int timeLength = Math.min(length - currentPos, TIME_LENGTH_FOR_FIRST_VALUE);
+ System.arraycopy(timestamps, currentPos, timesForFirstValue, 0, timeLength);
+ Object[] values = dataReader.getValuesInTimestamps(timesForFirstValue, timeLength);
+ for (int i = 0; i < timeLength; i++) {
+ if (values[i] != null) {
+ setValue(values[i]);
+ timestamp = timesForFirstValue[i];
+ return;
+ }
+ }
+ currentPos += timeLength;
+ }
+ }
+ @Override
+ public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ if (hasFinalResult()) {
+ return;
+ }
for (int i = 0; i < length; i++) {
- Object value = dataReader.getValueInTimestamp(timestamps[i]);
- if (value != null) {
- setValue(value);
+ if (values[i] != null) {
+ setValue(values[i]);
timestamp = timestamps[i];
break;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
index daf5a56..8eae923 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
@@ -53,11 +53,23 @@ public class FirstValueDescAggrResult extends FirstValueAggrResult {
@Override
public void updateResultUsingTimestamps(
long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
- for (int i = 0; i < length; i++) {
- Object value = dataReader.getValueInTimestamp(timestamps[i]);
- if (value != null) {
- setValue(value);
+ Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
+ for (int i = length - 1; i >= 0; i--) {
+ if (values[i] != null) {
+ setValue(values[i]);
timestamp = timestamps[i];
+ return;
+ }
+ }
+ }
+
+ @Override
+ public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ for (int i = length - 1; i >= 0; i--) {
+ if (values[i] != null) {
+ setValue(values[i]);
+ timestamp = timestamps[i];
+ return;
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
index 0726d7a..ad06ace 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
@@ -85,18 +85,24 @@ public class LastValueAggrResult extends AggregateResult {
@Override
public void updateResultUsingTimestamps(
long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
- long time = Long.MIN_VALUE;
- Object lastVal = null;
- for (int i = 0; i < length; i++) {
- Object value = dataReader.getValueInTimestamp(timestamps[i]);
- if (value != null) {
- time = timestamps[i];
- lastVal = value;
+ Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
+ for (int i = length - 1; i >= 0; i--) {
+ if (values[i] != null) {
+ timestamp = timestamps[i];
+ setValue(values[i]);
+ return;
}
}
- if (time != Long.MIN_VALUE) {
- setValue(lastVal);
- timestamp = time;
+ }
+
+ @Override
+ public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ for (int i = length - 1; i >= 0; i--) {
+ if (values[i] != null) {
+ timestamp = timestamps[i];
+ setValue(values[i]);
+ return;
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
index 15af763..587c7a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
@@ -68,19 +68,34 @@ public class LastValueDescAggrResult extends LastValueAggrResult {
if (hasFinalResult()) {
return;
}
- long time = Long.MIN_VALUE;
- Object lastVal = null;
- for (int i = 0; i < length; i++) {
- Object value = dataReader.getValueInTimestamp(timestamps[i]);
- if (value != null) {
- time = timestamps[i];
- lastVal = value;
- break;
+ int currentPos = 0;
+ long[] timesForFirstValue = new long[TIME_LENGTH_FOR_FIRST_VALUE];
+ while (currentPos < length) {
+ int timeLength = Math.min(length - currentPos, TIME_LENGTH_FOR_FIRST_VALUE);
+ System.arraycopy(timestamps, currentPos, timesForFirstValue, 0, timeLength);
+ Object[] values = dataReader.getValuesInTimestamps(timesForFirstValue, timeLength);
+ for (int i = 0; i < timeLength; i++) {
+ if (values[i] != null) {
+ setValue(values[i]);
+ timestamp = timesForFirstValue[i];
+ return;
+ }
}
+ currentPos += timeLength;
}
- if (time != Long.MIN_VALUE) {
- setValue(lastVal);
- timestamp = time;
+ }
+
+ @Override
+ public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ if (hasFinalResult()) {
+ return;
+ }
+ for (int i = 0; i < length; i++) {
+ if (values[i] != null) {
+ timestamp = timestamps[i];
+ setValue(values[i]);
+ return;
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
index 548b249..fad90de 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
@@ -66,16 +66,22 @@ public class MaxTimeAggrResult extends AggregateResult {
@Override
public void updateResultUsingTimestamps(
long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
- long time = -1;
- for (int i = 0; i < length; i++) {
- Object value = dataReader.getValueInTimestamp(timestamps[i]);
- if (value != null) {
- time = timestamps[i];
+ Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
+ for (int i = length - 1; i >= 0; i--) {
+ if (values[i] != null) {
+ updateMaxTimeResult(timestamps[i]);
+ return;
}
}
+ }
- if (time != -1) {
- updateMaxTimeResult(time);
+ @Override
+ public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ for (int i = length - 1; i >= 0; i--) {
+ if (values[i] != null) {
+ updateMaxTimeResult(timestamps[i]);
+ return;
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
index e29a211..a69dcc9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
@@ -52,17 +52,32 @@ public class MaxTimeDescAggrResult extends MaxTimeAggrResult {
if (hasFinalResult()) {
return;
}
- long time = -1;
- for (int i = 0; i < length; i++) {
- Object value = dataReader.getValueInTimestamp(timestamps[i]);
- if (value != null) {
- time = timestamps[i];
- break;
+ int currentPos = 0;
+ long[] timesForFirstValue = new long[TIME_LENGTH_FOR_FIRST_VALUE];
+ while (currentPos < length) {
+ int timeLength = Math.min(length - currentPos, TIME_LENGTH_FOR_FIRST_VALUE);
+ System.arraycopy(timestamps, currentPos, timesForFirstValue, 0, timeLength);
+ Object[] values = dataReader.getValuesInTimestamps(timesForFirstValue, timeLength);
+ for (int i = 0; i < timeLength; i++) {
+ if (values[i] != null) {
+ updateMaxTimeResult(timesForFirstValue[i]);
+ return;
+ }
}
+ currentPos += timeLength;
}
+ }
- if (time != -1) {
- updateMaxTimeResult(time);
+ @Override
+ public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ if (hasFinalResult()) {
+ return;
+ }
+ for (int i = 0; i < length; i++) {
+ if (values[i] != null) {
+ updateMaxTimeResult(timestamps[i]);
+ return;
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
index cd00df2..34d9622 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
@@ -72,13 +72,21 @@ public class MaxValueAggrResult extends AggregateResult {
public void updateResultUsingTimestamps(
long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
Comparable<Object> maxVal = null;
+ Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
for (int i = 0; i < length; i++) {
- Object value = dataReader.getValueInTimestamp(timestamps[i]);
- if (value == null) {
- continue;
+ if (values[i] != null && (maxVal == null || maxVal.compareTo(values[i]) < 0)) {
+ maxVal = (Comparable<Object>) values[i];
}
- if (maxVal == null || maxVal.compareTo(value) < 0) {
- maxVal = (Comparable<Object>) value;
+ }
+ updateResult(maxVal);
+ }
+
+ @Override
+ public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ Comparable<Object> maxVal = null;
+ for (int i = 0; i < length; i++) {
+ if (values[i] != null && (maxVal == null || maxVal.compareTo(values[i]) < 0)) {
+ maxVal = (Comparable<Object>) values[i];
}
}
updateResult(maxVal);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
index a0fbabd..37883d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
@@ -74,9 +74,29 @@ public class MinTimeAggrResult extends AggregateResult {
if (hasFinalResult()) {
return;
}
+ int currentPos = 0;
+ long[] timesForFirstValue = new long[TIME_LENGTH_FOR_FIRST_VALUE];
+ while (currentPos < length) {
+ int timeLength = Math.min(length - currentPos, TIME_LENGTH_FOR_FIRST_VALUE);
+ System.arraycopy(timestamps, currentPos, timesForFirstValue, 0, timeLength);
+ Object[] values = dataReader.getValuesInTimestamps(timesForFirstValue, timeLength);
+ for (int i = 0; i < timeLength; i++) {
+ if (values[i] != null) {
+ setLongValue(timesForFirstValue[i]);
+ return;
+ }
+ }
+ currentPos += timeLength;
+ }
+ }
+
+ @Override
+ public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ if (hasFinalResult()) {
+ return;
+ }
for (int i = 0; i < length; i++) {
- Object value = dataReader.getValueInTimestamp(timestamps[i]);
- if (value != null) {
+ if (values[i] != null) {
setLongValue(timestamps[i]);
return;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
index 1bafc56..6b45f14 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
@@ -43,10 +43,21 @@ public class MinTimeDescAggrResult extends MinTimeAggrResult {
@Override
public void updateResultUsingTimestamps(
long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
- for (int i = 0; i < length; i++) {
- Object value = dataReader.getValueInTimestamp(timestamps[i]);
- if (value != null) {
+ Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
+ for (int i = length - 1; i >= 0; i--) {
+ if (values[i] != null) {
setLongValue(timestamps[i]);
+ return;
+ }
+ }
+ }
+
+ @Override
+ public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ for (int i = length - 1; i >= 0; i--) {
+ if (values[i] != null) {
+ setLongValue(timestamps[i]);
+ return;
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
index 8b17d75..eefbbb1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
@@ -67,13 +67,21 @@ public class MinValueAggrResult extends AggregateResult {
public void updateResultUsingTimestamps(
long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
Comparable<Object> minVal = null;
+ Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
for (int i = 0; i < length; i++) {
- Object value = dataReader.getValueInTimestamp(timestamps[i]);
- if (value == null) {
- continue;
+ if (values[i] != null && (minVal == null || minVal.compareTo(values[i]) > 0)) {
+ minVal = (Comparable<Object>) values[i];
}
- if (minVal == null || minVal.compareTo(value) > 0) {
- minVal = (Comparable<Object>) value;
+ }
+ updateResult(minVal);
+ }
+
+ @Override
+ public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ Comparable<Object> minVal = null;
+ for (int i = 0; i < length; i++) {
+ if (values[i] != null && (minVal == null || minVal.compareTo(values[i]) > 0)) {
+ minVal = (Comparable<Object>) values[i];
}
}
updateResult(minVal);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
index 475e953..8a11502 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
@@ -80,10 +80,19 @@ public class SumAggrResult extends AggregateResult {
@Override
public void updateResultUsingTimestamps(
long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
+ Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
for (int i = 0; i < length; i++) {
- Object value = dataReader.getValueInTimestamp(timestamps[i]);
- if (value != null) {
- updateSum(value);
+ if (values[i] != null) {
+ updateSum(values[i]);
+ }
+ }
+ }
+
+ @Override
+ public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ for (int i = 0; i < length; i++) {
+ if (values[i] != null) {
+ updateSum(values[i]);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
index bf48f10..0a02f94 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
@@ -35,9 +35,10 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements UDFI
private final List<IReaderByTimestamp> seriesReaderByTimestampList;
private final List<Boolean> cached;
- private boolean hasCachedRow;
- private RowRecord cachedRowRecord;
- private Object[] cachedRowInObjects;
+ private List<RowRecord> cachedRowRecords = new ArrayList<>();
+
+ /** Used for UDF. */
+ private List<Object[]> cachedRowInObjects = new ArrayList<>();
/**
* constructor of EngineDataSetWithValueFilter.
@@ -64,61 +65,85 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements UDFI
@Override
public boolean hasNextWithoutConstraint() throws IOException {
- if (hasCachedRow) {
+ if (!cachedRowRecords.isEmpty()) {
return true;
}
- return cacheRowRecord();
+ return cacheRowRecords();
}
+ /** @return the first record of cached rows or null if there is no more data */
@Override
public RowRecord nextWithoutConstraint() throws IOException {
- if (!hasCachedRow && !cacheRowRecord()) {
+ if (cachedRowRecords.isEmpty() && !cacheRowRecords()) {
return null;
}
- hasCachedRow = false;
- return cachedRowRecord;
+ return cachedRowRecords.remove(cachedRowRecords.size() - 1);
}
/**
- * Cache row record
+ * Cache row records
*
* @return if there has next row record.
*/
- private boolean cacheRowRecord() throws IOException {
- while (timeGenerator.hasNext()) {
- boolean hasField = false;
- long timestamp = timeGenerator.next();
- RowRecord rowRecord = new RowRecord(timestamp);
-
- for (int i = 0; i < seriesReaderByTimestampList.size(); i++) {
- Object value;
- // get value from readers in time generator
- if (cached.get(i)) {
- value = timeGenerator.getValue(paths.get(i), timestamp);
- } else {
- // get value from series reader without filter
- IReaderByTimestamp reader = seriesReaderByTimestampList.get(i);
- value = reader.getValueInTimestamp(timestamp);
- }
- if (value == null) {
- rowRecord.addField(null);
+ private boolean cacheRowRecords() throws IOException {
+ int cachedTimeCnt = 0;
+ long[] cachedTimeArray = new long[fetchSize];
+ // TODO: LIMIT constraint
+ // 1. fill time array from time Generator
+ while (timeGenerator.hasNext() && cachedTimeCnt < fetchSize) {
+ cachedTimeArray[cachedTimeCnt++] = timeGenerator.next();
+ }
+ if (cachedTimeCnt == 0) {
+ return false;
+ }
+ RowRecord[] rowRecords = new RowRecord[cachedTimeCnt];
+ for (int i = 0; i < cachedTimeCnt; i++) {
+ rowRecords[i] = new RowRecord(cachedTimeArray[i]);
+ }
+
+ boolean[] hasField = new boolean[cachedTimeCnt];
+ // 2. fetch results of each time series using time array
+ for (int i = 0; i < seriesReaderByTimestampList.size(); i++) {
+ Object[] results;
+ // get value from readers in time generator
+ if (cached.get(i)) {
+ results = timeGenerator.getValues(paths.get(i));
+ } else {
+ results =
+ seriesReaderByTimestampList
+ .get(i)
+ .getValuesInTimestamps(cachedTimeArray, cachedTimeCnt);
+ }
+
+ // 3. use values in results to fill row record
+ for (int j = 0; j < cachedTimeCnt; j++) {
+ if (results[j] == null) {
+ rowRecords[j].addField(null);
} else {
- hasField = true;
- rowRecord.addField(value, dataTypes.get(i));
+ hasField[j] = true;
+ rowRecords[j].addField(results[j], dataTypes.get(i));
}
}
- if (hasField) {
- hasCachedRow = true;
- cachedRowRecord = rowRecord;
- break;
+ }
+ // 4. remove rowRecord if all values in one timestamp are null
+ // traverse in reversed order to get element efficiently
+ for (int i = cachedTimeCnt - 1; i >= 0; i--) {
+ if (hasField[i]) {
+ cachedRowRecords.add(rowRecords[i]);
}
}
- return hasCachedRow;
+
+ // 5. check whether there is next row record
+ if (cachedRowRecords.isEmpty() && timeGenerator.hasNext()) {
+ // Note: This may leads to a deep stack if much rowRecords are empty
+ return cacheRowRecords();
+ }
+ return !cachedRowRecords.isEmpty();
}
@Override
public boolean hasNextRowInObjects() throws IOException {
- if (hasCachedRow) {
+ if (!cachedRowInObjects.isEmpty()) {
return true;
}
return cacheRowInObjects();
@@ -126,40 +151,67 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements UDFI
@Override
public Object[] nextRowInObjects() throws IOException {
- if (!hasCachedRow && !cacheRowInObjects()) {
+ if (cachedRowInObjects.isEmpty() && !cacheRowInObjects()) {
// values + timestamp
return new Object[seriesReaderByTimestampList.size() + 1];
}
- hasCachedRow = false;
- return cachedRowInObjects;
+
+ return cachedRowInObjects.remove(cachedRowInObjects.size() - 1);
}
private boolean cacheRowInObjects() throws IOException {
- int seriesNumber = seriesReaderByTimestampList.size();
- while (timeGenerator.hasNext()) {
- boolean hasField = false;
-
- Object[] rowInObjects = new Object[seriesNumber + 1];
- long timestamp = timeGenerator.next();
- rowInObjects[seriesNumber] = timestamp;
-
- for (int i = 0; i < seriesNumber; i++) {
- Object value =
- cached.get(i)
- ? timeGenerator.getValue(paths.get(i), timestamp)
- : seriesReaderByTimestampList.get(i).getValueInTimestamp(timestamp);
- if (value != null) {
- hasField = true;
- rowInObjects[i] = value;
- }
+ int cachedTimeCnt = 0;
+ long[] cachedTimeArray = new long[fetchSize];
+
+ // TODO: LIMIT constraint
+ // 1. fill time array from time Generator
+ while (timeGenerator.hasNext() && cachedTimeCnt < fetchSize) {
+ cachedTimeArray[cachedTimeCnt++] = timeGenerator.next();
+ }
+ if (cachedTimeCnt == 0) {
+ return false;
+ }
+
+ Object[][] rowsInObject = new Object[cachedTimeCnt][seriesReaderByTimestampList.size() + 1];
+ for (int i = 0; i < cachedTimeCnt; i++) {
+ rowsInObject[i][seriesReaderByTimestampList.size()] = cachedTimeArray[i];
+ }
+
+ boolean[] hasField = new boolean[cachedTimeCnt];
+ // 2. fetch results of each time series using time array
+ for (int i = 0; i < seriesReaderByTimestampList.size(); i++) {
+ Object[] results;
+ // get value from readers in time generator
+ if (cached.get(i)) {
+ results = timeGenerator.getValues(paths.get(i));
+ } else {
+ results =
+ seriesReaderByTimestampList
+ .get(i)
+ .getValuesInTimestamps(cachedTimeArray, cachedTimeCnt);
}
- if (hasField) {
- hasCachedRow = true;
- cachedRowInObjects = rowInObjects;
- break;
+ // 3. use values in results to fill row record
+ for (int j = 0; j < cachedTimeCnt; j++) {
+ if (results[j] != null) {
+ hasField[j] = true;
+ rowsInObject[j][i] = results[j];
+ }
+ }
+ }
+ // 4. remove rowRecord if all values in one timestamp are null
+ // traverse in reversed order to get element efficiently
+ for (int i = cachedTimeCnt - 1; i >= 0; i--) {
+ if (hasField[i]) {
+ cachedRowInObjects.add(rowsInObject[i]);
}
}
- return hasCachedRow;
+
+ // 5. check whether there is next row record
+ if (cachedRowInObjects.isEmpty() && timeGenerator.hasNext()) {
+ // Note: This may leads to a deep stack if much rowRecords are empty
+ return cacheRowInObjects();
+ }
+ return !cachedRowInObjects.isEmpty();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 4c54ac9..2416f4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.context.QueryContext;
@@ -58,8 +59,11 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
+import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths;
+
@SuppressWarnings("java:S1135") // ignore todos
public class AggregationExecutor {
@@ -336,27 +340,23 @@ public class AggregationExecutor {
*/
public QueryDataSet executeWithValueFilter(QueryContext context, AggregationPlan queryPlan)
throws StorageEngineException, IOException, QueryProcessException {
- int index = 0;
- for (; index < aggregations.size(); index++) {
- String aggregationFunc = aggregations.get(index);
- if (!aggregationFunc.equals(IoTDBConstant.MAX_TIME)
- && !aggregationFunc.equals(IoTDBConstant.LAST_VALUE)) {
- break;
- }
- }
- if (index >= aggregations.size()) {
- queryPlan.setAscending(false);
- this.ascending = false;
- }
+ optimizeLastElementFunc(queryPlan);
+
TimeGenerator timestampGenerator = getTimeGenerator(context, queryPlan);
- List<IReaderByTimestamp> readersOfSelectedSeries = new ArrayList<>();
+ // group by path name
+ Map<PartialPath, List<Integer>> pathToAggrIndexesMap =
+ groupAggregationsBySeries(selectedSeries);
+ Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap = new HashMap<>();
List<StorageGroupProcessor> list = StorageEngine.getInstance().mergeLock(selectedSeries);
try {
for (int i = 0; i < selectedSeries.size(); i++) {
PartialPath path = selectedSeries.get(i);
- IReaderByTimestamp seriesReaderByTimestamp =
- getReaderByTime(path, queryPlan, dataTypes.get(i), context);
- readersOfSelectedSeries.add(seriesReaderByTimestamp);
+ List<Integer> indexes = pathToAggrIndexesMap.remove(path);
+ if (indexes != null) {
+ IReaderByTimestamp seriesReaderByTimestamp =
+ getReaderByTime(path, queryPlan, dataTypes.get(i), context);
+ readerToAggrIndexesMap.put(seriesReaderByTimestamp, indexes);
+ }
}
} finally {
StorageEngine.getInstance().mergeUnLock(list);
@@ -364,15 +364,30 @@ public class AggregationExecutor {
List<AggregateResult> aggregateResults = new ArrayList<>();
for (int i = 0; i < selectedSeries.size(); i++) {
- TSDataType type = dataTypes.get(i);
AggregateResult result =
- AggregateResultFactory.getAggrResultByName(aggregations.get(i), type, ascending);
+ AggregateResultFactory.getAggrResultByName(
+ aggregations.get(i), dataTypes.get(i), ascending);
aggregateResults.add(result);
}
- aggregateWithValueFilter(aggregateResults, timestampGenerator, readersOfSelectedSeries);
+ aggregateWithValueFilter(aggregateResults, timestampGenerator, readerToAggrIndexesMap);
return constructDataSet(aggregateResults, queryPlan);
}
+ private void optimizeLastElementFunc(QueryPlan queryPlan) {
+ int index = 0;
+ for (; index < aggregations.size(); index++) {
+ String aggregationFunc = aggregations.get(index);
+ if (!aggregationFunc.equals(IoTDBConstant.MAX_TIME)
+ && !aggregationFunc.equals(IoTDBConstant.LAST_VALUE)) {
+ break;
+ }
+ }
+ if (index >= aggregations.size()) {
+ queryPlan.setAscending(false);
+ this.ascending = false;
+ }
+ }
+
protected TimeGenerator getTimeGenerator(QueryContext context, RawDataQueryPlan queryPlan)
throws StorageEngineException {
return new ServerTimeGenerator(expression, context, queryPlan);
@@ -395,8 +410,11 @@ public class AggregationExecutor {
private void aggregateWithValueFilter(
List<AggregateResult> aggregateResults,
TimeGenerator timestampGenerator,
- List<IReaderByTimestamp> readersOfSelectedSeries)
+ Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap)
throws IOException {
+ List<Boolean> cached =
+ markFilterdPaths(
+ expression, new ArrayList<>(selectedSeries), timestampGenerator.hasOrNode());
while (timestampGenerator.hasNext()) {
@@ -411,11 +429,26 @@ public class AggregationExecutor {
}
// cal part of aggregate result
- for (int i = 0; i < readersOfSelectedSeries.size(); i++) {
- aggregateResults
- .get(i)
- .updateResultUsingTimestamps(
- timeArray, timeArrayLength, readersOfSelectedSeries.get(i));
+ for (Entry<IReaderByTimestamp, List<Integer>> entry : readerToAggrIndexesMap.entrySet()) {
+ int pathId = entry.getValue().get(0);
+ // cache in timeGenerator
+ if (cached.get(pathId)) {
+ Object[] values = timestampGenerator.getValues(selectedSeries.get(pathId));
+ for (Integer i : entry.getValue()) {
+ aggregateResults.get(i).updateResultUsingValues(timeArray, timeArrayLength, values);
+ }
+ } else {
+ if (entry.getValue().size() == 1) {
+ aggregateResults
+ .get(entry.getValue().get(0))
+ .updateResultUsingTimestamps(timeArray, timeArrayLength, entry.getKey());
+ } else {
+ Object[] values = entry.getKey().getValuesInTimestamps(timeArray, timeArrayLength);
+ for (Integer i : entry.getValue()) {
+ aggregateResults.get(i).updateResultUsingValues(timeArray, timeArrayLength, values);
+ }
+ }
+ }
}
}
}
@@ -474,9 +507,7 @@ public class AggregationExecutor {
Map<PartialPath, List<Integer>> pathToAggrIndexesMap = new HashMap<>();
for (int i = 0; i < selectedSeries.size(); i++) {
PartialPath series = selectedSeries.get(i);
- List<Integer> indexList =
- pathToAggrIndexesMap.computeIfAbsent(series, key -> new ArrayList<>());
- indexList.add(i);
+ pathToAggrIndexesMap.computeIfAbsent(series, key -> new ArrayList<>()).add(i);
}
return pathToAggrIndexesMap;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/adapter/ByTimestampReaderAdapter.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/adapter/ByTimestampReaderAdapter.java
index ba482c4..ffb8f62 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/externalsort/adapter/ByTimestampReaderAdapter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/adapter/ByTimestampReaderAdapter.java
@@ -32,34 +32,42 @@ public class ByTimestampReaderAdapter implements IReaderByTimestamp {
// only cache the first point that >= timestamp
private boolean hasCached;
private TimeValuePair pair;
+ private long currentTime = Long.MIN_VALUE;
public ByTimestampReaderAdapter(IPointReader pointReader) {
this.pointReader = pointReader;
}
@Override
- public Object getValueInTimestamp(long timestamp) throws IOException {
- if (hasCached) {
- if (pair.getTimestamp() < timestamp) {
- hasCached = false;
- } else if (pair.getTimestamp() == timestamp) {
- hasCached = false;
- return pair.getValue().getValue();
- } else {
- return null;
- }
- }
+ public Object[] getValuesInTimestamps(long[] timestamps, int length) throws IOException {
+ Object[] result = new Object[length];
- while (pointReader.hasNextTimeValuePair()) {
- pair = pointReader.nextTimeValuePair();
- if (pair.getTimestamp() == timestamp) {
- return pair.getValue().getValue();
- } else if (pair.getTimestamp() > timestamp) {
- hasCached = true;
- return null;
+ for (int i = 0; i < length; i++) {
+ if (timestamps[i] < currentTime) {
+ throw new IOException("time must be increasing when use ReaderByTimestamp");
+ }
+ currentTime = timestamps[i];
+ // search cache
+ if (hasCached && pair.getTimestamp() >= currentTime) {
+ if (pair.getTimestamp() == currentTime) {
+ hasCached = false;
+ result[i] = pair.getValue().getValue();
+ }
+ continue;
+ }
+ // search reader
+ while (pointReader.hasNextTimeValuePair()) {
+ pair = pointReader.nextTimeValuePair();
+ if (pair.getTimestamp() == currentTime) {
+ result[i] = pair.getValue().getValue();
+ break;
+ } else if (pair.getTimestamp() > currentTime) {
+ hasCached = true;
+ result[i] = null;
+ break;
+ }
}
}
-
- return null;
+ return result;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkReaderByTimestamp.java
index bc87cf7..0d24c06 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkReaderByTimestamp.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkReaderByTimestamp.java
@@ -34,46 +34,49 @@ public class DiskChunkReaderByTimestamp implements IReaderByTimestamp {
private ChunkReaderByTimestamp chunkReaderByTimestamp;
private BatchData data;
+ private long currentTime = Long.MIN_VALUE;
public DiskChunkReaderByTimestamp(ChunkReaderByTimestamp chunkReaderByTimestamp) {
this.chunkReaderByTimestamp = chunkReaderByTimestamp;
}
@Override
- public Object getValueInTimestamp(long timestamp) throws IOException {
+ public Object[] getValuesInTimestamps(long[] timestamps, int length) throws IOException {
+ Object[] result = new Object[length];
- if (!hasNext()) {
- return null;
- }
-
- while (data != null) {
- Object value = data.getValueInTimestamp(timestamp);
- if (value != null) {
- return value;
+ for (int i = 0; i < length; i++) {
+ if (timestamps[i] < currentTime) {
+ throw new IOException("time must be increasing when use ReaderByTimestamp");
}
- if (data.hasCurrent()) {
- return null;
- } else {
- chunkReaderByTimestamp.setCurrentTimestamp(timestamp);
- if (chunkReaderByTimestamp.hasNextSatisfiedPage()) {
- data = chunkReaderByTimestamp.nextPageData();
- } else {
- return null;
+ currentTime = timestamps[i];
+ while (hasNext()) {
+ data = next();
+ if (data.getMinTimestamp() > currentTime) {
+ result[i] = null;
+ break;
+ }
+ result[i] = data.getValueInTimestamp(currentTime);
+ // fill cache
+ if (!data.hasCurrent() && chunkReaderByTimestamp.hasNextSatisfiedPage()) {
+ data = next();
}
}
}
+ return result;
+ }
- return null;
+ private boolean hasCacheData() {
+ return data != null && data.hasCurrent();
}
- private boolean hasNext() throws IOException {
- if (data != null && data.hasCurrent()) {
- return true;
- }
- if (chunkReaderByTimestamp != null && chunkReaderByTimestamp.hasNextSatisfiedPage()) {
- data = chunkReaderByTimestamp.nextPageData();
- return true;
+ private boolean hasNext() {
+ return hasCacheData() || chunkReaderByTimestamp.hasNextSatisfiedPage();
+ }
+
+ private BatchData next() throws IOException {
+ if (hasCacheData()) {
+ return data;
}
- return false;
+ return chunkReaderByTimestamp.nextPageData();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java
index 57d4813..0db7bab 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java
@@ -34,7 +34,7 @@ public interface IReaderByTimestamp {
* guarantee of correctness with any other way of calling. For example, DO NOT call this method
* twice with the same timestamp.
*/
- Object getValueInTimestamp(long timestamp) throws IOException;
+ Object[] getValuesInTimestamps(long[] timestamps, int length) throws IOException;
/**
* Returns whether there is no more data in reader.
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
index f09b980..69d3248 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
@@ -67,13 +67,22 @@ public class SeriesReaderByTimestamp implements IReaderByTimestamp {
}
@Override
- public Object getValueInTimestamp(long timestamp) throws IOException {
- seriesReader.setTimeFilter(timestamp);
- if ((batchData == null || !hasAvailableData(batchData, timestamp)) && !hasNext(timestamp)) {
+ public Object[] getValuesInTimestamps(long[] timestamps, int length) throws IOException {
+ if (length <= 0) {
return null;
}
+ Object[] results = new Object[length];
+ seriesReader.setTimeFilter(timestamps[0]);
+ for (int i = 0; i < length; i++) {
+ if ((batchData == null || !hasAvailableData(batchData, timestamps[i]))
+ && !hasNext(timestamps[i])) {
+ // there is no more data
+ break;
+ }
+ results[i] = batchData.getValueInTimestamp(timestamps[i]);
+ }
- return batchData.getValueInTimestamp(timestamp);
+ return results;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 4e99e90..6aac0c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -645,7 +645,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
resp = getQueryColumnHeaders(plan, username);
}
// create and cache dataset
- QueryDataSet newDataSet = createQueryDataSet(queryId, plan);
+ QueryDataSet newDataSet = createQueryDataSet(queryId, plan, fetchSize);
if (plan instanceof ShowPlan || plan instanceof AuthorPlan) {
resp = getListDataSetHeaders(newDataSet);
} else if (plan instanceof UDFPlan) {
@@ -1022,12 +1022,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
/** create QueryDataSet and buffer it for fetchResults */
- private QueryDataSet createQueryDataSet(long queryId, PhysicalPlan physicalPlan)
+ private QueryDataSet createQueryDataSet(long queryId, PhysicalPlan physicalPlan, int fetchSize)
throws QueryProcessException, QueryFilterOptimizationException, StorageEngineException,
IOException, MetadataException, SQLException, TException, InterruptedException {
QueryContext context = genQueryContext(queryId);
QueryDataSet queryDataSet = executor.processQuery(physicalPlan, context);
+ queryDataSet.setFetchSize(fetchSize);
queryId2DataSet.put(queryId, queryDataSet);
return queryDataSet;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
index 4e71afa..d78d6d4 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
@@ -80,14 +80,18 @@ public class SeriesReaderByTimestampTest {
null,
true);
+ long timestamps[] = new long[500];
+ for (int i = 0; i < 500; i++) {
+ timestamps[i] = i;
+ }
+ Object[] values = seriesReader.getValuesInTimestamps(timestamps, timestamps.length);
for (int time = 0; time < 500; time++) {
- Integer value = (Integer) seriesReader.getValueInTimestamp(time);
if (time < 200) {
- Assert.assertEquals(time + 20000, value.intValue());
+ Assert.assertEquals(time + 20000, values[time]);
} else if (time < 260 || (time >= 300 && time < 380) || (time >= 400)) {
- Assert.assertEquals(time + 10000, value.intValue());
+ Assert.assertEquals(time + 10000, values[time]);
} else {
- Assert.assertEquals(time, value.intValue());
+ Assert.assertEquals(time, values[time]);
}
}
}
diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift
index 7c0e506..a8d3baf 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -344,12 +344,12 @@ service TSDataService extends RaftService {
long querySingleSeriesByTimestamp(1:SingleSeriesQueryRequest request)
/**
- * Fetch one value at given timestamp using the resultSetId generated by
+ * Fetch values at given timestamps using the resultSetId generated by
* querySingleSeriesByTimestamp.
* @return a ByteBuffer containing the serialized value or an empty buffer if there
* are not more results.
**/
- binary fetchSingleSeriesByTimestamp(1:Node header, 2:long readerId, 3:long timestamp)
+ binary fetchSingleSeriesByTimestamps(1:Node header, 2:long readerId, 3:list<long> timestamps)
/**
* Find the local query established for the remote query and release all its resource.
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithTimeGenerator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithTimeGenerator.java
index 8985bcc..7d911c4 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithTimeGenerator.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithTimeGenerator.java
@@ -72,7 +72,7 @@ public class DataSetWithTimeGenerator extends QueryDataSet {
// get value from readers in time generator
if (cached.get(i)) {
- Object value = timeGenerator.getValue(paths.get(i), timestamp);
+ Object value = timeGenerator.getValue(paths.get(i));
rowRecord.addField(value, dataTypes.get(i));
continue;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
index eb7a206..c68a0e0 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
@@ -33,6 +33,7 @@ public abstract class QueryDataSet {
protected int rowLimit = 0; // rowLimit > 0 means the LIMIT constraint exists
protected int rowOffset = 0;
protected int alreadyReturnedRowNum = 0;
+ protected int fetchSize = 10000;
protected boolean ascending;
public QueryDataSet() {}
@@ -81,6 +82,10 @@ public abstract class QueryDataSet {
return nextWithoutConstraint();
}
+ public void setFetchSize(int fetchSize) {
+ this.fetchSize = fetchSize;
+ }
+
public abstract RowRecord nextWithoutConstraint() throws IOException;
public List<Path> getPaths() {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java
index 78c0686..0da8b94 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java
@@ -31,7 +31,9 @@ import org.apache.iotdb.tsfile.read.query.timegenerator.node.OrNode;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
/**
* All SingleSeriesExpression involved in a IExpression will be transferred to a TimeGenerator tree
@@ -41,7 +43,8 @@ import java.util.*;
*/
public abstract class TimeGenerator {
- private HashMap<Path, List<LeafNode>> leafCache = new HashMap<>();
+ private HashMap<Path, List<LeafNode>> leafNodeCache = new HashMap<>();
+ private HashMap<Path, List<Object>> leafValuesCache;
protected Node operatorNode;
private boolean hasOrNode;
@@ -50,18 +53,43 @@ public abstract class TimeGenerator {
}
public long next() throws IOException {
+ if (!hasOrNode) {
+ if (leafValuesCache == null) {
+ leafValuesCache = new HashMap<>();
+ }
+ leafNodeCache.forEach(
+ (path, nodes) ->
+ leafValuesCache
+ .computeIfAbsent(path, k -> new ArrayList<>())
+ .add(nodes.get(0).currentValue()));
+ }
return operatorNode.next();
}
- public Object getValue(Path path, long time) {
- for (LeafNode leafNode : leafCache.get(path)) {
- if (!leafNode.currentTimeIs(time)) {
- continue;
- }
- return leafNode.currentValue();
+ /** ATTENTION: this method should only be used when there is no `OR` node */
+ public Object[] getValues(Path path) throws IOException {
+ if (hasOrNode) {
+ throw new IOException(
+ "getValues() method should not be invoked when there is OR operator in where clause");
}
+ if (leafValuesCache.get(path) == null) {
+ throw new IOException(
+ "getValues() method should not be invoked by non-existent path in where clause");
+ }
+ return leafValuesCache.remove(path).toArray();
+ }
- return null;
+ /** ATTENTION: this method should only be used when there is no `OR` node */
+ public Object getValue(Path path) throws IOException {
+ if (hasOrNode) {
+ throw new IOException(
+ "getValue() method should not be invoked when there is OR operator in where clause");
+ }
+ if (leafValuesCache.get(path) == null) {
+ throw new IOException(
+ "getValue() method should not be invoked by non-existent path in where clause");
+ }
+ return leafValuesCache.get(path).remove(0);
}
public void constructNode(IExpression expression) throws IOException {
@@ -76,13 +104,9 @@ public abstract class TimeGenerator {
IBatchReader seriesReader = generateNewBatchReader(singleSeriesExp);
Path path = singleSeriesExp.getSeriesPath();
- if (!leafCache.containsKey(path)) {
- leafCache.put(path, new ArrayList<>());
- }
-
// put the current reader to valueCache
LeafNode leafNode = new LeafNode(seriesReader);
- leafCache.get(path).add(leafNode);
+ leafNodeCache.computeIfAbsent(path, p -> new ArrayList<>()).add(leafNode);
return leafNode;
} else {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/AndNode.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/AndNode.java
index b90aafb..eff83b0 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/AndNode.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/AndNode.java
@@ -26,8 +26,8 @@ public class AndNode implements Node {
private Node leftChild;
private Node rightChild;
- private long cachedValue;
- private boolean hasCachedValue;
+ private long cachedTime;
+ private boolean hasCachedTime;
private boolean ascending = true;
/**
@@ -39,20 +39,20 @@ public class AndNode implements Node {
public AndNode(Node leftChild, Node rightChild) {
this.leftChild = leftChild;
this.rightChild = rightChild;
- this.hasCachedValue = false;
+ this.hasCachedTime = false;
}
public AndNode(Node leftChild, Node rightChild, boolean ascending) {
this.leftChild = leftChild;
this.rightChild = rightChild;
- this.hasCachedValue = false;
+ this.hasCachedTime = false;
this.ascending = ascending;
}
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
@Override
public boolean hasNext() throws IOException {
- if (hasCachedValue) {
+ if (hasCachedTime) {
return true;
}
if (leftChild.hasNext() && rightChild.hasNext()) {
@@ -69,8 +69,8 @@ public class AndNode implements Node {
long rightValue = rightChild.next();
while (true) {
if (leftValue == rightValue) {
- this.hasCachedValue = true;
- this.cachedValue = leftValue;
+ this.hasCachedTime = true;
+ this.cachedTime = leftValue;
return true;
}
if (seekRight.test(leftValue, rightValue)) {
@@ -92,8 +92,8 @@ public class AndNode implements Node {
@Override
public long next() throws IOException {
if (hasNext()) {
- hasCachedValue = false;
- return cachedValue;
+ hasCachedTime = false;
+ return cachedTime;
}
throw new IOException("no more data");
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/OrNode.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/OrNode.java
index 6205030..a133cbb 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/OrNode.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/OrNode.java
@@ -25,55 +25,55 @@ public class OrNode implements Node {
private Node leftChild;
private Node rightChild;
- private boolean hasCachedLeftValue;
- private long cachedLeftValue;
- private boolean hasCachedRightValue;
- private long cachedRightValue;
+ private boolean hasCachedLeftTime;
+ private long cachedLeftTime;
+ private boolean hasCachedRightTime;
+ private long cachedRightTime;
private boolean ascending = true;
public OrNode(Node leftChild, Node rightChild) {
this.leftChild = leftChild;
this.rightChild = rightChild;
- this.hasCachedLeftValue = false;
- this.hasCachedRightValue = false;
+ this.hasCachedLeftTime = false;
+ this.hasCachedRightTime = false;
}
public OrNode(Node leftChild, Node rightChild, boolean ascending) {
this.leftChild = leftChild;
this.rightChild = rightChild;
- this.hasCachedLeftValue = false;
- this.hasCachedRightValue = false;
+ this.hasCachedLeftTime = false;
+ this.hasCachedRightTime = false;
this.ascending = ascending;
}
@Override
public boolean hasNext() throws IOException {
- if (hasCachedLeftValue || hasCachedRightValue) {
+ if (hasCachedLeftTime || hasCachedRightTime) {
return true;
}
return leftChild.hasNext() || rightChild.hasNext();
}
private boolean hasLeftValue() throws IOException {
- return hasCachedLeftValue || leftChild.hasNext();
+ return hasCachedLeftTime || leftChild.hasNext();
}
private long getLeftValue() throws IOException {
- if (hasCachedLeftValue) {
- hasCachedLeftValue = false;
- return cachedLeftValue;
+ if (hasCachedLeftTime) {
+ hasCachedLeftTime = false;
+ return cachedLeftTime;
}
return leftChild.next();
}
private boolean hasRightValue() throws IOException {
- return hasCachedRightValue || rightChild.hasNext();
+ return hasCachedRightTime || rightChild.hasNext();
}
private long getRightValue() throws IOException {
- if (hasCachedRightValue) {
- hasCachedRightValue = false;
- return cachedRightValue;
+ if (hasCachedRightTime) {
+ hasCachedRightTime = false;
+ return cachedRightTime;
}
return rightChild.next();
}
@@ -99,12 +99,12 @@ public class OrNode implements Node {
private long popAndFillNextCache(boolean popLeft, boolean popRight, long left, long right) {
if (popLeft) {
- hasCachedRightValue = true;
- cachedRightValue = right;
+ hasCachedRightTime = true;
+ cachedRightTime = right;
return left;
} else if (popRight) {
- hasCachedLeftValue = true;
- cachedLeftValue = left;
+ hasCachedLeftTime = true;
+ cachedLeftTime = left;
return right;
} else {
return left;
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/FakedTimeGenerator.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/FakedTimeGenerator.java
index 9b848dc..85b30c0 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/FakedTimeGenerator.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/FakedTimeGenerator.java
@@ -35,19 +35,18 @@ public class FakedTimeGenerator extends TimeGenerator {
public FakedTimeGenerator() throws IOException {
- // or(and(d1.s1, d2.s2), d2.s2)
+ // and(and(d1.s1, d2.s2), d2.s2)
IExpression expression =
- BinaryExpression.or(
+ BinaryExpression.and(
BinaryExpression.and(
new SingleSeriesExpression(
new Path("d1", "s1"),
- FilterFactory.and(TimeFilter.gtEq(1L), TimeFilter.ltEq(5L))),
+ FilterFactory.and(TimeFilter.gtEq(3L), TimeFilter.ltEq(8L))),
new SingleSeriesExpression(
new Path("d2", "s2"),
FilterFactory.and(TimeFilter.gtEq(1L), TimeFilter.ltEq(10L)))),
new SingleSeriesExpression(
- new Path("d2", "s2"),
- FilterFactory.and(TimeFilter.gtEq(11L), TimeFilter.ltEq(15L))));
+ new Path("d2", "s2"), FilterFactory.and(TimeFilter.gtEq(2L), TimeFilter.ltEq(6L))));
super.constructNode(expression);
}
@@ -68,10 +67,10 @@ public class FakedTimeGenerator extends TimeGenerator {
Path path = new Path("d1", "s1");
long count = 0;
while (fakedTimeGenerator.hasNext()) {
- long time = fakedTimeGenerator.next();
- fakedTimeGenerator.getValue(path, time);
+ fakedTimeGenerator.next();
+ fakedTimeGenerator.getValue(path);
count++;
}
- Assert.assertEquals(10L, count);
+ Assert.assertEquals(4L, count);
}
}