You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/08/24 09:59:21 UTC

[iotdb] branch rel/0.12 updated: [To rel/0.12] [IOTDB-1584] Doesn't support order by time desc in cluster mode (#3810)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new 9e6ea86  [To rel/0.12] [IOTDB-1584] Doesn't support order by time desc in cluster mode (#3810)
9e6ea86 is described below

commit 9e6ea864f001f05dc84225da115d2ce7fa366e1e
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Tue Aug 24 17:59:01 2021 +0800

    [To rel/0.12] [IOTDB-1584] Doesn't support order by time desc in cluster mode (#3810)
---
 RELEASE_NOTES.md                                   |  1 +
 .../cluster/query/reader/ClusterReaderFactory.java | 12 ++++++--
 ...er.java => ManagedDescPriorityMergeReader.java} |  8 ++++--
 ...Reader.java => ManagedPriorityMergeReader.java} |  4 +--
 .../cluster/query/reader/mult/MultBatchReader.java |  5 ++--
 .../cluster/server/member/DataGroupMemberTest.java |  2 ++
 .../org/apache/iotdb/db/utils/SerializeUtils.java  |  5 +++-
 .../apache/iotdb/tsfile/read/common/BatchData.java | 33 ++++++++++++++++++++++
 .../tsfile/read/common/DescReadBatchData.java      |  5 +++-
 .../tsfile/read/common/DescReadWriteBatchData.java |  1 +
 10 files changed, 65 insertions(+), 11 deletions(-)

diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md
index e210d5d..b533460 100644
--- a/RELEASE_NOTES.md
+++ b/RELEASE_NOTES.md
@@ -74,6 +74,7 @@
 * [IOTDB-1556] Abort auto create device when meet exception in setStorageGroup
 * [IOTDB-1574] Deleted file handler leak
 * [IOTDB-1580] Error result of order by time desc when enable time partition
+* [IOTDB-1584] Doesn't support order by time desc in cluster mode
 * [ISSUE-3116] Bug when using natural month unit in time interval in group by query
 * [ISSUE-3316] Query result with the same time range is inconsistent in group by query
 * [ISSUE-3436] Fix query result not right after deleting multiple time interval of one timeseries
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index e3e0d53..60a84d8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
 import org.apache.iotdb.db.query.reader.series.SeriesRawDataPointReader;
 import org.apache.iotdb.db.query.reader.series.SeriesReader;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
 import org.apache.iotdb.db.utils.SerializeUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -364,7 +365,12 @@ public class ClusterReaderFactory {
         metaGroupMember.getName(),
         path,
         partitionGroups.size());
-    ManagedMergeReader mergeReader = new ManagedMergeReader(dataType);
+    PriorityMergeReader mergeReader;
+    if (ascending) {
+      mergeReader = new ManagedPriorityMergeReader(dataType);
+    } else {
+      mergeReader = new ManagedDescPriorityMergeReader(dataType);
+    }
     try {
       // build a reader for each group and merge them
       for (PartitionGroup partitionGroup : partitionGroups) {
@@ -383,7 +389,9 @@ public class ClusterReaderFactory {
     } catch (IOException | QueryProcessException e) {
       throw new StorageEngineException(e);
     }
-    return mergeReader;
+    // The instance of merge reader is either ManagedPriorityMergeReader or
+    // ManagedDescPriorityMergeReader, which is safe to cast type.
+    return (ManagedSeriesReader) mergeReader;
   }
 
   /**
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedMergeReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedDescPriorityMergeReader.java
similarity index 88%
copy from cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedMergeReader.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedDescPriorityMergeReader.java
index e54dede..653dea4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedMergeReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedDescPriorityMergeReader.java
@@ -20,16 +20,18 @@
 package org.apache.iotdb.cluster.query.reader;
 
 import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
-import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+import org.apache.iotdb.db.query.reader.universal.DescPriorityMergeReader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
 
 import java.io.IOException;
 import java.util.NoSuchElementException;
 
 @SuppressWarnings("common-java:DuplicatedBlocks")
-public class ManagedMergeReader extends PriorityMergeReader implements ManagedSeriesReader {
+public class ManagedDescPriorityMergeReader extends DescPriorityMergeReader
+    implements ManagedSeriesReader, IPointReader {
 
   private static final int BATCH_SIZE = 4096;
 
@@ -39,7 +41,7 @@ public class ManagedMergeReader extends PriorityMergeReader implements ManagedSe
   private BatchData batchData;
   private TSDataType dataType;
 
-  public ManagedMergeReader(TSDataType dataType) {
+  public ManagedDescPriorityMergeReader(TSDataType dataType) {
     this.dataType = dataType;
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedMergeReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedPriorityMergeReader.java
similarity index 94%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedMergeReader.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedPriorityMergeReader.java
index e54dede..e57f4d9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedMergeReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedPriorityMergeReader.java
@@ -29,7 +29,7 @@ import java.io.IOException;
 import java.util.NoSuchElementException;
 
 @SuppressWarnings("common-java:DuplicatedBlocks")
-public class ManagedMergeReader extends PriorityMergeReader implements ManagedSeriesReader {
+public class ManagedPriorityMergeReader extends PriorityMergeReader implements ManagedSeriesReader {
 
   private static final int BATCH_SIZE = 4096;
 
@@ -39,7 +39,7 @@ public class ManagedMergeReader extends PriorityMergeReader implements ManagedSe
   private BatchData batchData;
   private TSDataType dataType;
 
-  public ManagedMergeReader(TSDataType dataType) {
+  public ManagedPriorityMergeReader(TSDataType dataType) {
     this.dataType = dataType;
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultBatchReader.java
index dd7f21b..dc47deb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultBatchReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultBatchReader.java
@@ -41,7 +41,7 @@ public class MultBatchReader implements IMultBatchReader {
   @Override
   public boolean hasNextBatch() throws IOException {
     for (IBatchReader reader : pathBatchReaders.values()) {
-      if (reader.hasNextBatch()) {
+      if (reader != null && reader.hasNextBatch()) {
         return true;
       }
     }
@@ -50,7 +50,8 @@ public class MultBatchReader implements IMultBatchReader {
 
   @Override
   public boolean hasNextBatch(String fullPath) throws IOException {
-    return pathBatchReaders.get(fullPath).hasNextBatch();
+    IBatchReader reader = pathBatchReaders.get(fullPath);
+    return reader != null && reader.hasNextBatch();
   }
 
   @Override
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index 5d138fa..782b9bc 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -722,6 +722,7 @@ public class DataGroupMemberTest extends BaseMember {
     request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal());
     request.setRequester(TestUtils.getNode(1));
     request.setQueryId(0);
+    request.setAscending(true);
     Filter filter = TimeFilter.gtEq(5);
     ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
     DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
@@ -784,6 +785,7 @@ public class DataGroupMemberTest extends BaseMember {
     request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal());
     request.setRequester(TestUtils.getNode(1));
     request.setQueryId(0);
+    request.setAscending(true);
     Filter filter = new AndFilter(TimeFilter.gtEq(5), ValueFilter.ltEq(8.0));
     ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
     DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java
index cb881e0..c2ccae9 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.BatchData.BatchDataType;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -123,6 +124,7 @@ public class SerializeUtils {
       TSDataType dataType = batchData.getDataType();
       outputStream.writeInt(length);
       outputStream.write(dataType.ordinal());
+      outputStream.write(batchData.getBatchDataType().ordinal());
       switch (dataType) {
         case BOOLEAN:
           for (int i = 0; i < length; i++) {
@@ -175,7 +177,7 @@ public class SerializeUtils {
 
     int length = buffer.getInt();
     TSDataType dataType = TSDataType.values()[buffer.get()];
-    BatchData batchData = new BatchData(dataType);
+    BatchData batchData = BatchDataType.deserialize(buffer.get(), dataType);
     switch (dataType) {
       case INT32:
         for (int i = 0; i < length; i++) {
@@ -212,6 +214,7 @@ public class SerializeUtils {
         }
         break;
     }
+    batchData.resetBatchData();
     return batchData;
   }
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index c2a7d3b..ab07c0a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -57,6 +57,8 @@ public class BatchData {
 
   protected TSDataType dataType;
 
+  protected BatchDataType batchDataType = BatchDataType.Ordinary;
+
   // outer list index for read
   protected int readCurListIndex;
   // inner array index for read
@@ -162,6 +164,10 @@ public class BatchData {
     return dataType;
   }
 
+  public BatchDataType getBatchDataType() {
+    return batchDataType;
+  }
+
   /**
    * initialize batch data.
    *
@@ -599,4 +605,31 @@ public class BatchData {
   public BatchData flip() {
     return this;
   }
+
+  public enum BatchDataType {
+    Ordinary,
+    DescRead,
+    DescReadWrite;
+
+    BatchDataType() {}
+
+    /**
+     * give an integer to return a BatchType type.
+     *
+     * @param type -param to judge enum type
+     * @return -enum type
+     */
+    public static BatchData deserialize(byte type, TSDataType dataType) {
+      switch (type) {
+        case 0:
+          return new BatchData(dataType);
+        case 1:
+          return new DescReadBatchData(dataType);
+        case 2:
+          return new DescReadWriteBatchData(dataType);
+        default:
+          throw new IllegalArgumentException("Invalid input: " + type);
+      }
+    }
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadBatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadBatchData.java
index 9f7d539..ebfc204 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadBatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadBatchData.java
@@ -28,10 +28,13 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
  */
 public class DescReadBatchData extends BatchData {
 
-  public DescReadBatchData() {}
+  public DescReadBatchData() {
+    batchDataType = BatchDataType.DescRead;
+  }
 
   public DescReadBatchData(TSDataType dataType) {
     super(dataType);
+    batchDataType = BatchDataType.DescRead;
   }
 
   @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadWriteBatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadWriteBatchData.java
index c299534..ed6e6c2 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadWriteBatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadWriteBatchData.java
@@ -34,6 +34,7 @@ public class DescReadWriteBatchData extends DescReadBatchData {
 
   public DescReadWriteBatchData(TSDataType dataType) {
     super();
+    this.batchDataType = BatchDataType.DescReadWrite;
     this.dataType = dataType;
     this.readCurListIndex = 0;
     this.readCurArrayIndex = 0;