You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/06/29 02:16:48 UTC
[incubator-iotdb] branch master updated: move getSeriesSchemas to
MManager
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 03c4f03 move getSeriesSchemas to MManager
new 46e78a0 Merge pull request #1408 from mychaow/master
03c4f03 is described below
commit 03c4f03aff1845f9342706ab5a15a208711f07ad
Author: mychaow <94...@qq.com>
AuthorDate: Tue Jun 23 11:35:20 2020 +0800
move getSeriesSchemas to MManager
---
.../org/apache/iotdb/db/metadata/MManager.java | 259 +++++++++++++++--
.../org/apache/iotdb/db/mqtt/PublishHandler.java | 2 +-
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 317 ++-------------------
.../iotdb/db/qp/physical/crud/InsertPlan.java | 33 +--
.../org/apache/iotdb/db/service/TSServiceImpl.java | 64 ++---
.../iotdb/db/integration/IoTDBSimpleQueryIT.java | 13 +-
.../java/org/apache/iotdb/session/SessionUT.java | 26 +-
7 files changed, 310 insertions(+), 404 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index d247342..b573f40 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -18,31 +18,6 @@
*/
package org.apache.iotdb.db.metadata;
-import static java.util.stream.Collectors.toList;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -51,33 +26,49 @@ import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.exception.ConfigAdjusterException;
-import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.exception.metadata.*;
import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
import org.apache.iotdb.db.monitor.MonitorConstants;
import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
import org.apache.iotdb.db.utils.RandomDeleteCache;
import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.iotdb.tsfile.common.cache.LRUCache;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.cache.CacheException;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static java.util.stream.Collectors.toList;
+
/**
* This class takes the responsibility of serialization of all the metadata info and persistent it
* into files. This class contains all the interfaces to modify the metadata for delta system. All
@@ -1846,4 +1837,210 @@ public class MManager {
lock.readLock().unlock();
}
}
+
+ /**
+ * get schema for device.
+ * Attention!!! Only support insertPlan and insertTabletsPlan
+ * @param deviceId
+ * @param measurementList
+ * @param plan
+ * @return
+ * @throws MetadataException
+ */
+ public MeasurementSchema[] getSeriesSchemas(String deviceId, String[] measurementList, PhysicalPlan plan) throws MetadataException {
+ MeasurementSchema[] schemas = new MeasurementSchema[measurementList.length];
+
+ MNode deviceNode = null;
+ // 1. get device node
+ deviceNode = getDeviceNode(deviceId);
+
+ // 2. get schema of each measurement
+ for (int i = 0; i < measurementList.length; i++) {
+ try {
+ // if do not has measurement
+ if (!deviceNode.hasChild(measurementList[i])) {
+ // could not create it
+ if (!config.isAutoCreateSchemaEnabled()) {
+ throw new MetadataException(String.format(
+ "Current deviceId[%s] does not contain measurement:%s", deviceId, measurementList[i]));
+ }
+
+ // create it
+ Path path = new Path(deviceId, measurementList[i]);
+ TSDataType dataType = getTypeInLoc(plan, i);
+
+ createTimeseries(
+ path.getFullPath(),
+ dataType,
+ getDefaultEncoding(dataType),
+ TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ Collections.emptyMap());
+ }
+
+ MeasurementMNode measurementNode = (MeasurementMNode) getChild(deviceNode, measurementList[i]);
+
+ // check type is match
+ TSDataType insertDataType = null;
+ if (plan instanceof InsertPlan) {
+ if (!((InsertPlan)plan).isNeedInferType()) {
+ // only when InsertPlan's values is object[], we should check type
+ insertDataType = getTypeInLoc(plan, i);
+ } else {
+ insertDataType = measurementNode.getSchema().getType();
+ }
+ } else if (plan instanceof InsertTabletPlan) {
+ insertDataType = getTypeInLoc(plan, i);
+ }
+
+ if (measurementNode.getSchema().getType() != insertDataType) {
+ logger.warn("DataType mismatch, Insert measurement {} type {}, metadata tree type {}",
+ measurementList[i], insertDataType, measurementNode.getSchema().getType());
+ if (!config.isEnablePartialInsert()) {
+ throw new MetadataException(String.format(
+ "DataType mismatch, Insert measurement %s type %s, metadata tree type %s",
+ measurementList[i], insertDataType, measurementNode.getSchema().getType()));
+ } else {
+ // mark failed measurement
+ if (plan instanceof InsertTabletPlan) {
+ ((InsertTabletPlan) plan).markMeasurementInsertionFailed(i);
+ } else if (plan instanceof InsertPlan) {
+ ((InsertPlan) plan).markMeasurementInsertionFailed(i);
+ }
+ continue;
+ }
+ }
+
+ // maybe need to convert value type to the true type
+ if ((plan instanceof InsertPlan) && ((InsertPlan) plan).isNeedInferType()) {
+ changeStringValueToRealType((InsertPlan) plan, i, measurementNode.getSchema().getType());
+ }
+
+ schemas[i] = measurementNode.getSchema();
+ if (schemas[i] != null) {
+ measurementList[i] = schemas[i].getMeasurementId();
+ }
+ } catch (MetadataException e) {
+ logger.warn("meet error when check {}.{}, message: {}", deviceId, measurementList[i],
+ e.getMessage());
+ if (config.isEnablePartialInsert()) {
+ // mark failed measurement
+ if (plan instanceof InsertPlan) {
+ ((InsertPlan) plan).markMeasurementInsertionFailed(i);
+ } else if (plan instanceof InsertTabletPlan) {
+ ((InsertTabletPlan) plan).markMeasurementInsertionFailed(i);
+ }
+ } else {
+ throw e;
+ }
+ }
+ }
+ return schemas;
+ }
+
+ private void changeStringValueToRealType(InsertPlan plan, int loc, TSDataType type) throws MetadataException {
+ plan.getTypes()[loc] = type;
+ try {
+ switch (type) {
+ case INT32:
+ plan.getValues()[loc] =
+ Integer.parseInt(String.valueOf(plan.getValues()[loc]));
+ break;
+ case INT64:
+ plan.getValues()[loc] =
+ Long.parseLong(String.valueOf(plan.getValues()[loc]));
+ break;
+ case DOUBLE:
+ plan.getValues()[loc] =
+ Double.parseDouble(String.valueOf(plan.getValues()[loc]));
+ break;
+ case FLOAT:
+ plan.getValues()[loc] =
+ Float.parseFloat(String.valueOf(plan.getValues()[loc]));
+ break;
+ case BOOLEAN:
+ plan.getValues()[loc] =
+ Boolean.parseBoolean(String.valueOf(plan.getValues()[loc]));
+ break;
+ case TEXT:
+ plan.getValues()[loc] =
+ Binary.valueOf(String.valueOf(plan.getValues()[loc]));
+ break;
+ }
+ } catch (ClassCastException e) {
+ logger.error("inconsistent type between client and server for " + e.getMessage() + " " + type);
+ throw new MetadataException(e.getMessage());
+ } catch (NumberFormatException e) {
+ logger.error("inconsistent type between type {} and value {}", type, plan.getValues()[loc]);
+ throw new MetadataException(e.getMessage());
+ }
+ }
+
+ /**
+ * Get default encoding by dataType
+ */
+ private TSEncoding getDefaultEncoding(TSDataType dataType) {
+ IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
+ switch (dataType) {
+ case BOOLEAN:
+ return conf.getDefaultBooleanEncoding();
+ case INT32:
+ return conf.getDefaultInt32Encoding();
+ case INT64:
+ return conf.getDefaultInt64Encoding();
+ case FLOAT:
+ return conf.getDefaultFloatEncoding();
+ case DOUBLE:
+ return conf.getDefaultDoubleEncoding();
+ case TEXT:
+ return conf.getDefaultTextEncoding();
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", dataType.toString()));
+ }
+ }
+
+ /**
+ * get dataType of plan, in loc measurements
+ * only support InsertPlan and InsertTabletPlan
+ * @param plan
+ * @param loc
+ * @return
+ * @throws MetadataException
+ */
+ private TSDataType getTypeInLoc(PhysicalPlan plan, int loc) throws MetadataException {
+ TSDataType dataType;
+ if (plan instanceof InsertPlan) {
+ InsertPlan tPlan = (InsertPlan) plan;
+ dataType = TypeInferenceUtils.getPredictedDataType(tPlan.getValues()[loc], tPlan.isNeedInferType());
+ } else if (plan instanceof InsertTabletPlan) {
+ dataType = ((InsertTabletPlan) plan).getDataTypes()[loc];
+ } else {
+ throw new MetadataException(String.format(
+ "Only support insert and insertTablet, plan is [%s]", plan.getOperatorType()));
+ }
+ return dataType;
+ }
+
+ /**
+ * when insert, we lock device node for not create deleted time series
+ * before insert, we should call this function to lock the device node
+ * @param deviceId
+ */
+ public void lockInsert(String deviceId) throws MetadataException {
+ getDeviceNodeWithAutoCreateAndReadLock(deviceId);
+ }
+
+ /**
+ * when insert, we lock device node for not create deleted time series
+ * after insert, we should call this function to unlock the device node
+ * @param deviceId
+ */
+ public void unlockInsert(String deviceId) {
+ try {
+ MNode mNode =getDeviceNode(deviceId);
+ mNode.readUnlock();
+ } catch (MetadataException e) {
+ // ignore the exception
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java b/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
index 106333f..bd5e036 100644
--- a/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
@@ -92,7 +92,7 @@ public class PublishHandler extends AbstractInterceptHandler {
plan.setMeasurements(event.getMeasurements().toArray(new String[event.getMeasurements().size()]));
plan.setValues(event.getValues().toArray(new Object[event.getValues().size()]));
plan.setTypes(new TSDataType[event.getValues().size()]);
- plan.setInferType(true);
+ plan.setNeedInferType(true);
boolean status = false;
try {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index dd34f20..d2dfb28 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -18,51 +18,12 @@
*/
package org.apache.iotdb.db.qp.executor;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CANCELLED;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_PATHS;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COLUMN;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COUNT;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CREATED_TIME;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DEVICES;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DONE;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ITEM;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PARAMETER;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PROGRESS;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_STORAGE_GROUP;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TASK_NAME;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_ALIAS;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_COMPRESSION;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_DATATYPE;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_ENCODING;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TTL;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_USER;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_VALUE;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeSet;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.authorizer.BasicAuthorizer;
import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
import org.apache.iotdb.db.auth.entity.PathPrivilege;
import org.apache.iotdb.db.auth.entity.Role;
import org.apache.iotdb.db.auth.entity.User;
-import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.adapter.CompressionRatio;
@@ -78,8 +39,6 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.MManager;
@@ -90,37 +49,8 @@ import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator.AuthorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
-import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
-import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
-import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
-import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
-import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
-import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
-import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
-import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
-import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
-import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
-import org.apache.iotdb.db.qp.physical.sys.ClearCachePlan;
-import org.apache.iotdb.db.qp.physical.sys.CountPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
-import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
-import org.apache.iotdb.db.qp.physical.sys.MergePlan;
-import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan;
-import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowChildPathsPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.crud.*;
+import org.apache.iotdb.db.qp.physical.sys.*;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
import org.apache.iotdb.db.query.dataset.ListDataSet;
@@ -130,15 +60,11 @@ import org.apache.iotdb.db.query.executor.IQueryRouter;
import org.apache.iotdb.db.query.executor.QueryRouter;
import org.apache.iotdb.db.utils.AuthUtils;
import org.apache.iotdb.db.utils.FileLoaderUtils;
-import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
-import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.Path;
@@ -152,6 +78,14 @@ import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.Map.Entry;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.*;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
public class PlanExecutor implements IPlanExecutor {
private static final Logger logger = LoggerFactory.getLogger(PlanExecutor.class);
@@ -914,235 +848,40 @@ public class PlanExecutor implements IPlanExecutor {
}
}
+ protected MeasurementSchema[] getSeriesSchemas(InsertPlan insertPlan)
+ throws MetadataException {
+ return mManager.getSeriesSchemas(insertPlan.getDeviceId(), insertPlan.getMeasurements(), insertPlan);
+ }
+
+ protected MeasurementSchema[] getSeriesSchemas(InsertTabletPlan insertTabletPlan)
+ throws MetadataException {
+ return mManager.getSeriesSchemas(insertTabletPlan.getDeviceId(),
+ insertTabletPlan.getMeasurements(), insertTabletPlan);
+ }
+
@Override
public void insert(InsertPlan insertPlan) throws QueryProcessException {
try {
+ mManager.lockInsert(insertPlan.getDeviceId());
MeasurementSchema[] schemas = getSeriesSchemas(insertPlan);
insertPlan.setSchemasAndTransferType(schemas);
StorageEngine.getInstance().insert(insertPlan);
if (insertPlan.getFailedMeasurements() != null) {
throw new StorageEngineException(
- "failed to insert points " + insertPlan.getFailedMeasurements());
+ "failed to insert measurements " + insertPlan.getFailedMeasurements());
}
} catch (StorageEngineException | MetadataException e) {
throw new QueryProcessException(e);
- }
- }
-
- protected MeasurementSchema[] getSeriesSchemas(InsertPlan insertPlan) throws MetadataException {
- String[] measurementList = insertPlan.getMeasurements();
- String deviceId = insertPlan.getDeviceId();
- MeasurementSchema[] schemas = new MeasurementSchema[measurementList.length];
-
- MNode node = null;
- try {
- node = mManager.getDeviceNodeWithAutoCreateAndReadLock(deviceId);
- // To reduce the String number in memory, set the deviceId from MManager to insertPlan
- insertPlan.setDeviceId(node.getFullPath());
- } catch (PathNotExistException e) {
- // ignore
- }
- try {
- for (int i = 0; i < measurementList.length; i++) {
- try {
- schemas[i] = getSeriesSchema(node, insertPlan, i);
- if (schemas[i] != null) {
- measurementList[i] = schemas[i].getMeasurementId();
- }
- } catch (MetadataException e) {
- logger.warn("meet error when check {}.{}, message: {}", deviceId, measurementList[i],
- e.getMessage());
- if (enablePartialInsert) {
- insertPlan.markMeasurementInsertionFailed(i);
- } else {
- throw e;
- }
- }
- }
} finally {
- if (node != null) {
- node.readUnlock();
- }
- }
- return schemas;
- }
-
- /**
- * @param loc index of measurement in insertPlan
- */
- private MeasurementSchema getSeriesSchema(MNode deviceNode, InsertPlan insertPlan, int loc)
- throws MetadataException {
- String measurement = insertPlan.getMeasurements()[loc];
- String deviceId = insertPlan.getDeviceId();
- Object value = insertPlan.getValues()[loc];
- boolean isInferType = insertPlan.isInferType();
-
- MeasurementSchema measurementSchema;
- if (deviceNode != null && !deviceNode.hasChild(measurement)) {
- // devices exists in MTree
- if (!IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
- // but measurement not in MTree and cannot auto-create, try the cache
- measurementSchema = MManager.getInstance().getSeriesSchema(deviceId, measurement);
- if (measurementSchema == null) {
- throw new PathNotExistException(deviceId + PATH_SEPARATOR + measurement);
- }
- } else {
- // auto-create
- TSDataType dataType = TypeInferenceUtils.getPredictedDataType(value, isInferType);
- Path path = new Path(deviceId, measurement);
- internalCreateTimeseries(path.toString(), dataType);
-
- MeasurementMNode measurementNode = (MeasurementMNode) mManager
- .getChild(deviceNode, measurement);
- measurementSchema = measurementNode.getSchema();
- if (!isInferType) {
- checkType(insertPlan, loc, measurementNode.getSchema().getType());
- }
- }
- } else if (deviceNode != null) {
- // device and measurement exists in MTree
- MeasurementMNode measurementNode = (MeasurementMNode) MManager.getInstance()
- .getChild(deviceNode, measurement);
- measurementSchema = measurementNode.getSchema();
- } else {
- // device in not in MTree, try the cache
- measurementSchema = mManager.getSeriesSchema(deviceId, measurement);
- }
- return measurementSchema;
- }
-
- private void checkType(InsertPlan plan, int loc, TSDataType type) {
- plan.getTypes()[loc] = type;
- try {
- switch (type) {
- case INT32:
- if (!(plan.getValues()[loc] instanceof Integer)) {
- plan.getValues()[loc] =
- Integer.parseInt(((Binary) plan.getValues()[loc]).getStringValue());
- }
- break;
- case INT64:
- if (!(plan.getValues()[loc] instanceof Long)) {
- plan.getValues()[loc] =
- Long.parseLong(((Binary) plan.getValues()[loc]).getStringValue());
- }
- break;
- case DOUBLE:
- if (!(plan.getValues()[loc] instanceof Double)) {
- plan.getValues()[loc] =
- Double.parseDouble(((Binary) plan.getValues()[loc]).getStringValue());
- }
- break;
- case FLOAT:
- if (!(plan.getValues()[loc] instanceof Float)) {
- plan.getValues()[loc] =
- Float.parseFloat(((Binary) plan.getValues()[loc]).getStringValue());
- }
- break;
- case BOOLEAN:
- if (!(plan.getValues()[loc] instanceof Boolean)) {
- plan.getValues()[loc] =
- Boolean.parseBoolean(((Binary) plan.getValues()[loc]).getStringValue());
- }
- break;
- case TEXT:
- // need to do nothing
- break;
- }
- } catch (ClassCastException e) {
- logger.error("inconsistent type between client and server");
- }
- }
-
- /**
- * create timeseries with ignore PathAlreadyExistException
- */
- private void internalCreateTimeseries(String path, TSDataType dataType) throws MetadataException {
- try {
- mManager.createTimeseries(
- path,
- dataType,
- getDefaultEncoding(dataType),
- TSFileDescriptor.getInstance().getConfig().getCompressor(),
- Collections.emptyMap());
- } catch (PathAlreadyExistException e) {
- if (logger.isDebugEnabled()) {
- logger.debug("Ignore PathAlreadyExistException when Concurrent inserting"
- + " a non-exist time series {}", path);
- }
- }
- }
-
- /**
- * Get default encoding by dataType
- */
- private TSEncoding getDefaultEncoding(TSDataType dataType) {
- IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
- switch (dataType) {
- case BOOLEAN:
- return conf.getDefaultBooleanEncoding();
- case INT32:
- return conf.getDefaultInt32Encoding();
- case INT64:
- return conf.getDefaultInt64Encoding();
- case FLOAT:
- return conf.getDefaultFloatEncoding();
- case DOUBLE:
- return conf.getDefaultDoubleEncoding();
- case TEXT:
- return conf.getDefaultTextEncoding();
- default:
- throw new UnSupportedDataTypeException(
- String.format("Data type %s is not supported.", dataType.toString()));
+ mManager.unlockInsert(insertPlan.getDeviceId());
}
}
@Override
public void insertTablet(InsertTabletPlan insertTabletPlan) throws QueryProcessException {
- MNode node = null;
try {
- String[] measurementList = insertTabletPlan.getMeasurements();
- String deviceId = insertTabletPlan.getDeviceId();
- node = mManager.getDeviceNodeWithAutoCreateAndReadLock(deviceId);
- // To reduce the String number in memory, use the deviceId from MManager
- deviceId = node.getFullPath();
- insertTabletPlan.setDeviceId(deviceId);
- TSDataType[] dataTypes = insertTabletPlan.getDataTypes();
- IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
- MeasurementSchema[] schemas = new MeasurementSchema[measurementList.length];
-
- String measurement;
- for (int i = 0; i < measurementList.length; i++) {
- measurement = measurementList[i];
- // check if timeseries exists
- if (!node.hasChild(measurement)) {
- if (!conf.isAutoCreateSchemaEnabled()) {
- throw new QueryProcessException(String.format(
- "Current deviceId[%s] does not contain measurement:%s", deviceId, measurement));
- }
- Path path = new Path(deviceId, measurement);
- TSDataType dataType = dataTypes[i];
- internalCreateTimeseries(path.getFullPath(), dataType);
-
- }
- MeasurementMNode measurementNode = (MeasurementMNode) mManager.getChild(node, measurement);
-
- // check data type
- if (measurementNode.getSchema().getType() != insertTabletPlan.getDataTypes()[i]) {
- if (!enablePartialInsert) {
- throw new QueryProcessException(String.format(
- "Datatype mismatch, Insert measurement %s type %s, metadata tree type %s",
- measurement, insertTabletPlan.getDataTypes()[i],
- measurementNode.getSchema().getType()));
- } else {
- insertTabletPlan.markMeasurementInsertionFailed(i);
- continue;
- }
- }
- schemas[i] = measurementNode.getSchema();
- // reset measurement to common name instead of alias
- measurementList[i] = measurementNode.getName();
- }
+ mManager.lockInsert(insertTabletPlan.getDeviceId());
+ MeasurementSchema[] schemas = getSeriesSchemas(insertTabletPlan);
insertTabletPlan.setSchemas(schemas);
StorageEngine.getInstance().insertTablet(insertTabletPlan);
if (insertTabletPlan.getFailedMeasurements() != null) {
@@ -1152,9 +891,7 @@ public class PlanExecutor implements IPlanExecutor {
} catch (StorageEngineException | MetadataException e) {
throw new QueryProcessException(e);
} finally {
- if (node != null) {
- node.readUnlock();
- }
+ mManager.unlockInsert(insertTabletPlan.getDeviceId());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index 5641f4b..0190699 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -18,13 +18,6 @@
*/
package org.apache.iotdb.db.qp.physical.crud;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
@@ -46,6 +39,14 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
public class InsertPlan extends PhysicalPlan {
private static final Logger logger = LoggerFactory.getLogger(InsertPlan.class);
@@ -57,9 +58,9 @@ public class InsertPlan extends PhysicalPlan {
private TSDataType[] types;
private MeasurementSchema[] schemas;
- // if inferType is false, use the type of values directly
- // if inferType is true, values is String[], and infer types from them
- private boolean inferType = false;
+ // if isNeedInferType is true, the values must be String[], so we could infer types from them
+ // if values is object[], we could use the raw type of them, and we should set this to false
+ private boolean isNeedInferType = false;
// record the failed measurements
private List<String> failedMeasurements;
@@ -144,7 +145,7 @@ public class InsertPlan extends PhysicalPlan {
this.types = new TSDataType[measurements.length];
this.values = new Object[measurements.length];
System.arraycopy(insertValues, 0, values, 0, measurements.length);
- inferType = true;
+ isNeedInferType = true;
canBeSplit = false;
}
@@ -157,12 +158,12 @@ public class InsertPlan extends PhysicalPlan {
this.time = time;
}
- public boolean isInferType() {
- return inferType;
+ public boolean isNeedInferType() {
+ return isNeedInferType;
}
- public void setInferType(boolean inferType) {
- this.inferType = inferType;
+ public void setNeedInferType(boolean inferType) {
+ this.isNeedInferType = inferType;
}
public MeasurementSchema[] getSchemas() {
@@ -175,7 +176,7 @@ public class InsertPlan extends PhysicalPlan {
*/
public void setSchemasAndTransferType(MeasurementSchema[] schemas) throws QueryProcessException {
this.schemas = schemas;
- if (inferType) {
+ if (isNeedInferType) {
for (int i = 0; i < schemas.length; i++) {
if (schemas[i] == null) {
if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index c9b42ca..8f62241 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -18,22 +18,11 @@
*/
package org.apache.iotdb.db.service;
-import static org.apache.iotdb.db.conf.IoTDBConfig.PATH_PATTERN;
-import static org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType.TIMESERIES;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.sql.SQLException;
-import java.time.ZoneId;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
import org.antlr.v4.runtime.misc.ParseCancellationException;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.AuthorityChecker;
-import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
import org.apache.iotdb.db.auth.authorizer.BasicAuthorizer;
+import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -56,12 +45,7 @@ import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.*;
import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
-import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
+import org.apache.iotdb.db.qp.physical.sys.*;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.NonAlignEngineDataSet;
@@ -73,33 +57,7 @@ import org.apache.iotdb.db.utils.QueryDataSetUtils;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.ServerProperties;
-import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
-import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
-import org.apache.iotdb.service.rpc.thrift.TSIService;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
-import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.service.rpc.thrift.*;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -112,6 +70,18 @@ import org.apache.thrift.server.ServerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.SQLException;
+import java.time.ZoneId;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.db.conf.IoTDBConfig.PATH_PATTERN;
+import static org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType.TIMESERIES;
+
/**
* Thrift RPC implementation at server side.
@@ -1081,7 +1051,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
plan.setTypes(new TSDataType[plan.getMeasurements().length]);
plan.setValues(new Object[plan.getMeasurements().length]);
plan.setValues(req.valuesList.get(i));
- plan.setInferType(req.isInferType());
+ plan.setNeedInferType(req.isInferType());
TSStatus status = checkAuthority(plan, req.getSessionId());
if (status != null) {
statusList.add(status);
@@ -1139,7 +1109,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
plan.setTypes(new TSDataType[plan.getMeasurements().length]);
plan.setValues(new Object[plan.getMeasurements().length]);
plan.setValues(req.values);
- plan.setInferType(req.isInferType());
+ plan.setNeedInferType(req.isInferType());
TSStatus status = checkAuthority(plan, req.getSessionId());
if (status != null) {
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
index 05bfd78..d5fd7f5 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
@@ -18,15 +18,6 @@
*/
package org.apache.iotdb.db.integration;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
@@ -36,6 +27,10 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.sql.*;
+
+import static org.junit.Assert.*;
+
public class IoTDBSimpleQueryIT {
@Before
diff --git a/session/src/test/java/org/apache/iotdb/session/SessionUT.java b/session/src/test/java/org/apache/iotdb/session/SessionUT.java
index 2f60380..9ae71ff 100644
--- a/session/src/test/java/org/apache/iotdb/session/SessionUT.java
+++ b/session/src/test/java/org/apache/iotdb/session/SessionUT.java
@@ -18,11 +18,7 @@
*/
package org.apache.iotdb.session;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.iotdb.db.conf.IoTDBConstant;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
@@ -37,6 +33,9 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.List;
+
import static org.junit.Assert.*;
public class SessionUT {
@@ -122,14 +121,16 @@ public class SessionUT {
String deviceId = "root.sg1.d1";
- session.createTimeseries(deviceId + "s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.UNCOMPRESSED);
- session.createTimeseries(deviceId + "s2", TSDataType.INT64, TSEncoding.RLE, CompressionType.UNCOMPRESSED);
- session.createTimeseries(deviceId + "s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.UNCOMPRESSED);
+ session.createTimeseries(deviceId + ".s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.UNCOMPRESSED);
+ session.createTimeseries(deviceId + ".s2", TSDataType.INT64, TSEncoding.RLE, CompressionType.UNCOMPRESSED);
+ session.createTimeseries(deviceId + ".s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.UNCOMPRESSED);
+ session.createTimeseries(deviceId + ".s4", TSDataType.DOUBLE, TSEncoding.RLE, CompressionType.UNCOMPRESSED);
List<MeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
schemaList.add(new MeasurementSchema("s2", TSDataType.DOUBLE, TSEncoding.RLE));
schemaList.add(new MeasurementSchema("s3", TSDataType.TEXT, TSEncoding.PLAIN));
+ schemaList.add(new MeasurementSchema("s4", TSDataType.INT64, TSEncoding.PLAIN));
Tablet tablet = new Tablet("root.sg1.d1", schemaList, 10);
@@ -145,11 +146,15 @@ public class SessionUT {
sensor2[row] = 0.1 + time;
Binary[] sensor3 = (Binary[]) values[2];
sensor3[row] = Binary.valueOf("ha" + time);
+ long[] sensor4 = (long[]) values[3];
+ sensor4[row] = time;
}
- if (tablet.rowSize != 0) {
+ try {
session.insertTablet(tablet);
- tablet.reset();
+ fail();
+ } catch (StatementExecutionException e) {
+ // ignore
}
SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg1.d1");
@@ -157,7 +162,8 @@ public class SessionUT {
while (dataSet.hasNext()) {
RowRecord record = dataSet.next();
System.out.println(record.toString());
- assertEquals(i, record.getFields().get(1).getLongV());
+ assertEquals(i, record.getFields().get(0).getLongV());
+ assertTrue(record.getFields().get(1).isNull());
assertTrue(record.getFields().get(2).isNull());
assertTrue(record.getFields().get(3).isNull());
i++;