You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2021/12/08 12:08:13 UTC
[iotdb] branch xkf_id_table updated: add insert test
This is an automated email from the ASF dual-hosted git repository.
xuekaifeng pushed a commit to branch xkf_id_table
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/xkf_id_table by this push:
new 16bdce3 add insert test
16bdce3 is described below
commit 16bdce32486a1ef3980a9ef05060980606031f84
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Wed Dec 8 20:05:06 2021 +0800
add insert test
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 88 ++-
.../org/apache/iotdb/db/metadata/MManager.java | 48 +-
.../apache/iotdb/db/metadata/id_table/IDTable.java | 74 +--
.../metadata/id_table/entry/DeviceIDFactory.java | 13 +-
.../id_table/entry/InsertMeasurementMNode.java | 16 +-
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 130 ++--
.../iotdb/db/metadata/id_table/IDTableTest.java | 36 +-
.../db/metadata/id_table/InsertWithIDTable.java | 713 +++++++++++++++++++++
.../db/metadata/id_table/entry/DeviceIDTest.java | 13 +-
9 files changed, 920 insertions(+), 211 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index d3497ee..96a3f4b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -18,6 +18,30 @@
*/
package org.apache.iotdb.db.engine;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -33,12 +57,21 @@ import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartiti
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.virtualSg.VirtualStorageGroupManager;
-import org.apache.iotdb.db.exception.*;
+import org.apache.iotdb.db.exception.BatchProcessException;
+import org.apache.iotdb.db.exception.LoadFileException;
+import org.apache.iotdb.db.exception.ShutdownException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.StorageGroupProcessorException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
+import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.exception.WriteProcessRejectException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
+import org.apache.iotdb.db.metadata.id_table.entry.DeviceIDFactory;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.monitor.StatMonitor;
@@ -60,35 +93,9 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.FilePathUtils;
import org.apache.iotdb.tsfile.utils.Pair;
-
-import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.ConcurrentModificationException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-
public class StorageEngine implements IService {
private static final Logger logger = LoggerFactory.getLogger(StorageEngine.class);
@@ -579,6 +586,12 @@ public class StorageEngine implements IService {
}
}
StorageGroupProcessor storageGroupProcessor = getProcessor(insertRowPlan.getDeviceId());
+ getSeriesSchemas(insertRowPlan, storageGroupProcessor);
+ try {
+ insertRowPlan.transferType();
+ } catch (QueryProcessException e) {
+ throw new StorageEngineException(e);
+ }
try {
storageGroupProcessor.insert(insertRowPlan);
@@ -609,6 +622,12 @@ public class StorageEngine implements IService {
StorageGroupProcessor storageGroupProcessor =
getProcessor(insertRowsOfOneDevicePlan.getDeviceId());
+ for (InsertRowPlan plan : insertRowsOfOneDevicePlan.getRowPlans()) {
+ plan.setMeasurementMNodes(new IMeasurementMNode[plan.getMeasurements().length]);
+ // check whether types are match
+ getSeriesSchemas(plan, storageGroupProcessor);
+ }
+
// TODO monitor: update statistics
try {
storageGroupProcessor.insert(insertRowsOfOneDevicePlan);
@@ -639,6 +658,7 @@ public class StorageEngine implements IService {
e);
}
+ getSeriesSchemas(insertTabletPlan, storageGroupProcessor);
storageGroupProcessor.insertTablet(insertTabletPlan);
if (config.isEnableStatMonitor()) {
@@ -1034,6 +1054,20 @@ public class StorageEngine implements IService {
list.forEach(StorageGroupProcessor::readUnlock);
}
+ protected void getSeriesSchemas(InsertPlan insertPlan, StorageGroupProcessor processor)
+ throws StorageEngineException {
+ try {
+ if (config.isEnableIDTable()) {
+ processor.getIdTable().getSeriesSchemas(insertPlan);
+ } else {
+ IoTDB.metaManager.getSeriesSchemasAndReadLockDevice(insertPlan);
+ insertPlan.setDeviceID(DeviceIDFactory.getInstance().getDeviceID(insertPlan.getDeviceId()));
+ }
+ } catch (MetadataException | IOException e) {
+ throw new StorageEngineException(e);
+ }
+ }
+
static class InstanceHolder {
private static final StorageEngine INSTANCE = new StorageEngine();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 80dc8ac..d851df1 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -18,6 +18,28 @@
*/
package org.apache.iotdb.db.metadata;
+import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -94,35 +116,11 @@ import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
-
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.LoadingCache;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
-import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
-
/**
* This class takes the responsibility of serialization of all the metadata info and persistent it
* into files. This class contains all the interfaces to modify the metadata for delta system. All
@@ -872,7 +870,7 @@ public class MManager {
}
}
- protected IMNode getDeviceNodeWithAutoCreate(PartialPath path)
+ public IMNode getDeviceNodeWithAutoCreate(PartialPath path)
throws MetadataException, IOException {
return getDeviceNodeWithAutoCreate(
path, config.isAutoCreateSchemaEnabled(), true, config.getDefaultStorageGroupLevel());
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDTable.java b/server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDTable.java
index d69ba49..19b9590 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDTable.java
@@ -19,18 +19,13 @@
package org.apache.iotdb.db.metadata.id_table;
-import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
-
-import java.util.ArrayList;
-import java.util.Arrays;
+import java.io.IOException;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.metadata.id_table.entry.DeviceEntry;
import org.apache.iotdb.db.metadata.id_table.entry.DeviceIDFactory;
import org.apache.iotdb.db.metadata.id_table.entry.IDeviceID;
@@ -43,10 +38,8 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.service.IoTDB;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -161,6 +154,9 @@ public class IDTable {
}
}
+ // set reusable device id
+ plan.setDeviceID(deviceEntry.getDeviceID());
+
return deviceEntry.getDeviceID();
}
@@ -233,50 +229,32 @@ public class IDTable {
// if not exist, we create it
if (schemaEntry == null) {
- if (!config.isAutoCreateSchemaEnabled()) {
- throw new PathNotExistException(seriesKey.toString());
- }
-
- // create new timeseries in mmanager
+ // we have to copy plan's mnode for using id table's last cache
+ IMeasurementMNode[] insertPlanMNodeBackup =
+ new IMeasurementMNode[plan.getMeasurementMNodes().length];
+ System.arraycopy(
+ plan.getMeasurementMNodes(), 0, insertPlanMNodeBackup, 0, insertPlanMNodeBackup.length);
try {
- if (plan.isAligned()) {
- // create aligned timeseries
- List<TSEncoding> encodings = new ArrayList<>();
- List<CompressionType> compressors = new ArrayList<>();
- for (TSDataType dataType : plan.getDataTypes()) {
- encodings.add(getDefaultEncoding(dataType));
- compressors.add(TSFileDescriptor.getInstance().getConfig().getCompressor());
- }
-
- CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan =
- new CreateAlignedTimeSeriesPlan(
- plan.getDeviceId(),
- Arrays.asList(plan.getMeasurements()),
- Arrays.asList(plan.getDataTypes()),
- encodings,
- compressors,
- null);
+ IoTDB.metaManager.getSeriesSchemasAndReadLockDevice(plan);
+ } catch (IOException e) {
+ throw new MetadataException(e);
+ }
- IoTDB.metaManager.createAlignedTimeSeriesEntry(createAlignedTimeSeriesPlan);
- } else {
- // create normal timeseries
- CreateTimeSeriesPlan createTimeSeriesPlan =
- new CreateTimeSeriesPlan(
- seriesKey,
- plan.getDataTypes()[loc],
- getDefaultEncoding(plan.getDataTypes()[loc]),
- TSFileDescriptor.getInstance().getConfig().getCompressor(),
- null,
- null,
- null,
- null);
-
- IoTDB.metaManager.createTimeseriesEntry(createTimeSeriesPlan, -1);
+ // if the timeseries is in template, mmanager will not create timeseries. so we have to put it
+ // in id table here
+ for (IMeasurementMNode measurementMNode : plan.getMeasurementMNodes()) {
+ if (measurementMNode != null) {
+ IMeasurementSchema schema = measurementMNode.getSchema();
+ SchemaEntry curEntry =
+ new SchemaEntry(schema.getType(), schema.getEncodingType(), schema.getCompressor());
+ deviceEntry.putSchemaEntry(measurementMNode.getName(), curEntry);
}
- } catch (MetadataException e) {
- logger.error("create timeseries failed, path is:" + seriesKey);
}
+ // copy back measurement mnode list
+ System.arraycopy(
+ insertPlanMNodeBackup, 0, plan.getMeasurementMNodes(), 0, insertPlanMNodeBackup.length);
+
schemaEntry = deviceEntry.getSchemaEntry(measurementName);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/DeviceIDFactory.java b/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/DeviceIDFactory.java
index 7628ed6..b7ccfe0 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/DeviceIDFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/DeviceIDFactory.java
@@ -19,11 +19,10 @@
package org.apache.iotdb.db.metadata.id_table.entry;
+import java.util.function.Function;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.metadata.path.PartialPath;
-import java.util.function.Function;
-
/** factory to build device id according to configured algorithm */
public class DeviceIDFactory {
Function<PartialPath, IDeviceID> getDeviceIDFunction;
@@ -52,9 +51,9 @@ public class DeviceIDFactory {
.getConfig()
.getDeviceIDTransformationMethod()
.equals("SHA256")) {
- getDeviceIDFunction = partialPath -> new SHA256DeviceID(partialPath.getDevice());
+ getDeviceIDFunction = partialPath -> new SHA256DeviceID(partialPath.toString());
} else {
- getDeviceIDFunction = partialPath -> new PlainDeviceID(partialPath.getDevice());
+ getDeviceIDFunction = partialPath -> new PlainDeviceID(partialPath.toString());
}
}
// endregion
@@ -62,10 +61,10 @@ public class DeviceIDFactory {
/**
* get device id by full path
*
- * @param fullPath full path of the timeseries
+ * @param devicePath device path of the timeseries
* @return device id of the timeseries
*/
- public IDeviceID getDeviceID(PartialPath fullPath) {
- return getDeviceIDFunction.apply(fullPath);
+ public IDeviceID getDeviceID(PartialPath devicePath) {
+ return getDeviceIDFunction.apply(devicePath);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/InsertMeasurementMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/InsertMeasurementMNode.java
index 717f7ad..87a53c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/InsertMeasurementMNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/InsertMeasurementMNode.java
@@ -80,6 +80,17 @@ public class InsertMeasurementMNode implements IMeasurementMNode {
public TSDataType getDataType(String measurementId) {
return schemaEntry.getTSDataType();
}
+
+ @Override
+ public IEntityMNode getParent() {
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return schema.getMeasurementId();
+ }
+
// endregion
// region unsupported methods
@@ -94,11 +105,6 @@ public class InsertMeasurementMNode implements IMeasurementMNode {
}
@Override
- public IEntityMNode getParent() {
- throw new UnsupportedOperationException("insert measurement mnode doesn't support this method");
- }
-
- @Override
public void setParent(IMNode parent) {
throw new UnsupportedOperationException("insert measurement mnode doesn't support this method");
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index a7052fa..6a352fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -18,6 +18,56 @@
*/
package org.apache.iotdb.db.qp.executor;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CANCELLED;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_NODES;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_PATHS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COLUMN;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_EVERY_INTERVAL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_FOR_INTERVAL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_NAME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_QUERY_SQL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_TARGET_PATH;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COUNT;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CREATED_TIME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DEVICES;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DONE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_CLASS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_NAME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_TYPE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ITEM;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_LOCK_INFO;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PROGRESS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_STORAGE_GROUP;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TASK_NAME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TTL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_USER;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_VALUE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDAF;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDTF;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDAF;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDTF;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_NATIVE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.db.conf.IoTDBConstant.QUERY_ID;
+import static org.apache.iotdb.db.conf.IoTDBConstant.STATEMENT;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.auth.authorizer.BasicAuthorizer;
@@ -51,7 +101,6 @@ import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
@@ -160,61 +209,9 @@ import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CANCELLED;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_NODES;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_PATHS;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COLUMN;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_EVERY_INTERVAL;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_FOR_INTERVAL;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_NAME;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_QUERY_SQL;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_TARGET_PATH;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COUNT;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CREATED_TIME;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DEVICES;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DONE;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_CLASS;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_NAME;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_TYPE;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ITEM;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_LOCK_INFO;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PROGRESS;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_STORAGE_GROUP;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TASK_NAME;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TTL;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_USER;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_VALUE;
-import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
-import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDAF;
-import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDTF;
-import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDAF;
-import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDTF;
-import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_NATIVE;
-import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
-import static org.apache.iotdb.db.conf.IoTDBConstant.QUERY_ID;
-import static org.apache.iotdb.db.conf.IoTDBConstant.STATEMENT;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
-
public class PlanExecutor implements IPlanExecutor {
private static final Logger logger = LoggerFactory.getLogger(PlanExecutor.class);
@@ -1353,14 +1350,6 @@ public class PlanExecutor implements IPlanExecutor {
}
}
- protected IMNode getSeriesSchemas(InsertPlan insertPlan) throws MetadataException {
- try {
- return IoTDB.metaManager.getSeriesSchemasAndReadLockDevice(insertPlan);
- } catch (IOException e) {
- throw new MetadataException(e);
- }
- }
-
private void checkFailedMeasurments(InsertPlan plan)
throws PathNotExistException, StorageEngineException {
// check if all path not exist exceptions
@@ -1394,12 +1383,7 @@ public class PlanExecutor implements IPlanExecutor {
return;
}
try {
- for (InsertRowPlan plan : insertRowsOfOneDevicePlan.getRowPlans()) {
- plan.setMeasurementMNodes(new IMeasurementMNode[plan.getMeasurements().length]);
- // check whether types are match
- getSeriesSchemas(plan);
- }
- // ok, we can begin to write data into the engine..
+ // insert to storage engine
StorageEngine.getInstance().insert(insertRowsOfOneDevicePlan);
List<String> notExistedPaths = null;
@@ -1480,12 +1464,9 @@ public class PlanExecutor implements IPlanExecutor {
insertRowPlan.getValues()[i], insertRowPlan.isNeedInferType());
}
}
- // check whether types are match
- getSeriesSchemas(insertRowPlan);
- insertRowPlan.transferType();
- if (insertRowPlan.getFailedMeasurementNumber() < insertRowPlan.getMeasurements().length) {
- StorageEngine.getInstance().insert(insertRowPlan);
- }
+
+ StorageEngine.getInstance().insert(insertRowPlan);
+
if (insertRowPlan.getFailedMeasurements() != null) {
checkFailedMeasurments(insertRowPlan);
}
@@ -1532,8 +1513,9 @@ public class PlanExecutor implements IPlanExecutor {
try {
insertTabletPlan.setMeasurementMNodes(
new IMeasurementMNode[insertTabletPlan.getMeasurements().length]);
- getSeriesSchemas(insertTabletPlan);
+
StorageEngine.getInstance().insertTablet(insertTabletPlan);
+
if (insertTabletPlan.getFailedMeasurements() != null) {
checkFailedMeasurments(insertTabletPlan);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDTableTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDTableTest.java
index 56b1df8..440fa14 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDTableTest.java
@@ -1,21 +1,21 @@
-/// *
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements. See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership. The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing,
-// * software distributed under the License is distributed on an
-// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// * KIND, either express or implied. See the License for the
-// * specific language governing permissions and limitations
-// * under the License.
-// */
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.iotdb.db.metadata.id_table;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/id_table/InsertWithIDTable.java b/server/src/test/java/org/apache/iotdb/db/metadata/id_table/InsertWithIDTable.java
new file mode 100644
index 0000000..af3588c
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/id_table/InsertWithIDTable.java
@@ -0,0 +1,713 @@
+package org.apache.iotdb.db.metadata.id_table;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.Planner;
+import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan.PhysicalPlanType;
+import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class InsertWithIDTable {
+ private final Planner processor = new Planner();
+
+ private boolean isEnableIDTable = false;
+
+ @Before
+ public void before() {
+ IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(true);
+ isEnableIDTable = IoTDBDescriptor.getInstance().getConfig().isEnableIDTable();
+ IoTDBDescriptor.getInstance().getConfig().setEnableIDTable(true);
+ EnvironmentUtils.envSetUp();
+ }
+
+ @After
+ public void clean() throws IOException, StorageEngineException {
+ IoTDBDescriptor.getInstance().getConfig().setEnableIDTable(isEnableIDTable);
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void testInsertRowPlan()
+ throws QueryProcessException, MetadataException, InterruptedException,
+ QueryFilterOptimizationException, StorageEngineException, IOException {
+ InsertRowPlan rowPlan = getInsertRowPlan();
+
+ PlanExecutor executor = new PlanExecutor();
+ executor.insert(rowPlan);
+
+ QueryPlan queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.isp.d1");
+ QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+ Assert.assertEquals(6, dataSet.getPaths().size());
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ Assert.assertEquals(6, record.getFields().size());
+ }
+ }
+
+ @Test
+ public void testInsertRowPlanWithAlignedTimeseries()
+ throws QueryProcessException, MetadataException, InterruptedException,
+ QueryFilterOptimizationException, StorageEngineException, IOException {
+ InsertRowPlan vectorRowPlan = getInsertAlignedRowPlan();
+
+ PlanExecutor executor = new PlanExecutor();
+ executor.insert(vectorRowPlan);
+
+ Assert.assertEquals("[s1, s2, s3]", Arrays.toString(vectorRowPlan.getMeasurementMNodes()));
+
+ QueryPlan queryPlan =
+ (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.isp.d1.GPS");
+ QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+ Assert.assertEquals(1, dataSet.getPaths().size());
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ Assert.assertEquals(3, record.getFields().size());
+ }
+ }
+
+ @Test
+ public void testInsertRowPlanWithSchemaTemplate()
+ throws QueryProcessException, MetadataException, InterruptedException,
+ QueryFilterOptimizationException, StorageEngineException, IOException {
+ List<List<String>> measurementList = new ArrayList<>();
+ for (int i = 1; i <= 6; i++) {
+ measurementList.add(Collections.singletonList("s" + i));
+ }
+
+ List<List<TSDataType>> dataTypesList = new ArrayList<>();
+ dataTypesList.add(Collections.singletonList(TSDataType.DOUBLE));
+ dataTypesList.add(Collections.singletonList(TSDataType.FLOAT));
+ dataTypesList.add(Collections.singletonList(TSDataType.INT64));
+ dataTypesList.add(Collections.singletonList(TSDataType.INT32));
+ dataTypesList.add(Collections.singletonList(TSDataType.BOOLEAN));
+ dataTypesList.add(Collections.singletonList(TSDataType.TEXT));
+
+ List<List<TSEncoding>> encodingList = new ArrayList<>();
+ for (int i = 1; i <= 6; i++) {
+ encodingList.add(Collections.singletonList(TSEncoding.PLAIN));
+ }
+
+ List<List<CompressionType>> compressionTypes = new ArrayList<>();
+ for (int i = 1; i <= 6; i++) {
+ compressionTypes.add(Collections.singletonList(CompressionType.SNAPPY));
+ }
+
+ List<String> schemaNames = new ArrayList<>();
+ for (int i = 1; i <= 6; i++) {
+ schemaNames.add("s" + i);
+ }
+
+ CreateTemplatePlan plan =
+ new CreateTemplatePlan(
+ "template1",
+ schemaNames,
+ measurementList,
+ dataTypesList,
+ encodingList,
+ compressionTypes);
+
+ IoTDB.metaManager.createSchemaTemplate(plan);
+ IoTDB.metaManager.setSchemaTemplate(new SetTemplatePlan("template1", "root.isp.d1"));
+
+ IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(false);
+
+ InsertRowPlan rowPlan = getInsertRowPlan();
+
+ PlanExecutor executor = new PlanExecutor();
+ executor.insert(rowPlan);
+
+ QueryPlan queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.isp.d1");
+ QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+ Assert.assertEquals(6, dataSet.getPaths().size());
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ Assert.assertEquals(6, record.getFields().size());
+ }
+ }
+
+ @Test
+ public void testInsertRowSerialization() throws IllegalPathException, QueryProcessException {
+ InsertRowPlan plan1 = getInsertAlignedRowPlan();
+
+ PlanExecutor executor = new PlanExecutor();
+ executor.insert(plan1);
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(10000);
+ plan1.serialize(byteBuffer);
+ byteBuffer.flip();
+
+ Assert.assertEquals(PhysicalPlanType.INSERT.ordinal(), byteBuffer.get());
+
+ InsertRowPlan plan2 = new InsertRowPlan();
+ plan2.deserialize(byteBuffer);
+
+ executor.insert(plan2);
+ Assert.assertEquals(plan1, plan2);
+ }
+
+ @Test
+ public void testInsertRowPlanWithSchemaTemplateFormer()
+ throws QueryProcessException, MetadataException, InterruptedException,
+ QueryFilterOptimizationException, StorageEngineException, IOException {
+ List<List<String>> measurementList = new ArrayList<>();
+ List<String> v1 = new ArrayList<>();
+ v1.add("s1");
+ v1.add("s2");
+ v1.add("s3");
+ measurementList.add(v1);
+ List<String> v2 = new ArrayList<>();
+ v2.add("s4");
+ v2.add("s5");
+ measurementList.add(v2);
+ measurementList.add(Collections.singletonList("s6"));
+
+ List<List<TSDataType>> dataTypesList = new ArrayList<>();
+ List<TSDataType> d1 = new ArrayList<>();
+ d1.add(TSDataType.DOUBLE);
+ d1.add(TSDataType.FLOAT);
+ d1.add(TSDataType.INT64);
+ dataTypesList.add(d1);
+ List<TSDataType> d2 = new ArrayList<>();
+ d2.add(TSDataType.INT32);
+ d2.add(TSDataType.BOOLEAN);
+ dataTypesList.add(d2);
+ dataTypesList.add(Collections.singletonList(TSDataType.TEXT));
+
+ List<List<TSEncoding>> encodingList = new ArrayList<>();
+ List<TSEncoding> e1 = new ArrayList<>();
+ e1.add(TSEncoding.PLAIN);
+ e1.add(TSEncoding.PLAIN);
+ e1.add(TSEncoding.PLAIN);
+ encodingList.add(e1);
+ List<TSEncoding> e2 = new ArrayList<>();
+ e2.add(TSEncoding.PLAIN);
+ e2.add(TSEncoding.PLAIN);
+ encodingList.add(e2);
+ encodingList.add(Collections.singletonList(TSEncoding.PLAIN));
+
+ List<List<CompressionType>> compressionTypes = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ List<CompressionType> compressorList = new ArrayList<>();
+ for (int j = 0; j < 3; j++) {
+ compressorList.add(CompressionType.SNAPPY);
+ }
+ compressionTypes.add(compressorList);
+ }
+
+ CreateTemplatePlan plan =
+ new CreateTemplatePlan(
+ "template1", measurementList, dataTypesList, encodingList, compressionTypes);
+
+ IoTDB.metaManager.createSchemaTemplate(plan);
+ IoTDB.metaManager.setSchemaTemplate(new SetTemplatePlan("template1", "root.isp.d1.GPS"));
+
+ IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(false);
+
+ InsertRowPlan rowPlan = getInsertAlignedRowPlan();
+
+ PlanExecutor executor = new PlanExecutor();
+ executor.insert(rowPlan);
+
+ QueryPlan queryPlan =
+ (QueryPlan) processor.parseSQLToPhysicalPlan("select s1 from root.isp.d1.GPS");
+ QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+ Assert.assertEquals(1, dataSet.getPaths().size());
+ int count = 0;
+ while (dataSet.hasNext()) {
+ count++;
+ RowRecord record = dataSet.next();
+ Assert.assertEquals(1, record.getFields().size());
+ }
+ Assert.assertEquals(1, count);
+ }
+
+ @Test
+ public void testInsertTabletPlan()
+ throws QueryProcessException, MetadataException, InterruptedException,
+ QueryFilterOptimizationException, StorageEngineException, IOException {
+ long[] times = new long[] {110L, 111L, 112L, 113L};
+ List<Integer> dataTypes = new ArrayList<>();
+ dataTypes.add(TSDataType.DOUBLE.ordinal());
+ dataTypes.add(TSDataType.FLOAT.ordinal());
+ dataTypes.add(TSDataType.INT64.ordinal());
+ dataTypes.add(TSDataType.INT32.ordinal());
+ dataTypes.add(TSDataType.BOOLEAN.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+
+ Object[] columns = new Object[6];
+ columns[0] = new double[4];
+ columns[1] = new float[4];
+ columns[2] = new long[4];
+ columns[3] = new int[4];
+ columns[4] = new boolean[4];
+ columns[5] = new Binary[4];
+
+ for (int r = 0; r < 4; r++) {
+ ((double[]) columns[0])[r] = 1.0;
+ ((float[]) columns[1])[r] = 2;
+ ((long[]) columns[2])[r] = 10000;
+ ((int[]) columns[3])[r] = 100;
+ ((boolean[]) columns[4])[r] = false;
+ ((Binary[]) columns[5])[r] = new Binary("hh" + r);
+ }
+
+ InsertTabletPlan tabletPlan =
+ new InsertTabletPlan(
+ new PartialPath("root.isp.d1"),
+ new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+ dataTypes);
+ tabletPlan.setTimes(times);
+ tabletPlan.setColumns(columns);
+ tabletPlan.setRowCount(times.length);
+
+ PlanExecutor executor = new PlanExecutor();
+ executor.insertTablet(tabletPlan);
+
+ QueryPlan queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.isp.d1");
+ QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+ Assert.assertEquals(6, dataSet.getPaths().size());
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ Assert.assertEquals(6, record.getFields().size());
+ }
+ }
+
+ @Test
+ public void testInsertNullableTabletPlan()
+ throws QueryProcessException, MetadataException, InterruptedException,
+ QueryFilterOptimizationException, StorageEngineException, IOException {
+ long[] times = new long[] {110L, 111L, 112L, 113L};
+ List<Integer> dataTypes = new ArrayList<>();
+ dataTypes.add(TSDataType.DOUBLE.ordinal());
+ dataTypes.add(TSDataType.FLOAT.ordinal());
+ dataTypes.add(TSDataType.INT64.ordinal());
+ dataTypes.add(TSDataType.INT32.ordinal());
+ dataTypes.add(TSDataType.BOOLEAN.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+
+ Object[] columns = new Object[6];
+ columns[0] = new double[4];
+ columns[1] = new float[4];
+ columns[2] = new long[4];
+ columns[3] = new int[4];
+ columns[4] = new boolean[4];
+ columns[5] = new Binary[4];
+
+ for (int r = 0; r < 4; r++) {
+ ((double[]) columns[0])[r] = 1.0 + r;
+ ((float[]) columns[1])[r] = 2 + r;
+ ((long[]) columns[2])[r] = 10000 + r;
+ ((int[]) columns[3])[r] = 100 + r;
+ ((boolean[]) columns[4])[r] = (r % 2 == 0);
+ ((Binary[]) columns[5])[r] = new Binary("hh" + r);
+ }
+
+ BitMap[] bitMaps = new BitMap[dataTypes.size()];
+ for (int i = 0; i < dataTypes.size(); i++) {
+ if (bitMaps[i] == null) {
+ bitMaps[i] = new BitMap(times.length);
+ }
+ bitMaps[i].mark(i % times.length);
+ }
+
+ InsertTabletPlan tabletPlan =
+ new InsertTabletPlan(
+ new PartialPath("root.isp.d1"),
+ new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+ dataTypes);
+ tabletPlan.setTimes(times);
+ tabletPlan.setColumns(columns);
+ tabletPlan.setRowCount(times.length);
+ tabletPlan.setBitMaps(bitMaps);
+
+ PlanExecutor executor = new PlanExecutor();
+ executor.insertTablet(tabletPlan);
+
+ QueryPlan queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.isp.d1");
+ QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+ Assert.assertEquals(6, dataSet.getPaths().size());
+ int rowNum = 0;
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ Assert.assertEquals(6, record.getFields().size());
+ List<Field> fields = record.getFields();
+ for (int i = 0; i < 6; ++i) {
+ if (i % times.length == rowNum) {
+ Assert.assertNull(fields.get(i));
+ } else {
+ Assert.assertNotNull(fields.get(i));
+ }
+ }
+ rowNum++;
+ }
+ }
+
+ @Test
+ public void testInsertNullableTabletPlanWithAlignedTimeseries()
+ throws QueryProcessException, MetadataException, InterruptedException,
+ QueryFilterOptimizationException, StorageEngineException, IOException {
+ InsertTabletPlan tabletPlan = getAlignedInsertTabletPlan();
+ tabletPlan.setBitMaps(new BitMap[3]);
+ BitMap[] bitMaps = tabletPlan.getBitMaps();
+ for (int i = 0; i < 3; i++) {
+ if (bitMaps[i] == null) {
+ bitMaps[i] = new BitMap(4);
+ }
+ bitMaps[i].mark(i);
+ }
+ PlanExecutor executor = new PlanExecutor();
+ executor.insertTablet(tabletPlan);
+
+ Assert.assertEquals("[s1, s2, s3]", Arrays.toString(tabletPlan.getMeasurementMNodes()));
+
+ QueryPlan queryPlan =
+ (QueryPlan) processor.parseSQLToPhysicalPlan("select ** from root.isp.d1");
+ QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+ Assert.assertEquals(1, dataSet.getPaths().size());
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ Assert.assertEquals(3, record.getFields().size());
+ }
+ }
+
+ @Test
+ public void testInsertTabletSerialization() throws IllegalPathException, QueryProcessException {
+ InsertTabletPlan plan1 = getAlignedInsertTabletPlan();
+
+ PlanExecutor executor = new PlanExecutor();
+ executor.insertTablet(plan1);
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(10000);
+ plan1.serialize(byteBuffer);
+ byteBuffer.flip();
+
+ Assert.assertEquals(PhysicalPlanType.BATCHINSERT.ordinal(), byteBuffer.get());
+
+ InsertTabletPlan plan2 = new InsertTabletPlan();
+ plan2.deserialize(byteBuffer);
+ executor.insertTablet(plan2);
+
+ Assert.assertEquals(plan1, plan2);
+ }
+
+ @Test
+ public void testInsertTabletWithBitMapsSerialization()
+ throws IllegalPathException, QueryProcessException {
+ InsertTabletPlan plan1 = getAlignedInsertTabletPlan();
+ plan1.setBitMaps(new BitMap[3]);
+ BitMap[] bitMaps = plan1.getBitMaps();
+ for (int i = 0; i < 3; i++) {
+ if (bitMaps[i] == null) {
+ bitMaps[i] = new BitMap(4);
+ }
+ bitMaps[i].mark(i);
+ }
+ PlanExecutor executor = new PlanExecutor();
+ executor.insertTablet(plan1);
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(10000);
+ plan1.serialize(byteBuffer);
+ byteBuffer.flip();
+
+ Assert.assertEquals(PhysicalPlanType.BATCHINSERT.ordinal(), byteBuffer.get());
+
+ InsertTabletPlan plan2 = new InsertTabletPlan();
+ plan2.deserialize(byteBuffer);
+ executor.insertTablet(plan2);
+
+ Assert.assertEquals(plan1, plan2);
+ }
+
+ @Test
+ public void testInsertTabletPlanWithSchemaTemplateAndAutoCreateSchema()
+ throws QueryProcessException, MetadataException, InterruptedException,
+ QueryFilterOptimizationException, StorageEngineException, IOException {
+ CreateTemplatePlan plan = getCreateTemplatePlan();
+
+ IoTDB.metaManager.createSchemaTemplate(plan);
+ IoTDB.metaManager.setSchemaTemplate(new SetTemplatePlan("template1", "root.isp.d1"));
+ InsertTabletPlan tabletPlan = getAlignedInsertTabletPlan();
+
+ PlanExecutor executor = new PlanExecutor();
+ executor.insertTablet(tabletPlan);
+
+ QueryPlan queryPlan =
+ (QueryPlan) processor.parseSQLToPhysicalPlan("select ** from root.isp.d1");
+ QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+ Assert.assertEquals(3, dataSet.getPaths().size());
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ Assert.assertEquals(6, record.getFields().size());
+ }
+
+ // test recover
+ EnvironmentUtils.stopDaemon();
+ IoTDB.metaManager.clear();
+ // wait for close
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Thread.currentThread().interrupt();
+ }
+ EnvironmentUtils.activeDaemon();
+
+ queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select ** from root.isp.d1");
+ dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+ Assert.assertEquals(3, dataSet.getPaths().size());
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ Assert.assertEquals(6, record.getFields().size());
+ }
+ }
+
+ @Test
+ public void testInsertTabletPlanWithSchemaTemplate()
+ throws QueryProcessException, MetadataException, InterruptedException,
+ QueryFilterOptimizationException, StorageEngineException, IOException {
+ CreateTemplatePlan plan = getCreateTemplatePlan();
+
+ IoTDB.metaManager.createSchemaTemplate(plan);
+ IoTDB.metaManager.setSchemaTemplate(new SetTemplatePlan("template1", "root.isp.d1"));
+
+ InsertTabletPlan tabletPlan = getAlignedInsertTabletPlan();
+
+ PlanExecutor executor = new PlanExecutor();
+
+ // nothing can be found when we not insert data
+ QueryPlan queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select ** from root.isp");
+ QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+ Assert.assertEquals(0, dataSet.getPaths().size());
+
+ executor.insertTablet(tabletPlan);
+
+ queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select ** from root.isp");
+ dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+ Assert.assertEquals(3, dataSet.getPaths().size());
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ Assert.assertEquals(6, record.getFields().size());
+ }
+ }
+
+ private CreateTemplatePlan getCreateTemplatePlan() {
+ List<List<String>> measurementList = new ArrayList<>();
+ List<String> v1 = new ArrayList<>();
+ v1.add("vector.s1");
+ v1.add("vector.s2");
+ v1.add("vector.s3");
+ measurementList.add(v1);
+ List<String> v2 = new ArrayList<>();
+ v2.add("vector2.s4");
+ v2.add("vector2.s5");
+ measurementList.add(v2);
+ measurementList.add(Collections.singletonList("vector3.s6"));
+
+ List<List<TSDataType>> dataTypesList = new ArrayList<>();
+ List<TSDataType> d1 = new ArrayList<>();
+ d1.add(TSDataType.DOUBLE);
+ d1.add(TSDataType.FLOAT);
+ d1.add(TSDataType.INT64);
+ dataTypesList.add(d1);
+ List<TSDataType> d2 = new ArrayList<>();
+ d2.add(TSDataType.INT32);
+ d2.add(TSDataType.BOOLEAN);
+ dataTypesList.add(d2);
+ dataTypesList.add(Collections.singletonList(TSDataType.TEXT));
+
+ List<List<TSEncoding>> encodingList = new ArrayList<>();
+ List<TSEncoding> e1 = new ArrayList<>();
+ e1.add(TSEncoding.PLAIN);
+ e1.add(TSEncoding.PLAIN);
+ e1.add(TSEncoding.PLAIN);
+ encodingList.add(e1);
+ List<TSEncoding> e2 = new ArrayList<>();
+ e2.add(TSEncoding.PLAIN);
+ e2.add(TSEncoding.PLAIN);
+ encodingList.add(e2);
+ encodingList.add(Collections.singletonList(TSEncoding.PLAIN));
+
+ List<List<CompressionType>> compressionTypes = new ArrayList<>();
+ List<CompressionType> c1 = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ c1.add(CompressionType.SNAPPY);
+ }
+ List<CompressionType> c2 = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ c2.add(CompressionType.SNAPPY);
+ }
+ compressionTypes.add(c1);
+ compressionTypes.add(c2);
+ compressionTypes.add(Collections.singletonList(CompressionType.SNAPPY));
+
+ List<String> schemaNames = new ArrayList<>();
+ schemaNames.add("vector");
+ schemaNames.add("vector2");
+ schemaNames.add("s6");
+
+ return new CreateTemplatePlan(
+ "template1", schemaNames, measurementList, dataTypesList, encodingList, compressionTypes);
+ }
+
+ private InsertRowPlan getInsertRowPlan() throws IllegalPathException {
+ long time = 110L;
+ TSDataType[] dataTypes =
+ new TSDataType[] {
+ TSDataType.DOUBLE,
+ TSDataType.FLOAT,
+ TSDataType.INT64,
+ TSDataType.INT32,
+ TSDataType.BOOLEAN,
+ TSDataType.TEXT
+ };
+
+ String[] columns = new String[6];
+ columns[0] = 1.0 + "";
+ columns[1] = 2 + "";
+ columns[2] = 10000 + "";
+ columns[3] = 100 + "";
+ columns[4] = false + "";
+ columns[5] = "hh" + 0;
+
+ return new InsertRowPlan(
+ new PartialPath("root.isp.d1"),
+ time,
+ new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+ dataTypes,
+ columns);
+ }
+
+ private InsertRowPlan getInsertAlignedRowPlan() throws IllegalPathException {
+ long time = 110L;
+ TSDataType[] dataTypes =
+ new TSDataType[] {TSDataType.DOUBLE, TSDataType.FLOAT, TSDataType.INT64};
+
+ String[] columns = new String[3];
+ columns[0] = 1.0 + "";
+ columns[1] = 2 + "";
+ columns[2] = 10000 + "";
+
+ return new InsertRowPlan(
+ new PartialPath("root.isp.d1.GPS"),
+ time,
+ new String[] {"s1", "s2", "s3"},
+ dataTypes,
+ columns,
+ true);
+ }
+
+ private InsertTabletPlan getAlignedInsertTabletPlan() throws IllegalPathException {
+ long[] times = new long[] {110L, 111L, 112L, 113L};
+ List<Integer> dataTypes = new ArrayList<>();
+ dataTypes.add(TSDataType.DOUBLE.ordinal());
+ dataTypes.add(TSDataType.FLOAT.ordinal());
+ dataTypes.add(TSDataType.INT64.ordinal());
+
+ Object[] columns = new Object[3];
+ columns[0] = new double[4];
+ columns[1] = new float[4];
+ columns[2] = new long[4];
+
+ for (int r = 0; r < 4; r++) {
+ ((double[]) columns[0])[r] = 1.0;
+ ((float[]) columns[1])[r] = 2;
+ ((long[]) columns[2])[r] = 10000;
+ }
+
+ InsertTabletPlan tabletPlan =
+ new InsertTabletPlan(
+ new PartialPath("root.isp.d1.vector"), new String[] {"s1", "s2", "s3"}, dataTypes);
+ tabletPlan.setTimes(times);
+ tabletPlan.setColumns(columns);
+ tabletPlan.setRowCount(times.length);
+ tabletPlan.setAligned(true);
+ return tabletPlan;
+ }
+
+ @Test
+ public void testInsertMultiTabletPlan()
+ throws QueryProcessException, MetadataException, InterruptedException,
+ QueryFilterOptimizationException, StorageEngineException, IOException {
+ long[] times = new long[] {110L, 111L, 112L, 113L};
+ List<Integer> dataTypes = new ArrayList<>();
+ dataTypes.add(TSDataType.DOUBLE.ordinal());
+ dataTypes.add(TSDataType.FLOAT.ordinal());
+ dataTypes.add(TSDataType.INT64.ordinal());
+ dataTypes.add(TSDataType.INT32.ordinal());
+ dataTypes.add(TSDataType.BOOLEAN.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+
+ Object[] columns = new Object[6];
+ columns[0] = new double[4];
+ columns[1] = new float[4];
+ columns[2] = new long[4];
+ columns[3] = new int[4];
+ columns[4] = new boolean[4];
+ columns[5] = new Binary[4];
+
+ for (int r = 0; r < 4; r++) {
+ ((double[]) columns[0])[r] = 1.0;
+ ((float[]) columns[1])[r] = 2;
+ ((long[]) columns[2])[r] = 10000;
+ ((int[]) columns[3])[r] = 100;
+ ((boolean[]) columns[4])[r] = false;
+ ((Binary[]) columns[5])[r] = new Binary("hh" + r);
+ }
+
+ List<InsertTabletPlan> insertTabletPlanList = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ InsertTabletPlan tabletPlan =
+ new InsertTabletPlan(
+ new PartialPath("root.multi.d" + i),
+ new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+ dataTypes);
+ tabletPlan.setTimes(times);
+ tabletPlan.setColumns(columns);
+ tabletPlan.setRowCount(times.length);
+ insertTabletPlanList.add(tabletPlan);
+ }
+ PlanExecutor executor = new PlanExecutor();
+
+ InsertMultiTabletPlan insertMultiTabletPlan = new InsertMultiTabletPlan(insertTabletPlanList);
+
+ executor.insertTablet(insertMultiTabletPlan);
+ QueryPlan queryPlan =
+ (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.multi.**");
+ QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+ Assert.assertEquals(60, dataSet.getPaths().size());
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ Assert.assertEquals(60, record.getFields().size());
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/id_table/entry/DeviceIDTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/id_table/entry/DeviceIDTest.java
index 42c1cab..2da2b7d 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/id_table/entry/DeviceIDTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/id_table/entry/DeviceIDTest.java
@@ -19,14 +19,13 @@
package org.apache.iotdb.db.metadata.id_table.entry;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
-
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
public class DeviceIDTest {
@Test
public void deviceIDBuildTest() throws IllegalPathException {
@@ -34,9 +33,9 @@ public class DeviceIDTest {
PartialPath partialPath2 = new PartialPath("root.sg1.d1.s2");
PartialPath partialPath3 = new PartialPath("root.sg1.d2.s1");
- IDeviceID deviceID1 = DeviceIDFactory.getInstance().getDeviceID(partialPath1);
- IDeviceID deviceID2 = DeviceIDFactory.getInstance().getDeviceID(partialPath2);
- IDeviceID deviceID3 = DeviceIDFactory.getInstance().getDeviceID(partialPath3);
+ IDeviceID deviceID1 = DeviceIDFactory.getInstance().getDeviceID(partialPath1.getDevicePath());
+ IDeviceID deviceID2 = DeviceIDFactory.getInstance().getDeviceID(partialPath2.getDevicePath());
+ IDeviceID deviceID3 = DeviceIDFactory.getInstance().getDeviceID(partialPath3.getDevicePath());
assertEquals(deviceID1, deviceID2);
assertNotEquals(deviceID1, deviceID3);