You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/12/28 14:26:43 UTC

[iotdb] branch rel/0.12 updated: [To rel/0.12] [IOTDB-2219] Bug fix: query in-memory data is incorrect in cluster mode (#4648)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new ffc73ea  [To rel/0.12] [IOTDB-2219] Bug fix: query in-memory data is incorrect in cluster mode (#4648)
ffc73ea is described below

commit ffc73ea06ec74b2a12a364a4fe6cd37052d615f4
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Tue Dec 28 22:26:13 2021 +0800

    [To rel/0.12] [IOTDB-2219] Bug fix: query in-memory data is incorrect in cluster mode (#4648)
---
 .../query/ClusterDataQueryExecutorTest.java        | 77 ++++++++++++++++++++++
 .../query/reader/ClusterReaderFactoryTest.java     |  3 +-
 .../db/query/control/QueryResourceManager.java     |  5 +-
 3 files changed, 80 insertions(+), 5 deletions(-)

diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java
index 034a1a6..17a325b 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java
@@ -22,10 +22,18 @@ package org.apache.iotdb.cluster.query;
 import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
+import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
@@ -35,12 +43,16 @@ import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 
 public class ClusterDataQueryExecutorTest extends BaseQueryTest {
@@ -155,4 +167,69 @@ public class ClusterDataQueryExecutorTest extends BaseQueryTest {
       QueryResourceManager.getInstance().endQuery(context.getQueryId());
     }
   }
+
+  @Test // IOTDB-2219
+  public void testQueryInMemory()
+      throws IOException, StorageEngineException, IllegalPathException, QueryProcessException,
+          StorageGroupNotSetException {
+    PlanExecutor planExecutor = new PlanExecutor();
+    PartialPath[] paths =
+        new PartialPath[] {
+          new PartialPath(TestUtils.getTestSg(100), TestUtils.getTestMeasurement(0)),
+          new PartialPath(TestUtils.getTestSg(100), TestUtils.getTestMeasurement(1)),
+          new PartialPath(TestUtils.getTestSg(100), TestUtils.getTestMeasurement(2)),
+        };
+    String[] measurements =
+        new String[] {
+          TestUtils.getTestMeasurement(0),
+          TestUtils.getTestMeasurement(1),
+          TestUtils.getTestMeasurement(2)
+        };
+    MeasurementMNode[] schemas =
+        new MeasurementMNode[] {
+          TestUtils.getTestMeasurementMNode(0),
+          TestUtils.getTestMeasurementMNode(1),
+          TestUtils.getTestMeasurementMNode(2)
+        };
+    TSDataType[] dataTypes =
+        new TSDataType[] {TSDataType.DOUBLE, TSDataType.DOUBLE, TSDataType.DOUBLE};
+    Object[] values = new Object[] {1.0, 2.0, 3.0};
+
+    // set storage group
+    SetStorageGroupPlan setStorageGroupPlan = new SetStorageGroupPlan();
+    setStorageGroupPlan.setPath(new PartialPath(TestUtils.getTestSg(100)));
+    planExecutor.setStorageGroup(setStorageGroupPlan);
+
+    // insert data
+    InsertRowPlan insertPlan = new InsertRowPlan();
+    insertPlan.setDeviceId(new PartialPath(TestUtils.getTestSg(100)));
+    insertPlan.setMeasurements(measurements);
+    insertPlan.setMeasurementMNodes(schemas);
+    insertPlan.setDataTypes(dataTypes);
+    insertPlan.setNeedInferType(true);
+    insertPlan.setTime(0);
+    insertPlan.setValues(values);
+
+    planExecutor.processNonQuery(insertPlan);
+
+    // query data
+    RawDataQueryPlan queryPlan = new RawDataQueryPlan();
+    queryPlan.setDeduplicatedPaths(Arrays.asList(paths));
+    queryPlan.setDeduplicatedDataTypes(Arrays.asList(dataTypes));
+    queryExecutor = new ClusterDataQueryExecutor(queryPlan, testMetaMember);
+    RemoteQueryContext context =
+        new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true));
+    try {
+      QueryDataSet dataSet = queryExecutor.executeWithoutValueFilter(context);
+      RowRecord record = dataSet.next();
+      List<Field> fields = record.getFields();
+      Assert.assertEquals(values.length, fields.size());
+      for (int i = 0; i < values.length; i++) {
+        Assert.assertEquals(String.valueOf(values[i]), fields.get(i).getStringValue());
+      }
+      assertFalse(dataSet.hasNext());
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
+    }
+  }
 }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactoryTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactoryTest.java
index 11d2a31..dd3ccc4 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactoryTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactoryTest.java
@@ -36,6 +36,7 @@ import java.io.IOException;
 import java.util.Collections;
 
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 
 public class ClusterReaderFactoryTest extends BaseQueryTest {
 
@@ -72,7 +73,7 @@ public class ClusterReaderFactoryTest extends BaseQueryTest {
                   context,
                   dataGroupMemberMap.get(TestUtils.getNode(10)),
                   true);
-      assertNotNull(seriesReader);
+      assertNull(seriesReader);
     } finally {
       QueryResourceManager.getInstance().endQuery(context.getQueryId());
       StorageEngine.getInstance().setTTL(new PartialPath(TestUtils.getTestSg(0)), Long.MAX_VALUE);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index 54598cf..858d7ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -145,15 +145,12 @@ public class QueryResourceManager {
         && cachedQueryDataSourcesMap.get(queryId).containsKey(storageGroupPath)) {
       cachedQueryDataSource = cachedQueryDataSourcesMap.get(queryId).get(storageGroupPath);
     } else {
-      // QueryDataSource is not cached earlier in cluster mode
+      // QueryDataSource is never cached in cluster mode
       StorageGroupProcessor processor =
           StorageEngine.getInstance().getProcessor(selectedPath.getDevicePath());
       cachedQueryDataSource =
           processor.query(
               Collections.singletonList(selectedPath), context, filePathsManager, timeFilter);
-      cachedQueryDataSourcesMap
-          .computeIfAbsent(queryId, k -> new HashMap<>())
-          .put(storageGroupPath, cachedQueryDataSource);
     }
 
     // construct QueryDataSource for selectedPath