You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/06/05 14:15:05 UTC
[iotdb] branch master updated: [IOTDB-1405] Refactor MManager for
new vector interfaces (#3300)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 11c942a [IOTDB-1405] Refactor MManager for new vector interfaces (#3300)
11c942a is described below
commit 11c942a3b5bdcf046c334d79d65b0ba06e84107d
Author: Zesong Sun <sz...@mails.tsinghua.edu.cn>
AuthorDate: Sat Jun 5 22:14:44 2021 +0800
[IOTDB-1405] Refactor MManager for new vector interfaces (#3300)
Co-authored-by: 151250176 <15...@smail.nju.edu.cn>
---
client-cpp/src/main/Session.cpp | 8 +-
.../cluster/log/applier/AsyncDataLogApplier.java | 2 +-
.../iotdb/cluster/log/applier/BaseApplier.java | 2 +-
.../iotdb/cluster/log/applier/DataLogApplier.java | 4 +-
.../apache/iotdb/cluster/metadata/CMManager.java | 18 +-
.../iotdb/cluster/query/ClusterPlanRouter.java | 12 +-
.../apache/iotdb/cluster/utils/PartitionUtils.java | 2 +-
.../org/apache/iotdb/cluster/common/IoTDBTest.java | 2 +-
.../org/apache/iotdb/cluster/common/TestUtils.java | 4 +-
.../cluster/log/applier/DataLogApplierTest.java | 8 +-
.../cluster/log/logtypes/SerializeLogTest.java | 2 +-
.../cluster/server/member/DataGroupMemberTest.java | 10 +-
.../cluster/server/member/MetaGroupMemberTest.java | 6 +-
...e.java => AlignedTimeseriesSessionExample.java} | 74 +++++-
.../org/apache/iotdb/db/conf/IoTDBConstant.java | 1 -
.../org/apache/iotdb/db/engine/StorageEngine.java | 13 +-
.../iotdb/db/engine/memtable/AbstractMemTable.java | 54 ++--
.../engine/storagegroup/StorageGroupProcessor.java | 31 +--
.../db/engine/storagegroup/TsFileProcessor.java | 17 +-
.../apache/iotdb/db/metadata/MLogTxtWriter.java | 2 +-
.../org/apache/iotdb/db/metadata/MManager.java | 122 ++++-----
.../java/org/apache/iotdb/db/metadata/MTree.java | 129 +++++-----
.../org/apache/iotdb/db/metadata/MetaUtils.java | 15 +-
.../iotdb/db/metadata/logfile/MLogWriter.java | 8 +-
.../org/apache/iotdb/db/metadata/mnode/MNode.java | 16 +-
.../iotdb/db/metadata/mnode/StorageGroupMNode.java | 26 +-
.../iotdb/db/metadata/template/Template.java | 32 +--
.../org/apache/iotdb/db/mqtt/PublishHandler.java | 2 +-
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 57 ++--
.../db/qp/physical/crud/CreateTemplatePlan.java | 32 ++-
.../db/qp/physical/crud/InsertMultiTabletPlan.java | 2 +-
.../iotdb/db/qp/physical/crud/InsertPlan.java | 21 +-
.../iotdb/db/qp/physical/crud/InsertRowPlan.java | 91 +++++--
.../physical/crud/InsertRowsOfOneDevicePlan.java | 12 +-
.../iotdb/db/qp/physical/crud/InsertRowsPlan.java | 2 +-
.../db/qp/physical/crud/InsertTabletPlan.java | 34 ++-
.../physical/sys/CreateAlignedTimeSeriesPlan.java | 28 +-
.../db/qp/physical/sys/StorageGroupMNodePlan.java | 4 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 38 +--
.../iotdb/db/sink/local/LocalIoTDBHandler.java | 2 +-
.../iotdb/db/writelog/recover/LogReplayer.java | 16 +-
.../db/engine/memtable/MemTableTestUtils.java | 6 +-
.../db/engine/memtable/PrimitiveMemTableTest.java | 9 +-
.../iotdb/db/engine/storagegroup/TTLTest.java | 4 +-
.../db/integration/IOTDBInsertAlignedValuesIT.java | 4 +
.../iotdb/db/metadata/MManagerBasicTest.java | 286 +++++----------------
.../iotdb/db/qp/physical/InsertRowPlanTest.java | 50 ++--
.../iotdb/db/qp/physical/InsertTabletPlanTest.java | 38 ++-
.../java/org/apache/iotdb/session/Session.java | 115 ++++++---
.../apache/iotdb/session/SessionConnection.java | 16 +-
.../apache/iotdb/session/IoTDBSessionSimpleIT.java | 31 +--
.../apache/iotdb/session/SessionCacheLeaderUT.java | 4 +-
.../java/org/apache/iotdb/session/SessionUT.java | 17 +-
.../test/java/org/apache/iotdb/db/sql/Cases.java | 22 +-
thrift/src/main/thrift/rpc.thrift | 25 +-
.../apache/iotdb/tsfile/write/record/Tablet.java | 56 ++--
.../write/schema/VectorMeasurementSchema.java | 93 +++----
.../iotdb/tsfile/file/metadata/utils/Utils.java | 73 ------
58 files changed, 839 insertions(+), 971 deletions(-)
diff --git a/client-cpp/src/main/Session.cpp b/client-cpp/src/main/Session.cpp
index 8cc696b..1ae80c3 100644
--- a/client-cpp/src/main/Session.cpp
+++ b/client-cpp/src/main/Session.cpp
@@ -616,12 +616,12 @@ void Session::insertRecord(string deviceId, int64_t time, vector<string>& measu
}
}
-void Session::insertRecord(string deviceId, int64_t time, vector<string>& measurements,
+void Session::insertRecord(string prefixPath, int64_t time, vector<string>& measurements,
vector<TSDataType::TSDataType>& types, vector<char*>& values)
{
shared_ptr<TSInsertRecordReq> req(new TSInsertRecordReq());
req->__set_sessionId(sessionId);
- req->__set_deviceId(deviceId);
+ req->__set_prefixPath(prefixPath);
req->__set_timestamp(time);
req->__set_measurements(measurements);
string buffer;
@@ -763,7 +763,7 @@ void Session::insertTablet(Tablet& tablet, bool sorted) {
shared_ptr<TSInsertTabletReq> request(new TSInsertTabletReq());
request->__set_sessionId(sessionId);
- request->deviceId = tablet.deviceId;
+ request->prefixPath = tablet.deviceId;
for (pair<string, TSDataType::TSDataType> schema : tablet.schemas) {
request->measurements.push_back(schema.first);
request->types.push_back(schema.second);
@@ -857,7 +857,7 @@ void Session::testInsertRecord(string deviceId, int64_t time, vector<string>& me
void Session::testInsertTablet(Tablet& tablet) {
shared_ptr<TSInsertTabletReq> request(new TSInsertTabletReq());
request->__set_sessionId(sessionId);
- request->deviceId = tablet.deviceId;
+ request->prefixPath = tablet.deviceId;
for (pair<string, TSDataType::TSDataType> schema : tablet.schemas) {
request->measurements.push_back(schema.first);
request->types.push_back(schema.second);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
index 65a5b2d..89a8519 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
@@ -159,7 +159,7 @@ public class AsyncDataLogApplier implements LogApplier {
PartialPath path = ((InsertRowsPlan) plan).getFirstDeviceId();
sgPath = IoTDB.metaManager.getStorageGroupPath(path);
} else if (plan instanceof InsertPlan) {
- PartialPath deviceId = ((InsertPlan) plan).getDeviceId();
+ PartialPath deviceId = ((InsertPlan) plan).getPrefixPath();
sgPath = IoTDB.metaManager.getStorageGroupPath(deviceId);
} else if (plan instanceof CreateTimeSeriesPlan) {
PartialPath path = ((CreateTimeSeriesPlan) plan).getPath();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
index 7a9c34a..94606ac 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
@@ -173,7 +173,7 @@ abstract class BaseApplier implements LogApplier {
private void pullTimeseriesSchema(InsertPlan plan, Node ignoredGroup)
throws QueryProcessException {
try {
- PartialPath path = plan.getDeviceId();
+ PartialPath path = plan.getPrefixPath();
((CMManager) IoTDB.metaManager)
.pullTimeSeriesSchemas(Collections.singletonList(path), ignoredGroup);
} catch (MetadataException e1) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index 7d11d3e..542e422 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -118,7 +118,7 @@ public class DataLogApplier extends BaseApplier {
PartialPath sg;
long time = plan.getMinTime();
try {
- sg = IoTDB.metaManager.getStorageGroupPath(plan.getDeviceId());
+ sg = IoTDB.metaManager.getStorageGroupPath(plan.getPrefixPath());
} catch (StorageGroupNotSetException e) {
// the sg may not exist because the node does not catch up with the leader, retry after
// synchronization
@@ -127,7 +127,7 @@ public class DataLogApplier extends BaseApplier {
} catch (CheckConsistencyException ce) {
throw new QueryProcessException(ce.getMessage());
}
- sg = IoTDB.metaManager.getStorageGroupPath(plan.getDeviceId());
+ sg = IoTDB.metaManager.getStorageGroupPath(plan.getPrefixPath());
}
int slotId =
SlotPartitionTable.getSlotStrategy()
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index d2700a3..bb50f44 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -440,10 +440,10 @@ public class CMManager extends MManager {
throws MetadataException, IOException {
MeasurementMNode[] measurementMNodes = new MeasurementMNode[plan.getMeasurements().length];
int nonExistSchemaIndex =
- getMNodesLocally(plan.getDeviceId(), plan.getMeasurements(), measurementMNodes);
+ getMNodesLocally(plan.getPrefixPath(), plan.getMeasurements(), measurementMNodes);
if (nonExistSchemaIndex == -1) {
plan.setMeasurementMNodes(measurementMNodes);
- return new MNode(null, plan.getDeviceId().getDevice());
+ return new MNode(null, plan.getPrefixPath().getDevice());
}
// auto-create schema in IoTDBConfig is always disabled in the cluster version, and we have
// another config in ClusterConfig to do this
@@ -566,20 +566,20 @@ public class CMManager extends MManager {
|| plan instanceof InsertRowsOfOneDevicePlan
|| plan instanceof InsertTabletPlan) {
storageGroups.addAll(
- getStorageGroups(Collections.singletonList(((InsertPlan) plan).getDeviceId())));
+ getStorageGroups(Collections.singletonList(((InsertPlan) plan).getPrefixPath())));
} else if (plan instanceof InsertRowsPlan) {
storageGroups.addAll(
getStorageGroups(
((InsertRowsPlan) plan)
.getInsertRowPlanList().stream()
- .map(InsertPlan::getDeviceId)
+ .map(InsertPlan::getPrefixPath)
.collect(Collectors.toList())));
} else if (plan instanceof InsertMultiTabletPlan) {
storageGroups.addAll(
getStorageGroups(
((InsertMultiTabletPlan) plan)
.getInsertTabletPlanList().stream()
- .map(InsertPlan::getDeviceId)
+ .map(InsertPlan::getPrefixPath)
.collect(Collectors.toList())));
} else if (plan instanceof CreateTimeSeriesPlan) {
storageGroups.addAll(
@@ -684,7 +684,7 @@ public class CMManager extends MManager {
if (!success) {
logger.error(
"create timeseries for device={} failed, plan={}",
- insertTabletPlan.getDeviceId(),
+ insertTabletPlan.getPrefixPath(),
insertTabletPlan);
}
}
@@ -700,7 +700,7 @@ public class CMManager extends MManager {
if (!success) {
logger.error(
"create timeseries for device={} failed, plan={}",
- insertRowPlan.getDeviceId(),
+ insertRowPlan.getPrefixPath(),
insertRowPlan);
}
}
@@ -724,7 +724,7 @@ public class CMManager extends MManager {
}
List<String> seriesList = new ArrayList<>();
- PartialPath deviceId = insertPlan.getDeviceId();
+ PartialPath deviceId = insertPlan.getPrefixPath();
PartialPath storageGroupName;
try {
storageGroupName =
@@ -771,7 +771,7 @@ public class CMManager extends MManager {
CreateAlignedTimeSeriesPlan plan =
new CreateAlignedTimeSeriesPlan(
- insertPlan.getDeviceId(),
+ insertPlan.getPrefixPath(),
measurements,
dataTypes,
encodings,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
index 5069f60..78d1fbd 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
@@ -93,7 +93,7 @@ public class ClusterPlanRouter {
}
private PartitionGroup routePlan(InsertRowPlan plan) throws MetadataException {
- return partitionTable.partitionByPathTime(plan.getDeviceId(), plan.getTime());
+ return partitionTable.partitionByPathTime(plan.getPrefixPath(), plan.getTime());
}
private PartitionGroup routePlan(CreateTimeSeriesPlan plan) throws MetadataException {
@@ -147,7 +147,7 @@ public class ClusterPlanRouter {
private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(InsertRowPlan plan)
throws MetadataException {
PartitionGroup partitionGroup =
- partitionTable.partitionByPathTime(plan.getDeviceId(), plan.getTime());
+ partitionTable.partitionByPathTime(plan.getPrefixPath(), plan.getTime());
return Collections.singletonMap(plan, partitionGroup);
}
@@ -165,7 +165,7 @@ public class ClusterPlanRouter {
private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(CreateAlignedTimeSeriesPlan plan)
throws MetadataException {
- PartitionGroup partitionGroup = partitionTable.partitionByPathTime(plan.getDevicePath(), 0);
+ PartitionGroup partitionGroup = partitionTable.partitionByPathTime(plan.getPrefixPath(), 0);
return Collections.singletonMap(plan, partitionGroup);
}
@@ -190,7 +190,7 @@ public class ClusterPlanRouter {
InsertTabletPlan tmpPlan = (InsertTabletPlan) entry.getKey();
PartitionGroup tmpPg = entry.getValue();
// 1.1 the sg that the plan(actually calculated based on device) belongs to
- PartialPath tmpSgPath = IoTDB.metaManager.getStorageGroupPath(tmpPlan.getDeviceId());
+ PartialPath tmpSgPath = IoTDB.metaManager.getStorageGroupPath(tmpPlan.getPrefixPath());
Map<PartialPath, InsertMultiTabletPlan> sgPathPlanMap = pgSgPathPlanMap.get(tmpPg);
if (sgPathPlanMap == null) {
// 2.1 construct the InsertMultiTabletPlan
@@ -251,7 +251,7 @@ public class ClusterPlanRouter {
Map<PartitionGroup, InsertRowsPlan> groupPlanMap = new HashMap<>();
for (int i = 0; i < insertRowsPlan.getInsertRowPlanList().size(); i++) {
InsertRowPlan rowPlan = insertRowsPlan.getInsertRowPlanList().get(i);
- PartialPath storageGroup = getMManager().getStorageGroupPath(rowPlan.getDeviceId());
+ PartialPath storageGroup = getMManager().getStorageGroupPath(rowPlan.getPrefixPath());
PartitionGroup group = partitionTable.route(storageGroup.getFullPath(), rowPlan.getTime());
if (groupPlanMap.containsKey(group)) {
InsertRowsPlan tmpPlan = groupPlanMap.get(group);
@@ -272,7 +272,7 @@ public class ClusterPlanRouter {
@SuppressWarnings("SuspiciousSystemArraycopy")
private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(InsertTabletPlan plan)
throws MetadataException {
- PartialPath storageGroup = getMManager().getStorageGroupPath(plan.getDeviceId());
+ PartialPath storageGroup = getMManager().getStorageGroupPath(plan.getPrefixPath());
Map<PhysicalPlan, PartitionGroup> result = new HashMap<>();
long[] times = plan.getTimes();
if (times.length == 0) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
index 9bcd9ec..1dd6189 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
@@ -132,7 +132,7 @@ public class PartitionUtils {
}
public static InsertTabletPlan copy(InsertTabletPlan plan, long[] times, Object[] values) {
- InsertTabletPlan newPlan = new InsertTabletPlan(plan.getDeviceId(), plan.getMeasurements());
+ InsertTabletPlan newPlan = new InsertTabletPlan(plan.getPrefixPath(), plan.getMeasurements());
newPlan.setDataTypes(plan.getDataTypes());
// according to TSServiceImpl.insertBatch(), only the deviceId, measurements, dataTypes,
// times, columns, and rowCount are need to be maintained.
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/IoTDBTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/IoTDBTest.java
index 9f6aae3..5ca4cb2 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/IoTDBTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/IoTDBTest.java
@@ -100,7 +100,7 @@ public abstract class IoTDBTest {
protected void prepareData(int sgNum, int timeOffset, int size)
throws QueryProcessException, IllegalPathException {
InsertRowPlan insertPlan = new InsertRowPlan();
- insertPlan.setDeviceId(new PartialPath(TestUtils.getTestSg(sgNum)));
+ insertPlan.setPrefixPath(new PartialPath(TestUtils.getTestSg(sgNum)));
String[] measurements = new String[10];
for (int i = 0; i < measurements.length; i++) {
measurements[i] = TestUtils.getTestMeasurement(i);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
index 055e505..3b25bcd 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
@@ -288,7 +288,7 @@ public class TestUtils {
// data for raw data query and aggregation
// 10 devices (storage groups)
for (int j = 0; j < 10; j++) {
- insertPlan.setDeviceId(new PartialPath(getTestSg(j)));
+ insertPlan.setPrefixPath(new PartialPath(getTestSg(j)));
String[] measurements = new String[10];
MeasurementMNode[] mNodes = new MeasurementMNode[10];
// 10 series each device, all double
@@ -341,7 +341,7 @@ public class TestUtils {
}
// data for fill
- insertPlan.setDeviceId(new PartialPath(getTestSg(0)));
+ insertPlan.setPrefixPath(new PartialPath(getTestSg(0)));
String[] measurements = new String[] {getTestMeasurement(10)};
MeasurementMNode[] schemas = new MeasurementMNode[] {TestUtils.getTestMeasurementMNode(10)};
insertPlan.setMeasurements(measurements);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
index 3dbf0c3..5f8f43f 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
@@ -257,7 +257,7 @@ public class DataLogApplierTest extends IoTDBTest {
log.setPlan(insertPlan);
// this series is already created
- insertPlan.setDeviceId(new PartialPath(TestUtils.getTestSg(1)));
+ insertPlan.setPrefixPath(new PartialPath(TestUtils.getTestSg(1)));
insertPlan.setTime(1);
insertPlan.setNeedInferType(true);
insertPlan.setMeasurements(new String[] {TestUtils.getTestMeasurement(0)});
@@ -276,7 +276,7 @@ public class DataLogApplierTest extends IoTDBTest {
assertFalse(dataSet.hasNext());
// this series is not created but can be fetched
- insertPlan.setDeviceId(new PartialPath(TestUtils.getTestSg(4)));
+ insertPlan.setPrefixPath(new PartialPath(TestUtils.getTestSg(4)));
applier.apply(log);
dataSet = query(Collections.singletonList(TestUtils.getTestSeries(4, 0)), null);
assertTrue(dataSet.hasNext());
@@ -287,14 +287,14 @@ public class DataLogApplierTest extends IoTDBTest {
assertFalse(dataSet.hasNext());
// this series does not exists any where
- insertPlan.setDeviceId(new PartialPath(TestUtils.getTestSg(5)));
+ insertPlan.setPrefixPath(new PartialPath(TestUtils.getTestSg(5)));
applier.apply(log);
assertEquals(
"org.apache.iotdb.db.exception.metadata.PathNotExistException: Path [root.test5.s0] does not exist",
log.getException().getMessage());
// this storage group is not even set
- insertPlan.setDeviceId(new PartialPath(TestUtils.getTestSg(16)));
+ insertPlan.setPrefixPath(new PartialPath(TestUtils.getTestSg(16)));
applier.apply(log);
assertEquals(
"Storage group is not set for current seriesPath: [root.test16]",
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
index bd64486..c8eba0f 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
@@ -52,7 +52,7 @@ public class SerializeLogTest {
log.setCurrLogIndex(2);
log.setCurrLogTerm(2);
InsertRowPlan plan = new InsertRowPlan();
- plan.setDeviceId(new PartialPath("root.d1"));
+ plan.setPrefixPath(new PartialPath("root.d1"));
plan.setMeasurements(new String[] {"s1", "s2", "s3"});
plan.setNeedInferType(true);
plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index b6a41eb..5482354 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -498,7 +498,7 @@ public class DataGroupMemberTest extends BaseMember {
}
InsertRowPlan insertPlan = new InsertRowPlan();
- insertPlan.setDeviceId(new PartialPath(TestUtils.getTestSg(0)));
+ insertPlan.setPrefixPath(new PartialPath(TestUtils.getTestSg(0)));
insertPlan.setTime(0);
insertPlan.setMeasurements(new String[] {"s0"});
insertPlan.setNeedInferType(true);
@@ -703,7 +703,7 @@ public class DataGroupMemberTest extends BaseMember {
IllegalPathException {
System.out.println("Start testQuerySingleSeries()");
InsertRowPlan insertPlan = new InsertRowPlan();
- insertPlan.setDeviceId(new PartialPath(TestUtils.getTestSg(0)));
+ insertPlan.setPrefixPath(new PartialPath(TestUtils.getTestSg(0)));
insertPlan.setNeedInferType(true);
insertPlan.setMeasurements(new String[] {getTestMeasurement(0)});
insertPlan.setDataTypes(new TSDataType[insertPlan.getMeasurements().length]);
@@ -769,7 +769,7 @@ public class DataGroupMemberTest extends BaseMember {
IllegalPathException {
System.out.println("Start testQuerySingleSeriesWithValueFilter()");
InsertRowPlan insertPlan = new InsertRowPlan();
- insertPlan.setDeviceId(new PartialPath(TestUtils.getTestSg(0)));
+ insertPlan.setPrefixPath(new PartialPath(TestUtils.getTestSg(0)));
insertPlan.setNeedInferType(true);
insertPlan.setMeasurements(new String[] {getTestMeasurement(0)});
insertPlan.setDataTypes(new TSDataType[insertPlan.getMeasurements().length]);
@@ -835,7 +835,7 @@ public class DataGroupMemberTest extends BaseMember {
IllegalPathException {
System.out.println("Start testQuerySingleSeriesByTimestamp()");
InsertRowPlan insertPlan = new InsertRowPlan();
- insertPlan.setDeviceId(new PartialPath(TestUtils.getTestSg(0)));
+ insertPlan.setPrefixPath(new PartialPath(TestUtils.getTestSg(0)));
insertPlan.setNeedInferType(true);
insertPlan.setMeasurements(new String[] {getTestMeasurement(0)});
insertPlan.setDataTypes(new TSDataType[insertPlan.getMeasurements().length]);
@@ -901,7 +901,7 @@ public class DataGroupMemberTest extends BaseMember {
IllegalPathException {
System.out.println("Start testQuerySingleSeriesByTimestampWithValueFilter()");
InsertRowPlan insertPlan = new InsertRowPlan();
- insertPlan.setDeviceId(new PartialPath(TestUtils.getTestSg(0)));
+ insertPlan.setPrefixPath(new PartialPath(TestUtils.getTestSg(0)));
insertPlan.setNeedInferType(true);
insertPlan.setMeasurements(new String[] {getTestMeasurement(0)});
insertPlan.setDataTypes(new TSDataType[insertPlan.getMeasurements().length]);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 475bef2..1e94fe7 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -538,7 +538,7 @@ public class MetaGroupMemberTest extends BaseMember {
// the operation is accepted
dummyResponse.set(Response.RESPONSE_AGREE);
InsertRowPlan insertPlan = new InsertRowPlan();
- insertPlan.setDeviceId(new PartialPath(TestUtils.getTestSg(0)));
+ insertPlan.setPrefixPath(new PartialPath(TestUtils.getTestSg(0)));
insertPlan.setNeedInferType(true);
insertPlan.setMeasurements(new String[] {TestUtils.getTestMeasurement(0)});
insertPlan.setDataTypes(new TSDataType[insertPlan.getMeasurements().length]);
@@ -856,7 +856,7 @@ public class MetaGroupMemberTest extends BaseMember {
insertPlan.setMeasurements(new String[] {TestUtils.getTestMeasurement(0)});
insertPlan.setDataTypes(new TSDataType[insertPlan.getMeasurements().length]);
for (int i = 0; i < 10; i++) {
- insertPlan.setDeviceId(new PartialPath(TestUtils.getTestSg(i)));
+ insertPlan.setPrefixPath(new PartialPath(TestUtils.getTestSg(i)));
IMeasurementSchema schema = TestUtils.getTestMeasurementSchema(0);
try {
IoTDB.metaManager.createTimeseries(
@@ -921,7 +921,7 @@ public class MetaGroupMemberTest extends BaseMember {
RaftServer.setReadOperationTimeoutMS(1000);
for (int i = 0; i < 10; i++) {
- insertPlan.setDeviceId(new PartialPath(TestUtils.getTestSg(i)));
+ insertPlan.setPrefixPath(new PartialPath(TestUtils.getTestSg(i)));
IMeasurementSchema schema = TestUtils.getTestMeasurementSchema(0);
try {
IoTDB.metaManager.createTimeseries(
diff --git a/example/session/src/main/java/org/apache/iotdb/VectorSessionExample.java b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
similarity index 81%
rename from example/session/src/main/java/org/apache/iotdb/VectorSessionExample.java
rename to example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
index afeef91..37ea190 100644
--- a/example/session/src/main/java/org/apache/iotdb/VectorSessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
@@ -36,10 +36,10 @@ import java.util.ArrayList;
import java.util.List;
@SuppressWarnings("squid:S106")
-public class VectorSessionExample {
+public class AlignedTimeseriesSessionExample {
private static Session session;
- private static final String ROOT_SG1_D1 = "root.sg_1.d1";
+ private static final String ROOT_SG1_D1_VECTOR = "root.sg_1.d1.vector";
public static void main(String[] args)
throws IoTDBConnectionException, StatementExecutionException {
@@ -50,6 +50,9 @@ public class VectorSessionExample {
session.setFetchSize(10000);
createTemplate();
+ createAlignedTimeseries();
+ insertAlignedRecord();
+
insertTabletWithAlignedTimeseriesMethod1();
insertTabletWithAlignedTimeseriesMethod2();
@@ -178,6 +181,23 @@ public class VectorSessionExample {
dataSet.closeOperationHandle();
}
+ private static void createAlignedTimeseries()
+ throws StatementExecutionException, IoTDBConnectionException {
+ List<String> measurements = new ArrayList<>();
+ for (int i = 1; i <= 2; i++) {
+ measurements.add("s" + i);
+ }
+ List<TSDataType> dataTypes = new ArrayList<>();
+ dataTypes.add(TSDataType.INT64);
+ dataTypes.add(TSDataType.INT32);
+ List<TSEncoding> encodings = new ArrayList<>();
+ for (int i = 1; i <= 2; i++) {
+ encodings.add(TSEncoding.RLE);
+ }
+ session.createAlignedTimeseries(
+ ROOT_SG1_D1_VECTOR, measurements, dataTypes, encodings, CompressionType.SNAPPY, null);
+ }
+
// be sure template is coordinate with tablet
private static void createTemplate()
throws StatementExecutionException, IoTDBConnectionException {
@@ -204,9 +224,12 @@ public class VectorSessionExample {
List<CompressionType> compressionTypeList = new ArrayList<>();
compressionTypeList.add(CompressionType.SNAPPY);
- session.createDeviceTemplate(
- "template1", measurementList, dataTypeList, encodingList, compressionTypeList);
- session.setDeviceTemplate("template1", "root.sg_1");
+ List<String> schemaList = new ArrayList<>();
+ schemaList.add("test_vector");
+
+ session.createSchemaTemplate(
+ "template1", schemaList, measurementList, dataTypeList, encodingList, compressionTypeList);
+ session.setSchemaTemplate("template1", "root.sg_1");
}
/** Method 1 for insert tablet with aligned timeseries */
@@ -217,9 +240,12 @@ public class VectorSessionExample {
List<IMeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(
new VectorMeasurementSchema(
- new String[] {"s1", "s2"}, new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
+ "vector",
+ new String[] {"s1", "s2"},
+ new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
- Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList);
+ Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR, schemaList);
+ tablet.setAligned(true);
long timestamp = System.currentTimeMillis();
for (long row = 0; row < 100; row++) {
@@ -257,9 +283,12 @@ public class VectorSessionExample {
List<IMeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(
new VectorMeasurementSchema(
- new String[] {"s1", "s2"}, new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
+ "vector",
+ new String[] {"s1", "s2"},
+ new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
- Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList);
+ Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR, schemaList);
+ tablet.setAligned(true);
long[] timestamps = tablet.timestamps;
Object[] values = tablet.values;
@@ -294,9 +323,12 @@ public class VectorSessionExample {
List<IMeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(
new VectorMeasurementSchema(
- new String[] {"s1", "s2"}, new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
+ "vector",
+ new String[] {"s1", "s2"},
+ new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
- Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList);
+ Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR, schemaList);
+ tablet.setAligned(true);
long[] timestamps = tablet.timestamps;
Object[] values = tablet.values;
@@ -334,4 +366,24 @@ public class VectorSessionExample {
session.executeNonQueryStatement("flush");
}
+
+ private static void insertAlignedRecord()
+ throws IoTDBConnectionException, StatementExecutionException {
+ List<String> measurements = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
+ measurements.add("s1");
+ measurements.add("s2");
+ measurements.add("s3");
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT32);
+ types.add(TSDataType.INT64);
+
+ for (long time = 0; time < 100; time++) {
+ List<Object> values = new ArrayList<>();
+ values.add(1L);
+ values.add(2);
+ values.add(3L);
+ session.insertRecord(ROOT_SG1_D1_VECTOR, time, measurements, types, values, true);
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index 29f7dd5..6f26726 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -111,7 +111,6 @@ public class IoTDBConstant {
public static final String PATH_WILDCARD = "*";
public static final String TIME = "time";
- public static final String ALIGN_TIMESERIES_PREFIX = "$#$";
// sdt parameters
public static final String LOSS = "loss";
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 93e6fcb..f5d0bff 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -539,14 +539,14 @@ public class StorageEngine implements IService {
throw new StorageEngineException(e);
}
}
- StorageGroupProcessor storageGroupProcessor = getProcessor(insertRowPlan.getDeviceId());
+ StorageGroupProcessor storageGroupProcessor = getProcessor(insertRowPlan.getPrefixPath());
try {
storageGroupProcessor.insert(insertRowPlan);
if (config.isEnableStatMonitor()) {
try {
StorageGroupMNode storageGroupMNode =
- IoTDB.metaManager.getStorageGroupNodeByPath(insertRowPlan.getDeviceId());
+ IoTDB.metaManager.getStorageGroupNodeByPath(insertRowPlan.getPrefixPath());
updateMonitorStatistics(
processorMap.get(storageGroupMNode.getPartialPath()), insertRowPlan);
} catch (MetadataException e) {
@@ -568,7 +568,7 @@ public class StorageEngine implements IService {
}
}
StorageGroupProcessor storageGroupProcessor =
- getProcessor(insertRowsOfOneDevicePlan.getDeviceId());
+ getProcessor(insertRowsOfOneDevicePlan.getPrefixPath());
// TODO monitor: update statistics
try {
@@ -592,11 +592,12 @@ public class StorageEngine implements IService {
}
StorageGroupProcessor storageGroupProcessor;
try {
- storageGroupProcessor = getProcessor(insertTabletPlan.getDeviceId());
+ storageGroupProcessor = getProcessor(insertTabletPlan.getPrefixPath());
} catch (StorageEngineException e) {
throw new StorageEngineException(
String.format(
- "Get StorageGroupProcessor of device %s " + "failed", insertTabletPlan.getDeviceId()),
+ "Get StorageGroupProcessor of device %s " + "failed",
+ insertTabletPlan.getPrefixPath()),
e);
}
@@ -605,7 +606,7 @@ public class StorageEngine implements IService {
if (config.isEnableStatMonitor()) {
try {
StorageGroupMNode storageGroupMNode =
- IoTDB.metaManager.getStorageGroupNodeByPath(insertTabletPlan.getDeviceId());
+ IoTDB.metaManager.getStorageGroupNodeByPath(insertTabletPlan.getPrefixPath());
updateMonitorStatistics(
processorMap.get(storageGroupMNode.getPartialPath()), insertTabletPlan);
} catch (MetadataException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index ba62303..d00c2e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -117,39 +117,38 @@ public abstract class AbstractMemTable implements IMemTable {
MeasurementMNode[] measurementMNodes = insertRowPlan.getMeasurementMNodes();
int columnIndex = 0;
- for (int i = 0; i < measurementMNodes.length; i++) {
-
- if (measurementMNodes[i] != null
- && measurementMNodes[i].getSchema().getType() == TSDataType.VECTOR) {
- // write vector
- Object[] vectorValue =
- new Object[measurementMNodes[i].getSchema().getValueTSDataTypeList().size()];
- for (int j = 0; j < vectorValue.length; j++) {
- vectorValue[j] = values[columnIndex];
- columnIndex++;
- }
- memSize +=
- MemUtils.getVectorRecordSize(
- measurementMNodes[i].getSchema().getValueTSDataTypeList(),
- vectorValue,
- disableMemControl);
- write(
- insertRowPlan.getDeviceId().getFullPath(),
- measurementMNodes[i].getSchema(),
- insertRowPlan.getTime(),
- vectorValue);
- } else {
+ if (insertRowPlan.isAligned()) {
+ MeasurementMNode measurementMNode = measurementMNodes[0];
+ // write vector
+ Object[] vectorValue =
+ new Object[measurementMNode.getSchema().getValueTSDataTypeList().size()];
+ for (int j = 0; j < vectorValue.length; j++) {
+ vectorValue[j] = values[columnIndex];
+ columnIndex++;
+ }
+ memSize +=
+ MemUtils.getVectorRecordSize(
+ measurementMNode.getSchema().getValueTSDataTypeList(),
+ vectorValue,
+ disableMemControl);
+ write(
+ insertRowPlan.getPrefixPath().getFullPath(),
+ measurementMNode.getSchema(),
+ insertRowPlan.getTime(),
+ vectorValue);
+ } else {
+ for (MeasurementMNode measurementMNode : measurementMNodes) {
if (values[columnIndex] == null) {
columnIndex++;
continue;
}
memSize +=
MemUtils.getRecordSize(
- measurementMNodes[i].getSchema().getType(), values[columnIndex], disableMemControl);
+ measurementMNode.getSchema().getType(), values[columnIndex], disableMemControl);
write(
- insertRowPlan.getDeviceId().getFullPath(),
- measurementMNodes[i].getSchema(),
+ insertRowPlan.getPrefixPath().getFullPath(),
+ measurementMNode.getSchema(),
insertRowPlan.getTime(),
values[columnIndex]);
columnIndex++;
@@ -194,9 +193,9 @@ public abstract class AbstractMemTable implements IMemTable {
}
IWritableMemChunk memSeries =
createIfNotExistAndGet(
- insertTabletPlan.getDeviceId().getFullPath(),
+ insertTabletPlan.getPrefixPath().getFullPath(),
insertTabletPlan.getMeasurementMNodes()[i].getSchema());
- if (insertTabletPlan.getMeasurementMNodes()[i].getSchema().getType() == TSDataType.VECTOR) {
+ if (insertTabletPlan.isAligned()) {
VectorMeasurementSchema vectorSchema =
(VectorMeasurementSchema) insertTabletPlan.getMeasurementMNodes()[i].getSchema();
Object[] columns = new Object[vectorSchema.getValueMeasurementIdList().size()];
@@ -210,6 +209,7 @@ public abstract class AbstractMemTable implements IMemTable {
}
memSeries.write(
insertTabletPlan.getTimes(), bitMaps, columns, TSDataType.VECTOR, start, end);
+ break;
} else {
memSeries.write(
insertTabletPlan.getTimes(),
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 1e75eea..6af0a83 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -73,7 +73,6 @@ import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -812,7 +811,7 @@ public class StorageGroupProcessor {
insertRowPlan.getTime()
> partitionLatestFlushedTimeForEachDevice
.get(timePartitionId)
- .getOrDefault(insertRowPlan.getDeviceId().getFullPath(), Long.MIN_VALUE);
+ .getOrDefault(insertRowPlan.getPrefixPath().getFullPath(), Long.MIN_VALUE);
// is unsequence and user set config to discard out of order data
if (!isSequence
@@ -884,7 +883,8 @@ public class StorageGroupProcessor {
long lastFlushTime =
partitionLatestFlushedTimeForEachDevice
.computeIfAbsent(beforeTimePartition, id -> new HashMap<>())
- .computeIfAbsent(insertTabletPlan.getDeviceId().getFullPath(), id -> Long.MIN_VALUE);
+ .computeIfAbsent(
+ insertTabletPlan.getPrefixPath().getFullPath(), id -> Long.MIN_VALUE);
// if is sequence
boolean isSequence = false;
while (loc < insertTabletPlan.getRowCount()) {
@@ -907,7 +907,7 @@ public class StorageGroupProcessor {
partitionLatestFlushedTimeForEachDevice
.computeIfAbsent(beforeTimePartition, id -> new HashMap<>())
.computeIfAbsent(
- insertTabletPlan.getDeviceId().getFullPath(), id -> Long.MIN_VALUE);
+ insertTabletPlan.getPrefixPath().getFullPath(), id -> Long.MIN_VALUE);
isSequence = false;
}
// still in this partition
@@ -939,7 +939,7 @@ public class StorageGroupProcessor {
}
long globalLatestFlushedTime =
globalLatestFlushedTimeForEachDevice.getOrDefault(
- insertTabletPlan.getDeviceId().getFullPath(), Long.MIN_VALUE);
+ insertTabletPlan.getPrefixPath().getFullPath(), Long.MIN_VALUE);
tryToUpdateBatchInsertLastCache(insertTabletPlan, globalLatestFlushedTime);
if (!noFailure) {
@@ -1008,11 +1008,12 @@ public class StorageGroupProcessor {
if (sequence
&& latestTimeForEachDevice
.get(timePartitionId)
- .getOrDefault(insertTabletPlan.getDeviceId().getFullPath(), Long.MIN_VALUE)
+ .getOrDefault(insertTabletPlan.getPrefixPath().getFullPath(), Long.MIN_VALUE)
< insertTabletPlan.getTimes()[end - 1]) {
latestTimeForEachDevice
.get(timePartitionId)
- .put(insertTabletPlan.getDeviceId().getFullPath(), insertTabletPlan.getTimes()[end - 1]);
+ .put(
+ insertTabletPlan.getPrefixPath().getFullPath(), insertTabletPlan.getTimes()[end - 1]);
}
// check memtable size and may async try to flush the work memtable
@@ -1030,7 +1031,7 @@ public class StorageGroupProcessor {
int columnIndex = 0;
for (int i = 0; i < mNodes.length; i++) {
// Don't update cached last value for vector type
- if (mNodes[i] != null && mNodes[i].getSchema().getType() == TSDataType.VECTOR) {
+ if (mNodes[i] != null && plan.isAligned()) {
columnIndex += mNodes[i].getSchema().getValueMeasurementIdList().size();
} else {
if (plan.getColumns()[i] == null) {
@@ -1046,7 +1047,7 @@ public class StorageGroupProcessor {
} else {
// measurementMNodes[i] is null, use the path to update remote cache
IoTDB.metaManager.updateLastCache(
- plan.getDeviceId().concatNode(plan.getMeasurements()[columnIndex]),
+ plan.getPrefixPath().concatNode(plan.getMeasurements()[columnIndex]),
plan.composeLastTimeValuePair(columnIndex),
true,
latestFlushedTime,
@@ -1070,16 +1071,16 @@ public class StorageGroupProcessor {
// try to update the latest time of the device of this tsRecord
if (latestTimeForEachDevice
.get(timePartitionId)
- .getOrDefault(insertRowPlan.getDeviceId().getFullPath(), Long.MIN_VALUE)
+ .getOrDefault(insertRowPlan.getPrefixPath().getFullPath(), Long.MIN_VALUE)
< insertRowPlan.getTime()) {
latestTimeForEachDevice
.get(timePartitionId)
- .put(insertRowPlan.getDeviceId().getFullPath(), insertRowPlan.getTime());
+ .put(insertRowPlan.getPrefixPath().getFullPath(), insertRowPlan.getTime());
}
long globalLatestFlushTime =
globalLatestFlushedTimeForEachDevice.getOrDefault(
- insertRowPlan.getDeviceId().getFullPath(), Long.MIN_VALUE);
+ insertRowPlan.getPrefixPath().getFullPath(), Long.MIN_VALUE);
tryToUpdateInsertLastCache(insertRowPlan, globalLatestFlushTime);
@@ -1097,7 +1098,7 @@ public class StorageGroupProcessor {
int columnIndex = 0;
for (int i = 0; i < mNodes.length; i++) {
// Don't update cached last value for vector type
- if (mNodes[i] != null && mNodes[i].getSchema().getType() == TSDataType.VECTOR) {
+ if (mNodes[i] != null && plan.isAligned()) {
columnIndex += mNodes[i].getSchema().getValueMeasurementIdList().size();
} else {
if (plan.getValues()[columnIndex] == null) {
@@ -1112,7 +1113,7 @@ public class StorageGroupProcessor {
null, plan.composeTimeValuePair(columnIndex), true, latestFlushedTime, mNodes[i]);
} else {
IoTDB.metaManager.updateLastCache(
- plan.getDeviceId().concatNode(plan.getMeasurements()[columnIndex]),
+ plan.getPrefixPath().concatNode(plan.getMeasurements()[columnIndex]),
plan.composeTimeValuePair(columnIndex),
true,
latestFlushedTime,
@@ -2925,7 +2926,7 @@ public class StorageGroupProcessor {
plan.getTime()
> partitionLatestFlushedTimeForEachDevice
.get(timePartitionId)
- .getOrDefault(plan.getDeviceId().getFullPath(), Long.MIN_VALUE);
+ .getOrDefault(plan.getPrefixPath().getFullPath(), Long.MIN_VALUE);
}
// is unsequence and user set config to discard out of order data
if (!isSequence
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 0db373e..99d22e0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -206,12 +206,12 @@ public class TsFileProcessor {
// update start time of this memtable
tsFileResource.updateStartTime(
- insertRowPlan.getDeviceId().getFullPath(), insertRowPlan.getTime());
+ insertRowPlan.getPrefixPath().getFullPath(), insertRowPlan.getTime());
// for sequence tsfile, we update the endTime only when the file is prepared to be closed.
// for unsequence tsfile, we have to update the endTime for each insertion.
if (!sequence) {
tsFileResource.updateEndTime(
- insertRowPlan.getDeviceId().getFullPath(), insertRowPlan.getTime());
+ insertRowPlan.getPrefixPath().getFullPath(), insertRowPlan.getTime());
}
tsFileResource.updatePlanIndexes(insertRowPlan.getIndex());
}
@@ -265,13 +265,13 @@ public class TsFileProcessor {
results[i] = RpcUtils.SUCCESS_STATUS;
}
tsFileResource.updateStartTime(
- insertTabletPlan.getDeviceId().getFullPath(), insertTabletPlan.getTimes()[start]);
+ insertTabletPlan.getPrefixPath().getFullPath(), insertTabletPlan.getTimes()[start]);
// for sequence tsfile, we update the endTime only when the file is prepared to be closed.
// for unsequence tsfile, we have to update the endTime for each insertion.
if (!sequence) {
tsFileResource.updateEndTime(
- insertTabletPlan.getDeviceId().getFullPath(), insertTabletPlan.getTimes()[end - 1]);
+ insertTabletPlan.getPrefixPath().getFullPath(), insertTabletPlan.getTimes()[end - 1]);
}
tsFileResource.updatePlanIndexes(insertTabletPlan.getIndex());
}
@@ -283,7 +283,7 @@ public class TsFileProcessor {
long memTableIncrement = 0L;
long textDataIncrement = 0L;
long chunkMetadataIncrement = 0L;
- String deviceId = insertRowPlan.getDeviceId().getFullPath();
+ String deviceId = insertRowPlan.getPrefixPath().getFullPath();
int columnIndex = 0;
for (int i = 0; i < insertRowPlan.getMeasurementMNodes().length; i++) {
// skip failed Measurements
@@ -333,14 +333,12 @@ public class TsFileProcessor {
}
long[] memIncrements = new long[3]; // memTable, text, chunk metadata
- String deviceId = insertTabletPlan.getDeviceId().getFullPath();
+ String deviceId = insertTabletPlan.getPrefixPath().getFullPath();
int columnIndex = 0;
for (int i = 0; i < insertTabletPlan.getMeasurementMNodes().length; i++) {
// for aligned timeseries
- if (insertTabletPlan.getMeasurementMNodes()[i] != null
- && insertTabletPlan.getMeasurementMNodes()[i].getSchema().getType()
- == TSDataType.VECTOR) {
+ if (insertTabletPlan.isAligned()) {
VectorMeasurementSchema vectorSchema =
(VectorMeasurementSchema) insertTabletPlan.getMeasurementMNodes()[i].getSchema();
Object[] columns = new Object[vectorSchema.getValueMeasurementIdList().size()];
@@ -348,6 +346,7 @@ public class TsFileProcessor {
columns[j] = insertTabletPlan.getColumns()[columnIndex++];
}
updateVectorMemCost(vectorSchema, deviceId, start, end, memIncrements, columns);
+ break;
}
// for non aligned
else {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MLogTxtWriter.java b/server/src/main/java/org/apache/iotdb/db/metadata/MLogTxtWriter.java
index f1bc0b6..b7affe5 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MLogTxtWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MLogTxtWriter.java
@@ -130,7 +130,7 @@ public class MLogTxtWriter implements AutoCloseable {
String.format(
"%s,%s,%s,%s,%s,%s",
MetadataOperationType.CREATE_TIMESERIES,
- plan.getDevicePath().getFullPath(),
+ plan.getPrefixPath().getFullPath(),
plan.getMeasurements(),
plan.getDataTypes().stream().map(TSDataType::serialize),
plan.getEncodings().stream().map(TSEncoding::serialize),
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index bac2ad5..3e9af1b 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -320,7 +320,6 @@ public class MManager {
public void clear() {
try {
templateMap.clear();
- Template.clear();
this.mtree = new MTree();
this.mNodeCache.clear();
this.tagIndex.clear();
@@ -516,7 +515,7 @@ public class MManager {
}
public void createAlignedTimeSeries(
- PartialPath devicePath,
+ PartialPath prefixPath,
List<String> measurements,
List<TSDataType> dataTypes,
List<TSEncoding> encodings,
@@ -524,7 +523,7 @@ public class MManager {
throws MetadataException {
createAlignedTimeSeries(
new CreateAlignedTimeSeriesPlan(
- devicePath, measurements, dataTypes, encodings, compressor, null));
+ prefixPath, measurements, dataTypes, encodings, compressor, null));
}
/**
@@ -539,21 +538,20 @@ public class MManager {
+ "please increase MAX_HEAP_SIZE in iotdb-env.sh/bat and restart");
}
try {
- PartialPath devicePath = plan.getDevicePath();
+ PartialPath prefixPath = plan.getPrefixPath();
List<String> measurements = plan.getMeasurements();
- int alignedSize = measurements.size();
List<TSDataType> dataTypes = plan.getDataTypes();
List<TSEncoding> encodings = plan.getEncodings();
- for (int i = 0; i < alignedSize; i++) {
+ for (int i = 0; i < measurements.size(); i++) {
SchemaUtils.checkDataTypeWithEncoding(dataTypes.get(i), encodings.get(i));
}
- ensureStorageGroup(devicePath);
+ ensureStorageGroup(prefixPath);
// create time series in MTree
mtree.createAlignedTimeseries(
- devicePath, measurements, plan.getDataTypes(), plan.getEncodings(), plan.getCompressor());
+ prefixPath, measurements, plan.getDataTypes(), plan.getEncodings(), plan.getCompressor());
// update statistics and schemaDataTypeNumMap
totalSeriesNumber.addAndGet(measurements.size());
@@ -1140,7 +1138,7 @@ public class MManager {
* get MeasurementSchema or VectorMeasurementSchema which contains the measurement
*
* @param device device path
- * @param measurement measurement name, could start with "$#$"
+ * @param measurement measurement name, could be vector name
* @return MeasurementSchema or VectorMeasurementSchema
*/
public IMeasurementSchema getSeriesSchema(PartialPath device, String measurement)
@@ -1245,7 +1243,8 @@ public class MManager {
subSensorsPathList.add(path);
nodeToPartialPath.put(
node,
- new VectorPartialPath(path.getDevice() + "." + node.getName(), subSensorsPathList));
+ new VectorPartialPath(
+ path.getDevice() + "." + path.getMeasurement(), subSensorsPathList));
}
nodeToIndex.computeIfAbsent(node, k -> new ArrayList<>()).add(index);
} else {
@@ -2148,7 +2147,13 @@ public class MManager {
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public MNode getSeriesSchemasAndReadLockDevice(InsertPlan plan)
throws MetadataException, IOException {
- PartialPath deviceId = plan.getDeviceId();
+ PartialPath prefixPath = plan.getPrefixPath();
+ PartialPath deviceId = prefixPath;
+ String vectorId = null;
+ if (plan.isAligned()) {
+ deviceId = prefixPath.getDevicePath();
+ vectorId = prefixPath.getMeasurement();
+ }
String[] measurementList = plan.getMeasurements();
MeasurementMNode[] measurementMNodes = plan.getMeasurementMNodes();
@@ -2161,46 +2166,30 @@ public class MManager {
// 2. get schema of each measurement
// if do not have measurement
MeasurementMNode measurementMNode;
- int loc = 0;
-
for (int i = 0; i < measurementList.length; i++) {
try {
String measurement = measurementList[i];
- boolean isVector = false;
- String firstMeasurementOfVector = null;
- if (measurement.contains("(") && measurement.contains(",")) {
- isVector = true;
- firstMeasurementOfVector = measurement.replace("(", "").replace(")", "").split(",")[0];
- }
- MNode child = getMNode(deviceMNode.left, isVector ? firstMeasurementOfVector : measurement);
+ MNode child = getMNode(deviceMNode.left, plan.isAligned() ? vectorId : measurement);
if (child instanceof MeasurementMNode) {
measurementMNode = (MeasurementMNode) child;
} else if (child instanceof StorageGroupMNode) {
throw new PathAlreadyExistException(deviceId + PATH_SEPARATOR + measurement);
- } else if ((measurementMNode = findTemplate(deviceMNode, measurement)) != null) {
+ } else if ((measurementMNode = findTemplate(deviceMNode, measurement, vectorId)) != null) {
// empty
} else {
if (!config.isAutoCreateSchemaEnabled()) {
throw new PathNotExistException(deviceId + PATH_SEPARATOR + measurement);
} else {
if (plan instanceof InsertRowPlan || plan instanceof InsertTabletPlan) {
- List<String> measurements =
- Arrays.asList(measurement.replace("(", "").replace(")", "").split(","));
- if (measurements.size() == 1) {
+ if (!plan.isAligned()) {
internalCreateTimeseries(
- deviceId.concatNode(measurement), plan.getDataTypes()[loc]);
+ prefixPath.concatNode(measurement), plan.getDataTypes()[i]);
measurementMNode = (MeasurementMNode) deviceMNode.left.getChild(measurement);
} else {
- int curLoc = loc;
- List<TSDataType> dataTypes = new ArrayList<>();
- for (int j = 0; j < measurements.size(); j++) {
- dataTypes.add(plan.getDataTypes()[curLoc]);
- curLoc++;
- }
- internalAlignedCreateTimeseries(deviceId, measurements, dataTypes);
- measurementMNode =
- (MeasurementMNode) deviceMNode.left.getChild(measurements.get(0));
+ internalAlignedCreateTimeseries(
+ prefixPath, Arrays.asList(measurementList), Arrays.asList(plan.getDataTypes()));
+ measurementMNode = (MeasurementMNode) deviceMNode.left.getChild(vectorId);
}
} else {
throw new MetadataException(
@@ -2216,39 +2205,34 @@ public class MManager {
TSDataType insertDataType;
DataTypeMismatchException mismatchException = null;
if (plan instanceof InsertRowPlan || plan instanceof InsertTabletPlan) {
- if (measurementList[i].contains("(") && measurementList[i].contains(",")) {
- for (int j = 0; j < measurementList[i].split(",").length; j++) {
- TSDataType dataTypeInNode =
- measurementMNode.getSchema().getValueTSDataTypeList().get(j);
- insertDataType = plan.getDataTypes()[loc];
- if (insertDataType == null) {
- insertDataType = dataTypeInNode;
- }
- if (dataTypeInNode != insertDataType) {
- mismatch = true;
- logger.warn(
- "DataType mismatch, Insert measurement {} in {} type {}, metadata tree type {}",
- measurementMNode.getSchema().getValueMeasurementIdList().get(j),
- measurementList[i],
- insertDataType,
- dataTypeInNode);
- mismatchException =
- new DataTypeMismatchException(
- measurementList[i], insertDataType, dataTypeInNode);
- break;
- }
- loc++;
+ if (plan.isAligned()) {
+ TSDataType dataTypeInNode =
+ measurementMNode.getSchema().getValueTSDataTypeList().get(i);
+ insertDataType = plan.getDataTypes()[i];
+ if (insertDataType == null) {
+ insertDataType = dataTypeInNode;
+ }
+ if (dataTypeInNode != insertDataType) {
+ mismatch = true;
+ logger.warn(
+ "DataType mismatch, Insert measurement {} in {} type {}, metadata tree type {}",
+ measurementMNode.getSchema().getValueMeasurementIdList().get(i),
+ measurementList[i],
+ insertDataType,
+ dataTypeInNode);
+ mismatchException =
+ new DataTypeMismatchException(measurementList[i], insertDataType, dataTypeInNode);
}
} else {
if (plan instanceof InsertRowPlan) {
if (!((InsertRowPlan) plan).isNeedInferType()) {
// only when InsertRowPlan's values is object[], we should check type
- insertDataType = getTypeInLoc(plan, loc);
+ insertDataType = getTypeInLoc(plan, i);
} else {
insertDataType = measurementMNode.getSchema().getType();
}
} else {
- insertDataType = getTypeInLoc(plan, loc);
+ insertDataType = getTypeInLoc(plan, i);
}
mismatch = measurementMNode.getSchema().getType() != insertDataType;
if (mismatch) {
@@ -2261,7 +2245,6 @@ public class MManager {
new DataTypeMismatchException(
measurementList[i], insertDataType, measurementMNode.getSchema().getType());
}
- loc++;
}
}
@@ -2318,15 +2301,14 @@ public class MManager {
return deviceMNode.getChild(measurementName);
}
- private MeasurementMNode findTemplate(Pair<MNode, Template> deviceMNode, String measurement)
+ private MeasurementMNode findTemplate(
+ Pair<MNode, Template> deviceMNode, String measurement, String vectorId)
throws MetadataException {
if (deviceMNode.right != null) {
Map<String, IMeasurementSchema> curTemplateMap = deviceMNode.right.getSchemaMap();
- List<String> measurements =
- Arrays.asList(measurement.replace("(", "").replace(")", "").split(","));
- String firstMeasurement = measurements.get(0);
- IMeasurementSchema schema = curTemplateMap.get(firstMeasurement);
+ String schemaName = vectorId != null ? vectorId : measurement;
+ IMeasurementSchema schema = curTemplateMap.get(schemaName);
if (!deviceMNode.left.isUseTemplate()) {
deviceMNode.left.setUseTemplate(true);
try {
@@ -2338,13 +2320,9 @@ public class MManager {
if (schema != null) {
if (schema instanceof MeasurementSchema) {
- return new MeasurementMNode(deviceMNode.left, firstMeasurement, schema, null);
+ return new MeasurementMNode(deviceMNode.left, measurement, schema, null);
} else if (schema instanceof VectorMeasurementSchema) {
- return new MeasurementMNode(
- deviceMNode.left,
- deviceMNode.right.getMeasurementNodeName(schema.getValueMeasurementIdList().get(0)),
- schema,
- null);
+ return new MeasurementMNode(deviceMNode.left, vectorId, schema, null);
}
}
return null;
@@ -2365,14 +2343,14 @@ public class MManager {
/** create aligned timeseries ignoring PathAlreadyExistException */
private void internalAlignedCreateTimeseries(
- PartialPath devicePath, List<String> measurements, List<TSDataType> dataTypes)
+ PartialPath prefixPath, List<String> measurements, List<TSDataType> dataTypes)
throws MetadataException {
List<TSEncoding> encodings = new ArrayList<>();
for (TSDataType dataType : dataTypes) {
encodings.add(getDefaultEncoding(dataType));
}
createAlignedTimeSeries(
- devicePath,
+ prefixPath,
measurements,
dataTypes,
encodings,
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
index 711302c..efb6d2e 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
@@ -312,13 +312,11 @@ public class MTree implements Serializable {
}
MNode cur = root;
boolean hasSetStorageGroup = false;
- StorageGroupMNode storageGroupMNode = null;
// e.g, devicePath = root.sg.d1, create internal nodes and set cur to d1 node
- for (int i = 1; i < deviceNodeNames.length; i++) {
+ for (int i = 1; i < deviceNodeNames.length - 1; i++) {
String nodeName = deviceNodeNames[i];
if (cur instanceof StorageGroupMNode) {
hasSetStorageGroup = true;
- storageGroupMNode = (StorageGroupMNode) cur;
}
if (!cur.hasChild(nodeName)) {
if (!hasSetStorageGroup) {
@@ -328,12 +326,7 @@ public class MTree implements Serializable {
}
cur = cur.getChild(nodeName);
}
- int alignedTimeseriesIndex = 0;
- if (storageGroupMNode != null) {
- alignedTimeseriesIndex = storageGroupMNode.getAlignedTimeseriesIndex();
- storageGroupMNode.addAlignedTimeseriesIndex();
- }
- String leafName = IoTDBConstant.ALIGN_TIMESERIES_PREFIX + alignedTimeseriesIndex;
+ String leafName = deviceNodeNames[deviceNodeNames.length - 1];
// synchronize check and add, we need addChild and add Alias become atomic operation
// only write on mtree will be synchronized
@@ -358,11 +351,12 @@ public class MTree implements Serializable {
compressor),
null);
cur.addChild(leafName, measurementMNode);
+
for (String measurement : measurements) {
if (child != null) {
- cur.replaceChild(measurementMNode.getName(), measurementMNode);
+ measurementMNode.replaceChild(measurement, new MNode(measurementMNode, measurement));
} else {
- cur.addChild(measurement, measurementMNode);
+ measurementMNode.addChild(measurement, new MNode(measurementMNode, measurement));
}
}
}
@@ -469,7 +463,10 @@ public class MTree implements Serializable {
for (int i = 1; i < nodeNames.length; i++) {
String childName = nodeNames[i];
cur = cur.getChild(childName);
- if (cur == null) {
+ if (cur instanceof MeasurementMNode
+ && ((MeasurementMNode) cur).getSchema() instanceof VectorMeasurementSchema) {
+ return i == nodeNames.length - 1 || cur.getChildren().containsKey(nodeNames[i + 1]);
+ } else if (cur == null) {
return false;
}
}
@@ -725,17 +722,16 @@ public class MTree implements Serializable {
upperTemplate = cur.getDeviceTemplate();
}
MNode next = cur.getChild(nodes[i]);
+ if (cur instanceof MeasurementMNode
+ && ((MeasurementMNode) cur).getSchema() instanceof VectorMeasurementSchema) {
+ return cur;
+ }
if (next == null) {
if (upperTemplate == null) {
throw new PathNotExistException(path.getFullPath(), true);
}
String realName = nodes[i];
- if (path instanceof VectorPartialPath) {
- VectorPartialPath vectorPartialPath = (VectorPartialPath) path;
- realName = vectorPartialPath.getSubSensorsPathList().get(0).getMeasurement();
- }
-
IMeasurementSchema schema = upperTemplate.getSchemaMap().get(realName);
if (schema == null) {
throw new PathNotExistException(path.getFullPath(), true);
@@ -1230,7 +1226,9 @@ public class MTree implements Serializable {
QueryContext queryContext,
Template upperTemplate)
throws MetadataException {
- if (node instanceof MeasurementMNode && nodes.length <= idx) {
+ if (node instanceof MeasurementMNode
+ && (nodes.length <= idx
+ || ((MeasurementMNode) node).getSchema() instanceof VectorMeasurementSchema)) {
if (hasLimit) {
curOffset.set(curOffset.get() + 1);
if (curOffset.get() < offset.get() || count.get().intValue() == limit.get().intValue()) {
@@ -1242,14 +1240,13 @@ public class MTree implements Serializable {
addMeasurementSchema(
node, timeseriesSchemaList, needLast, queryContext, measurementSchema, "*");
} else if (measurementSchema instanceof VectorMeasurementSchema) {
- String lastWord = nodes[nodes.length - 1];
addVectorMeasurementSchema(
node,
timeseriesSchemaList,
needLast,
queryContext,
measurementSchema,
- nodes.length == idx ? lastWord : "*");
+ idx < nodes.length ? nodes[idx] : "*");
}
if (hasLimit) {
count.set(count.get() + 1);
@@ -1263,12 +1260,7 @@ public class MTree implements Serializable {
// we should use template when all child is measurement or this node has no child
if (!nodeReg.contains(PATH_WILDCARD)) {
- MNode next = null;
- if (nodeReg.contains("(") && nodeReg.contains(",")) {
- next = node.getChildOfAlignedTimeseries(nodeReg);
- } else {
- next = node.getChild(nodeReg);
- }
+ MNode next = node.getChild(nodeReg);
if (next != null) {
findPath(
next,
@@ -1281,26 +1273,10 @@ public class MTree implements Serializable {
upperTemplate);
}
} else {
- for (MNode child : node.getDistinctMNodes()) {
- boolean continueSearch = false;
- if (child instanceof MeasurementMNode
- && ((MeasurementMNode) child).getSchema() instanceof VectorMeasurementSchema) {
- List<String> measurementsList =
- ((MeasurementMNode) child).getSchema().getValueMeasurementIdList();
- for (String measurement : measurementsList) {
- if (Pattern.matches(nodeReg.replace("*", ".*"), measurement)) {
- continueSearch = true;
- }
- }
- } else {
- if (Pattern.matches(nodeReg.replace("*", ".*"), child.getName())) {
- continueSearch = true;
- }
- }
- if (!continueSearch) {
+ for (MNode child : node.getChildren().values()) {
+ if (!Pattern.matches(nodeReg.replace("*", ".*"), child.getName())) {
continue;
}
-
findPath(
child,
nodes,
@@ -1311,7 +1287,7 @@ public class MTree implements Serializable {
queryContext,
upperTemplate);
if (hasLimit && count.get().intValue() == limit.get().intValue()) {
- break;
+ return;
}
}
}
@@ -1331,14 +1307,19 @@ public class MTree implements Serializable {
schema,
nodeReg);
} else if (schema instanceof VectorMeasurementSchema) {
- String firstNode = schema.getValueMeasurementIdList().get(0);
- addVectorMeasurementSchema(
- new MeasurementMNode(node, firstNode, schema, null),
- timeseriesSchemaList,
- needLast,
- queryContext,
- schema,
- nodeReg);
+ VectorMeasurementSchema vectorMeasurementSchema = (VectorMeasurementSchema) schema;
+ if (Pattern.matches(
+ nodeReg.replace("*", ".*"), vectorMeasurementSchema.getMeasurementId())) {
+ String firstNode = schema.getValueMeasurementIdList().get(0);
+ addVectorMeasurementSchemaForTemplate(
+ new MeasurementMNode(node, firstNode, schema, null),
+ timeseriesSchemaList,
+ needLast,
+ queryContext,
+ schema,
+ MetaUtils.getNodeRegByIdx(idx + 1, nodes),
+ vectorMeasurementSchema.getMeasurementId());
+ }
}
}
}
@@ -1379,20 +1360,42 @@ public class MTree implements Serializable {
String reg)
throws StorageGroupNotSetException, IllegalPathException {
List<String> measurements = schema.getValueMeasurementIdList();
- int measurementSize = measurements.size();
- Set<String> measurementsInReg = new HashSet<>();
- if (reg.contains("(") && reg.contains(",")) {
- measurementsInReg.addAll(MetaUtils.getMeasurementsInPartialPath(reg));
- }
- for (int i = 0; i < measurementSize; i++) {
- if (measurementsInReg.size() != 0 && !measurementsInReg.contains(measurements.get(i))) {
+ for (int i = 0; i < measurements.size(); i++) {
+ if (!Pattern.matches(reg.replace("*", ".*"), measurements.get(i))) {
continue;
}
- if (measurementsInReg.size() == 0
- && !Pattern.matches(reg.replace("*", ".*"), measurements.get(i))) {
+ PartialPath devicePath = node.getPartialPath();
+ String[] tsRow = new String[7];
+ tsRow[0] = null;
+ tsRow[1] = getStorageGroupPath(devicePath).getFullPath();
+ tsRow[2] = schema.getValueTSDataTypeList().get(i).toString();
+ tsRow[3] = schema.getValueTSEncodingList().get(i).toString();
+ tsRow[4] = schema.getCompressor().toString();
+ tsRow[5] = "-1";
+ tsRow[6] =
+ needLast ? String.valueOf(getLastTimeStamp((MeasurementMNode) node, queryContext)) : null;
+ Pair<PartialPath, String[]> temp =
+ new Pair<>(new PartialPath(devicePath.getFullPath(), measurements.get(i)), tsRow);
+ timeseriesSchemaList.add(temp);
+ }
+ }
+
+ private void addVectorMeasurementSchemaForTemplate(
+ MNode node,
+ List<Pair<PartialPath, String[]>> timeseriesSchemaList,
+ boolean needLast,
+ QueryContext queryContext,
+ IMeasurementSchema schema,
+ String reg,
+ String vectorId)
+ throws StorageGroupNotSetException, IllegalPathException {
+ List<String> measurements = schema.getValueMeasurementIdList();
+ for (int i = 0; i < measurements.size(); i++) {
+ if (!Pattern.matches(reg.replace("*", ".*"), measurements.get(i))) {
continue;
}
- PartialPath devicePath = node.getPartialPath().getDevicePath();
+ PartialPath devicePath =
+ new PartialPath(node.getPartialPath().getDevicePath().getFullPath(), vectorId);
String[] tsRow = new String[7];
tsRow[0] = null;
tsRow[1] = getStorageGroupPath(devicePath).getFullPath();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MetaUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/MetaUtils.java
index 3d29697..35c80bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MetaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MetaUtils.java
@@ -114,20 +114,7 @@ public class MetaUtils {
* @return measurement names. For example: [s1, s2, s3]
*/
public static List<String> getMeasurementsInPartialPath(PartialPath fullPath) {
- if (fullPath.getMeasurement().contains("(") && fullPath.getMeasurement().contains(",")) {
- return getMeasurementsInPartialPath(fullPath.getMeasurement());
- } else {
- return Arrays.asList(fullPath.getMeasurement());
- }
- }
-
- public static List<String> getMeasurementsInPartialPath(String measurementString) {
- String[] measurements = measurementString.replace("(", "").replace(")", "").split(",");
- List<String> measurementList = new ArrayList<>();
- for (String measurement : measurements) {
- measurementList.add(measurement.trim());
- }
- return measurementList;
+ return Arrays.asList(fullPath.getMeasurement());
}
@TestOnly
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
index afde6dc..851ac89 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
@@ -199,8 +199,7 @@ public class MLogWriter implements AutoCloseable {
childSize = node.getChildren().size();
}
StorageGroupMNodePlan plan =
- new StorageGroupMNodePlan(
- node.getName(), node.getDataTTL(), childSize, node.getAlignedTimeseriesIndex());
+ new StorageGroupMNodePlan(node.getName(), node.getDataTTL(), childSize);
putLog(plan);
}
@@ -438,10 +437,7 @@ public class MLogWriter implements AutoCloseable {
CompressionType.values()[Integer.parseInt(words[5])]));
case "1":
return new StorageGroupMNodePlan(
- words[1],
- Long.parseLong(words[2]),
- Integer.parseInt(words[3]),
- words.length == 5 ? Integer.parseInt(words[4]) : 0);
+ words[1], Long.parseLong(words[2]), Integer.parseInt(words[3]));
case "0":
return new MNodePlan(words[1], Integer.parseInt(words[2]));
default:
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java
index 5da7dd4..dcd892f 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java
@@ -108,7 +108,6 @@ public class MNode implements Serializable {
}
}
}
-
child.parent = this;
children.putIfAbsent(name, child);
}
@@ -180,7 +179,7 @@ public class MNode implements Serializable {
public MNode getChildOfAlignedTimeseries(String name) throws MetadataException {
MNode node = null;
// for aligned timeseries
- List<String> measurementList = MetaUtils.getMeasurementsInPartialPath(name);
+ List<String> measurementList = MetaUtils.getMeasurementsInPartialPath(new PartialPath(name));
for (String measurement : measurementList) {
MNode nodeOfMeasurement = getChild(measurement);
if (node == null) {
@@ -279,19 +278,6 @@ public class MNode implements Serializable {
return children;
}
- public List<MNode> getDistinctMNodes() {
- if (children == null) {
- return Collections.emptyList();
- }
- List<MNode> distinctList = new ArrayList<>();
- for (MNode child : children.values()) {
- if (!distinctList.contains(child)) {
- distinctList.add(child);
- }
- }
- return distinctList;
- }
-
public Map<String, MNode> getAliasChildren() {
if (aliasChildren == null) {
return Collections.emptyMap();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/StorageGroupMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/StorageGroupMNode.java
index cfb2a71..8453fcf 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/StorageGroupMNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/StorageGroupMNode.java
@@ -33,18 +33,9 @@ public class StorageGroupMNode extends MNode {
*/
private long dataTTL;
- private int alignedTimeseriesIndex;
-
public StorageGroupMNode(MNode parent, String name, long dataTTL) {
super(parent, name);
this.dataTTL = dataTTL;
- this.alignedTimeseriesIndex = 0;
- }
-
- public StorageGroupMNode(MNode parent, String name, long dataTTL, int alignedTimeseriesIndex) {
- super(parent, name);
- this.dataTTL = dataTTL;
- this.alignedTimeseriesIndex = alignedTimeseriesIndex;
}
public long getDataTTL() {
@@ -55,14 +46,6 @@ public class StorageGroupMNode extends MNode {
this.dataTTL = dataTTL;
}
- public int getAlignedTimeseriesIndex() {
- return alignedTimeseriesIndex;
- }
-
- public void addAlignedTimeseriesIndex() {
- this.alignedTimeseriesIndex++;
- }
-
@Override
public void serializeTo(MLogWriter logWriter) throws IOException {
serializeChildren(logWriter);
@@ -71,15 +54,10 @@ public class StorageGroupMNode extends MNode {
}
public static StorageGroupMNode deserializeFrom(StorageGroupMNodePlan plan) {
- return new StorageGroupMNode(
- null, plan.getName(), plan.getDataTTL(), plan.getAlignedTimeseriesIndex());
+ return new StorageGroupMNode(null, plan.getName(), plan.getDataTTL());
}
public static StorageGroupMNode deserializeFrom(String[] nodeInfo) {
- return new StorageGroupMNode(
- null,
- nodeInfo[1],
- Long.parseLong(nodeInfo[2]),
- nodeInfo.length == 4 ? Integer.parseInt(nodeInfo[3]) : 0);
+ return new StorageGroupMNode(null, nodeInfo[1], Long.parseLong(nodeInfo[2]));
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java b/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java
index 84bf240..6077e66 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java
@@ -18,11 +18,9 @@
*/
package org.apache.iotdb.db.metadata.template;
-import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.qp.physical.crud.CreateTemplatePlan;
-import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -37,14 +35,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
public class Template {
- private static final AtomicLong increasingId = new AtomicLong();
+ private String name;
- String name;
-
- Map<String, IMeasurementSchema> schemaMap = new HashMap<>();
+ private Map<String, IMeasurementSchema> schemaMap = new HashMap<>();
public Template(CreateTemplatePlan plan) {
name = plan.getName();
@@ -67,7 +62,7 @@ public class Template {
curSchema =
new VectorMeasurementSchema(
- IoTDBConstant.ALIGN_TIMESERIES_PREFIX + "#" + increasingId.getAndIncrement(),
+ plan.getSchemaNames().get(i),
measurementsArray,
typeArray,
encodingArray,
@@ -83,14 +78,13 @@ public class Template {
plan.getCompressors().get(i));
}
- for (String path : plan.getMeasurements().get(i)) {
- if (schemaMap.containsKey(path)) {
- throw new IllegalArgumentException(
- "Duplicate measurement name in create template plan. Name is :" + path);
- }
-
- schemaMap.put(path, curSchema);
+ String path = plan.getSchemaNames().get(i);
+ if (schemaMap.containsKey(path)) {
+ throw new IllegalArgumentException(
+ "Duplicate measurement name in create template plan. Name is :" + path);
}
+
+ schemaMap.put(path, curSchema);
}
}
@@ -111,12 +105,8 @@ public class Template {
}
public boolean isCompatible(PartialPath path) {
- return !schemaMap.containsKey(path.getMeasurement());
- }
-
- @TestOnly
- public static void clear() {
- increasingId.set(0);
+ return !(schemaMap.containsKey(path.getMeasurement())
+ || schemaMap.containsKey(path.getDevicePath().getMeasurement()));
}
public List<MeasurementMNode> getMeasurementMNode() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java b/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
index 1e4b213..0c72b67 100644
--- a/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
@@ -102,7 +102,7 @@ public class PublishHandler extends AbstractInterceptHandler {
boolean status = false;
try {
- plan.setDeviceId(new PartialPath(event.getDevice()));
+ plan.setPrefixPath(new PartialPath(event.getDevice()));
status = executeNonQuery(plan);
} catch (Exception e) {
LOG.warn(
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 7d1ced7..6588e6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -47,7 +47,6 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.MetaUtils;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
@@ -137,7 +136,6 @@ import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.Path;
@@ -1468,46 +1466,21 @@ public class PlanExecutor implements IPlanExecutor {
}
PartialPath path = multiPlan.getPaths().get(i);
String measurement = path.getMeasurement();
- if (measurement.contains("(") && measurement.contains(",")) {
- PartialPath devicePath = path.getDevicePath();
- List<String> measurements = MetaUtils.getMeasurementsInPartialPath(path);
- List<TSDataType> dataTypes = new ArrayList<>();
- List<TSEncoding> encodings = new ArrayList<>();
- for (int j = 0; j < measurements.size(); j++) {
- dataTypes.add(multiPlan.getDataTypes().get(dataTypeIdx));
- encodings.add(multiPlan.getEncodings().get(dataTypeIdx));
- dataTypeIdx++;
- }
- CreateAlignedTimeSeriesPlan plan =
- new CreateAlignedTimeSeriesPlan(
- devicePath,
- measurements,
- dataTypes,
- encodings,
- multiPlan.getCompressors().get(i),
- null);
- try {
- createAlignedTimeSeries(plan);
- } catch (QueryProcessException e) {
- multiPlan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
- }
- } else {
- CreateTimeSeriesPlan plan =
- new CreateTimeSeriesPlan(
- multiPlan.getPaths().get(i),
- multiPlan.getDataTypes().get(i),
- multiPlan.getEncodings().get(i),
- multiPlan.getCompressors().get(i),
- multiPlan.getProps() == null ? null : multiPlan.getProps().get(i),
- multiPlan.getTags() == null ? null : multiPlan.getTags().get(i),
- multiPlan.getAttributes() == null ? null : multiPlan.getAttributes().get(i),
- multiPlan.getAlias() == null ? null : multiPlan.getAlias().get(i));
- dataTypeIdx++;
- try {
- createTimeSeries(plan);
- } catch (QueryProcessException e) {
- multiPlan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
- }
+ CreateTimeSeriesPlan plan =
+ new CreateTimeSeriesPlan(
+ multiPlan.getPaths().get(i),
+ multiPlan.getDataTypes().get(i),
+ multiPlan.getEncodings().get(i),
+ multiPlan.getCompressors().get(i),
+ multiPlan.getProps() == null ? null : multiPlan.getProps().get(i),
+ multiPlan.getTags() == null ? null : multiPlan.getTags().get(i),
+ multiPlan.getAttributes() == null ? null : multiPlan.getAttributes().get(i),
+ multiPlan.getAlias() == null ? null : multiPlan.getAlias().get(i));
+ dataTypeIdx++;
+ try {
+ createTimeSeries(plan);
+ } catch (QueryProcessException e) {
+ multiPlan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
}
}
if (!multiPlan.getResults().isEmpty()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/CreateTemplatePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/CreateTemplatePlan.java
index 6dba97a..82c45b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/CreateTemplatePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/CreateTemplatePlan.java
@@ -36,11 +36,20 @@ import java.util.List;
public class CreateTemplatePlan extends PhysicalPlan {
String name;
+ List<String> schemaNames;
List<List<String>> measurements;
List<List<TSDataType>> dataTypes;
List<List<TSEncoding>> encodings;
List<CompressionType> compressors;
+ public List<String> getSchemaNames() {
+ return schemaNames;
+ }
+
+ public void setSchemaNames(List<String> schemaNames) {
+ this.schemaNames = schemaNames;
+ }
+
public String getName() {
return name;
}
@@ -87,12 +96,14 @@ public class CreateTemplatePlan extends PhysicalPlan {
public CreateTemplatePlan(
String name,
+ List<String> schemaNames,
List<List<String>> measurements,
List<List<TSDataType>> dataTypes,
List<List<TSEncoding>> encodings,
List<CompressionType> compressors) {
super(false, OperatorType.CREATE_TEMPLATE);
this.name = name;
+ this.schemaNames = schemaNames;
this.measurements = measurements;
this.dataTypes = dataTypes;
this.encodings = encodings;
@@ -105,6 +116,12 @@ public class CreateTemplatePlan extends PhysicalPlan {
ReadWriteIOUtils.write(name, buffer);
+ // schema names
+ ReadWriteIOUtils.write(schemaNames.size(), buffer);
+ for (String schemaName : schemaNames) {
+ ReadWriteIOUtils.write(schemaName, buffer);
+ }
+
// measurements
ReadWriteIOUtils.write(measurements.size(), buffer);
for (List<String> measurementList : measurements) {
@@ -145,8 +162,15 @@ public class CreateTemplatePlan extends PhysicalPlan {
public void deserialize(ByteBuffer buffer) {
name = ReadWriteIOUtils.readString(buffer);
- // measurements
+ // schema names
int size = ReadWriteIOUtils.readInt(buffer);
+ schemaNames = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ schemaNames.add(ReadWriteIOUtils.readString(buffer));
+ }
+
+ // measurements
+ size = ReadWriteIOUtils.readInt(buffer);
measurements = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
int listSize = ReadWriteIOUtils.readInt(buffer);
@@ -197,6 +221,12 @@ public class CreateTemplatePlan extends PhysicalPlan {
ReadWriteIOUtils.write(name, stream);
+ // schema names
+ ReadWriteIOUtils.write(schemaNames.size(), stream);
+ for (String schemaName : schemaNames) {
+ ReadWriteIOUtils.write(schemaName, stream);
+ }
+
// measurements
ReadWriteIOUtils.write(measurements.size(), stream);
for (List<String> measurementList : measurements) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
index 9cc807f..59d5ae2 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
@@ -179,7 +179,7 @@ public class InsertMultiTabletPlan extends InsertPlan implements BatchPlan {
}
public PartialPath getFirstDeviceId() {
- return insertTabletPlanList.get(0).getDeviceId();
+ return insertTabletPlanList.get(0).getPrefixPath();
}
public InsertTabletPlan getInsertTabletPlan(int index) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index 68dcac3..c05d19a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -32,7 +32,8 @@ import java.util.List;
public abstract class InsertPlan extends PhysicalPlan {
- protected PartialPath deviceId;
+ protected PartialPath prefixPath;
+ protected boolean isAligned;
protected String[] measurements;
// get from client
protected TSDataType[] dataTypes;
@@ -49,12 +50,12 @@ public abstract class InsertPlan extends PhysicalPlan {
super.canBeSplit = false;
}
- public PartialPath getDeviceId() {
- return deviceId;
+ public PartialPath getPrefixPath() {
+ return prefixPath;
}
- public void setDeviceId(PartialPath deviceId) {
- this.deviceId = deviceId;
+ public void setPrefixPath(PartialPath prefixPath) {
+ this.prefixPath = prefixPath;
}
public String[] getMeasurements() {
@@ -93,6 +94,14 @@ public abstract class InsertPlan extends PhysicalPlan {
return failedMeasurements == null ? 0 : failedMeasurements.size();
}
+ public boolean isAligned() {
+ return isAligned;
+ }
+
+ public void setAligned(boolean aligned) {
+ isAligned = aligned;
+ }
+
public abstract long getMinTime();
/** @param index failed measurement index */
@@ -159,7 +168,7 @@ public abstract class InsertPlan extends PhysicalPlan {
@Override
public void checkIntegrity() throws QueryProcessException {
- if (deviceId == null) {
+ if (prefixPath == null) {
throw new QueryProcessException("DeviceId is null");
}
if (measurements == null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index f20b385..8134fc9 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -70,7 +70,7 @@ public class InsertRowPlan extends InsertPlan {
public InsertRowPlan(InsertRowPlan another) {
super(OperatorType.INSERT);
- this.deviceId = another.deviceId;
+ this.prefixPath = another.prefixPath;
this.time = another.time;
this.measurements = new String[another.measurements.length];
System.arraycopy(another.measurements, 0, this.measurements, 0, another.measurements.length);
@@ -81,10 +81,10 @@ public class InsertRowPlan extends InsertPlan {
}
public InsertRowPlan(
- PartialPath deviceId, long insertTime, String[] measurementList, String[] insertValues) {
+ PartialPath prefixPath, long insertTime, String[] measurementList, String[] insertValues) {
super(Operator.OperatorType.INSERT);
this.time = insertTime;
- this.deviceId = deviceId;
+ this.prefixPath = prefixPath;
this.measurements = measurementList;
this.dataTypes = new TSDataType[insertValues.length];
// We need to create an Object[] for the data type casting, because we can not set Float, Long
@@ -94,12 +94,13 @@ public class InsertRowPlan extends InsertPlan {
isNeedInferType = true;
}
+ /** should be deprecated after insertRecords() and insertRowsOfOneDevice() support vector */
public InsertRowPlan(
- PartialPath deviceId, long insertTime, String[] measurementList, ByteBuffer values)
+ PartialPath prefixPath, long insertTime, String[] measurementList, ByteBuffer values)
throws QueryProcessException {
super(Operator.OperatorType.INSERT);
this.time = insertTime;
- this.deviceId = deviceId;
+ this.prefixPath = prefixPath;
this.measurements = measurementList;
this.dataTypes = new TSDataType[measurementList.length];
this.values = new Object[measurementList.length];
@@ -107,16 +108,57 @@ public class InsertRowPlan extends InsertPlan {
isNeedInferType = false;
}
+ public InsertRowPlan(
+ PartialPath prefixPath,
+ long insertTime,
+ String[] measurementList,
+ ByteBuffer values,
+ boolean isAligned)
+ throws QueryProcessException {
+ super(Operator.OperatorType.INSERT);
+ this.time = insertTime;
+ this.prefixPath = prefixPath;
+ this.measurements = measurementList;
+ this.dataTypes = new TSDataType[measurementList.length];
+ this.values = new Object[measurementList.length];
+ this.fillValues(values);
+ isNeedInferType = false;
+ this.isAligned = isAligned;
+ }
+
@TestOnly
public InsertRowPlan(
- PartialPath deviceId,
+ PartialPath prefixPath,
long insertTime,
String[] measurements,
TSDataType[] dataTypes,
String[] insertValues) {
super(OperatorType.INSERT);
this.time = insertTime;
- this.deviceId = deviceId;
+ this.prefixPath = prefixPath;
+ this.measurements = measurements;
+ this.dataTypes = dataTypes;
+ this.values = new Object[dataTypes.length];
+ for (int i = 0; i < dataTypes.length; i++) {
+ try {
+ values[i] = CommonUtils.parseValueForTest(dataTypes[i], insertValues[i]);
+ } catch (QueryProcessException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @TestOnly
+ public InsertRowPlan(
+ PartialPath prefixPath,
+ long insertTime,
+ String[] measurements,
+ TSDataType[] dataTypes,
+ String[] insertValues,
+ boolean isAligned) {
+ super(OperatorType.INSERT);
+ this.time = insertTime;
+ this.prefixPath = prefixPath;
this.measurements = measurements;
this.dataTypes = dataTypes;
this.values = new Object[dataTypes.length];
@@ -127,32 +169,33 @@ public class InsertRowPlan extends InsertPlan {
e.printStackTrace();
}
}
+ this.isAligned = isAligned;
}
@TestOnly
public InsertRowPlan(
- PartialPath deviceId,
+ PartialPath prefixPath,
long insertTime,
String measurement,
TSDataType type,
String insertValue) {
super(OperatorType.INSERT);
this.time = insertTime;
- this.deviceId = deviceId;
+ this.prefixPath = prefixPath;
this.measurements = new String[] {measurement};
this.dataTypes = new TSDataType[] {type};
this.values = new Object[1];
try {
values[0] = CommonUtils.parseValueForTest(dataTypes[0], insertValue);
} catch (QueryProcessException e) {
- e.printStackTrace();
+ logger.error(e.getMessage());
}
}
@TestOnly
public InsertRowPlan(TSRecord tsRecord) throws IllegalPathException {
super(OperatorType.INSERT);
- this.deviceId = new PartialPath(tsRecord.deviceId);
+ this.prefixPath = new PartialPath(tsRecord.deviceId);
this.time = tsRecord.time;
this.measurements = new String[tsRecord.dataPointList.size()];
this.measurementMNodes = new MeasurementMNode[tsRecord.dataPointList.size()];
@@ -203,11 +246,13 @@ public class InsertRowPlan extends InsertPlan {
i,
new QueryProcessException(
new PathNotExistException(
- deviceId.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i])));
+ prefixPath.getFullPath()
+ + IoTDBConstant.PATH_SEPARATOR
+ + measurements[i])));
} else {
throw new QueryProcessException(
new PathNotExistException(
- deviceId.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i]));
+ prefixPath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i]));
}
columnIndex++;
continue;
@@ -220,7 +265,7 @@ public class InsertRowPlan extends InsertPlan {
} catch (Exception e) {
logger.warn(
"{}.{} data type is not consistent, input {}, registered {}",
- deviceId,
+ prefixPath,
measurements[i],
values[i],
dataTypes[i]);
@@ -243,7 +288,7 @@ public class InsertRowPlan extends InsertPlan {
} catch (Exception e) {
logger.warn(
"{}.{} data type is not consistent, input {}, registered {}",
- deviceId,
+ prefixPath,
measurements[i],
values[columnIndex],
dataTypes[columnIndex]);
@@ -284,7 +329,7 @@ public class InsertRowPlan extends InsertPlan {
public List<PartialPath> getPaths() {
List<PartialPath> ret = new ArrayList<>();
for (String m : measurements) {
- PartialPath fullPath = deviceId.concatNode(m);
+ PartialPath fullPath = prefixPath.concatNode(m);
ret.add(fullPath);
}
return ret;
@@ -308,14 +353,14 @@ public class InsertRowPlan extends InsertPlan {
}
InsertRowPlan that = (InsertRowPlan) o;
return time == that.time
- && Objects.equals(deviceId, that.deviceId)
+ && Objects.equals(prefixPath, that.prefixPath)
&& Arrays.equals(measurements, that.measurements)
&& Arrays.equals(values, that.values);
}
@Override
public int hashCode() {
- return Objects.hash(deviceId, time);
+ return Objects.hash(prefixPath, time);
}
@Override
@@ -327,7 +372,7 @@ public class InsertRowPlan extends InsertPlan {
public void subSerialize(DataOutputStream stream) throws IOException {
stream.writeLong(time);
- putString(stream, deviceId.getFullPath());
+ putString(stream, prefixPath.getFullPath());
serializeMeasurementsAndValues(stream);
}
@@ -477,7 +522,7 @@ public class InsertRowPlan extends InsertPlan {
public void subSerialize(ByteBuffer buffer) {
buffer.putLong(time);
- putString(buffer, deviceId.getFullPath());
+ putString(buffer, prefixPath.getFullPath());
serializeMeasurementsAndValues(buffer);
}
@@ -505,7 +550,7 @@ public class InsertRowPlan extends InsertPlan {
@Override
public void deserialize(ByteBuffer buffer) throws IllegalPathException {
this.time = buffer.getLong();
- this.deviceId = new PartialPath(readString(buffer));
+ this.prefixPath = new PartialPath(readString(buffer));
deserializeMeasurementsAndValues(buffer);
}
@@ -532,8 +577,8 @@ public class InsertRowPlan extends InsertPlan {
@Override
public String toString() {
- return "deviceId: "
- + deviceId
+ return "prefixPath: "
+ + prefixPath
+ ", time: "
+ time
+ ", measurements: "
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
index 15f667b..0791f39 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
@@ -44,7 +44,7 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
ByteBuffer[] insertValues)
throws QueryProcessException {
super(OperatorType.BATCH_INSERT_ONE_DEVICE);
- this.deviceId = deviceId;
+ this.prefixPath = deviceId;
rowPlans = new InsertRowPlan[insertTimes.length];
for (int i = 0; i < insertTimes.length; i++) {
rowPlans[i] =
@@ -96,7 +96,7 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
public void serialize(DataOutputStream stream) throws IOException {
int type = PhysicalPlanType.BATCH_INSERT_ONE_DEVICE.ordinal();
stream.writeByte((byte) type);
- putString(stream, deviceId.getFullPath());
+ putString(stream, prefixPath.getFullPath());
stream.writeInt(rowPlans.length);
for (InsertRowPlan plan : rowPlans) {
@@ -110,7 +110,7 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
int type = PhysicalPlanType.BATCH_INSERT_ONE_DEVICE.ordinal();
buffer.put((byte) type);
- putString(buffer, deviceId.getFullPath());
+ putString(buffer, prefixPath.getFullPath());
buffer.putInt(rowPlans.length);
for (InsertRowPlan plan : rowPlans) {
buffer.putLong(plan.getTime());
@@ -120,11 +120,11 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
@Override
public void deserialize(ByteBuffer buffer) throws IllegalPathException {
- this.deviceId = new PartialPath(readString(buffer));
+ this.prefixPath = new PartialPath(readString(buffer));
this.rowPlans = new InsertRowPlan[buffer.getInt()];
for (int i = 0; i < rowPlans.length; i++) {
rowPlans[i] = new InsertRowPlan();
- rowPlans[i].setDeviceId(deviceId);
+ rowPlans[i].setPrefixPath(prefixPath);
rowPlans[i].setTime(buffer.getLong());
rowPlans[i].deserializeMeasurementsAndValues(buffer);
}
@@ -141,7 +141,7 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
@Override
public String toString() {
- return "deviceId: " + deviceId + ", times: " + rowPlans.length;
+ return "deviceId: " + prefixPath + ", times: " + rowPlans.length;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java
index cc4685a..03d3c49 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java
@@ -219,7 +219,7 @@ public class InsertRowsPlan extends InsertPlan implements BatchPlan {
}
public PartialPath getFirstDeviceId() {
- return insertRowPlanList.get(0).getDeviceId();
+ return insertRowPlanList.get(0).getPrefixPath();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index 267c28c..425bc0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -80,28 +80,38 @@ public class InsertTabletPlan extends InsertPlan {
super(OperatorType.BATCHINSERT);
}
- public InsertTabletPlan(PartialPath deviceId, List<String> measurements) {
+ public InsertTabletPlan(PartialPath prefixPath, List<String> measurements) {
super(OperatorType.BATCHINSERT);
- this.deviceId = deviceId;
+ this.prefixPath = prefixPath;
this.measurements = measurements.toArray(new String[0]);
this.canBeSplit = true;
}
- public InsertTabletPlan(PartialPath deviceId, String[] measurements) {
+ public InsertTabletPlan(PartialPath prefixPath, String[] measurements) {
super(OperatorType.BATCHINSERT);
- this.deviceId = deviceId;
+ this.prefixPath = prefixPath;
this.measurements = measurements;
this.canBeSplit = true;
}
- public InsertTabletPlan(PartialPath deviceId, String[] measurements, List<Integer> dataTypes) {
+ public InsertTabletPlan(PartialPath prefixPath, String[] measurements, List<Integer> dataTypes) {
super(OperatorType.BATCHINSERT);
- this.deviceId = deviceId;
+ this.prefixPath = prefixPath;
this.measurements = measurements;
setDataTypes(dataTypes);
this.canBeSplit = true;
}
+ public InsertTabletPlan(
+ PartialPath prefixPath, String[] measurements, List<Integer> dataTypes, boolean isAligned) {
+ super(OperatorType.BATCHINSERT);
+ this.prefixPath = prefixPath;
+ this.measurements = measurements;
+ setDataTypes(dataTypes);
+ this.canBeSplit = true;
+ this.isAligned = isAligned;
+ }
+
public int getStart() {
return start;
}
@@ -135,7 +145,7 @@ public class InsertTabletPlan extends InsertPlan {
}
List<PartialPath> ret = new ArrayList<>();
for (String m : measurements) {
- PartialPath fullPath = deviceId.concatNode(m);
+ PartialPath fullPath = prefixPath.concatNode(m);
ret.add(fullPath);
}
paths = ret;
@@ -150,7 +160,7 @@ public class InsertTabletPlan extends InsertPlan {
}
public void subSerialize(DataOutputStream stream) throws IOException {
- putString(stream, deviceId.getFullPath());
+ putString(stream, prefixPath.getFullPath());
writeMeasurements(stream);
writeDataTypes(stream);
writeTimes(stream);
@@ -236,7 +246,7 @@ public class InsertTabletPlan extends InsertPlan {
}
public void subSerialize(ByteBuffer buffer) {
- putString(buffer, deviceId.getFullPath());
+ putString(buffer, prefixPath.getFullPath());
writeMeasurements(buffer);
writeDataTypes(buffer);
writeTimes(buffer);
@@ -438,7 +448,7 @@ public class InsertTabletPlan extends InsertPlan {
@Override
public void deserialize(ByteBuffer buffer) throws IllegalPathException {
- this.deviceId = new PartialPath(readString(buffer));
+ this.prefixPath = new PartialPath(readString(buffer));
int measurementSize = buffer.getInt();
this.measurements = new String[measurementSize];
@@ -557,8 +567,8 @@ public class InsertTabletPlan extends InsertPlan {
@Override
public String toString() {
return "InsertTabletPlan {"
- + "deviceId:"
- + deviceId
+ + "prefixPath:"
+ + prefixPath
+ ", timesRange["
+ times[0]
+ ","
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateAlignedTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateAlignedTimeSeriesPlan.java
index 5451860..14c99f1 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateAlignedTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateAlignedTimeSeriesPlan.java
@@ -42,7 +42,7 @@ public class CreateAlignedTimeSeriesPlan extends PhysicalPlan {
private static final Logger logger = LoggerFactory.getLogger(CreateAlignedTimeSeriesPlan.class);
- private PartialPath devicePath;
+ private PartialPath prefixPath;
private List<String> measurements;
private List<TSDataType> dataTypes;
private List<TSEncoding> encodings;
@@ -55,14 +55,14 @@ public class CreateAlignedTimeSeriesPlan extends PhysicalPlan {
}
public CreateAlignedTimeSeriesPlan(
- PartialPath devicePath,
+ PartialPath prefixPath,
List<String> measurements,
List<TSDataType> dataTypes,
List<TSEncoding> encodings,
CompressionType compressor,
List<String> aliasList) {
super(false, Operator.OperatorType.CREATE_ALIGNED_TIMESERIES);
- this.devicePath = devicePath;
+ this.prefixPath = prefixPath;
this.measurements = measurements;
this.dataTypes = dataTypes;
this.encodings = encodings;
@@ -71,12 +71,12 @@ public class CreateAlignedTimeSeriesPlan extends PhysicalPlan {
this.canBeSplit = false;
}
- public PartialPath getDevicePath() {
- return devicePath;
+ public PartialPath getPrefixPath() {
+ return prefixPath;
}
- public void setDevicePath(PartialPath devicePath) {
- this.devicePath = devicePath;
+ public void setPrefixPath(PartialPath prefixPath) {
+ this.prefixPath = prefixPath;
}
public List<String> getMeasurements() {
@@ -123,7 +123,7 @@ public class CreateAlignedTimeSeriesPlan extends PhysicalPlan {
public String toString() {
return String.format(
"devicePath: %s, measurements: %s, dataTypes: %s, encodings: %s, compression: %s",
- devicePath, measurements, dataTypes, encodings, compressor);
+ prefixPath, measurements, dataTypes, encodings, compressor);
}
@Override
@@ -131,7 +131,7 @@ public class CreateAlignedTimeSeriesPlan extends PhysicalPlan {
List<PartialPath> paths = new ArrayList<>();
for (String measurement : measurements) {
try {
- paths.add(new PartialPath(devicePath.getFullPath(), measurement));
+ paths.add(new PartialPath(prefixPath.getFullPath(), measurement));
} catch (IllegalPathException e) {
logger.error("Failed to get paths of CreateAlignedTimeSeriesPlan. ", e);
}
@@ -142,7 +142,7 @@ public class CreateAlignedTimeSeriesPlan extends PhysicalPlan {
@Override
public void serialize(DataOutputStream stream) throws IOException {
stream.writeByte((byte) PhysicalPlanType.CREATE_ALIGNED_TIMESERIES.ordinal());
- byte[] bytes = devicePath.getFullPath().getBytes();
+ byte[] bytes = prefixPath.getFullPath().getBytes();
stream.writeInt(bytes.length);
stream.write(bytes);
@@ -173,7 +173,7 @@ public class CreateAlignedTimeSeriesPlan extends PhysicalPlan {
@Override
public void serialize(ByteBuffer buffer) {
buffer.put((byte) PhysicalPlanType.CREATE_ALIGNED_TIMESERIES.ordinal());
- byte[] bytes = devicePath.getFullPath().getBytes();
+ byte[] bytes = prefixPath.getFullPath().getBytes();
buffer.putInt(bytes.length);
buffer.put(bytes);
@@ -208,7 +208,7 @@ public class CreateAlignedTimeSeriesPlan extends PhysicalPlan {
byte[] bytes = new byte[length];
buffer.get(bytes);
- devicePath = new PartialPath(new String(bytes));
+ prefixPath = new PartialPath(new String(bytes));
int size = ReadWriteIOUtils.readInt(buffer);
measurements = new ArrayList<>();
for (int i = 0; i < size; i++) {
@@ -245,7 +245,7 @@ public class CreateAlignedTimeSeriesPlan extends PhysicalPlan {
}
CreateAlignedTimeSeriesPlan that = (CreateAlignedTimeSeriesPlan) o;
- return Objects.equals(devicePath, that.devicePath)
+ return Objects.equals(prefixPath, that.prefixPath)
&& Objects.equals(measurements, that.measurements)
&& Objects.equals(dataTypes, that.dataTypes)
&& Objects.equals(encodings, that.encodings)
@@ -254,6 +254,6 @@ public class CreateAlignedTimeSeriesPlan extends PhysicalPlan {
@Override
public int hashCode() {
- return Objects.hash(devicePath, measurements, dataTypes, encodings, compressor);
+ return Objects.hash(prefixPath, measurements, dataTypes, encodings, compressor);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StorageGroupMNodePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StorageGroupMNodePlan.java
index 89c00b7..c780569 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StorageGroupMNodePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StorageGroupMNodePlan.java
@@ -38,13 +38,11 @@ public class StorageGroupMNodePlan extends MNodePlan {
super(false, Operator.OperatorType.STORAGE_GROUP_MNODE);
}
- public StorageGroupMNodePlan(
- String name, long dataTTL, int childSize, int alignedTimeseriesIndex) {
+ public StorageGroupMNodePlan(String name, long dataTTL, int childSize) {
super(false, Operator.OperatorType.STORAGE_GROUP_MNODE);
this.name = name;
this.dataTTL = dataTTL;
this.childSize = childSize;
- this.alignedTimeseriesIndex = alignedTimeseriesIndex;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 19fa259..8605d26 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -96,8 +96,8 @@ import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateDeviceTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
+import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
@@ -123,7 +123,7 @@ import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
-import org.apache.iotdb.service.rpc.thrift.TSSetDeviceTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
@@ -1471,7 +1471,7 @@ public class TSServiceImpl implements TSIService.Iface {
for (int i = 0; i < req.deviceIds.size(); i++) {
InsertRowPlan plan = new InsertRowPlan();
try {
- plan.setDeviceId(new PartialPath(req.getDeviceIds().get(i)));
+ plan.setPrefixPath(new PartialPath(req.getDeviceIds().get(i)));
plan.setTime(req.getTimestamps().get(i));
addMeasurementAndValue(plan, req.getMeasurementsList().get(i), req.getValuesList().get(i));
plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
@@ -1568,15 +1568,16 @@ public class TSServiceImpl implements TSIService.Iface {
AUDIT_LOGGER.debug(
"Session {} insertRecord, device {}, time {}",
currSessionId.get(),
- req.getDeviceId(),
+ req.getPrefixPath(),
req.getTimestamp());
InsertRowPlan plan =
new InsertRowPlan(
- new PartialPath(req.getDeviceId()),
+ new PartialPath(req.getPrefixPath()),
req.getTimestamp(),
req.getMeasurements().toArray(new String[0]),
- req.values);
+ req.values,
+ req.isAligned);
TSStatus status = checkAuthority(plan, req.getSessionId());
return status != null ? status : executeNonQueryPlan(plan);
@@ -1600,7 +1601,7 @@ public class TSServiceImpl implements TSIService.Iface {
req.getTimestamp());
InsertRowPlan plan = new InsertRowPlan();
- plan.setDeviceId(new PartialPath(req.getDeviceId()));
+ plan.setPrefixPath(new PartialPath(req.getDeviceId()));
plan.setTime(req.getTimestamp());
plan.setMeasurements(req.getMeasurements().toArray(new String[0]));
plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
@@ -1647,7 +1648,7 @@ public class TSServiceImpl implements TSIService.Iface {
}
InsertTabletPlan insertTabletPlan =
- new InsertTabletPlan(new PartialPath(req.deviceId), req.measurements);
+ new InsertTabletPlan(new PartialPath(req.getPrefixPath()), req.measurements);
insertTabletPlan.setTimes(QueryDataSetUtils.readTimesFromBuffer(req.timestamps, req.size));
insertTabletPlan.setColumns(
QueryDataSetUtils.readValuesFromBuffer(
@@ -1656,6 +1657,7 @@ public class TSServiceImpl implements TSIService.Iface {
QueryDataSetUtils.readBitMapsFromBuffer(req.values, req.types.size(), req.size));
insertTabletPlan.setRowCount(req.size);
insertTabletPlan.setDataTypes(req.types);
+ insertTabletPlan.setAligned(req.isAligned);
TSStatus status = checkAuthority(insertTabletPlan, req.getSessionId());
return status != null ? status : executeNonQueryPlan(insertTabletPlan);
@@ -1805,7 +1807,7 @@ public class TSServiceImpl implements TSIService.Iface {
return createTimeseries(
new TSCreateTimeseriesReq(
req.sessionId,
- req.devicePath + "." + req.measurements.get(0),
+ req.prefixPath + "." + req.measurements.get(0),
req.dataTypes.get(0),
req.encodings.get(0),
req.compressor));
@@ -1815,7 +1817,7 @@ public class TSServiceImpl implements TSIService.Iface {
AUDIT_LOGGER.debug(
"Session-{} create aligned timeseries {}.{}",
currSessionId.get(),
- req.getDevicePath(),
+ req.getPrefixPath(),
req.getMeasurements());
}
@@ -1830,7 +1832,7 @@ public class TSServiceImpl implements TSIService.Iface {
CreateAlignedTimeSeriesPlan plan =
new CreateAlignedTimeSeriesPlan(
- new PartialPath(req.devicePath),
+ new PartialPath(req.prefixPath),
req.measurements,
dataTypes,
encodings,
@@ -1962,7 +1964,7 @@ public class TSServiceImpl implements TSIService.Iface {
}
@Override
- public TSStatus createDeviceTemplate(TSCreateDeviceTemplateReq req) throws TException {
+ public TSStatus createSchemaTemplate(TSCreateSchemaTemplateReq req) throws TException {
try {
if (!checkLogin(req.getSessionId())) {
return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
@@ -1970,9 +1972,10 @@ public class TSServiceImpl implements TSIService.Iface {
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
- "Session-{} create device template {}.{}.{}.{}.{}",
+ "Session-{} create device template {}.{}.{}.{}.{}.{}",
currSessionId.get(),
req.getName(),
+ req.getSchemaNames(),
req.getMeasurements(),
req.getDataTypes(),
req.getEncodings(),
@@ -2004,7 +2007,12 @@ public class TSServiceImpl implements TSIService.Iface {
CreateTemplatePlan plan =
new CreateTemplatePlan(
- req.getName(), req.getMeasurements(), dataTypes, encodings, compressionTypes);
+ req.getName(),
+ req.getSchemaNames(),
+ req.getMeasurements(),
+ dataTypes,
+ encodings,
+ compressionTypes);
TSStatus status = checkAuthority(plan, req.getSessionId());
return status != null ? status : executeNonQueryPlan(plan);
@@ -2015,7 +2023,7 @@ public class TSServiceImpl implements TSIService.Iface {
}
@Override
- public TSStatus setDeviceTemplate(TSSetDeviceTemplateReq req) throws TException {
+ public TSStatus setSchemaTemplate(TSSetSchemaTemplateReq req) throws TException {
if (!checkLogin(req.getSessionId())) {
return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sink/local/LocalIoTDBHandler.java b/server/src/main/java/org/apache/iotdb/db/sink/local/LocalIoTDBHandler.java
index 69a6537..427ab59 100644
--- a/server/src/main/java/org/apache/iotdb/db/sink/local/LocalIoTDBHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/sink/local/LocalIoTDBHandler.java
@@ -85,7 +85,7 @@ public class LocalIoTDBHandler implements Handler<LocalIoTDBConfiguration, Local
throws QueryProcessException, StorageEngineException, StorageGroupNotSetException {
InsertRowPlan plan = new InsertRowPlan();
plan.setNeedInferType(false);
- plan.setDeviceId(device);
+ plan.setPrefixPath(device);
plan.setMeasurements(measurements);
plan.setDataTypes(dataTypes);
plan.setTime(event.getTimestamp());
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
index e9a6e5b..52fbd1f 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
@@ -156,22 +156,22 @@ public class LogReplayer {
maxTime = ((InsertTabletPlan) plan).getMaxTime();
}
// the last chunk group may contain the same data with the logs, ignore such logs in seq file
- long lastEndTime = currentTsFileResource.getEndTime(plan.getDeviceId().getFullPath());
+ long lastEndTime = currentTsFileResource.getEndTime(plan.getPrefixPath().getFullPath());
if (lastEndTime != Long.MIN_VALUE && lastEndTime >= minTime && sequence) {
return;
}
- Long startTime = tempStartTimeMap.get(plan.getDeviceId().getFullPath());
+ Long startTime = tempStartTimeMap.get(plan.getPrefixPath().getFullPath());
if (startTime == null || startTime > minTime) {
- tempStartTimeMap.put(plan.getDeviceId().getFullPath(), minTime);
+ tempStartTimeMap.put(plan.getPrefixPath().getFullPath(), minTime);
}
- Long endTime = tempEndTimeMap.get(plan.getDeviceId().getFullPath());
+ Long endTime = tempEndTimeMap.get(plan.getPrefixPath().getFullPath());
if (endTime == null || endTime < maxTime) {
- tempEndTimeMap.put(plan.getDeviceId().getFullPath(), maxTime);
+ tempEndTimeMap.put(plan.getPrefixPath().getFullPath(), maxTime);
}
}
MeasurementMNode[] mNodes;
try {
- mNodes = IoTDB.metaManager.getMNodes(plan.getDeviceId(), plan.getMeasurements());
+ mNodes = IoTDB.metaManager.getMNodes(plan.getPrefixPath(), plan.getMeasurements());
} catch (MetadataException e) {
throw new QueryProcessException(e);
}
@@ -194,11 +194,11 @@ public class LogReplayer {
tPlan.markFailedMeasurementInsertion(
i,
new PathNotExistException(
- tPlan.getDeviceId().getFullPath()
+ tPlan.getPrefixPath().getFullPath()
+ IoTDBConstant.PATH_SEPARATOR
+ tPlan.getMeasurements()[i]));
columnIndex++;
- } else if (mNodes[i].getSchema().getType() == TSDataType.VECTOR) {
+ } else if (tPlan.isAligned()) {
List<TSDataType> datatypes = mNodes[i].getSchema().getValueTSDataTypeList();
for (int j = 0; j < datatypes.size(); j++) {
if (tPlan.getDataTypes()[columnIndex] == null) {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java
index 05b6fbb..59bfa48 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.engine.memtable;
-import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
@@ -88,8 +87,7 @@ public class MemTableTestUtils {
MeasurementMNode[] mNodes = new MeasurementMNode[2];
IMeasurementSchema schema =
- new VectorMeasurementSchema(
- IoTDBConstant.ALIGN_TIMESERIES_PREFIX, measurements, dataTypes, encodings);
+ new VectorMeasurementSchema("$#$0", measurements, dataTypes, encodings);
mNodes[0] = new MeasurementMNode(null, "sensor0", schema, null);
mNodes[1] = new MeasurementMNode(null, "sensor1", schema, null);
@@ -97,6 +95,8 @@ public class MemTableTestUtils {
new InsertTabletPlan(
new PartialPath(deviceId0), new String[] {"(sensor0,sensor1)"}, dataTypesList);
+ insertTabletPlan.setAligned(true);
+
long[] times = new long[101];
Object[] columns = new Object[2];
columns[0] = new boolean[101];
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
index 330db67..9de28d8 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.engine.memtable;
-import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -196,7 +195,7 @@ public class PrimitiveMemTableTest {
"root.sg.device5",
"sensor1",
new VectorMeasurementSchema(
- IoTDBConstant.ALIGN_TIMESERIES_PREFIX + 0,
+ "$#$0",
new String[] {"sensor1"},
new TSDataType[] {TSDataType.INT64},
new TSEncoding[] {TSEncoding.GORILLA},
@@ -217,7 +216,7 @@ public class PrimitiveMemTableTest {
"root.sg.device5",
"$#$1",
new VectorMeasurementSchema(
- IoTDBConstant.ALIGN_TIMESERIES_PREFIX + 0,
+ "$#$0",
new String[] {"sensor0", "sensor1"},
new TSDataType[] {TSDataType.BOOLEAN, TSDataType.INT64},
new TSEncoding[] {TSEncoding.PLAIN, TSEncoding.GORILLA},
@@ -323,8 +322,7 @@ public class PrimitiveMemTableTest {
MeasurementMNode[] mNodes = new MeasurementMNode[2];
IMeasurementSchema schema =
- new VectorMeasurementSchema(
- IoTDBConstant.ALIGN_TIMESERIES_PREFIX + 0, measurements, dataTypes, encodings);
+ new VectorMeasurementSchema("$#$0", measurements, dataTypes, encodings);
mNodes[0] = new MeasurementMNode(null, "sensor0", schema, null);
mNodes[1] = new MeasurementMNode(null, "sensor1", schema, null);
@@ -348,6 +346,7 @@ public class PrimitiveMemTableTest {
insertTabletPlan.setMeasurementMNodes(mNodes);
insertTabletPlan.setStart(0);
insertTabletPlan.setEnd(100);
+ insertTabletPlan.setAligned(true);
return insertTabletPlan;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index f08db84..db4f812 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -153,7 +153,7 @@ public class TTLTest {
throws WriteProcessException, QueryProcessException, IllegalPathException,
TriggerExecutionException {
InsertRowPlan plan = new InsertRowPlan();
- plan.setDeviceId(new PartialPath(sg1));
+ plan.setPrefixPath(new PartialPath(sg1));
plan.setTime(System.currentTimeMillis());
plan.setMeasurements(new String[] {"s1"});
plan.setDataTypes(new TSDataType[] {TSDataType.INT64});
@@ -186,7 +186,7 @@ public class TTLTest {
throws WriteProcessException, QueryProcessException, IllegalPathException,
TriggerExecutionException {
InsertRowPlan plan = new InsertRowPlan();
- plan.setDeviceId(new PartialPath(sg1));
+ plan.setPrefixPath(new PartialPath(sg1));
plan.setTime(System.currentTimeMillis());
plan.setMeasurements(new String[] {"s1"});
plan.setDataTypes(new TSDataType[] {TSDataType.INT64});
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IOTDBInsertAlignedValuesIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IOTDBInsertAlignedValuesIT.java
index 4c15d83..b3d42d2 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IOTDBInsertAlignedValuesIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IOTDBInsertAlignedValuesIT.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.jdbc.Config;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import java.sql.Connection;
@@ -65,6 +66,7 @@ public class IOTDBInsertAlignedValuesIT {
}
@Test
+ @Ignore // SQL of insert aligned record is not supported yet
public void testInsertAlignedValues() throws SQLException {
Statement st0 = connection.createStatement();
st0.execute(
@@ -100,6 +102,7 @@ public class IOTDBInsertAlignedValuesIT {
}
@Test
+ @Ignore // SQL of insert aligned record is not supported yet
public void testInsertAlignedNullableValues() throws SQLException {
Statement st0 = connection.createStatement();
st0.execute(
@@ -135,6 +138,7 @@ public class IOTDBInsertAlignedValuesIT {
}
@Test
+ @Ignore // SQL of insert aligned record is not supported yet
public void testUpdatingAlignedValues() throws SQLException {
Statement st0 = connection.createStatement();
st0.execute(
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
index 1761ab7..85422f4 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
@@ -263,7 +263,7 @@ public class MManagerBasicTest {
compressionType,
Collections.emptyMap());
manager.createAlignedTimeSeries(
- new PartialPath("root.laptop.d1"),
+ new PartialPath("root.laptop.d1.vector"),
Arrays.asList("s1", "s2", "s3"),
Arrays.asList(
TSDataType.valueOf("INT32"),
@@ -280,38 +280,21 @@ public class MManagerBasicTest {
assertTrue(manager.isPathExist(new PartialPath("root.laptop")));
assertTrue(manager.isPathExist(new PartialPath("root.laptop.d1")));
assertTrue(manager.isPathExist(new PartialPath("root.laptop.d1.s0")));
- assertTrue(manager.isPathExist(new PartialPath("root.laptop.d1.s1")));
- assertTrue(manager.isPathExist(new PartialPath("root.laptop.d1.s2")));
- assertTrue(manager.isPathExist(new PartialPath("root.laptop.d1.s3")));
- try {
- assertEquals(
- 1,
- manager
- .getStorageGroupNodeByStorageGroupPath(new PartialPath("root.laptop"))
- .getAlignedTimeseriesIndex());
- } catch (MetadataException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- try {
- manager.deleteTimeseries(new PartialPath("root.laptop.d1.s2"));
- } catch (MetadataException e) {
- assertEquals(
- "Not support deleting part of aligned timeseies! (Path: root.laptop.d1.s2)",
- e.getMessage());
- }
+ assertTrue(manager.isPathExist(new PartialPath("root.laptop.d1.vector")));
+ assertTrue(manager.isPathExist(new PartialPath("root.laptop.d1.vector.s1")));
+ assertTrue(manager.isPathExist(new PartialPath("root.laptop.d1.vector.s2")));
+ assertTrue(manager.isPathExist(new PartialPath("root.laptop.d1.vector.s3")));
try {
- manager.deleteTimeseries(new PartialPath("root.laptop.d1.(s2, s3)"));
+ manager.deleteTimeseries(new PartialPath("root.laptop.d1.vector.s2"));
} catch (MetadataException e) {
assertEquals(
- "Not support deleting part of aligned timeseies! (Path: root.laptop.d1.(s2, s3))",
+ "Not support deleting part of aligned timeseies! (Path: root.laptop.d1.vector.s2)",
e.getMessage());
}
try {
- manager.deleteTimeseries(new PartialPath("root.laptop.d1.(s1,s2,s3)"));
+ manager.deleteTimeseries(new PartialPath("root.laptop.d1.vector"));
} catch (MetadataException e) {
e.printStackTrace();
fail(e.getMessage());
@@ -319,9 +302,10 @@ public class MManagerBasicTest {
assertTrue(manager.isPathExist(new PartialPath("root.laptop.d1")));
assertTrue(manager.isPathExist(new PartialPath("root.laptop.d1.s0")));
- assertFalse(manager.isPathExist(new PartialPath("root.laptop.d1.s1")));
- assertFalse(manager.isPathExist(new PartialPath("root.laptop.d1.s2")));
- assertFalse(manager.isPathExist(new PartialPath("root.laptop.d1.s3")));
+ assertFalse(manager.isPathExist(new PartialPath("root.laptop.d1.vector")));
+ assertFalse(manager.isPathExist(new PartialPath("root.laptop.d1.vector.s1")));
+ assertFalse(manager.isPathExist(new PartialPath("root.laptop.d1.vector.s2")));
+ assertFalse(manager.isPathExist(new PartialPath("root.laptop.d1.vector.s3")));
try {
manager.deleteTimeseries(new PartialPath("root.laptop.d1.s0"));
@@ -334,7 +318,7 @@ public class MManagerBasicTest {
try {
manager.createAlignedTimeSeries(
- new PartialPath("root.laptop.d1"),
+ new PartialPath("root.laptop.d1.vector"),
Arrays.asList("s0", "s2", "s4"),
Arrays.asList(
TSDataType.valueOf("INT32"),
@@ -349,19 +333,10 @@ public class MManagerBasicTest {
}
assertTrue(manager.isPathExist(new PartialPath("root.laptop.d1")));
- assertTrue(manager.isPathExist(new PartialPath("root.laptop.d1.s0")));
- assertTrue(manager.isPathExist(new PartialPath("root.laptop.d1.s2")));
- assertTrue(manager.isPathExist(new PartialPath("root.laptop.d1.s4")));
- try {
- assertEquals(
- 2,
- manager
- .getStorageGroupNodeByStorageGroupPath(new PartialPath("root.laptop"))
- .getAlignedTimeseriesIndex());
- } catch (MetadataException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ assertTrue(manager.isPathExist(new PartialPath("root.laptop.d1.vector")));
+ assertTrue(manager.isPathExist(new PartialPath("root.laptop.d1.vector.s0")));
+ assertTrue(manager.isPathExist(new PartialPath("root.laptop.d1.vector.s2")));
+ assertTrue(manager.isPathExist(new PartialPath("root.laptop.d1.vector.s4")));
}
@Test
@@ -933,8 +908,12 @@ public class MManagerBasicTest {
compressionTypes.add(CompressionType.SNAPPY);
}
+ List<String> schemaNames = new ArrayList<>();
+ schemaNames.add("s11");
+ schemaNames.add("vector");
+
return new CreateTemplatePlan(
- "template1", measurementList, dataTypeList, encodingList, compressionTypes);
+ "template1", schemaNames, measurementList, dataTypeList, encodingList, compressionTypes);
}
@Test
@@ -968,15 +947,21 @@ public class MManagerBasicTest {
compressionTypes.add(CompressionType.SNAPPY);
}
+ List<String> schemaNames = new ArrayList<>();
+ schemaNames.add("s11");
+ schemaNames.add("test_vector");
+
CreateTemplatePlan plan1 =
new CreateTemplatePlan(
"template1",
+ new ArrayList<>(schemaNames),
new ArrayList<>(measurementList),
new ArrayList<>(dataTypeList),
new ArrayList<>(encodingList),
new ArrayList<>(compressionTypes));
measurementList.add(Collections.singletonList("s12"));
+ schemaNames.add("s12");
dataTypeList.add(Collections.singletonList(TSDataType.INT64));
encodingList.add(Collections.singletonList(TSEncoding.RLE));
compressionTypes.add(CompressionType.SNAPPY);
@@ -984,6 +969,7 @@ public class MManagerBasicTest {
CreateTemplatePlan plan2 =
new CreateTemplatePlan(
"template2",
+ new ArrayList<>(schemaNames),
new ArrayList<>(measurementList),
new ArrayList<>(dataTypeList),
new ArrayList<>(encodingList),
@@ -1002,6 +988,7 @@ public class MManagerBasicTest {
CreateTemplatePlan plan3 =
new CreateTemplatePlan(
"template3",
+ new ArrayList<>(schemaNames),
new ArrayList<>(measurementList),
new ArrayList<>(dataTypeList),
new ArrayList<>(encodingList),
@@ -1023,6 +1010,7 @@ public class MManagerBasicTest {
CreateTemplatePlan plan4 =
new CreateTemplatePlan(
"template4",
+ new ArrayList<>(schemaNames),
new ArrayList<>(measurementList),
new ArrayList<>(dataTypeList),
new ArrayList<>(encodingList),
@@ -1073,7 +1061,7 @@ public class MManagerBasicTest {
CreateTimeSeriesPlan createTimeSeriesPlan2 =
new CreateTimeSeriesPlan(
- new PartialPath("root.sg1.d1.s1"),
+ new PartialPath("root.sg1.d1.vector.s1"),
TSDataType.INT32,
TSEncoding.PLAIN,
CompressionType.GZIP,
@@ -1087,7 +1075,7 @@ public class MManagerBasicTest {
fail();
} catch (Exception e) {
assertEquals(
- "Path [root.sg1.d1.s1 ( which is incompatible with template )] already exist",
+ "Path [root.sg1.d1.vector.s1 ( which is incompatible with template )] already exist",
e.getMessage());
}
}
@@ -1103,7 +1091,7 @@ public class MManagerBasicTest {
compressionType,
Collections.emptyMap());
manager.createAlignedTimeSeries(
- new PartialPath("root.laptop.d1"),
+ new PartialPath("root.laptop.d1.vector"),
Arrays.asList("s1", "s2", "s3"),
Arrays.asList(
TSDataType.valueOf("INT32"),
@@ -1122,29 +1110,29 @@ public class MManagerBasicTest {
assertEquals(1, result.size());
assertEquals("root.laptop.d1.s0", result.get(0).getName());
- // show timeseries root.laptop.d1.s1
- showTimeSeriesPlan =
- new ShowTimeSeriesPlan(
- new PartialPath("root.laptop.d1.s1"), false, null, null, 0, 0, false);
- result = manager.showTimeseries(showTimeSeriesPlan, new QueryContext());
- assertEquals(1, result.size());
- assertEquals("root.laptop.d1.s1", result.get(0).getName());
+ // show timeseries
+ // showTimeSeriesPlan =
+ // new ShowTimeSeriesPlan(new PartialPath("root"), false, null, null, 0, 0, false);
+ // result = manager.showTimeseries(showTimeSeriesPlan, new QueryContext());
+ // assertEquals(4, result.size());
- // show timeseries root.laptop.d1.(s1,s2,s3)
+ // show timeseries root.laptop.d1.vector
showTimeSeriesPlan =
new ShowTimeSeriesPlan(
- new PartialPath("root.laptop.d1.(s1,s2,s3)"), false, null, null, 0, 0, false);
+ new PartialPath("root.laptop.d1.vector"), false, null, null, 0, 0, false);
result = manager.showTimeseries(showTimeSeriesPlan, new QueryContext());
assertEquals(3, result.size());
for (int i = 0; i < result.size(); i++) {
- assertEquals("root.laptop.d1.s" + (i + 1), result.get(i).getName());
+ assertEquals("root.laptop.d1.vector.s" + (i + 1), result.get(i).getName());
}
- // show timeseries
+ // show timeseries root.laptop.d1.vector.s1
showTimeSeriesPlan =
- new ShowTimeSeriesPlan(new PartialPath("root"), false, null, null, 0, 0, false);
+ new ShowTimeSeriesPlan(
+ new PartialPath("root.laptop.d1.vector.s1"), false, null, null, 0, 0, false);
result = manager.showTimeseries(showTimeSeriesPlan, new QueryContext());
- assertEquals(4, result.size());
+ assertEquals(1, result.size());
+ assertEquals("root.laptop.d1.vector.s1", result.get(0).getName());
} catch (MetadataException e) {
e.printStackTrace();
fail(e.getMessage());
@@ -1195,9 +1183,18 @@ public class MManagerBasicTest {
compressionTypes.add(compressionType);
}
+ List<String> schemaNames = new ArrayList<>();
+ schemaNames.add("s0");
+ schemaNames.add("vector");
+
CreateTemplatePlan plan =
new CreateTemplatePlan(
- "template1", measurementList, dataTypeList, encodingList, compressionTypes);
+ "template1",
+ schemaNames,
+ measurementList,
+ dataTypeList,
+ encodingList,
+ compressionTypes);
MManager manager = IoTDB.metaManager;
try {
manager.createDeviceTemplate(plan);
@@ -1217,23 +1214,13 @@ public class MManagerBasicTest {
assertEquals(1, result.size());
assertEquals("root.laptop.d1.s0", result.get(0).getName());
- // show timeseries root.laptop.d1.s1
+ // show timeseries root.laptop.d1.vector.s1
showTimeSeriesPlan =
new ShowTimeSeriesPlan(
- new PartialPath("root.laptop.d1.s1"), false, null, null, 0, 0, false);
+ new PartialPath("root.laptop.d1.vector.s1"), false, null, null, 0, 0, false);
result = manager.showTimeseries(showTimeSeriesPlan, new QueryContext());
assertEquals(1, result.size());
- assertEquals("root.laptop.d1.s1", result.get(0).getName());
-
- // show timeseries root.laptop.d1.(s1,s2,s3)
- showTimeSeriesPlan =
- new ShowTimeSeriesPlan(
- new PartialPath("root.laptop.d1.(s1,s2,s3)"), false, null, null, 0, 0, false);
- result = manager.showTimeseries(showTimeSeriesPlan, new QueryContext());
- assertEquals(3, result.size());
- for (int i = 0; i < result.size(); i++) {
- assertEquals("root.laptop.d1.s" + (i + 1), result.get(i).getName());
- }
+ assertEquals("root.laptop.d1.vector.s1", result.get(0).getName());
// show timeseries root.laptop.d1.(s1,s2,s3)
showTimeSeriesPlan =
@@ -1241,9 +1228,10 @@ public class MManagerBasicTest {
result = manager.showTimeseries(showTimeSeriesPlan, new QueryContext());
assertEquals(4, result.size());
Set<String> set = new HashSet<>();
- for (int i = 0; i < result.size(); i++) {
- set.add("root.laptop.d1.s" + i);
+ for (int i = 1; i < result.size(); i++) {
+ set.add("root.laptop.d1.vector.s" + i);
}
+ set.add("root.laptop.d1.s0");
for (int i = 0; i < result.size(); i++) {
set.remove(result.get(i).getName());
@@ -1348,7 +1336,7 @@ public class MManagerBasicTest {
try {
manager.setStorageGroup(new PartialPath("root.laptop"));
manager.createAlignedTimeSeries(
- new PartialPath("root.laptop.d1"),
+ new PartialPath("root.laptop.d1.vector"),
Arrays.asList("s1", "s2", "s3"),
Arrays.asList(
TSDataType.valueOf("FLOAT"),
@@ -1370,18 +1358,19 @@ public class MManagerBasicTest {
InsertRowPlan insertRowPlan =
new InsertRowPlan(
- new PartialPath("root.laptop.d1"),
+ new PartialPath("root.laptop.d1.vector"),
time,
- new String[] {"(s1,s2,s3)"},
+ new String[] {"s1", "s2", "s3"},
dataTypes,
- columns);
+ columns,
+ true);
insertRowPlan.setMeasurementMNodes(
new MeasurementMNode[insertRowPlan.getMeasurements().length]);
// call getSeriesSchemasAndReadLockDevice
MNode mNode = manager.getSeriesSchemasAndReadLockDevice(insertRowPlan);
assertEquals(4, mNode.getMeasurementMNodeCount());
- assertNull(insertRowPlan.getMeasurementMNodes()[0]);
+ assertNull(insertRowPlan.getMeasurementMNodes()[1]);
assertEquals(1, insertRowPlan.getFailedMeasurementNumber());
} catch (Exception e) {
@@ -1428,143 +1417,6 @@ public class MManagerBasicTest {
}
@Test
- public void testCreateMixedTimeseriesAndInsertWithMismatchDataType() {
- MManager manager = IoTDB.metaManager;
- try {
- manager.setStorageGroup(new PartialPath("root.laptop"));
- manager.createTimeseries(
- new PartialPath("root.laptop.d1.s0"),
- TSDataType.valueOf("INT32"),
- TSEncoding.valueOf("RLE"),
- compressionType,
- Collections.emptyMap());
-
- manager.createAlignedTimeSeries(
- new PartialPath("root.laptop.d1"),
- Arrays.asList("s1", "s2", "s3"),
- Arrays.asList(
- TSDataType.valueOf("FLOAT"),
- TSDataType.valueOf("INT64"),
- TSDataType.valueOf("INT32")),
- Arrays.asList(
- TSEncoding.valueOf("RLE"), TSEncoding.valueOf("RLE"), TSEncoding.valueOf("RLE")),
- compressionType);
-
- // construct an insertRowPlan with correct data type
- long time = 1L;
- TSDataType[] dataTypes =
- new TSDataType[] {TSDataType.INT32, TSDataType.FLOAT, TSDataType.INT64, TSDataType.INT32};
-
- String[] columns = new String[4];
- columns[0] = 100 + "";
- columns[1] = 2.0 + "";
- columns[2] = 10000 + "";
- columns[3] = 200 + "";
-
- InsertRowPlan insertRowPlan =
- new InsertRowPlan(
- new PartialPath("root.laptop.d1"),
- time,
- new String[] {"s0", "(s1,s2,s3)"},
- dataTypes,
- columns);
- insertRowPlan.setMeasurementMNodes(
- new MeasurementMNode[insertRowPlan.getMeasurements().length]);
-
- // call getSeriesSchemasAndReadLockDevice
- MNode mNode = manager.getSeriesSchemasAndReadLockDevice(insertRowPlan);
- assertEquals(5, mNode.getMeasurementMNodeCount());
- assertNotNull(insertRowPlan.getMeasurementMNodes()[0]);
- assertNotNull(insertRowPlan.getMeasurementMNodes()[1]);
- assertEquals(0, insertRowPlan.getFailedMeasurementNumber());
-
- // construct an insertRowPlan with mismatched data type in non-aligned timeseries
- time = 2L;
- dataTypes =
- new TSDataType[] {TSDataType.FLOAT, TSDataType.FLOAT, TSDataType.INT64, TSDataType.INT32};
-
- columns[0] = 2.0 + "";
- columns[1] = 2.0 + "";
- columns[2] = 10000 + "";
- columns[3] = 200 + "";
-
- insertRowPlan =
- new InsertRowPlan(
- new PartialPath("root.laptop.d1"),
- time,
- new String[] {"s0", "(s1,s2,s3)"},
- dataTypes,
- columns);
- insertRowPlan.setMeasurementMNodes(
- new MeasurementMNode[insertRowPlan.getMeasurements().length]);
-
- // call getSeriesSchemasAndReadLockDevice
- mNode = manager.getSeriesSchemasAndReadLockDevice(insertRowPlan);
- assertEquals(5, mNode.getMeasurementMNodeCount());
- assertNull(insertRowPlan.getMeasurementMNodes()[0]);
- assertNotNull(insertRowPlan.getMeasurementMNodes()[1]);
- assertEquals(1, insertRowPlan.getFailedMeasurementNumber());
-
- // construct an insertRowPlan with mismatched data type in aligned timeseries
- time = 3L;
- dataTypes =
- new TSDataType[] {TSDataType.INT32, TSDataType.FLOAT, TSDataType.INT32, TSDataType.INT32};
-
- columns[0] = 100 + "";
- columns[1] = 2.0 + "";
- columns[2] = 200 + "";
- columns[3] = 300 + "";
-
- insertRowPlan =
- new InsertRowPlan(
- new PartialPath("root.laptop.d1"),
- time,
- new String[] {"s0", "(s1,s2,s3)"},
- dataTypes,
- columns);
- insertRowPlan.setMeasurementMNodes(
- new MeasurementMNode[insertRowPlan.getMeasurements().length]);
-
- // call getSeriesSchemasAndReadLockDevice
- mNode = manager.getSeriesSchemasAndReadLockDevice(insertRowPlan);
- assertEquals(5, mNode.getMeasurementMNodeCount());
- assertNotNull(insertRowPlan.getMeasurementMNodes()[0]);
- assertNull(insertRowPlan.getMeasurementMNodes()[1]);
- assertEquals(1, insertRowPlan.getFailedMeasurementNumber());
-
- // construct an insertRowPlan with mismatched data type in both timeseries
- time = 4L;
- dataTypes =
- new TSDataType[] {TSDataType.FLOAT, TSDataType.FLOAT, TSDataType.INT32, TSDataType.INT32};
-
- columns[0] = 1.0 + "";
- columns[1] = 2.0 + "";
- columns[2] = 200 + "";
- columns[3] = 300 + "";
-
- insertRowPlan =
- new InsertRowPlan(
- new PartialPath("root.laptop.d1"),
- time,
- new String[] {"s0", "(s1,s2,s3)"},
- dataTypes,
- columns);
- insertRowPlan.setMeasurementMNodes(
- new MeasurementMNode[insertRowPlan.getMeasurements().length]);
-
- // call getSeriesSchemasAndReadLockDevice
- mNode = manager.getSeriesSchemasAndReadLockDevice(insertRowPlan);
- assertEquals(5, mNode.getMeasurementMNodeCount());
- assertNull(insertRowPlan.getMeasurementMNodes()[0]);
- assertNull(insertRowPlan.getMeasurementMNodes()[1]);
- assertEquals(2, insertRowPlan.getFailedMeasurementNumber());
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
public void testGetStorageGroupNodeByPath() {
MManager manager = IoTDB.metaManager;
PartialPath partialPath = null;
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowPlanTest.java
index 3fe2736..56cb484 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowPlanTest.java
@@ -119,14 +119,16 @@ public class InsertRowPlanTest {
PlanExecutor executor = new PlanExecutor();
executor.insert(vectorRowPlan);
- Assert.assertEquals("[$#$0, $#$1, s6]", Arrays.toString(vectorRowPlan.getMeasurementMNodes()));
+ Assert.assertEquals(
+ "[vector, vector, vector]", Arrays.toString(vectorRowPlan.getMeasurementMNodes()));
- QueryPlan queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.isp.d1");
+ QueryPlan queryPlan =
+ (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.isp.d1.vector");
QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
- Assert.assertEquals(3, dataSet.getPaths().size());
+ Assert.assertEquals(1, dataSet.getPaths().size());
while (dataSet.hasNext()) {
RowRecord record = dataSet.next();
- Assert.assertEquals(6, record.getFields().size());
+ Assert.assertEquals(3, record.getFields().size());
}
}
@@ -175,9 +177,19 @@ public class InsertRowPlanTest {
compressionTypes.add(CompressionType.SNAPPY);
}
+ List<String> schemaNames = new ArrayList<>();
+ schemaNames.add("vector");
+ schemaNames.add("vector2");
+ schemaNames.add("s6");
+
CreateTemplatePlan plan =
new CreateTemplatePlan(
- "template1", measurementList, dataTypesList, encodingList, compressionTypes);
+ "template1",
+ schemaNames,
+ measurementList,
+ dataTypesList,
+ encodingList,
+ compressionTypes);
IoTDB.metaManager.createDeviceTemplate(plan);
IoTDB.metaManager.setDeviceTemplate(new SetDeviceTemplatePlan("template1", "root.isp.d1"));
@@ -189,12 +201,13 @@ public class InsertRowPlanTest {
PlanExecutor executor = new PlanExecutor();
executor.insert(rowPlan);
- QueryPlan queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.isp.d1");
+ QueryPlan queryPlan =
+ (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.isp.d1.vector");
QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
- Assert.assertEquals(3, dataSet.getPaths().size());
+ Assert.assertEquals(1, dataSet.getPaths().size());
while (dataSet.hasNext()) {
RowRecord record = dataSet.next();
- Assert.assertEquals(6, record.getFields().size());
+ Assert.assertEquals(3, record.getFields().size());
}
}
@@ -219,28 +232,19 @@ public class InsertRowPlanTest {
private InsertRowPlan getInsertVectorRowPlan() throws IllegalPathException {
long time = 110L;
TSDataType[] dataTypes =
- new TSDataType[] {
- TSDataType.DOUBLE,
- TSDataType.FLOAT,
- TSDataType.INT64,
- TSDataType.INT32,
- TSDataType.BOOLEAN,
- TSDataType.TEXT
- };
+ new TSDataType[] {TSDataType.DOUBLE, TSDataType.FLOAT, TSDataType.INT64};
- String[] columns = new String[6];
+ String[] columns = new String[3];
columns[0] = 1.0 + "";
columns[1] = 2 + "";
columns[2] = 10000 + "";
- columns[3] = 100 + "";
- columns[4] = false + "";
- columns[5] = "hh" + 0;
return new InsertRowPlan(
- new PartialPath("root.isp.d1"),
+ new PartialPath("root.isp.d1.vector"),
time,
- new String[] {"(s1,s2,s3)", "(s4,s5)", "s6"},
+ new String[] {"s1", "s2", "s3"},
dataTypes,
- columns);
+ columns,
+ true);
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java
index 17e6ab1..2efaa68 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java
@@ -129,15 +129,15 @@ public class InsertTabletPlanTest {
PlanExecutor executor = new PlanExecutor();
executor.insertTablet(tabletPlan);
- Assert.assertEquals("[$#$0, $#$1, s6]", Arrays.toString(tabletPlan.getMeasurementMNodes()));
+ Assert.assertEquals(
+ "[vector, vector, vector]", Arrays.toString(tabletPlan.getMeasurementMNodes()));
QueryPlan queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.isp.d1");
QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
- Assert.assertEquals(3, dataSet.getPaths().size());
+ Assert.assertEquals(1, dataSet.getPaths().size());
while (dataSet.hasNext()) {
RowRecord record = dataSet.next();
- System.out.println(record);
- Assert.assertEquals(6, record.getFields().size());
+ Assert.assertEquals(3, record.getFields().size());
}
}
@@ -157,16 +157,15 @@ public class InsertTabletPlanTest {
PlanExecutor executor = new PlanExecutor();
executor.insertTablet(tabletPlan);
- Assert.assertEquals("[$#$0, $#$1, s6]", Arrays.toString(tabletPlan.getMeasurementMNodes()));
- System.out.println(Arrays.toString(tabletPlan.getMeasurementMNodes()));
+ Assert.assertEquals(
+ "[vector, vector, vector]", Arrays.toString(tabletPlan.getMeasurementMNodes()));
QueryPlan queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.isp.d1");
QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
- Assert.assertEquals(3, dataSet.getPaths().size());
+ Assert.assertEquals(1, dataSet.getPaths().size());
while (dataSet.hasNext()) {
RowRecord record = dataSet.next();
- System.out.println(record);
- Assert.assertEquals(6, record.getFields().size());
+ Assert.assertEquals(3, record.getFields().size());
}
}
@@ -241,8 +240,13 @@ public class InsertTabletPlanTest {
compressionTypes.add(CompressionType.SNAPPY);
}
+ List<String> schemaNames = new ArrayList<>();
+ schemaNames.add("vector");
+ schemaNames.add("vector2");
+ schemaNames.add("s6");
+
return new CreateTemplatePlan(
- "template1", measurementList, dataTypesList, encodingList, compressionTypes);
+ "template1", schemaNames, measurementList, dataTypesList, encodingList, compressionTypes);
}
@Test
@@ -339,35 +343,25 @@ public class InsertTabletPlanTest {
dataTypes.add(TSDataType.DOUBLE.ordinal());
dataTypes.add(TSDataType.FLOAT.ordinal());
dataTypes.add(TSDataType.INT64.ordinal());
- dataTypes.add(TSDataType.INT32.ordinal());
- dataTypes.add(TSDataType.BOOLEAN.ordinal());
- dataTypes.add(TSDataType.TEXT.ordinal());
Object[] columns = new Object[6];
columns[0] = new double[4];
columns[1] = new float[4];
columns[2] = new long[4];
- columns[3] = new int[4];
- columns[4] = new boolean[4];
- columns[5] = new Binary[4];
for (int r = 0; r < 4; r++) {
((double[]) columns[0])[r] = 1.0;
((float[]) columns[1])[r] = 2;
((long[]) columns[2])[r] = 10000;
- ((int[]) columns[3])[r] = 100;
- ((boolean[]) columns[4])[r] = false;
- ((Binary[]) columns[5])[r] = new Binary("hh" + r);
}
InsertTabletPlan tabletPlan =
new InsertTabletPlan(
- new PartialPath("root.isp.d1"),
- new String[] {"(s1,s2,s3)", "(s4,s5)", "s6"},
- dataTypes);
+ new PartialPath("root.isp.d1.vector"), new String[] {"s1", "s2", "s3"}, dataTypes);
tabletPlan.setTimes(times);
tabletPlan.setColumns(columns);
tabletPlan.setRowCount(times.length);
+ tabletPlan.setAligned(true);
return tabletPlan;
}
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index f9e4c9c..dbdabc3 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -24,8 +24,8 @@ import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateDeviceTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
+import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
@@ -36,8 +36,9 @@ import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
-import org.apache.iotdb.service.rpc.thrift.TSSetDeviceTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -378,7 +379,7 @@ public class Session {
}
public void createAlignedTimeseries(
- String devicePath,
+ String prefixPath,
List<String> measurements,
List<TSDataType> dataTypes,
List<TSEncoding> encodings,
@@ -387,19 +388,19 @@ public class Session {
throws IoTDBConnectionException, StatementExecutionException {
TSCreateAlignedTimeseriesReq request =
getTSCreateAlignedTimeseriesReq(
- devicePath, measurements, dataTypes, encodings, compressor, measurementAliasList);
+ prefixPath, measurements, dataTypes, encodings, compressor, measurementAliasList);
defaultSessionConnection.createAlignedTimeseries(request);
}
private TSCreateAlignedTimeseriesReq getTSCreateAlignedTimeseriesReq(
- String devicePath,
+ String prefixPath,
List<String> measurements,
List<TSDataType> dataTypes,
List<TSEncoding> encodings,
CompressionType compressor,
List<String> measurementAliasList) {
TSCreateAlignedTimeseriesReq request = new TSCreateAlignedTimeseriesReq();
- request.setDevicePath(devicePath);
+ request.setPrefixPath(prefixPath);
request.setMeasurements(measurements);
request.setDataTypes(dataTypes.stream().map(TSDataType::ordinal).collect(Collectors.toList()));
request.setEncodings(encodings.stream().map(TSEncoding::ordinal).collect(Collectors.toList()));
@@ -639,13 +640,25 @@ public class Session {
long time,
List<String> measurements,
List<TSDataType> types,
+ boolean isAligned,
Object... values)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertRecordReq request =
- genTSInsertRecordReq(deviceId, time, measurements, types, Arrays.asList(values));
+ genTSInsertRecordReq(deviceId, time, measurements, types, Arrays.asList(values), isAligned);
insertRecord(deviceId, request);
}
+ public void insertRecord(
+ String deviceId,
+ long time,
+ List<String> measurements,
+ List<TSDataType> types,
+ Object... values)
+ throws IoTDBConnectionException, StatementExecutionException {
+ // not vector by default
+ insertRecord(deviceId, time, measurements, types, false, values);
+ }
+
private void insertRecord(String deviceId, TSInsertRecordReq request)
throws IoTDBConnectionException, StatementExecutionException {
try {
@@ -751,9 +764,24 @@ public class Session {
long time,
List<String> measurements,
List<TSDataType> types,
+ List<Object> values,
+ boolean isAligned)
+ throws IoTDBConnectionException, StatementExecutionException {
+ TSInsertRecordReq request =
+ genTSInsertRecordReq(deviceId, time, measurements, types, values, isAligned);
+ insertRecord(deviceId, request);
+ }
+
+ public void insertRecord(
+ String deviceId,
+ long time,
+ List<String> measurements,
+ List<TSDataType> types,
List<Object> values)
throws IoTDBConnectionException, StatementExecutionException {
- TSInsertRecordReq request = genTSInsertRecordReq(deviceId, time, measurements, types, values);
+ // not vector by default
+ TSInsertRecordReq request =
+ genTSInsertRecordReq(deviceId, time, measurements, types, values, false);
insertRecord(deviceId, request);
}
@@ -762,15 +790,17 @@ public class Session {
long time,
List<String> measurements,
List<TSDataType> types,
- List<Object> values)
+ List<Object> values,
+ boolean isAligned)
throws IoTDBConnectionException {
TSInsertRecordReq request = new TSInsertRecordReq();
- request.setDeviceId(deviceId);
+ request.setPrefixPath(deviceId);
request.setTimestamp(time);
request.setMeasurements(measurements);
ByteBuffer buffer = ByteBuffer.allocate(calculateLength(types, values));
putValues(types, values, buffer);
request.setValues(buffer);
+ request.setIsAligned(isAligned);
return request;
}
@@ -1161,24 +1191,26 @@ public class Session {
}
TSInsertTabletReq request = new TSInsertTabletReq();
- request.setDeviceId(tablet.deviceId);
- for (IMeasurementSchema measurementSchema : tablet.getSchemas()) {
- if (measurementSchema instanceof MeasurementSchema) {
+
+ if (request.isAligned) {
+ if (tablet.getSchemas().size() > 1) {
+ throw new BatchExecutionException("One tablet should only contain one aligned timeseries!");
+ }
+ IMeasurementSchema measurementSchema = tablet.getSchemas().get(0);
+ request.setPrefixPath(
+ tablet.deviceId + TsFileConstant.PATH_SEPARATOR + measurementSchema.getMeasurementId());
+ int measurementsSize = measurementSchema.getValueMeasurementIdList().size();
+ for (int i = 0; i < measurementsSize; i++) {
+ request.addToMeasurements(measurementSchema.getValueMeasurementIdList().get(i));
+ request.addToTypes(measurementSchema.getValueTSDataTypeList().get(i).ordinal());
+ }
+ request.setIsAligned(true);
+ } else {
+ for (IMeasurementSchema measurementSchema : tablet.getSchemas()) {
+ request.setPrefixPath(tablet.deviceId);
request.addToMeasurements(measurementSchema.getMeasurementId());
request.addToTypes(measurementSchema.getType().ordinal());
- } else {
- int measurementsSize = measurementSchema.getValueMeasurementIdList().size();
- StringBuilder measurement = new StringBuilder("(");
- for (int i = 0; i < measurementsSize; i++) {
- measurement.append(measurementSchema.getValueMeasurementIdList().get(i));
- if (i != measurementsSize - 1) {
- measurement.append(",");
- } else {
- measurement.append(")");
- }
- request.addToTypes(measurementSchema.getValueTSDataTypeList().get(i).ordinal());
- }
- request.addToMeasurements(measurement.toString());
+ request.setIsAligned(tablet.isAligned());
}
}
request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
@@ -1381,7 +1413,8 @@ public class Session {
List<TSDataType> types,
List<Object> values)
throws IoTDBConnectionException, StatementExecutionException {
- TSInsertRecordReq request = genTSInsertRecordReq(deviceId, time, measurements, types, values);
+ TSInsertRecordReq request =
+ genTSInsertRecordReq(deviceId, time, measurements, types, values, false);
defaultSessionConnection.testInsertRecord(request);
}
@@ -1663,14 +1696,16 @@ public class Session {
return sortedBitMap;
}
- public void setDeviceTemplate(String templateName, String prefixPath)
+ public void setSchemaTemplate(String templateName, String prefixPath)
throws IoTDBConnectionException, StatementExecutionException {
- TSSetDeviceTemplateReq request = getTSSetDeviceTemplateReq(templateName, prefixPath);
- defaultSessionConnection.setDeviceTemplate(request);
+ TSSetSchemaTemplateReq request = getTSSetSchemaTemplateReq(templateName, prefixPath);
+ defaultSessionConnection.setSchemaTemplate(request);
}
/**
* @param name template name
+ * @param schemaNames list of schema names, if this measurement is vector, name it. if this
+ * measurement is not vector, keep this name as same as measurement's name
* @param measurements List of measurements, if it is a single measurement, just put it's name
* into a list and add to measurements if it is a vector measurement, put all measurements of
* the vector into a list and add to measurements
@@ -1684,33 +1719,37 @@ public class Session {
* @throws IoTDBConnectionException
* @throws StatementExecutionException
*/
- public void createDeviceTemplate(
+ public void createSchemaTemplate(
String name,
+ List<String> schemaNames,
List<List<String>> measurements,
List<List<TSDataType>> dataTypes,
List<List<TSEncoding>> encodings,
List<CompressionType> compressors)
throws IoTDBConnectionException, StatementExecutionException {
- TSCreateDeviceTemplateReq request =
- getTSCreateDeviceTemplateReq(name, measurements, dataTypes, encodings, compressors);
- defaultSessionConnection.createDeviceTemplate(request);
+ TSCreateSchemaTemplateReq request =
+ getTSCreateSchemaTemplateReq(
+ name, schemaNames, measurements, dataTypes, encodings, compressors);
+ defaultSessionConnection.createSchemaTemplate(request);
}
- private TSSetDeviceTemplateReq getTSSetDeviceTemplateReq(String templateName, String prefixPath) {
- TSSetDeviceTemplateReq request = new TSSetDeviceTemplateReq();
+ private TSSetSchemaTemplateReq getTSSetSchemaTemplateReq(String templateName, String prefixPath) {
+ TSSetSchemaTemplateReq request = new TSSetSchemaTemplateReq();
request.setTemplateName(templateName);
request.setPrefixPath(prefixPath);
return request;
}
- private TSCreateDeviceTemplateReq getTSCreateDeviceTemplateReq(
+ private TSCreateSchemaTemplateReq getTSCreateSchemaTemplateReq(
String name,
+ List<String> schemaNames,
List<List<String>> measurements,
List<List<TSDataType>> dataTypes,
List<List<TSEncoding>> encodings,
List<CompressionType> compressors) {
- TSCreateDeviceTemplateReq request = new TSCreateDeviceTemplateReq();
+ TSCreateSchemaTemplateReq request = new TSCreateSchemaTemplateReq();
request.setName(name);
+ request.setSchemaNames(schemaNames);
request.setMeasurements(measurements);
List<List<Integer>> requestType = new ArrayList<>();
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 361c87b..57f91ca 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -28,8 +28,8 @@ import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateDeviceTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
+import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
@@ -46,7 +46,7 @@ import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
-import org.apache.iotdb.service.rpc.thrift.TSSetDeviceTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
@@ -737,16 +737,16 @@ public class SessionConnection {
return flag;
}
- protected void createDeviceTemplate(TSCreateDeviceTemplateReq request)
+ protected void createSchemaTemplate(TSCreateSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
request.setSessionId(sessionId);
try {
- RpcUtils.verifySuccess(client.createDeviceTemplate(request));
+ RpcUtils.verifySuccess(client.createSchemaTemplate(request));
} catch (TException e) {
if (reconnect()) {
try {
request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.createDeviceTemplate(request));
+ RpcUtils.verifySuccess(client.createSchemaTemplate(request));
} catch (TException tException) {
throw new IoTDBConnectionException(tException);
}
@@ -756,16 +756,16 @@ public class SessionConnection {
}
}
- protected void setDeviceTemplate(TSSetDeviceTemplateReq request)
+ protected void setSchemaTemplate(TSSetSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
request.setSessionId(sessionId);
try {
- RpcUtils.verifySuccess(client.setDeviceTemplate(request));
+ RpcUtils.verifySuccess(client.setSchemaTemplate(request));
} catch (TException e) {
if (reconnect()) {
try {
request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.setDeviceTemplate(request));
+ RpcUtils.verifySuccess(client.setSchemaTemplate(request));
} catch (TException tException) {
throw new IoTDBConnectionException(tException);
}
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java
index 91a1368..15bd740 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java
@@ -379,32 +379,29 @@ public class IoTDBSessionSimpleIT {
session = new Session("127.0.0.1", 6667, "root", "root");
session.open();
List<IMeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(new MeasurementSchema("s0", TSDataType.INT64));
schemaList.add(
new VectorMeasurementSchema(
+ "vector",
new String[] {"s1", "s2", "s3"},
new TSDataType[] {TSDataType.INT64, TSDataType.INT32, TSDataType.TEXT}));
- schemaList.add(new MeasurementSchema("s4", TSDataType.INT32));
- Tablet tablet = new Tablet("root.sg1.d1", schemaList);
+ Tablet tablet = new Tablet("root.sg1.d1.vector", schemaList);
+ tablet.setAligned(true);
long timestamp = System.currentTimeMillis();
for (long row = 0; row < 10; row++) {
int rowIndex = tablet.rowSize++;
tablet.addTimestamp(rowIndex, timestamp);
tablet.addValue(
- schemaList.get(0).getMeasurementId(), rowIndex, new SecureRandom().nextLong());
- tablet.addValue(
- schemaList.get(1).getValueMeasurementIdList().get(0),
+ schemaList.get(0).getValueMeasurementIdList().get(0),
rowIndex,
new SecureRandom().nextLong());
tablet.addValue(
- schemaList.get(1).getValueMeasurementIdList().get(1),
+ schemaList.get(0).getValueMeasurementIdList().get(1),
rowIndex,
new SecureRandom().nextInt());
tablet.addValue(
- schemaList.get(1).getValueMeasurementIdList().get(2), rowIndex, new Binary("test"));
- tablet.addValue(schemaList.get(2).getMeasurementId(), rowIndex, new SecureRandom().nextInt());
+ schemaList.get(0).getValueMeasurementIdList().get(2), rowIndex, new Binary("test"));
timestamp++;
}
@@ -419,8 +416,6 @@ public class IoTDBSessionSimpleIT {
Assert.assertEquals(10L, rowRecord.getFields().get(0).getLongV());
Assert.assertEquals(10L, rowRecord.getFields().get(1).getLongV());
Assert.assertEquals(10L, rowRecord.getFields().get(2).getLongV());
- Assert.assertEquals(10L, rowRecord.getFields().get(3).getLongV());
- Assert.assertEquals(10L, rowRecord.getFields().get(4).getLongV());
}
session.close();
}
@@ -747,13 +742,13 @@ public class IoTDBSessionSimpleIT {
});
Assert.assertArrayEquals(
dataSet.getColumnTypes().toArray(new String[0]),
- new TSDataType[] {
- TSDataType.INT64,
- TSDataType.INT64,
- TSDataType.FLOAT,
- TSDataType.BOOLEAN,
- TSDataType.INT32,
- TSDataType.INT32
+ new String[] {
+ String.valueOf(TSDataType.INT64),
+ String.valueOf(TSDataType.INT64),
+ String.valueOf(TSDataType.FLOAT),
+ String.valueOf(TSDataType.BOOLEAN),
+ String.valueOf(TSDataType.INT32),
+ String.valueOf(TSDataType.INT32)
});
long time = 1L;
//
diff --git a/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java b/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
index 763a27b..7e7f4e6 100644
--- a/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
+++ b/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
@@ -779,7 +779,7 @@ public class SessionCacheLeaderUT {
@Override
protected void insertRecord(TSInsertRecordReq request) throws RedirectException {
- throw new RedirectException(getDeviceIdBelongedEndpoint(request.deviceId));
+ throw new RedirectException(getDeviceIdBelongedEndpoint(request.prefixPath));
}
@Override
@@ -805,7 +805,7 @@ public class SessionCacheLeaderUT {
@Override
protected void insertTablet(TSInsertTabletReq request) throws RedirectException {
- throw new RedirectException(getDeviceIdBelongedEndpoint(request.deviceId));
+ throw new RedirectException(getDeviceIdBelongedEndpoint(request.prefixPath));
}
@Override
diff --git a/session/src/test/java/org/apache/iotdb/session/SessionUT.java b/session/src/test/java/org/apache/iotdb/session/SessionUT.java
index 28f251c..8ec1e2e 100644
--- a/session/src/test/java/org/apache/iotdb/session/SessionUT.java
+++ b/session/src/test/java/org/apache/iotdb/session/SessionUT.java
@@ -203,14 +203,6 @@ public class SessionUT {
}
@Test
- public void setDeviceTemplate() throws IoTDBConnectionException, StatementExecutionException {
- session = new Session("127.0.0.1", 6667, "root", "root", ZoneId.of("+05:00"));
- session.open();
-
- session.setDeviceTemplate("template1", "root.sg.1");
- }
-
- @Test
public void createDeviceTemplate() throws IoTDBConnectionException, StatementExecutionException {
session = new Session("127.0.0.1", 6667, "root", "root", ZoneId.of("+05:00"));
session.open();
@@ -244,7 +236,12 @@ public class SessionUT {
compressionTypes.add(CompressionType.SNAPPY);
}
- session.createDeviceTemplate(
- "template1", measurementList, dataTypeList, encodingList, compressionTypes);
+ List<String> schemaNames = new ArrayList<>();
+ schemaNames.add("s11");
+ schemaNames.add("test_vector");
+
+ session.createSchemaTemplate(
+ "template1", schemaNames, measurementList, dataTypeList, encodingList, compressionTypes);
+ session.setSchemaTemplate("template1", "root.sg.1");
}
}
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
index 0dd74f8..960585f 100644
--- a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
@@ -37,7 +37,11 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import java.util.stream.Stream;
public abstract class Cases {
@@ -189,6 +193,7 @@ public abstract class Cases {
@Test
public void vectorCountTest() throws IoTDBConnectionException, StatementExecutionException {
List<List<String>> measurementList = new ArrayList<>();
+ List<String> schemaNames = new ArrayList<>();
List<List<TSEncoding>> encodingList = new ArrayList<>();
List<List<TSDataType>> dataTypeList = new ArrayList<>();
List<CompressionType> compressionTypes = new ArrayList<>();
@@ -205,17 +210,24 @@ public abstract class Cases {
encodings.add(TSEncoding.RLE);
compressionTypes.add(CompressionType.SNAPPY);
});
+ schemaNames.add("schema");
encodingList.add(encodings);
dataTypeList.add(dataTypes);
measurementList.add(Arrays.asList(vectorMeasurements));
- session.createDeviceTemplate(
- "testcontainer", measurementList, dataTypeList, encodingList, compressionTypes);
+ session.createSchemaTemplate(
+ "testcontainer",
+ schemaNames,
+ measurementList,
+ dataTypeList,
+ encodingList,
+ compressionTypes);
session.setStorageGroup("root.template");
- session.setDeviceTemplate("testcontainer", "root.template");
+ session.setSchemaTemplate("testcontainer", "root.template");
VectorMeasurementSchema vectorMeasurementSchema =
- new VectorMeasurementSchema(vectorMeasurements, dataTypes.toArray(new TSDataType[0]));
+ new VectorMeasurementSchema(
+ "vector", vectorMeasurements, dataTypes.toArray(new TSDataType[0]));
Tablet tablet = new Tablet("root.template.device1", Arrays.asList(vectorMeasurementSchema));
for (int i = 0; i < 10; i++) {
diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift
index 4010e50..13aac39 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -194,10 +194,11 @@ struct TSSetTimeZoneReq {
// for session
struct TSInsertRecordReq {
1: required i64 sessionId
- 2: required string deviceId
+ 2: required string prefixPath
3: required list<string> measurements
4: required binary values
5: required i64 timestamp
+ 6: optional bool isAligned
}
struct TSInsertStringRecordReq {
@@ -210,12 +211,13 @@ struct TSInsertStringRecordReq {
struct TSInsertTabletReq {
1: required i64 sessionId
- 2: required string deviceId
+ 2: required string prefixPath
3: required list<string> measurements
4: required binary values
5: required binary timestamps
6: required list<i32> types
7: required i32 size
+ 8: optional bool isAligned
}
struct TSInsertTabletsReq {
@@ -273,7 +275,7 @@ struct TSCreateTimeseriesReq {
struct TSCreateAlignedTimeseriesReq {
1: required i64 sessionId
- 2: required string devicePath
+ 2: required string prefixPath
3: required list<string> measurements
4: required list<i32> dataTypes
5: required list<i32> encodings
@@ -318,19 +320,20 @@ struct ServerProperties {
3: required string timestampPrecision;
}
-struct TSSetDeviceTemplateReq {
+struct TSSetSchemaTemplateReq {
1: required i64 sessionId
2: required string templateName
3: required string prefixPath
}
-struct TSCreateDeviceTemplateReq {
+struct TSCreateSchemaTemplateReq {
1: required i64 sessionId
2: required string name
- 3: required list<list<string>> measurements
- 4: required list<list<i32>> dataTypes
- 5: required list<list<i32>> encodings
- 6: required list<i32> compressors
+ 3: required list<string> schemaNames
+ 4: required list<list<string>> measurements
+ 5: required list<list<i32>> dataTypes
+ 6: required list<list<i32>> encodings
+ 7: required list<i32> compressors
}
service TSIService {
@@ -408,7 +411,7 @@ service TSIService {
i64 requestStatementId(1:i64 sessionId);
- TSStatus createDeviceTemplate(1:TSCreateDeviceTemplateReq req);
+ TSStatus createSchemaTemplate(1:TSCreateSchemaTemplateReq req);
- TSStatus setDeviceTemplate(1:TSSetDeviceTemplateReq req);
+ TSStatus setSchemaTemplate(1:TSSetSchemaTemplateReq req);
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java
index 5f47c79..fffb85e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java
@@ -53,10 +53,7 @@ public class Tablet {
private List<IMeasurementSchema> schemas;
/** measurementId->indexOf(measurementSchema) */
- private Map<String, Integer> measurementIndexInSchema;
-
- /** measurementId->indexOf(values) */
- private Map<String, Integer> measurementIndexInValues;
+ private Map<String, Integer> measurementIndex;
/** timestamps in this tablet */
public long[] timestamps;
@@ -68,6 +65,8 @@ public class Tablet {
public int rowSize;
/** the maximum number of rows for this tablet */
private int maxRowNumber;
+ /** whether this tablet store data of aligned timeseries or not */
+ private boolean isAligned;
/**
* Return a tablet with default specified row number. This is the standard constructor (all Tablet
@@ -94,22 +93,16 @@ public class Tablet {
this.deviceId = deviceId;
this.schemas = new ArrayList<>(schemas);
this.maxRowNumber = maxRowNumber;
- measurementIndexInSchema = new HashMap<>();
- measurementIndexInValues = new HashMap<>();
+ measurementIndex = new HashMap<>();
- int indexInValues = 0;
int indexInSchema = 0;
for (IMeasurementSchema schema : schemas) {
if (schema.getType() == TSDataType.VECTOR) {
for (String measurementId : schema.getValueMeasurementIdList()) {
- measurementIndexInValues.put(measurementId, indexInValues);
- measurementIndexInSchema.put(measurementId, indexInSchema);
- indexInValues++;
+ measurementIndex.put(measurementId, indexInSchema);
}
} else {
- measurementIndexInValues.put(schema.getMeasurementId(), indexInValues);
- measurementIndexInSchema.put(schema.getMeasurementId(), indexInSchema);
- indexInValues++;
+ measurementIndex.put(schema.getMeasurementId(), indexInSchema);
}
indexInSchema++;
}
@@ -127,69 +120,66 @@ public class Tablet {
timestamps[rowIndex] = timestamp;
}
- // (s1, s2) s3
public void addValue(String measurementId, int rowIndex, Object value) {
- int indexOfValues = measurementIndexInValues.get(measurementId);
- int indexOfSchema = measurementIndexInSchema.get(measurementId);
+ int indexOfSchema = measurementIndex.get(measurementId);
IMeasurementSchema measurementSchema = schemas.get(indexOfSchema);
-
if (measurementSchema.getType().equals(TSDataType.VECTOR)) {
int indexInVector = measurementSchema.getMeasurementIdColumnIndex(measurementId);
TSDataType dataType = measurementSchema.getValueTSDataTypeList().get(indexInVector);
- addValueOfDataType(dataType, rowIndex, indexOfValues, value);
+ addValueOfDataType(dataType, rowIndex, indexInVector, value);
} else {
- addValueOfDataType(measurementSchema.getType(), rowIndex, indexOfValues, value);
+ addValueOfDataType(measurementSchema.getType(), rowIndex, indexOfSchema, value);
}
}
private void addValueOfDataType(
- TSDataType dataType, int rowIndex, int indexOfValue, Object value) {
+ TSDataType dataType, int rowIndex, int indexOfSchema, Object value) {
if (value == null) {
// init the bitMap to mark null value
if (bitMaps == null) {
bitMaps = new BitMap[values.length];
}
- if (bitMaps[indexOfValue] == null) {
- bitMaps[indexOfValue] = new BitMap(maxRowNumber);
+ if (bitMaps[indexOfSchema] == null) {
+ bitMaps[indexOfSchema] = new BitMap(maxRowNumber);
}
// mark the null value position
- bitMaps[indexOfValue].mark(rowIndex);
+ bitMaps[indexOfSchema].mark(rowIndex);
}
switch (dataType) {
case TEXT:
{
- Binary[] sensor = (Binary[]) values[indexOfValue];
+ Binary[] sensor = (Binary[]) values[indexOfSchema];
sensor[rowIndex] = value != null ? (Binary) value : Binary.EMPTY_VALUE;
break;
}
case FLOAT:
{
- float[] sensor = (float[]) values[indexOfValue];
+ float[] sensor = (float[]) values[indexOfSchema];
sensor[rowIndex] = value != null ? (float) value : Float.MIN_VALUE;
break;
}
case INT32:
{
- int[] sensor = (int[]) values[indexOfValue];
+ int[] sensor = (int[]) values[indexOfSchema];
sensor[rowIndex] = value != null ? (int) value : Integer.MIN_VALUE;
break;
}
case INT64:
{
- long[] sensor = (long[]) values[indexOfValue];
+ long[] sensor = (long[]) values[indexOfSchema];
sensor[rowIndex] = value != null ? (long) value : Long.MIN_VALUE;
break;
}
case DOUBLE:
{
- double[] sensor = (double[]) values[indexOfValue];
+ double[] sensor = (double[]) values[indexOfSchema];
sensor[rowIndex] = value != null ? (double) value : Double.MIN_VALUE;
break;
}
case BOOLEAN:
{
- boolean[] sensor = (boolean[]) values[indexOfValue];
+ boolean[] sensor = (boolean[]) values[indexOfSchema];
sensor[rowIndex] = value != null && (boolean) value;
break;
}
@@ -337,4 +327,12 @@ public class Tablet {
}
return valueOccupation;
}
+
+ public boolean isAligned() {
+ return isAligned;
+ }
+
+ public void setAligned(boolean aligned) {
+ isAligned = aligned;
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
index e6abdc6..f9eb2b6 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
@@ -35,6 +35,7 @@ import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -42,11 +43,8 @@ import java.util.Objects;
public class VectorMeasurementSchema
implements IMeasurementSchema, Comparable<VectorMeasurementSchema>, Serializable {
- public static final String VECTOR_NAME_PREFIX = "$#$";
-
- // this is equal to the time id in this vector
private String vectorMeausurementId;
- private String[] measurements;
+ private Map<String, Integer> measurementsToIndexMap;
private byte[] types;
private byte[] encodings;
private TSEncodingBuilder[] encodingConverters;
@@ -61,7 +59,10 @@ public class VectorMeasurementSchema
TSEncoding[] encodings,
CompressionType compressionType) {
this.vectorMeausurementId = measurementId;
- this.measurements = measurements;
+ this.measurementsToIndexMap = new HashMap<>();
+ for (int i = 0; i < measurements.length; i++) {
+ measurementsToIndexMap.put(measurements[i], i);
+ }
byte[] typesInByte = new byte[types.length];
for (int i = 0; i < types.length; i++) {
typesInByte[i] = types[i].serialize();
@@ -77,17 +78,12 @@ public class VectorMeasurementSchema
this.compressor = compressionType.serialize();
}
- public VectorMeasurementSchema(
- String[] measurements, byte[] types, byte[] encodings, byte compressor) {
- this.measurements = measurements;
- this.types = types;
- this.encodings = encodings;
- this.encodingConverters = new TSEncodingBuilder[measurements.length];
- this.compressor = compressor;
- }
-
- public VectorMeasurementSchema(String[] measurements, TSDataType[] types) {
- this.measurements = measurements;
+ public VectorMeasurementSchema(String measurementId, String[] measurements, TSDataType[] types) {
+ this.vectorMeausurementId = measurementId;
+ this.measurementsToIndexMap = new HashMap<>();
+ for (int i = 0; i < measurements.length; i++) {
+ measurementsToIndexMap.put(measurements[i], i);
+ }
this.types = new byte[types.length];
for (int i = 0; i < types.length; i++) {
this.types[i] = types[i].serialize();
@@ -164,6 +160,10 @@ public class VectorMeasurementSchema
@Override
public List<String> getValueMeasurementIdList() {
+ String[] measurements = new String[measurementsToIndexMap.size()];
+ for (Map.Entry<String, Integer> entry : measurementsToIndexMap.entrySet()) {
+ measurements[entry.getValue()] = entry.getKey();
+ }
return Arrays.asList(measurements);
}
@@ -203,18 +203,18 @@ public class VectorMeasurementSchema
@Override
public int getMeasurementIdColumnIndex(String measurementId) {
- return getValueMeasurementIdList().indexOf(measurementId);
+ return measurementsToIndexMap.get(measurementId);
}
@Override
public int serializeTo(ByteBuffer buffer) {
int byteLen = 0;
- byteLen +=
- ReadWriteIOUtils.write(vectorMeausurementId.substring(VECTOR_NAME_PREFIX.length()), buffer);
- byteLen += ReadWriteIOUtils.write(measurements.length, buffer);
+ byteLen += ReadWriteIOUtils.write(vectorMeausurementId, buffer);
+ byteLen += ReadWriteIOUtils.write(measurementsToIndexMap.size(), buffer);
- for (String measurementId : measurements) {
- byteLen += ReadWriteIOUtils.write(measurementId, buffer);
+ for (Map.Entry<String, Integer> entry : measurementsToIndexMap.entrySet()) {
+ byteLen += ReadWriteIOUtils.write(entry.getKey(), buffer);
+ byteLen += ReadWriteIOUtils.write(entry.getValue(), buffer);
}
for (byte type : types) {
byteLen += ReadWriteIOUtils.write(type, buffer);
@@ -230,13 +230,12 @@ public class VectorMeasurementSchema
@Override
public int serializeTo(OutputStream outputStream) throws IOException {
int byteLen = 0;
- byteLen +=
- ReadWriteIOUtils.write(
- vectorMeausurementId.substring(VECTOR_NAME_PREFIX.length()), outputStream);
- byteLen += ReadWriteIOUtils.write(measurements.length, outputStream);
+ byteLen += ReadWriteIOUtils.write(vectorMeausurementId, outputStream);
+ byteLen += ReadWriteIOUtils.write(measurementsToIndexMap.size(), outputStream);
- for (String measurementId : measurements) {
- byteLen += ReadWriteIOUtils.write(measurementId, outputStream);
+ for (Map.Entry<String, Integer> entry : measurementsToIndexMap.entrySet()) {
+ byteLen += ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ byteLen += ReadWriteIOUtils.write(entry.getValue(), outputStream);
}
for (byte type : types) {
byteLen += ReadWriteIOUtils.write(type, outputStream);
@@ -268,15 +267,15 @@ public class VectorMeasurementSchema
public static VectorMeasurementSchema deserializeFrom(InputStream inputStream)
throws IOException {
VectorMeasurementSchema vectorMeasurementSchema = new VectorMeasurementSchema();
- vectorMeasurementSchema.vectorMeausurementId =
- VECTOR_NAME_PREFIX + ReadWriteIOUtils.readString(inputStream);
+ vectorMeasurementSchema.vectorMeausurementId = ReadWriteIOUtils.readString(inputStream);
int measurementSize = ReadWriteIOUtils.readInt(inputStream);
- String[] measurements = new String[measurementSize];
+ Map<String, Integer> measurementsToIndexMap = new HashMap<>();
for (int i = 0; i < measurementSize; i++) {
- measurements[i] = ReadWriteIOUtils.readString(inputStream);
+ measurementsToIndexMap.put(
+ ReadWriteIOUtils.readString(inputStream), ReadWriteIOUtils.readInt(inputStream));
}
- vectorMeasurementSchema.measurements = measurements;
+ vectorMeasurementSchema.measurementsToIndexMap = measurementsToIndexMap;
byte[] types = new byte[measurementSize];
for (int i = 0; i < measurementSize; i++) {
@@ -296,14 +295,14 @@ public class VectorMeasurementSchema
public static VectorMeasurementSchema deserializeFrom(ByteBuffer buffer) {
VectorMeasurementSchema vectorMeasurementSchema = new VectorMeasurementSchema();
- vectorMeasurementSchema.vectorMeausurementId =
- VECTOR_NAME_PREFIX + ReadWriteIOUtils.readString(buffer);
+ vectorMeasurementSchema.vectorMeausurementId = ReadWriteIOUtils.readString(buffer);
int measurementSize = ReadWriteIOUtils.readInt(buffer);
- String[] measurements = new String[measurementSize];
+ Map<String, Integer> measurementsToIndexMap = new HashMap<>();
for (int i = 0; i < measurementSize; i++) {
- measurements[i] = ReadWriteIOUtils.readString(buffer);
+ measurementsToIndexMap.put(
+ ReadWriteIOUtils.readString(buffer), ReadWriteIOUtils.readInt(buffer));
}
- vectorMeasurementSchema.measurements = measurements;
+ vectorMeasurementSchema.measurementsToIndexMap = measurementsToIndexMap;
byte[] types = new byte[measurementSize];
for (int i = 0; i < measurementSize; i++) {
@@ -332,36 +331,38 @@ public class VectorMeasurementSchema
VectorMeasurementSchema that = (VectorMeasurementSchema) o;
return Arrays.equals(types, that.types)
&& Arrays.equals(encodings, that.encodings)
- && Arrays.equals(measurements, that.measurements)
+ && Objects.equals(vectorMeausurementId, that.vectorMeausurementId)
&& Objects.equals(compressor, that.compressor);
}
@Override
public int hashCode() {
- return Objects.hash(types, encodings, measurements, compressor);
+ return Objects.hash(vectorMeausurementId, types, encodings, compressor);
}
- /** compare by first measurementID. */
+ /** compare by vector name */
@Override
public int compareTo(VectorMeasurementSchema o) {
if (equals(o)) {
return 0;
} else {
- return this.measurements[0].compareTo(o.measurements[0]);
+ return this.vectorMeausurementId.compareTo(o.vectorMeausurementId);
}
}
@Override
public String toString() {
StringContainer sc = new StringContainer("");
- for (int i = 0; i < measurements.length; i++) {
+ sc.addTail(vectorMeausurementId, ",");
+ // string is not in real order
+ for (Map.Entry<String, Integer> entry : measurementsToIndexMap.entrySet()) {
sc.addTail(
"[",
- measurements[i],
+ entry.getKey(),
",",
- TSDataType.deserialize(types[i]).toString(),
+ TSDataType.deserialize(types[entry.getValue()]).toString(),
",",
- TSEncoding.deserialize(encodings[i]).toString());
+ TSEncoding.deserialize(encodings[entry.getValue()]).toString());
sc.addTail("],");
}
sc.addTail(CompressionType.deserialize(compressor).toString());
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/Utils.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/Utils.java
index 3c558a1..dbd1fa2 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/Utils.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/Utils.java
@@ -19,69 +19,19 @@
package org.apache.iotdb.tsfile.file.metadata.utils;
import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
import org.apache.iotdb.tsfile.file.metadata.statistics.BooleanStatistics;
import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.junit.Assert;
-
-import java.util.List;
-import java.util.Map;
-
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class Utils {
private static final double maxError = 0.0001d;
- public static void isListEqual(List<?> listA, List<?> listB, String name) {
- if ((listA == null) ^ (listB == null)) {
- System.out.println("error");
- fail(String.format("one of %s is null", name));
- }
- if ((listA != null) && (listB != null)) {
- if (listA.size() != listB.size()) {
- fail(String.format("%s size is different", name));
- }
- for (int i = 0; i < listA.size(); i++) {
- assertTrue(listA.get(i).equals(listB.get(i)));
- }
- }
- }
-
- public static void isMapStringEqual(
- Map<String, String> mapA, Map<String, String> mapB, String name) {
- if ((mapA == null) ^ (mapB == null)) {
- System.out.println("error");
- fail(String.format("one of %s is null", name));
- }
- if ((mapA != null) && (mapB != null)) {
- if (mapA.size() != mapB.size()) {
- fail(String.format("%s size is different", name));
- }
- for (String key : mapA.keySet()) {
- assertTrue(mapA.get(key).equals(mapB.get(key)));
- }
- }
- }
-
- public static void isTwoTsDigestEqual(
- Statistics statisticsA, Statistics statisticsB, String name) {
- if ((statisticsA == null) ^ (statisticsB == null)) {
- System.out.println("error");
- fail(String.format("one of %s is null", name));
- }
- if (statisticsA != null) {
- Assert.assertEquals(statisticsA, statisticsB);
- }
- }
-
/**
* when one of A and B is Null, A != B, so test case fails.
*
@@ -98,29 +48,6 @@ public class Utils {
return true;
}
- public static void isStringSame(Object str1, Object str2, String name) {
- if ((str1 == null) && (str2 == null)) {
- return;
- }
- if ((str1 == null) ^ (str2 == null)) {
- fail(String.format("one of %s string is null", name));
- }
- assertTrue(str1.toString().equals(str2.toString()));
- }
-
- public static void isTimeSeriesChunkMetadataEqual(
- ChunkMetadata metadata1, ChunkMetadata metadata2) {
- if (Utils.isTwoObjectsNotNULL(metadata1, metadata2, "ChunkMetaData")) {
- assertTrue(metadata1.getOffsetOfChunkHeader() == metadata2.getOffsetOfChunkHeader());
- assertTrue(metadata1.getNumOfPoints() == metadata2.getNumOfPoints());
- assertTrue(metadata1.getStartTime() == metadata2.getStartTime());
- assertTrue(metadata1.getEndTime() == metadata2.getEndTime());
- assertNotNull(metadata1.getStatistics());
- assertNotNull(metadata2.getStatistics());
- Utils.isTwoTsDigestEqual(metadata1.getStatistics(), metadata2.getStatistics(), "TsDigest");
- }
- }
-
public static boolean isFileMetaDataEqual(TsFileMetadata metadata1, TsFileMetadata metadata2) {
if (Utils.isTwoObjectsNotNULL(metadata1, metadata2, "File MetaData")) {
if (Utils.isTwoObjectsNotNULL(