You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/03/11 09:01:21 UTC
[iotdb] branch clusterQueryOpt updated: implement cluster version
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch clusterQueryOpt
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/clusterQueryOpt by this push:
new 2d6773f implement cluster version
2d6773f is described below
commit 2d6773f2f640948200e37b3ba92c7201580159a4
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Mar 11 17:00:42 2021 +0800
implement cluster version
---
.../iotdb/cluster/query/LocalQueryExecutor.java | 15 ++---
.../iotdb/cluster/query/reader/EmptyReader.java | 8 +--
.../cluster/query/reader/MergedReaderByTime.java | 4 +-
.../reader/RemoteSeriesReaderByTimestamp.java | 54 ++++++++---------
.../iotdb/cluster/server/DataClusterServer.java | 13 ++--
.../cluster/server/service/DataAsyncService.java | 10 +++-
.../cluster/server/service/DataSyncService.java | 5 +-
.../iotdb/cluster/common/TestAsyncDataClient.java | 9 ++-
.../cluster/common/TestManagedSeriesReader.java | 25 +++++---
.../query/manage/ClusterQueryManagerTest.java | 2 +-
.../reader/RemoteSeriesReaderByTimestampTest.java | 69 ++++++++++++++--------
.../cluster/server/member/DataGroupMemberTest.java | 30 ++++++----
.../cluster/server/member/MetaGroupMemberTest.java | 8 ++-
.../reader/series/SeriesReaderByTimestamp.java | 11 ++--
thrift/src/main/thrift/cluster.thrift | 4 +-
15 files changed, 157 insertions(+), 110 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index 028b3fc..24c7847 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -102,24 +102,19 @@ public class LocalQueryExecutor {
return ((CMManager) IoTDB.metaManager);
}
- /**
- * Return the data of the reader whose id is "readerId", using timestamps in "timeBuffer".
- *
- * @param readerId
- * @param time
- */
- public ByteBuffer fetchSingleSeriesByTimestamp(long readerId, long time)
+ /** Return the data of the reader whose id is "readerId", using timestamps in "timeBuffer". */
+ public ByteBuffer fetchSingleSeriesByTimestamps(long readerId, long[] timestamps, int length)
throws ReaderNotFoundException, IOException {
IReaderByTimestamp reader = dataGroupMember.getQueryManager().getReaderByTimestamp(readerId);
if (reader == null) {
throw new ReaderNotFoundException(readerId);
}
- Object value = reader.getValueInTimestamp(time);
- if (value != null) {
+ Object[] values = reader.getValuesInTimestamps(timestamps, length);
+ if (values != null) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
- SerializeUtils.serializeObject(value, dataOutputStream);
+ SerializeUtils.serializeObjects(values, dataOutputStream);
return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
} else {
return ByteBuffer.allocate(0);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java
index 8ead5bd..3b29a69 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.utils.Pair;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -155,12 +156,7 @@ public class EmptyReader extends BaseManagedSeriesReader
}
@Override
- public Object getValueInTimestamp(long timestamp) {
- return null;
- }
-
- @Override
- public Object[] getValuesInTimestamps(long[] timestamps) {
+ public Object[] getValuesInTimestamps(long[] timestamps, int length) throws IOException {
return null;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java
index 2db6a09..c1906e7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java
@@ -33,10 +33,10 @@ public class MergedReaderByTime implements IReaderByTimestamp {
}
@Override
- public Object getValueInTimestamp(long timestamp) throws IOException {
+ public Object[] getValuesInTimestamps(long[] timestamps, int length) throws IOException {
for (IReaderByTimestamp innerReader : innerReaders) {
if (innerReader != null) {
- Object valueInTimestamp = innerReader.getValueInTimestamp(timestamp);
+ Object[] valueInTimestamp = innerReader.getValuesInTimestamps(timestamps, length);
if (valueInTimestamp != null) {
return valueInTimestamp;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
index f2d8729..d077f02 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
@@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
public class RemoteSeriesReaderByTimestamp implements IReaderByTimestamp {
@@ -49,37 +51,42 @@ public class RemoteSeriesReaderByTimestamp implements IReaderByTimestamp {
}
@Override
- public Object getValueInTimestamp(long timestamp) throws IOException {
+ public Object[] getValuesInTimestamps(long[] timestamps, int length) throws IOException {
if (!sourceInfo.checkCurClient()) {
return null;
}
ByteBuffer result;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
- result = fetchResultAsync(timestamp);
+ result = fetchResultAsync(timestamps, length);
} else {
- result = fetchResultSync(timestamp);
+ result = fetchResultSync(timestamps, length);
}
- return SerializeUtils.deserializeObject(result);
+ return SerializeUtils.deserializeObjects(result);
}
@SuppressWarnings("java:S2274") // enable timeout
- private ByteBuffer fetchResultAsync(long timestamp) throws IOException {
+ private ByteBuffer fetchResultAsync(long[] timestamps, int length) throws IOException {
+ // convert long[] to List<Long>, which is used for thrift
+ List<Long> timestampList = new ArrayList<>(length);
+ for (int i = 0; i < length; i++) {
+ timestampList.add(timestamps[i]);
+ }
synchronized (fetchResult) {
fetchResult.set(null);
try {
sourceInfo
.getCurAsyncClient(RaftServer.getReadOperationTimeoutMS())
- .fetchSingleSeriesByTimestamp(
- sourceInfo.getHeader(), sourceInfo.getReaderId(), timestamp, handler);
+ .fetchSingleSeriesByTimestamps(
+ sourceInfo.getHeader(), sourceInfo.getReaderId(), timestampList, handler);
fetchResult.wait(RaftServer.getReadOperationTimeoutMS());
} catch (TException e) {
// try other node
- if (!sourceInfo.switchNode(true, timestamp)) {
+ if (!sourceInfo.switchNode(true, timestamps[0])) {
return null;
}
- return fetchResultAsync(timestamp);
+ return fetchResultAsync(timestamps, length);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Query {} interrupted", sourceInfo);
@@ -89,36 +96,27 @@ public class RemoteSeriesReaderByTimestamp implements IReaderByTimestamp {
return fetchResult.get();
}
- private ByteBuffer fetchResultSync(long timestamp) throws IOException {
+ private ByteBuffer fetchResultSync(long[] timestamps, int length) throws IOException {
SyncDataClient curSyncClient = null;
+ // convert long[] to List<Long>, which is used for thrift
+ List<Long> timestampList = new ArrayList<>(length);
+ for (int i = 0; i < length; i++) {
+ timestampList.add(timestamps[i]);
+ }
try {
curSyncClient = sourceInfo.getCurSyncClient(RaftServer.getReadOperationTimeoutMS());
- return curSyncClient.fetchSingleSeriesByTimestamp(
- sourceInfo.getHeader(), sourceInfo.getReaderId(), timestamp);
+ return curSyncClient.fetchSingleSeriesByTimestamps(
+ sourceInfo.getHeader(), sourceInfo.getReaderId(), timestampList);
} catch (TException e) {
// try other node
- if (!sourceInfo.switchNode(true, timestamp)) {
+ if (!sourceInfo.switchNode(true, timestamps[0])) {
return null;
}
- return fetchResultSync(timestamp);
+ return fetchResultSync(timestamps, length);
} finally {
if (curSyncClient != null) {
ClientUtils.putBackSyncClient(curSyncClient);
}
}
}
-
- @Override
- public Object[] getValuesInTimestamps(long[] timestamps) throws IOException {
- return null;
- }
-
- @SuppressWarnings("java:S2274") // enable timeout
- private ByteBuffer fetchResultAsync(long[] timestamps) throws IOException {
- return null;
- }
-
- private ByteBuffer fetchResultSync(long[] timestamps) throws IOException {
- return null;
- }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 012569e..03a78a9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -384,12 +384,15 @@ public class DataClusterServer extends RaftServer
}
@Override
- public void fetchSingleSeriesByTimestamp(
- Node header, long readerId, long time, AsyncMethodCallback<ByteBuffer> resultHandler) {
+ public void fetchSingleSeriesByTimestamps(
+ Node header,
+ long readerId,
+ List<Long> timestamps,
+ AsyncMethodCallback<ByteBuffer> resultHandler) {
DataAsyncService service =
getDataAsyncService(header, resultHandler, "Fetch by timestamp:" + readerId);
if (service != null) {
- service.fetchSingleSeriesByTimestamp(header, readerId, time, resultHandler);
+ service.fetchSingleSeriesByTimestamps(header, readerId, timestamps, resultHandler);
}
}
@@ -737,9 +740,9 @@ public class DataClusterServer extends RaftServer
}
@Override
- public ByteBuffer fetchSingleSeriesByTimestamp(Node header, long readerId, long timestamp)
+ public ByteBuffer fetchSingleSeriesByTimestamps(Node header, long readerId, List<Long> timestamps)
throws TException {
- return getDataSyncService(header).fetchSingleSeriesByTimestamp(header, readerId, timestamp);
+ return getDataSyncService(header).fetchSingleSeriesByTimestamps(header, readerId, timestamps);
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
index 41a2689..99fe858 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
@@ -207,13 +207,17 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
}
@Override
- public void fetchSingleSeriesByTimestamp(
- Node header, long readerId, long timestamp, AsyncMethodCallback<ByteBuffer> resultHandler) {
+ public void fetchSingleSeriesByTimestamps(
+ Node header,
+ long readerId,
+ List<Long> timestamps,
+ AsyncMethodCallback<ByteBuffer> resultHandler) {
try {
resultHandler.onComplete(
dataGroupMember
.getLocalQueryExecutor()
- .fetchSingleSeriesByTimestamp(readerId, timestamp));
+ .fetchSingleSeriesByTimestamps(
+ readerId, timestamps.stream().mapToLong(k -> k).toArray(), timestamps.size()));
} catch (ReaderNotFoundException | IOException e) {
resultHandler.onError(e);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index 61ae338..7e39689 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -209,12 +209,13 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
}
@Override
- public ByteBuffer fetchSingleSeriesByTimestamp(Node header, long readerId, long timestamp)
+ public ByteBuffer fetchSingleSeriesByTimestamps(Node header, long readerId, List<Long> timestamps)
throws TException {
try {
return dataGroupMember
.getLocalQueryExecutor()
- .fetchSingleSeriesByTimestamp(readerId, timestamp);
+ .fetchSingleSeriesByTimestamps(
+ readerId, timestamps.stream().mapToLong(k -> k).toArray(), timestamps.size());
} catch (ReaderNotFoundException | IOException e) {
throw new TException(e);
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
index 661dfbf..5b2797a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
@@ -99,12 +99,15 @@ public class TestAsyncDataClient extends AsyncDataClient {
}
@Override
- public void fetchSingleSeriesByTimestamp(
- Node header, long readerId, long time, AsyncMethodCallback<ByteBuffer> resultHandler) {
+ public void fetchSingleSeriesByTimestamps(
+ Node header,
+ long readerId,
+ List<Long> timestamps,
+ AsyncMethodCallback<ByteBuffer> resultHandler) {
new Thread(
() ->
new DataAsyncService(dataGroupMemberMap.get(header))
- .fetchSingleSeriesByTimestamp(header, readerId, time, resultHandler))
+ .fetchSingleSeriesByTimestamps(header, readerId, timestamps, resultHandler))
.start();
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestManagedSeriesReader.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestManagedSeriesReader.java
index 5ac3f64..6f220e4 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestManagedSeriesReader.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestManagedSeriesReader.java
@@ -57,17 +57,24 @@ public class TestManagedSeriesReader implements ManagedSeriesReader, IReaderByTi
}
@Override
- public Object getValueInTimestamp(long timestamp) {
- while (batchData.hasCurrent()) {
- long currTime = batchData.currentTime();
- if (currTime == timestamp) {
- return batchData.currentValue();
- } else if (currTime > timestamp) {
- break;
+ public Object[] getValuesInTimestamps(long[] timestamps, int length) {
+ Object[] results = new Object[length];
+ boolean hasValue = false;
+ for (int i = 0; i < length; i++) {
+ while (batchData.hasCurrent()) {
+ long currTime = batchData.currentTime();
+ if (currTime == timestamps[i]) {
+ hasValue = true;
+ results[i] = batchData.currentValue();
+ break;
+ } else if (currTime > timestamps[i]) {
+ results[i] = null;
+ break;
+ }
+ batchData.next();
}
- batchData.next();
}
- return null;
+ return hasValue ? results : null;
}
@Override
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/ClusterQueryManagerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/ClusterQueryManagerTest.java
index 01a7b8b..35a404e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/ClusterQueryManagerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/ClusterQueryManagerTest.java
@@ -79,7 +79,7 @@ public class ClusterQueryManagerTest {
@Test
public void testRegisterReaderByTime() {
- IReaderByTimestamp reader = timestamp -> null;
+ IReaderByTimestamp reader = (timestamp, length) -> null;
long id = queryManager.registerReaderByTime(reader);
assertSame(reader, queryManager.getReaderByTimestamp(id));
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
index a81dc66..b8a4706 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
@@ -45,11 +45,11 @@ import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
public class RemoteSeriesReaderByTimestampTest {
@@ -68,10 +68,10 @@ public class RemoteSeriesReaderByTimestampTest {
public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
return new AsyncDataClient(null, null, node, null) {
@Override
- public void fetchSingleSeriesByTimestamp(
+ public void fetchSingleSeriesByTimestamps(
Node header,
long readerId,
- long time,
+ List<Long> timestamps,
AsyncMethodCallback<ByteBuffer> resultHandler)
throws TException {
if (failedNodes.contains(node)) {
@@ -84,23 +84,26 @@ public class RemoteSeriesReaderByTimestampTest {
DataOutputStream dataOutputStream =
new DataOutputStream(byteArrayOutputStream);
boolean isNull = true;
- while (batchData.hasCurrent()) {
- long currentTime = batchData.currentTime();
- Object value = batchData.currentValue();
- if (currentTime == time) {
- SerializeUtils.serializeObject(value, dataOutputStream);
+ Object[] results = new Object[timestamps.size()];
+ for (int i = 0; i < timestamps.size(); i++) {
+ while (batchData.hasCurrent()) {
+ long currentTime = batchData.currentTime();
+ if (currentTime == timestamps.get(i)) {
+ results[i] = batchData.currentValue();
+ batchData.next();
+ isNull = false;
+ break;
+ } else if (currentTime > timestamps.get(i)) {
+ results[i] = null;
+ break;
+ }
+ // time < timestamp, continue
batchData.next();
- isNull = false;
- break;
- } else if (currentTime > time) {
- break;
}
- // time < timestamp, continue
- batchData.next();
- }
- if (isNull) {
- SerializeUtils.serializeObject(null, dataOutputStream);
}
+ SerializeUtils.serializeObjects(
+ isNull ? new Object[0] : results, dataOutputStream);
+
resultHandler.onComplete(
ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
})
@@ -146,10 +149,16 @@ public class RemoteSeriesReaderByTimestampTest {
RemoteSeriesReaderByTimestamp reader = new RemoteSeriesReaderByTimestamp(sourceInfo);
+ long[] times = new long[100];
for (int i = 0; i < 100; i++) {
- assertEquals(i * 1.0, reader.getValueInTimestamp(i));
+ times[i] = i;
}
- assertNull(reader.getValueInTimestamp(101));
+ Object[] results = reader.getValuesInTimestamps(times, times.length);
+ for (int i = 0; i < 100; i++) {
+ assertEquals(i * 1.0, results[i]);
+ }
+ times[0] = 101;
+ assertEquals(0, reader.getValuesInTimestamps(times, 1).length);
} finally {
QueryResourceManager.getInstance().endQuery(context.getQueryId());
}
@@ -180,8 +189,13 @@ public class RemoteSeriesReaderByTimestampTest {
+ (endTime - startTime));
// normal read
assertEquals(TestUtils.getNode(0), sourceInfo.getCurrentNode());
+ long[] times = new long[50];
+ for (int i = 0; i < 50; i++) {
+ times[i] = i;
+ }
+ Object[] results = reader.getValuesInTimestamps(times, 50);
for (int i = 0; i < 50; i++) {
- assertEquals(i * 1.0, reader.getValueInTimestamp(i));
+ assertEquals(i * 1.0, results[i]);
}
endTime = System.currentTimeMillis();
@@ -191,14 +205,22 @@ public class RemoteSeriesReaderByTimestampTest {
+ (endTime - startTime));
failedNodes.add(TestUtils.getNode(0));
for (int i = 50; i < 80; i++) {
- assertEquals(i * 1.0, reader.getValueInTimestamp(i));
+ times[i - 50] = i;
+ }
+ results = reader.getValuesInTimestamps(times, 30);
+ for (int i = 50; i < 80; i++) {
+ assertEquals(i * 1.0, results[i - 50]);
}
assertEquals(TestUtils.getNode(1), sourceInfo.getCurrentNode());
// a bad client, change to another node again
failedNodes.add(TestUtils.getNode(1));
for (int i = 80; i < 90; i++) {
- assertEquals(i * 1.0, reader.getValueInTimestamp(i));
+ times[i - 80] = i;
+ }
+ results = reader.getValuesInTimestamps(times, 10);
+ for (int i = 80; i < 90; i++) {
+ assertEquals(i * 1.0, results[i - 80]);
}
assertEquals(TestUtils.getNode(2), sourceInfo.getCurrentNode());
@@ -211,7 +233,8 @@ public class RemoteSeriesReaderByTimestampTest {
failedNodes.add(TestUtils.getNode(2));
try {
- reader.getValueInTimestamp(90);
+ times[0] = 90;
+ reader.getValuesInTimestamps(times, 1);
fail();
} catch (IOException e) {
assertEquals("no available client.", e.getMessage());
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index 84d069a..1a52082 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -803,11 +803,15 @@ public class DataGroupMemberTest extends MemberTest {
AtomicReference<ByteBuffer> dataResult = new AtomicReference<>();
GenericHandler<ByteBuffer> dataHandler = new GenericHandler<>(TestUtils.getNode(0), dataResult);
+ List<Long> timestamps = new ArrayList<>(5);
for (int i = 5; i < 10; i++) {
- new DataAsyncService(dataGroupMember)
- .fetchSingleSeriesByTimestamp(TestUtils.getNode(0), readerId, i, dataHandler);
- Object value = SerializeUtils.deserializeObject(dataResult.get());
- assertEquals(i * 1.0, (Double) value, 0.00001);
+ timestamps.add((long) i);
+ }
+ new DataAsyncService(dataGroupMember)
+ .fetchSingleSeriesByTimestamps(TestUtils.getNode(0), readerId, timestamps, dataHandler);
+ Object[] values = SerializeUtils.deserializeObjects(dataResult.get());
+ for (int i = 5; i < 10; i++) {
+ assertEquals(i * 1.0, (Double) values[i - 5], 0.00001);
}
new DataAsyncService(dataGroupMember)
@@ -861,11 +865,15 @@ public class DataGroupMemberTest extends MemberTest {
AtomicReference<ByteBuffer> dataResult = new AtomicReference<>();
GenericHandler<ByteBuffer> dataHandler = new GenericHandler<>(TestUtils.getNode(0), dataResult);
+ List<Long> timestamps = new ArrayList<>(4);
for (int i = 5; i < 9; i++) {
- new DataAsyncService(dataGroupMember)
- .fetchSingleSeriesByTimestamp(TestUtils.getNode(0), readerId, i, dataHandler);
- Object value = SerializeUtils.deserializeObject(dataResult.get());
- assertEquals(i * 1.0, (Double) value, 0.00001);
+ timestamps.add((long) i);
+ }
+ new DataAsyncService(dataGroupMember)
+ .fetchSingleSeriesByTimestamps(TestUtils.getNode(0), readerId, timestamps, dataHandler);
+ Object[] values = SerializeUtils.deserializeObjects(dataResult.get());
+ for (int i = 5; i < 9; i++) {
+ assertEquals(i * 1.0, (Double) values[i - 5], 0.00001);
}
new DataAsyncService(dataGroupMember)
@@ -896,11 +904,13 @@ public class DataGroupMemberTest extends MemberTest {
public void testFetchWithoutQuery() {
System.out.println("Start testFetchWithoutQuery()");
AtomicReference<Exception> result = new AtomicReference<>();
+ List<Long> timestamps = new ArrayList<>(1);
+ timestamps.add((long) 0);
new DataAsyncService(dataGroupMember)
- .fetchSingleSeriesByTimestamp(
+ .fetchSingleSeriesByTimestamps(
TestUtils.getNode(0),
0,
- 0,
+ timestamps,
new AsyncMethodCallback<ByteBuffer>() {
@Override
public void onComplete(ByteBuffer buffer) {}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 08914d1..e4e6b25 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -881,6 +881,10 @@ public class MetaGroupMemberTest extends MemberTest {
try {
ClusterReaderFactory readerFactory = new ClusterReaderFactory(testMetaMember);
+ long[] times = new long[10];
+ for (int i = 0; i < 10; i++) {
+ times[i] = i;
+ }
for (int i = 0; i < 10; i++) {
IReaderByTimestamp readerByTimestamp =
readerFactory.getReaderByTimestamp(
@@ -889,8 +893,10 @@ public class MetaGroupMemberTest extends MemberTest {
TSDataType.DOUBLE,
context,
true);
+
+ Object[] values = readerByTimestamp.getValuesInTimestamps(times, 10);
for (int j = 0; j < 10; j++) {
- assertEquals(j * 1.0, (double) readerByTimestamp.getValueInTimestamp(j), 0.00001);
+ assertEquals(j * 1.0, (double) values[j], 0.00001);
}
}
} finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
index 00b6df3..8f0b04d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
@@ -69,17 +69,18 @@ public class SeriesReaderByTimestamp implements IReaderByTimestamp {
@Override
public Object[] getValuesInTimestamps(long[] timestamp, int length) throws IOException {
Object[] results = new Object[length];
- for (int i = 0; i < length; i++) {
+ int timeIndex;
+ for (timeIndex = 0; timeIndex < length; timeIndex++) {
seriesReader.setTimeFilter(timestamp[0]);
- if ((batchData == null || !hasAvailableData(batchData, timestamp[i]))
- && !hasNext(timestamp[i])) {
+ if ((batchData == null || !hasAvailableData(batchData, timestamp[timeIndex]))
+ && !hasNext(timestamp[timeIndex])) {
// there is no more data
break;
}
- results[i] = batchData.getValueInTimestamp(timestamp[i]);
+ results[timeIndex] = batchData.getValueInTimestamp(timestamp[timeIndex]);
}
- return results;
+ return timeIndex != 0 ? results : null;
}
@Override
diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift
index 7c0e506..a8d3baf 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -344,12 +344,12 @@ service TSDataService extends RaftService {
long querySingleSeriesByTimestamp(1:SingleSeriesQueryRequest request)
/**
- * Fetch one value at given timestamp using the resultSetId generated by
+ * Fetch values at given timestamps using the resultSetId generated by
* querySingleSeriesByTimestamp.
* @return a ByteBuffer containing the serialized value or an empty buffer if there
* are not more results.
**/
- binary fetchSingleSeriesByTimestamp(1:Node header, 2:long readerId, 3:long timestamp)
+ binary fetchSingleSeriesByTimestamps(1:Node header, 2:long readerId, 3:list<long> timestamps)
/**
* Find the local query established for the remote query and release all its resource.