You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/08/24 09:59:21 UTC
[iotdb] branch rel/0.12 updated: [To rel/0.12] [IOTDB-1584] Doesn't
support order by time desc in cluster mode (#3810)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 9e6ea86 [To rel/0.12] [IOTDB-1584] Doesn't support order by time desc in cluster mode (#3810)
9e6ea86 is described below
commit 9e6ea864f001f05dc84225da115d2ce7fa366e1e
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Tue Aug 24 17:59:01 2021 +0800
[To rel/0.12] [IOTDB-1584] Doesn't support order by time desc in cluster mode (#3810)
---
RELEASE_NOTES.md | 1 +
.../cluster/query/reader/ClusterReaderFactory.java | 12 ++++++--
...er.java => ManagedDescPriorityMergeReader.java} | 8 ++++--
...Reader.java => ManagedPriorityMergeReader.java} | 4 +--
.../cluster/query/reader/mult/MultBatchReader.java | 5 ++--
.../cluster/server/member/DataGroupMemberTest.java | 2 ++
.../org/apache/iotdb/db/utils/SerializeUtils.java | 5 +++-
.../apache/iotdb/tsfile/read/common/BatchData.java | 33 ++++++++++++++++++++++
.../tsfile/read/common/DescReadBatchData.java | 5 +++-
.../tsfile/read/common/DescReadWriteBatchData.java | 1 +
10 files changed, 65 insertions(+), 11 deletions(-)
diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md
index e210d5d..b533460 100644
--- a/RELEASE_NOTES.md
+++ b/RELEASE_NOTES.md
@@ -74,6 +74,7 @@
* [IOTDB-1556] Abort auto create device when meet exception in setStorageGroup
* [IOTDB-1574] Deleted file handler leak
* [IOTDB-1580] Error result of order by time desc when enable time partition
+* [IOTDB-1584] Doesn't support order by time desc in cluster mode
* [ISSUE-3116] Bug when using natural month unit in time interval in group by query
* [ISSUE-3316] Query result with the same time range is inconsistent in group by query
* [ISSUE-3436] Fix query result not right after deleting multiple time interval of one timeseries
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index e3e0d53..60a84d8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
import org.apache.iotdb.db.query.reader.series.SeriesRawDataPointReader;
import org.apache.iotdb.db.query.reader.series.SeriesReader;
import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
import org.apache.iotdb.db.utils.SerializeUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -364,7 +365,12 @@ public class ClusterReaderFactory {
metaGroupMember.getName(),
path,
partitionGroups.size());
- ManagedMergeReader mergeReader = new ManagedMergeReader(dataType);
+ PriorityMergeReader mergeReader;
+ if (ascending) {
+ mergeReader = new ManagedPriorityMergeReader(dataType);
+ } else {
+ mergeReader = new ManagedDescPriorityMergeReader(dataType);
+ }
try {
// build a reader for each group and merge them
for (PartitionGroup partitionGroup : partitionGroups) {
@@ -383,7 +389,9 @@ public class ClusterReaderFactory {
} catch (IOException | QueryProcessException e) {
throw new StorageEngineException(e);
}
- return mergeReader;
+ // The instance of merge reader is either ManagedPriorityMergeReader or
+ // ManagedDescPriorityMergeReader, which is safe to cast type.
+ return (ManagedSeriesReader) mergeReader;
}
/**
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedMergeReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedDescPriorityMergeReader.java
similarity index 88%
copy from cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedMergeReader.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedDescPriorityMergeReader.java
index e54dede..653dea4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedMergeReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedDescPriorityMergeReader.java
@@ -20,16 +20,18 @@
package org.apache.iotdb.cluster.query.reader;
import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
-import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+import org.apache.iotdb.db.query.reader.universal.DescPriorityMergeReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
import java.io.IOException;
import java.util.NoSuchElementException;
@SuppressWarnings("common-java:DuplicatedBlocks")
-public class ManagedMergeReader extends PriorityMergeReader implements ManagedSeriesReader {
+public class ManagedDescPriorityMergeReader extends DescPriorityMergeReader
+ implements ManagedSeriesReader, IPointReader {
private static final int BATCH_SIZE = 4096;
@@ -39,7 +41,7 @@ public class ManagedMergeReader extends PriorityMergeReader implements ManagedSe
private BatchData batchData;
private TSDataType dataType;
- public ManagedMergeReader(TSDataType dataType) {
+ public ManagedDescPriorityMergeReader(TSDataType dataType) {
this.dataType = dataType;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedMergeReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedPriorityMergeReader.java
similarity index 94%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedMergeReader.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedPriorityMergeReader.java
index e54dede..e57f4d9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedMergeReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedPriorityMergeReader.java
@@ -29,7 +29,7 @@ import java.io.IOException;
import java.util.NoSuchElementException;
@SuppressWarnings("common-java:DuplicatedBlocks")
-public class ManagedMergeReader extends PriorityMergeReader implements ManagedSeriesReader {
+public class ManagedPriorityMergeReader extends PriorityMergeReader implements ManagedSeriesReader {
private static final int BATCH_SIZE = 4096;
@@ -39,7 +39,7 @@ public class ManagedMergeReader extends PriorityMergeReader implements ManagedSe
private BatchData batchData;
private TSDataType dataType;
- public ManagedMergeReader(TSDataType dataType) {
+ public ManagedPriorityMergeReader(TSDataType dataType) {
this.dataType = dataType;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultBatchReader.java
index dd7f21b..dc47deb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultBatchReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultBatchReader.java
@@ -41,7 +41,7 @@ public class MultBatchReader implements IMultBatchReader {
@Override
public boolean hasNextBatch() throws IOException {
for (IBatchReader reader : pathBatchReaders.values()) {
- if (reader.hasNextBatch()) {
+ if (reader != null && reader.hasNextBatch()) {
return true;
}
}
@@ -50,7 +50,8 @@ public class MultBatchReader implements IMultBatchReader {
@Override
public boolean hasNextBatch(String fullPath) throws IOException {
- return pathBatchReaders.get(fullPath).hasNextBatch();
+ IBatchReader reader = pathBatchReaders.get(fullPath);
+ return reader != null && reader.hasNextBatch();
}
@Override
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index 5d138fa..782b9bc 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -722,6 +722,7 @@ public class DataGroupMemberTest extends BaseMember {
request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal());
request.setRequester(TestUtils.getNode(1));
request.setQueryId(0);
+ request.setAscending(true);
Filter filter = TimeFilter.gtEq(5);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
@@ -784,6 +785,7 @@ public class DataGroupMemberTest extends BaseMember {
request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal());
request.setRequester(TestUtils.getNode(1));
request.setQueryId(0);
+ request.setAscending(true);
Filter filter = new AndFilter(TimeFilter.gtEq(5), ValueFilter.ltEq(8.0));
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java
index cb881e0..c2ccae9 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.BatchData.BatchDataType;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -123,6 +124,7 @@ public class SerializeUtils {
TSDataType dataType = batchData.getDataType();
outputStream.writeInt(length);
outputStream.write(dataType.ordinal());
+ outputStream.write(batchData.getBatchDataType().ordinal());
switch (dataType) {
case BOOLEAN:
for (int i = 0; i < length; i++) {
@@ -175,7 +177,7 @@ public class SerializeUtils {
int length = buffer.getInt();
TSDataType dataType = TSDataType.values()[buffer.get()];
- BatchData batchData = new BatchData(dataType);
+ BatchData batchData = BatchDataType.deserialize(buffer.get(), dataType);
switch (dataType) {
case INT32:
for (int i = 0; i < length; i++) {
@@ -212,6 +214,7 @@ public class SerializeUtils {
}
break;
}
+ batchData.resetBatchData();
return batchData;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index c2a7d3b..ab07c0a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -57,6 +57,8 @@ public class BatchData {
protected TSDataType dataType;
+ protected BatchDataType batchDataType = BatchDataType.Ordinary;
+
// outer list index for read
protected int readCurListIndex;
// inner array index for read
@@ -162,6 +164,10 @@ public class BatchData {
return dataType;
}
+ public BatchDataType getBatchDataType() {
+ return batchDataType;
+ }
+
/**
* initialize batch data.
*
@@ -599,4 +605,31 @@ public class BatchData {
public BatchData flip() {
return this;
}
+
+ public enum BatchDataType {
+ Ordinary,
+ DescRead,
+ DescReadWrite;
+
+ BatchDataType() {}
+
+ /**
+ * give an integer to return a BatchType type.
+ *
+ * @param type -param to judge enum type
+ * @return -enum type
+ */
+ public static BatchData deserialize(byte type, TSDataType dataType) {
+ switch (type) {
+ case 0:
+ return new BatchData(dataType);
+ case 1:
+ return new DescReadBatchData(dataType);
+ case 2:
+ return new DescReadWriteBatchData(dataType);
+ default:
+ throw new IllegalArgumentException("Invalid input: " + type);
+ }
+ }
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadBatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadBatchData.java
index 9f7d539..ebfc204 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadBatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadBatchData.java
@@ -28,10 +28,13 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
*/
public class DescReadBatchData extends BatchData {
- public DescReadBatchData() {}
+ public DescReadBatchData() {
+ batchDataType = BatchDataType.DescRead;
+ }
public DescReadBatchData(TSDataType dataType) {
super(dataType);
+ batchDataType = BatchDataType.DescRead;
}
@Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadWriteBatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadWriteBatchData.java
index c299534..ed6e6c2 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadWriteBatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadWriteBatchData.java
@@ -34,6 +34,7 @@ public class DescReadWriteBatchData extends DescReadBatchData {
public DescReadWriteBatchData(TSDataType dataType) {
super();
+ this.batchDataType = BatchDataType.DescReadWrite;
this.dataType = dataType;
this.readCurListIndex = 0;
this.readCurArrayIndex = 0;