You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/12/28 14:26:43 UTC
[iotdb] branch rel/0.12 updated: [To rel/0.12] [IOTDB-2219] Bug fix: query in-memory data is incorrect in cluster mode (#4648)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new ffc73ea [To rel/0.12] [IOTDB-2219] Bug fix: query in-memory data is incorrect in cluster mode (#4648)
ffc73ea is described below
commit ffc73ea06ec74b2a12a364a4fe6cd37052d615f4
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Tue Dec 28 22:26:13 2021 +0800
[To rel/0.12] [IOTDB-2219] Bug fix: query in-memory data is incorrect in cluster mode (#4648)
---
.../query/ClusterDataQueryExecutorTest.java | 77 ++++++++++++++++++++++
.../query/reader/ClusterReaderFactoryTest.java | 3 +-
.../db/query/control/QueryResourceManager.java | 5 +-
3 files changed, 80 insertions(+), 5 deletions(-)
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java
index 034a1a6..17a325b 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java
@@ -22,10 +22,18 @@ package org.apache.iotdb.cluster.query;
import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
+import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
@@ -35,12 +43,16 @@ import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
public class ClusterDataQueryExecutorTest extends BaseQueryTest {
@@ -155,4 +167,69 @@ public class ClusterDataQueryExecutorTest extends BaseQueryTest {
QueryResourceManager.getInstance().endQuery(context.getQueryId());
}
}
+
+ @Test // IOTDB-2219
+ public void testQueryInMemory()
+ throws IOException, StorageEngineException, IllegalPathException, QueryProcessException,
+ StorageGroupNotSetException {
+ PlanExecutor planExecutor = new PlanExecutor();
+ PartialPath[] paths =
+ new PartialPath[] {
+ new PartialPath(TestUtils.getTestSg(100), TestUtils.getTestMeasurement(0)),
+ new PartialPath(TestUtils.getTestSg(100), TestUtils.getTestMeasurement(1)),
+ new PartialPath(TestUtils.getTestSg(100), TestUtils.getTestMeasurement(2)),
+ };
+ String[] measurements =
+ new String[] {
+ TestUtils.getTestMeasurement(0),
+ TestUtils.getTestMeasurement(1),
+ TestUtils.getTestMeasurement(2)
+ };
+ MeasurementMNode[] schemas =
+ new MeasurementMNode[] {
+ TestUtils.getTestMeasurementMNode(0),
+ TestUtils.getTestMeasurementMNode(1),
+ TestUtils.getTestMeasurementMNode(2)
+ };
+ TSDataType[] dataTypes =
+ new TSDataType[] {TSDataType.DOUBLE, TSDataType.DOUBLE, TSDataType.DOUBLE};
+ Object[] values = new Object[] {1.0, 2.0, 3.0};
+
+ // set storage group
+ SetStorageGroupPlan setStorageGroupPlan = new SetStorageGroupPlan();
+ setStorageGroupPlan.setPath(new PartialPath(TestUtils.getTestSg(100)));
+ planExecutor.setStorageGroup(setStorageGroupPlan);
+
+ // insert data
+ InsertRowPlan insertPlan = new InsertRowPlan();
+ insertPlan.setDeviceId(new PartialPath(TestUtils.getTestSg(100)));
+ insertPlan.setMeasurements(measurements);
+ insertPlan.setMeasurementMNodes(schemas);
+ insertPlan.setDataTypes(dataTypes);
+ insertPlan.setNeedInferType(true);
+ insertPlan.setTime(0);
+ insertPlan.setValues(values);
+
+ planExecutor.processNonQuery(insertPlan);
+
+ // query data
+ RawDataQueryPlan queryPlan = new RawDataQueryPlan();
+ queryPlan.setDeduplicatedPaths(Arrays.asList(paths));
+ queryPlan.setDeduplicatedDataTypes(Arrays.asList(dataTypes));
+ queryExecutor = new ClusterDataQueryExecutor(queryPlan, testMetaMember);
+ RemoteQueryContext context =
+ new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true));
+ try {
+ QueryDataSet dataSet = queryExecutor.executeWithoutValueFilter(context);
+ RowRecord record = dataSet.next();
+ List<Field> fields = record.getFields();
+ Assert.assertEquals(values.length, fields.size());
+ for (int i = 0; i < values.length; i++) {
+ Assert.assertEquals(String.valueOf(values[i]), fields.get(i).getStringValue());
+ }
+ assertFalse(dataSet.hasNext());
+ } finally {
+ QueryResourceManager.getInstance().endQuery(context.getQueryId());
+ }
+ }
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactoryTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactoryTest.java
index 11d2a31..dd3ccc4 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactoryTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactoryTest.java
@@ -36,6 +36,7 @@ import java.io.IOException;
import java.util.Collections;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
public class ClusterReaderFactoryTest extends BaseQueryTest {
@@ -72,7 +73,7 @@ public class ClusterReaderFactoryTest extends BaseQueryTest {
context,
dataGroupMemberMap.get(TestUtils.getNode(10)),
true);
- assertNotNull(seriesReader);
+ assertNull(seriesReader);
} finally {
QueryResourceManager.getInstance().endQuery(context.getQueryId());
StorageEngine.getInstance().setTTL(new PartialPath(TestUtils.getTestSg(0)), Long.MAX_VALUE);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index 54598cf..858d7ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -145,15 +145,12 @@ public class QueryResourceManager {
&& cachedQueryDataSourcesMap.get(queryId).containsKey(storageGroupPath)) {
cachedQueryDataSource = cachedQueryDataSourcesMap.get(queryId).get(storageGroupPath);
} else {
- // QueryDataSource is not cached earlier in cluster mode
+ // QueryDataSource is never cached in cluster mode
StorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(selectedPath.getDevicePath());
cachedQueryDataSource =
processor.query(
Collections.singletonList(selectedPath), context, filePathsManager, timeFilter);
- cachedQueryDataSourcesMap
- .computeIfAbsent(queryId, k -> new HashMap<>())
- .put(storageGroupPath, cachedQueryDataSource);
}
// construct QueryDataSource for selectedPath