You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/07/07 03:10:59 UTC
[incubator-iotdb] branch master updated: premerge for the
distributed version (#1326)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1e2c3e9 premerge for the distributed version (#1326)
1e2c3e9 is described below
commit 1e2c3e9674a91edfb27f10f91cebb1c6d1173e93
Author: Jiang Tian <jt...@163.com>
AuthorDate: Mon Jul 6 22:07:55 2020 -0500
premerge for the distributed version (#1326)
* a premerge for the distributed version
* add sync thread interruption
* fix dead wait in SyncServerManager and add retry
---
cli/src/assembly/resources/sbin/start-cli.sh | 2 +-
.../java/org/apache/iotdb/cli/AbstractCli.java | 10 ++
.../iotdb/db/auth/authorizer/BasicAuthorizer.java | 2 +-
.../org/apache/iotdb/db/engine/StorageEngine.java | 11 +-
.../engine/storagegroup/StorageGroupProcessor.java | 22 +++-
.../db/engine/storagegroup/TsFileResource.java | 10 +-
.../version/SimpleFileVersionController.java | 10 +-
.../org/apache/iotdb/db/metadata/MManager.java | 91 +++++++++++----
.../java/org/apache/iotdb/db/metadata/MTree.java | 22 +++-
.../apache/iotdb/db/qp/constant/SQLConstant.java | 4 +-
.../iotdb/db/qp/physical/crud/InsertRowPlan.java | 39 +++++--
.../db/qp/physical/crud/InsertTabletPlan.java | 123 +++++++++------------
.../db/qp/physical/sys/ShowTimeSeriesPlan.java | 8 ++
.../query/dataset/groupby/GroupByFillDataSet.java | 2 +-
.../iotdb/db/query/executor/LastQueryExecutor.java | 26 +++--
.../iotdb/db/query/executor/QueryRouter.java | 7 +-
.../db/query/executor/RawDataQueryExecutor.java | 12 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 30 ++---
.../org/apache/iotdb/db/utils/CommonUtils.java | 32 +++---
.../org/apache/iotdb/db/utils/SchemaUtils.java | 7 +-
.../org/apache/iotdb/db/utils/SerializeUtils.java | 24 +++-
.../apache/iotdb/db/utils/TypeInferenceUtils.java | 4 +-
.../apache/iotdb/db/qp/plan/SerializationTest.java | 88 +++++++++++++++
.../org/apache/iotdb/db/tools/WalCheckerTest.java | 2 +-
thrift/src/main/thrift/cluster.thrift | 34 +++++-
.../iotdb/tsfile/utils/ReadWriteIOUtils.java | 12 +-
.../tsfile/write/schema/MeasurementSchema.java | 3 +
...easurementSchema.java => TimeseriesSchema.java} | 108 +++++++-----------
28 files changed, 489 insertions(+), 256 deletions(-)
diff --git a/cli/src/assembly/resources/sbin/start-cli.sh b/cli/src/assembly/resources/sbin/start-cli.sh
index 3d02904..45bc03c 100644
--- a/cli/src/assembly/resources/sbin/start-cli.sh
+++ b/cli/src/assembly/resources/sbin/start-cli.sh
@@ -29,7 +29,7 @@ fi
MAIN_CLASS=org.apache.iotdb.cli.Cli
-CLASSPATH=""
+CLASSPATH="."
for f in ${IOTDB_CLI_HOME}/lib/*.jar; do
CLASSPATH=${CLASSPATH}":"$f
done
diff --git a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
index 36df8f8..a99dac3 100644
--- a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
+++ b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
@@ -345,7 +345,17 @@ public abstract class AbstractCli {
for (int j = index + 1; j < args.length; j++) {
executeCommand.append(args[j]).append(" ");
}
+ // remove last space
executeCommand.deleteCharAt(executeCommand.length() - 1);
+ // some bashes may not remove quotes of parameters automatically, remove them in that case
+ if (executeCommand.charAt(0) == '\'' || executeCommand.charAt(0) == '\"') {
+ executeCommand.deleteCharAt(0);
+ if (executeCommand.charAt(executeCommand.length() - 1) == '\''
+ || executeCommand.charAt(executeCommand.length() - 1) == '\"') {
+ executeCommand.deleteCharAt(executeCommand.length() - 1);
+ }
+ }
+
execute = executeCommand.toString();
hasExecuteSQL = true;
args = Arrays.copyOfRange(args, 0, index);
diff --git a/server/src/main/java/org/apache/iotdb/db/auth/authorizer/BasicAuthorizer.java b/server/src/main/java/org/apache/iotdb/db/auth/authorizer/BasicAuthorizer.java
index 245e3f8..57f6009 100644
--- a/server/src/main/java/org/apache/iotdb/db/auth/authorizer/BasicAuthorizer.java
+++ b/server/src/main/java/org/apache/iotdb/db/auth/authorizer/BasicAuthorizer.java
@@ -18,12 +18,12 @@
*/
package org.apache.iotdb.db.auth.authorizer;
-
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.entity.PrivilegeType;
import org.apache.iotdb.db.auth.entity.Role;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index aac4265..68332bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -590,6 +590,11 @@ public class StorageEngine implements IService {
return timePartitionInterval;
}
+ @TestOnly
+ public static void setTimePartitionInterval(long timePartitionInterval) {
+ StorageEngine.timePartitionInterval = timePartitionInterval;
+ }
+
public static long getTimePartition(long time) {
return enablePartition ? time / timePartitionInterval : 0;
}
@@ -605,17 +610,13 @@ public class StorageEngine implements IService {
getProcessor(storageGroup).setPartitionFileVersionToMax(partitionId, newMaxVersion);
}
+
public void removePartitions(String storageGroupName, TimePartitionFilter filter)
throws StorageEngineException {
getProcessor(storageGroupName).removePartitions(filter);
}
@TestOnly
- public static void setTimePartitionInterval(long timePartitionInterval) {
- StorageEngine.timePartitionInterval = timePartitionInterval;
- }
-
- @TestOnly
public static void setEnablePartition(boolean enablePartition) {
StorageEngine.enablePartition = enablePartition;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index bca9cb1..747cabb 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -249,6 +249,14 @@ public class StorageGroupProcessor {
}
+ private Map<Long, List<TsFileResource>> splitResourcesByPartition(List<TsFileResource> resources) {
+ Map<Long, List<TsFileResource>> ret = new HashMap<>();
+ for (TsFileResource resource : resources) {
+ ret.computeIfAbsent(resource.getTimePartition(), l -> new ArrayList<>()).add(resource);
+ }
+ return ret;
+ }
+
private void recover() throws StorageGroupProcessorException {
logger.info("recover Storage Group {}", storageGroupName);
@@ -265,8 +273,16 @@ public class StorageGroupProcessor {
List<TsFileResource> oldUnseqTsFiles = unseqTsFilesPair.right;
upgradeUnseqFileList.addAll(oldUnseqTsFiles);
- recoverSeqFiles(tmpSeqTsFiles);
- recoverUnseqFiles(tmpUnseqTsFiles);
+ // split by partition so that we can find the last file of each partition and decide to
+ // close it or not
+ Map<Long, List<TsFileResource>> partitionTmpSeqTsFiles = splitResourcesByPartition(tmpSeqTsFiles);
+ Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles = splitResourcesByPartition(tmpUnseqTsFiles);
+ for (List<TsFileResource> value : partitionTmpSeqTsFiles.values()) {
+ recoverSeqFiles(value);
+ }
+ for (List<TsFileResource> value : partitionTmpUnseqTsFiles.values()) {
+ recoverUnseqFiles(value);
+ }
for (TsFileResource resource : sequenceFileTreeSet) {
long partitionNum = resource.getTimePartition();
@@ -830,7 +846,7 @@ public class StorageGroupProcessor {
}
}
} catch (MetadataException e) {
- throw new WriteProcessException(e);
+ // skip last cache update if the local MTree does not contain the schema
} finally {
if (node != null) {
node.readUnlock();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index fbb5164..ab90391 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -60,23 +60,23 @@ public class TsFileResource {
public static final String RESOURCE_SUFFIX = ".resource";
static final String TEMP_SUFFIX = ".temp";
private static final String CLOSING_SUFFIX = ".closing";
- private static final int INIT_ARRAY_SIZE = 64;
+ protected static final int INIT_ARRAY_SIZE = 64;
/**
* start times array.
*/
- private long[] startTimes;
+ protected long[] startTimes;
/**
* end times array.
* The values in this array are Long.MIN_VALUE if it's an unsealed sequence tsfile
*/
- private long[] endTimes;
+ protected long[] endTimes;
/**
* device -> index of start times array and end times array
*/
- private Map<String, Integer> deviceToIndex;
+ protected Map<String, Integer> deviceToIndex;
public TsFileProcessor getProcessor() {
return processor;
@@ -234,7 +234,7 @@ public class TsFileResource {
}
}
- private void initTimes(long[] times, long defaultTime) {
+ protected void initTimes(long[] times, long defaultTime) {
Arrays.fill(times, defaultTime);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java b/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
index 3bc438f..665b720 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
@@ -19,15 +19,13 @@
package org.apache.iotdb.db.engine.version;
+import java.io.File;
+import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
/**
* SimpleFileVersionController uses a local file and its file name to store the version.
*/
@@ -144,7 +142,9 @@ public class SimpleFileVersionController implements VersionController {
} else {
versionFile = SystemFileFactory.INSTANCE.getFile(directory, FILE_PREFIX + "0");
prevVersion = 0;
- new FileOutputStream(versionFile).close();
+ if (!versionFile.createNewFile()) {
+ logger.warn("Cannot create new version file {}", versionFile);
+ }
}
// prevent overlapping in case of failure
currVersion = prevVersion + saveInterval;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 6bc01ca..024bf35 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -18,6 +18,31 @@
*/
package org.apache.iotdb.db.metadata;
+import static java.util.stream.Collectors.toList;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -26,7 +51,12 @@ import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.exception.ConfigAdjusterException;
-import org.apache.iotdb.db.exception.metadata.*;
+import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
@@ -52,23 +82,10 @@ import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static java.util.stream.Collectors.toList;
-
/**
* This class takes the responsibility of serialization of all the metadata info and persistent it
* into files. This class contains all the interfaces to modify the metadata for delta system. All
@@ -730,9 +747,13 @@ public class MManager {
* @return A List instance which stores all node at given level
*/
public List<String> getNodesList(String prefixPath, int nodeLevel) throws MetadataException {
+ return getNodesList(prefixPath, nodeLevel, null);
+ }
+
+ public List<String> getNodesList(String prefixPath, int nodeLevel, StorageGroupFilter filter) throws MetadataException {
lock.readLock().lock();
try {
- return mtree.getNodesList(prefixPath, nodeLevel);
+ return mtree.getNodesList(prefixPath, nodeLevel, filter);
} finally {
lock.readLock().unlock();
}
@@ -1699,7 +1720,23 @@ public class MManager {
}
}
- public void collectSeries(MNode startingNode, Collection<MeasurementSchema> timeseriesSchemas) {
+ public void collectTimeseriesSchema(MNode startingNode, Collection<TimeseriesSchema> timeseriesSchemas) {
+ Deque<MNode> nodeDeque = new ArrayDeque<>();
+ nodeDeque.addLast(startingNode);
+ while (!nodeDeque.isEmpty()) {
+ MNode node = nodeDeque.removeFirst();
+ if (node instanceof MeasurementMNode) {
+ MeasurementSchema nodeSchema = ((MeasurementMNode) node).getSchema();
+ timeseriesSchemas.add(new TimeseriesSchema(node.getFullPath(), nodeSchema.getType(),
+ nodeSchema.getEncodingType(), nodeSchema.getCompressor()));
+ } else if (!node.getChildren().isEmpty()) {
+ nodeDeque.addAll(node.getChildren().values());
+ }
+ }
+ }
+
+ public void collectMeasurementSchema(MNode startingNode,
+ Collection<MeasurementSchema> timeseriesSchemas) {
Deque<MNode> nodeDeque = new ArrayDeque<>();
nodeDeque.addLast(startingNode);
while (!nodeDeque.isEmpty()) {
@@ -1715,17 +1752,19 @@ public class MManager {
}
/**
- * Collect the timeseries schemas under "startingPath". Notice the measurements in the collected
- * MeasurementSchemas are the full path here.
+ * Collect the timeseries schemas under "startingPath".
+ *
+ * @param startingPath
+ * @param measurementSchemas
*/
- public void collectSeries(String startingPath, List<MeasurementSchema> timeseriesSchemas) {
+ public void collectSeries(String startingPath, List<MeasurementSchema> measurementSchemas) {
MNode mNode;
try {
mNode = getNodeByPath(startingPath);
} catch (MetadataException e) {
return;
}
- collectSeries(mNode, timeseriesSchemas);
+ collectMeasurementSchema(mNode, measurementSchemas);
}
/**
@@ -1779,6 +1818,16 @@ public class MManager {
}
}
+ /**
+ * StorageGroupFilter filters unsatisfied storage groups in metadata queries to speed up and
+ * deduplicate.
+ */
+ @FunctionalInterface
+ public interface StorageGroupFilter {
+
+ boolean satisfy(String storageGroup);
+ }
+
private void checkMTreeModified() {
if (logWriter == null || logFile == null) {
// the logWriter is not initialized now, we skip the check once.
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
index 5136124..0e9472f 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.metadata;
import static java.util.stream.Collectors.toList;
import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_WILDCARD;
-import static org.apache.iotdb.db.query.executor.LastQueryExecutor.calculateLastPairForOneSeries;
+import static org.apache.iotdb.db.query.executor.LastQueryExecutor.calculateLastPairForOneSeriesLocally;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
@@ -60,6 +60,7 @@ import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.metadata.MManager.StorageGroupFilter;
import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
@@ -758,7 +759,7 @@ public class MTree implements Serializable {
return node.getCachedLast().getTimestamp();
} else {
try {
- last = calculateLastPairForOneSeries(new Path(node.getFullPath()),
+ last = calculateLastPairForOneSeriesLocally(new Path(node.getFullPath()),
node.getSchema().getType(), new QueryContext(-1), Collections.emptySet());
return last.getTimestamp();
} catch (Exception e) {
@@ -879,6 +880,11 @@ public class MTree implements Serializable {
* Get all paths from root to the given level.
*/
List<String> getNodesList(String path, int nodeLevel) throws MetadataException {
+ return getNodesList(path, nodeLevel, null);
+ }
+
+ /** Get all paths from root to the given level */
+ List<String> getNodesList(String path, int nodeLevel, StorageGroupFilter filter) throws MetadataException {
String[] nodes = MetaUtils.getNodeNames(path);
if (!nodes[0].equals(root.getName())) {
throw new IllegalPathException(path);
@@ -888,11 +894,14 @@ public class MTree implements Serializable {
for (int i = 1; i < nodes.length; i++) {
if (node.getChild(nodes[i]) != null) {
node = node.getChild(nodes[i]);
+ if (node instanceof StorageGroupMNode && filter != null && !filter.satisfy(node.getFullPath())) {
+ return res;
+ }
} else {
throw new MetadataException(nodes[i - 1] + " does not have the child node " + nodes[i]);
}
}
- findNodes(node, path, res, nodeLevel - (nodes.length - 1));
+ findNodes(node, path, res, nodeLevel - (nodes.length - 1), filter);
return res;
}
@@ -901,8 +910,9 @@ public class MTree implements Serializable {
*
* @param targetLevel Record the distance to the target level, 0 means the target level.
*/
- private void findNodes(MNode node, String path, List<String> res, int targetLevel) {
- if (node == null) {
+ private void findNodes(MNode node, String path, List<String> res, int targetLevel,
+ StorageGroupFilter filter) {
+ if (node == null || node instanceof StorageGroupMNode && filter != null && !filter.satisfy(node.getFullPath())) {
return;
}
if (targetLevel == 0) {
@@ -910,7 +920,7 @@ public class MTree implements Serializable {
return;
}
for (MNode child : node.getChildren().values()) {
- findNodes(child, path + PATH_SEPARATOR + child.toString(), res, targetLevel - 1);
+ findNodes(child, path + PATH_SEPARATOR + child.toString(), res, targetLevel - 1, filter);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
index c2aea65..5fdb148 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
@@ -45,8 +45,8 @@ public class SQLConstant {
public static final String METADATA_PARAM_EQUAL = "=";
public static final String QUOTE = "'";
public static final String DQUOTE = "\"";
- public static final String BOOLEN_TRUE = "true";
- public static final String BOOLEN_FALSE = "false";
+ public static final String BOOLEAN_TRUE = "true";
+ public static final String BOOLEAN_FALSE = "false";
public static final String BOOLEAN_TRUE_NUM = "1";
public static final String BOOLEAN_FALSE_NUM = "0";
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index 0bef4f8..5c0f4d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory;
public class InsertRowPlan extends InsertPlan {
private static final Logger logger = LoggerFactory.getLogger(InsertRowPlan.class);
+ private static final short TYPE_RAW_STRING = -1;
private long time;
private Object[] values;
@@ -245,24 +246,26 @@ public class InsertRowPlan extends InsertPlan {
}
}
- for (MeasurementSchema schema : schemas) {
- if (schema != null) {
- schema.serializeTo(stream);
- }
- }
-
try {
putValues(stream);
} catch (QueryProcessException e) {
throw new IOException(e);
}
+
+ // the types are not inferred before the plan is serialized
+ stream.write((byte) (isNeedInferType ? 1 : 0));
}
private void putValues(DataOutputStream outputStream) throws QueryProcessException, IOException {
for (int i = 0; i < values.length; i++) {
- if (dataTypes[i] == null) {
+ // types are not determined, the situation mainly occurs when the plan uses string values
+ // and is forwarded to other nodes
+ if (dataTypes == null || dataTypes[i] == null) {
+ ReadWriteIOUtils.write(TYPE_RAW_STRING, outputStream);
+ ReadWriteIOUtils.write((String) values[i], outputStream);
continue;
}
+
ReadWriteIOUtils.write(dataTypes[i], outputStream);
switch (dataTypes[i]) {
case BOOLEAN:
@@ -291,9 +294,14 @@ public class InsertRowPlan extends InsertPlan {
private void putValues(ByteBuffer buffer) throws QueryProcessException {
for (int i = 0; i < values.length; i++) {
- if (dataTypes[i] == null) {
+ // types are not determined, the situation mainly occurs when the plan uses string values
+ // and is forwarded to other nodes
+ if (dataTypes == null || dataTypes[i] == null) {
+ ReadWriteIOUtils.write(TYPE_RAW_STRING, buffer);
+ ReadWriteIOUtils.write((String) values[i], buffer);
continue;
}
+
ReadWriteIOUtils.write(dataTypes[i], buffer);
switch (dataTypes[i]) {
case BOOLEAN:
@@ -325,7 +333,15 @@ public class InsertRowPlan extends InsertPlan {
*/
public void fillValues(ByteBuffer buffer) throws QueryProcessException {
for (int i = 0; i < measurements.length; i++) {
- dataTypes[i] = ReadWriteIOUtils.readDataType(buffer);
+ // types are not determined, the situation mainly occurs when the plan uses string values
+ // and is forwarded to other nodes
+ short typeNum = ReadWriteIOUtils.readShort(buffer);
+ if (typeNum == TYPE_RAW_STRING) {
+ values[i] = ReadWriteIOUtils.readString(buffer);
+ continue;
+ }
+
+ dataTypes[i] = TSDataType.values()[typeNum];
switch (dataTypes[i]) {
case BOOLEAN:
values[i] = ReadWriteIOUtils.readBool(buffer);
@@ -373,6 +389,9 @@ public class InsertRowPlan extends InsertPlan {
} catch (QueryProcessException e) {
e.printStackTrace();
}
+
+ // the types are not inferred before the plan is serialized
+ buffer.put((byte) (isNeedInferType ? 1 : 0));
}
@Override
@@ -394,6 +413,8 @@ public class InsertRowPlan extends InsertPlan {
} catch (QueryProcessException e) {
e.printStackTrace();
}
+
+ isNeedInferType = buffer.get() == 1;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index 7fff034..922ed25 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -49,7 +49,6 @@ public class InsertTabletPlan extends InsertPlan {
private Object[] columns;
private ByteBuffer valueBuffer;
- private Set<Integer> index;
private int rowCount = 0;
// cached values
private Long maxTime = null;
@@ -98,14 +97,6 @@ public class InsertTabletPlan extends InsertPlan {
this.end = end;
}
- public Set<Integer> getIndex() {
- return index;
- }
-
- public void setIndex(Set<Integer> index) {
- this.index = index;
- }
-
@Override
public List<Path> getPaths() {
if (paths != null) {
@@ -142,11 +133,11 @@ public class InsertTabletPlan extends InsertPlan {
stream.writeShort(dataType.serialize());
}
- stream.writeInt(index.size());
+ stream.writeInt(end - start);
if (timeBuffer == null) {
- for (int loc : index) {
- stream.writeLong(times[loc]);
+ for (int i = start; i < end; i++) {
+ stream.writeLong(times[i]);
}
} else {
stream.write(timeBuffer.array());
@@ -161,62 +152,6 @@ public class InsertTabletPlan extends InsertPlan {
}
}
- private void serializeValues(DataOutputStream stream) throws IOException {
- for (int i = 0; i < measurements.length; i++) {
- if (measurements[i] == null) {
- continue;
- }
- serializeColumn(dataTypes[i], columns[i], stream, index);
- }
- }
-
- private void serializeColumn(TSDataType dataType, Object column, DataOutputStream stream,
- Set<Integer> index)
- throws IOException {
- switch (dataType) {
- case INT32:
- int[] intValues = (int[]) column;
- for (int loc : index) {
- stream.writeInt(intValues[loc]);
- }
- break;
- case INT64:
- long[] longValues = (long[]) column;
- for (int loc : index) {
- stream.writeLong(longValues[loc]);
- }
- break;
- case FLOAT:
- float[] floatValues = (float[]) column;
- for (int loc : index) {
- stream.writeFloat(floatValues[loc]);
- }
- break;
- case DOUBLE:
- double[] doubleValues = (double[]) column;
- for (int loc : index) {
- stream.writeDouble(doubleValues[loc]);
- }
- break;
- case BOOLEAN:
- boolean[] boolValues = (boolean[]) column;
- for (int loc : index) {
- stream.write(BytesUtils.boolToByte(boolValues[loc]));
- }
- break;
- case TEXT:
- Binary[] binaryValues = (Binary[]) column;
- for (int loc : index) {
- stream.writeInt(binaryValues[loc].getLength());
- stream.write(binaryValues[loc].getValues());
- }
- break;
- default:
- throw new UnSupportedDataTypeException(
- String.format(DATATYPE_UNSUPPORTED, dataType));
- }
- }
-
@Override
public void serialize(ByteBuffer buffer) {
int type = PhysicalPlanType.BATCHINSERT.ordinal();
@@ -257,6 +192,12 @@ public class InsertTabletPlan extends InsertPlan {
}
}
+ private void serializeValues(DataOutputStream outputStream) throws IOException {
+ for (int i = 0; i < measurements.length; i++) {
+ serializeColumn(dataTypes[i], columns[i], outputStream, start, end);
+ }
+ }
+
private void serializeValues(ByteBuffer buffer) {
for (int i = 0; i < measurements.length; i++) {
if (measurements[i] == null) {
@@ -312,6 +253,52 @@ public class InsertTabletPlan extends InsertPlan {
}
}
+ private void serializeColumn(TSDataType dataType, Object column, DataOutputStream outputStream,
+ int start, int end) throws IOException {
+ switch (dataType) {
+ case INT32:
+ int[] intValues = (int[]) column;
+ for (int j = start; j < end; j++) {
+ outputStream.writeInt(intValues[j]);
+ }
+ break;
+ case INT64:
+ long[] longValues = (long[]) column;
+ for (int j = start; j < end; j++) {
+ outputStream.writeLong(longValues[j]);
+ }
+ break;
+ case FLOAT:
+ float[] floatValues = (float[]) column;
+ for (int j = start; j < end; j++) {
+ outputStream.writeFloat(floatValues[j]);
+ }
+ break;
+ case DOUBLE:
+ double[] doubleValues = (double[]) column;
+ for (int j = start; j < end; j++) {
+ outputStream.writeDouble(doubleValues[j]);
+ }
+ break;
+ case BOOLEAN:
+ boolean[] boolValues = (boolean[]) column;
+ for (int j = start; j < end; j++) {
+ outputStream.writeByte(BytesUtils.boolToByte(boolValues[j]));
+ }
+ break;
+ case TEXT:
+ Binary[] binaryValues = (Binary[]) column;
+ for (int j = start; j < end; j++) {
+ outputStream.writeInt(binaryValues[j].getLength());
+ outputStream.write(binaryValues[j].getValues());
+ }
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format(DATATYPE_UNSUPPORTED, dataType));
+ }
+ }
+
public void setTimeBuffer(ByteBuffer timeBuffer) {
this.timeBuffer = timeBuffer;
this.timeBuffer.position(0);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowTimeSeriesPlan.java
index e01ad3c..6ffd3a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowTimeSeriesPlan.java
@@ -114,4 +114,12 @@ public class ShowTimeSeriesPlan extends ShowPlan {
limit = buffer.getInt();
orderByHeat = buffer.get() == 1;
}
+
+ public void setLimit(int limit) {
+ this.limit = limit;
+ }
+
+ public void setOffset(int offset) {
+ this.offset = offset;
+ }
}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
index 697cd3b..f83237a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
@@ -90,7 +90,7 @@ public class GroupByFillDataSet extends QueryDataSet {
lastTimeArray = new long[paths.size()];
Arrays.fill(lastTimeArray, Long.MAX_VALUE);
for (int i = 0; i < paths.size(); i++) {
- TimeValuePair lastTimeValuePair = LastQueryExecutor.calculateLastPairForOneSeries(
+ TimeValuePair lastTimeValuePair = LastQueryExecutor.calculateLastPairForOneSeriesLocally(
paths.get(i), dataTypes.get(i), context,
groupByFillPlan.getAllMeasurementsInDevice(paths.get(i).getDevice()));
if (lastTimeValuePair.getValue() != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
index 395109b..93f5e61 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
@@ -103,24 +104,31 @@ public class LastQueryExecutor {
return dataSet;
}
+ protected TimeValuePair calculateLastPairForOneSeries(
+ Path seriesPath, TSDataType tsDataType, QueryContext context, Set<String> deviceMeasurements)
+ throws IOException, QueryProcessException, StorageEngineException {
+ return calculateLastPairForOneSeriesLocally(seriesPath, tsDataType, context,
+ deviceMeasurements);
+ }
+
/**
* get last result for one series
*
* @param context query context
* @return TimeValuePair
*/
- public static TimeValuePair calculateLastPairForOneSeries(
- Path seriesPath, TSDataType tsDataType, QueryContext context, Set<String> sensors)
+ public static TimeValuePair calculateLastPairForOneSeriesLocally(
+ Path seriesPath, TSDataType tsDataType, QueryContext context, Set<String> deviceMeasurements)
throws IOException, QueryProcessException, StorageEngineException {
// Retrieve last value from MNode
- MeasurementMNode node;
+ MeasurementMNode node = null;
try {
node = (MeasurementMNode) IoTDB.metaManager.getNodeByPath(seriesPath.toString());
} catch (MetadataException e) {
- throw new QueryProcessException(e);
+ // TODO use last cache for remote series
}
- if (node.getCachedLast() != null) {
+ if (node != null && node.getCachedLast() != null) {
return node.getCachedLast();
}
@@ -135,7 +143,7 @@ public class LastQueryExecutor {
if (!seqFileResources.isEmpty()) {
for (int i = seqFileResources.size() - 1; i >= 0; i--) {
TimeseriesMetadata timeseriesMetadata = FileLoaderUtils.loadTimeSeriesMetadata(
- seqFileResources.get(i), seriesPath, context, null, sensors);
+ seqFileResources.get(i), seriesPath, context, null, deviceMeasurements);
if (timeseriesMetadata != null) {
if (!timeseriesMetadata.isModified()) {
Statistics timeseriesMetadataStats = timeseriesMetadata.getStatistics();
@@ -165,7 +173,7 @@ public class LastQueryExecutor {
continue;
}
TimeseriesMetadata timeseriesMetadata =
- FileLoaderUtils.loadTimeSeriesMetadata(resource, seriesPath, context, null, sensors);
+ FileLoaderUtils.loadTimeSeriesMetadata(resource, seriesPath, context, null, deviceMeasurements);
if (timeseriesMetadata != null) {
for (ChunkMetadata chunkMetaData : timeseriesMetadata.loadChunkMetadataList()) {
if (chunkMetaData.getEndTime() > resultPair.getTimestamp()
@@ -182,7 +190,9 @@ public class LastQueryExecutor {
}
// Update cached last value with low priority
- node.updateCachedLast(resultPair, false, Long.MIN_VALUE);
+ if (node != null) {
+ node.updateCachedLast(resultPair, false, Long.MIN_VALUE);
+ }
return resultPair;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
index 08add9f..c2591d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
@@ -217,7 +217,12 @@ public class QueryRouter implements IQueryRouter {
@Override
public QueryDataSet lastQuery(LastQueryPlan lastQueryPlan, QueryContext context)
throws StorageEngineException, QueryProcessException, IOException {
- LastQueryExecutor lastQueryExecutor = new LastQueryExecutor(lastQueryPlan);
+ LastQueryExecutor lastQueryExecutor = getLastQueryExecutor(lastQueryPlan);
return lastQueryExecutor.execute(context, lastQueryPlan);
}
+
+ protected LastQueryExecutor getLastQueryExecutor(LastQueryPlan lastQueryPlan) {
+ return new LastQueryExecutor(lastQueryPlan);
+ }
+
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
index f541666..5899643 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
@@ -18,6 +18,12 @@
*/
package org.apache.iotdb.db.query.executor;
+import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -39,12 +45,6 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths;
/**
* IoTDB query executor.
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 3f95840..6d888ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -513,7 +513,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
/**
- * @param plan must be a plan for Query: FillQueryPlan, AggregationPlan, GroupByPlan, some
+ * @param plan must be a plan for Query: FillQueryPlan, AggregationPlan, GroupByTimePlan, some
* AuthorPlan
*/
private TSExecuteStatementResp internalExecuteQueryStatement(String statement,
@@ -961,7 +961,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return new TSExecuteStatementResp(status);
}
- status = executePlan(plan);
+ status = executeNonQueryPlan(plan);
TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(status);
long queryId = generateQueryId(false);
resp.setQueryId(queryId);
@@ -1093,7 +1093,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
if (status != null) {
statusList.add(status);
} else {
- statusList.add(executePlan(plan));
+ statusList.add(executeNonQueryPlan(plan));
}
} catch (Exception e) {
logger.error("meet error when insert in batch", e);
@@ -1131,7 +1131,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
if (status != null) {
statusList.add(status);
} else {
- statusList.add(executePlan(plan));
+ statusList.add(executeNonQueryPlan(plan));
}
} catch (Exception e) {
logger.error("meet error when insert in batch", e);
@@ -1202,7 +1202,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
if (status != null) {
return status;
}
- return executePlan(plan);
+ return executeNonQueryPlan(plan);
} catch (Exception e) {
logger.error("meet error when insert", e);
}
@@ -1232,7 +1232,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
if (status != null) {
return status;
}
- return executePlan(plan);
+ return executeNonQueryPlan(plan);
} catch (Exception e) {
logger.error("meet error when insert", e);
}
@@ -1258,7 +1258,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
if (status != null) {
return new TSStatus(status);
}
- return new TSStatus(executePlan(plan));
+ return new TSStatus(executeNonQueryPlan(plan));
}
@Override
@@ -1283,7 +1283,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return status;
}
- return executePlan(insertTabletPlan);
+ return executeNonQueryPlan(insertTabletPlan);
} catch (Exception e) {
logger.error("{}: error occurs when executing statements", IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils
@@ -1321,7 +1321,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
continue;
}
- statusList.add(executePlan(insertTabletPlan));
+ statusList.add(executeNonQueryPlan(insertTabletPlan));
}
return RpcUtils.getStatus(statusList);
} catch (Exception e) {
@@ -1350,7 +1350,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
if (status != null) {
return new TSStatus(status);
}
- return new TSStatus(executePlan(plan));
+ return new TSStatus(executeNonQueryPlan(plan));
}
@Override
@@ -1368,7 +1368,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
if (status != null) {
return new TSStatus(status);
}
- return new TSStatus(executePlan(plan));
+ return new TSStatus(executeNonQueryPlan(plan));
}
@Override
@@ -1392,7 +1392,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
if (status != null) {
return status;
}
- return executePlan(plan);
+ return executeNonQueryPlan(plan);
}
@Override
@@ -1427,7 +1427,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
continue;
}
- statusList.add(executePlan(plan));
+ statusList.add(executeNonQueryPlan(plan));
}
boolean isAllSuccessful = true;
@@ -1464,7 +1464,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
if (status != null) {
return status;
}
- return executePlan(plan);
+ return executeNonQueryPlan(plan);
}
@Override
@@ -1492,7 +1492,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return null;
}
- protected TSStatus executePlan(PhysicalPlan plan) {
+ protected TSStatus executeNonQueryPlan(PhysicalPlan plan) {
boolean execRet;
try {
execRet = executeNonQuery(plan);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
index c6575de..7639a76 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
@@ -72,15 +72,7 @@ public class CommonUtils {
try {
switch (dataType) {
case BOOLEAN:
- value = value.toLowerCase();
- if (SQLConstant.BOOLEAN_FALSE_NUM.equals(value) || SQLConstant.BOOLEN_FALSE
- .equals(value)) {
- return false;
- }
- if (SQLConstant.BOOLEAN_TRUE_NUM.equals(value) || SQLConstant.BOOLEN_TRUE.equals(value)) {
- return true;
- }
- throw new QueryProcessException("The BOOLEAN should be true/TRUE, false/FALSE or 0/1");
+ return parseBoolean(value);
case INT32:
return Integer.parseInt(value);
case INT64:
@@ -114,15 +106,7 @@ public class CommonUtils {
try {
switch (dataType) {
case BOOLEAN:
- value = value.toLowerCase();
- if (SQLConstant.BOOLEAN_FALSE_NUM.equals(value) || SQLConstant.BOOLEN_FALSE
- .equals(value)) {
- return false;
- }
- if (SQLConstant.BOOLEAN_TRUE_NUM.equals(value) || SQLConstant.BOOLEN_TRUE.equals(value)) {
- return true;
- }
- throw new QueryProcessException("The BOOLEAN should be true/TRUE, false/FALSE or 0/1");
+ return parseBoolean(value);
case INT32:
return Integer.parseInt(value);
case INT64:
@@ -140,4 +124,16 @@ public class CommonUtils {
throw new QueryProcessException(e.getMessage());
}
}
+
+ private static boolean parseBoolean(String value) throws QueryProcessException {
+ value = value.toLowerCase();
+ if (SQLConstant.BOOLEAN_FALSE_NUM.equals(value) || SQLConstant.BOOLEAN_FALSE
+ .equals(value)) {
+ return false;
+ }
+ if (SQLConstant.BOOLEAN_TRUE_NUM.equals(value) || SQLConstant.BOOLEAN_TRUE.equals(value)) {
+ return true;
+ }
+ throw new QueryProcessException("The BOOLEAN should be true/TRUE, false/FALSE or 0/1");
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
index 84c767c..abf158a 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,10 +44,10 @@ public class SchemaUtils {
private static final Logger logger = LoggerFactory.getLogger(SchemaUtils.class);
- public static void registerTimeseries(MeasurementSchema schema) {
+ public static void registerTimeseries(TimeseriesSchema schema) {
try {
logger.debug("Registering timeseries {}", schema);
- String path = schema.getMeasurementId();
+ String path = schema.getFullPath();
TSDataType dataType = schema.getType();
TSEncoding encoding = schema.getEncodingType();
CompressionType compressionType = schema.getCompressor();
@@ -55,7 +56,7 @@ public class SchemaUtils {
} catch (PathAlreadyExistException ignored) {
// ignore added timeseries
} catch (MetadataException e) {
- logger.error("Cannot create timeseries {} in snapshot, ignored", schema.getMeasurementId(),
+ logger.error("Cannot create timeseries {} in snapshot, ignored", schema.getFullPath(),
e);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java
index ce1e755..105547b 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java
@@ -208,7 +208,7 @@ public class SerializeUtils {
}
public static BatchData deserializeBatchData(ByteBuffer buffer) {
- if (buffer == null || buffer.limit() == 0) {
+ if (buffer == null || (buffer.limit() - buffer.position()) == 0) {
return null;
}
@@ -494,4 +494,26 @@ public class SerializeUtils {
}
return ret;
}
+
+ /**
+ * Convert a string representation of a Node to an object.
+ * @param str A string that is generated by Node.toString()
+ * @return a Node object
+ */
+ public static Node stringToNode(String str) {
+ int ipFirstPos = str.indexOf("ip:", 0) + "ip:".length();
+ int ipLastPos = str.indexOf(',', ipFirstPos);
+ int metaPortFirstPos = str.indexOf("metaPort:", ipLastPos) + "metaPort:".length();
+ int metaPortLastPos = str.indexOf(',', metaPortFirstPos);
+ int idFirstPos = str.indexOf("nodeIdentifier:", metaPortLastPos) + "nodeIdentifier:".length();
+ int idLastPos = str.indexOf(',', idFirstPos);
+ int dataPortFirstPos = str.indexOf("dataPort:", idLastPos) + "dataPort:".length();
+ int dataPortLastPos = str.indexOf(')', dataPortFirstPos);
+
+ String ip = str.substring(ipFirstPos, ipLastPos);
+ int metaPort = Integer.parseInt(str.substring(metaPortFirstPos, metaPortLastPos));
+ int id = Integer.parseInt(str.substring(idFirstPos, idLastPos));
+ int dataPort = Integer.parseInt(str.substring(dataPortFirstPos, dataPortLastPos));
+ return new Node(ip, metaPort, id, dataPort);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
index e328874..5310eac 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
@@ -47,8 +47,8 @@ public class TypeInferenceUtils {
}
private static boolean isBoolean(String s) {
- return s.equalsIgnoreCase(SQLConstant.BOOLEN_TRUE) || s
- .equalsIgnoreCase(SQLConstant.BOOLEN_FALSE);
+ return s.equalsIgnoreCase(SQLConstant.BOOLEAN_TRUE) || s
+ .equalsIgnoreCase(SQLConstant.BOOLEAN_FALSE);
}
/**
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/plan/SerializationTest.java b/server/src/test/java/org/apache/iotdb/db/qp/plan/SerializationTest.java
new file mode 100644
index 0000000..9269a52
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/qp/plan/SerializationTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.plan;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.qp.Planner;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SerializationTest {
+
+ private Planner processor = new Planner();
+
+ @Before
+ public void before() throws MetadataException {
+ MManager.getInstance().init();
+ MManager.getInstance().setStorageGroup("root.vehicle");
+ MManager.getInstance()
+ .createTimeseries("root.vehicle.d1.s1", TSDataType.FLOAT, TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED, null);
+ MManager.getInstance()
+ .createTimeseries("root.vehicle.d2.s1", TSDataType.FLOAT, TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED, null);
+ MManager.getInstance()
+ .createTimeseries("root.vehicle.d3.s1", TSDataType.FLOAT, TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED, null);
+ MManager.getInstance()
+ .createTimeseries("root.vehicle.d4.s1", TSDataType.FLOAT, TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED, null);
+ }
+
+ @After
+ public void clean() throws IOException {
+ MManager.getInstance().clear();
+ EnvironmentUtils.cleanAllDir();
+ }
+
+ @Test
+ public void testInsert() throws QueryProcessException, IOException {
+ String sqlStr = "INSERT INTO root.vehicle.d1(timestamp, s1) VALUES (1, 5.0)";
+ PhysicalPlan plan = processor.parseSQLToPhysicalPlan(sqlStr);
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
+ plan.serialize(dataOutputStream);
+ ByteBuffer buffer = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+ PhysicalPlan planB = PhysicalPlan.Factory.create(buffer);
+ assertEquals(plan, planB);
+ }
+
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ plan.serialize(buffer);
+ buffer.flip();
+ PhysicalPlan planB = PhysicalPlan.Factory.create(buffer);
+ assertEquals(plan, planB);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java b/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
index a327a8b..2e368ae 100644
--- a/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
@@ -80,7 +80,7 @@ public class WalCheckerTest {
TSDataType[] types = new TSDataType[]{TSDataType.INT64, TSDataType.INT64, TSDataType.INT64};
String[] values = new String[]{"5", "6", "7"};
for (int j = 0; j < 10; j++) {
- new InsertRowPlan(deviceId, j, measurements, values).serialize(binaryPlans);
+ new InsertRowPlan(deviceId, j, measurements, types, values).serialize(binaryPlans);
}
binaryPlans.flip();
logWriter.write(binaryPlans);
diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift
index b59bee8..94a6cdc 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -120,6 +120,7 @@ struct StartUpStatus {
1: required long partitionInterval
2: required int hashSalt
3: required int replicationNumber
+ 4: required list<Node> seedNodeList
}
// follower -> leader
@@ -127,6 +128,7 @@ struct CheckStatusResponse {
1: required bool partitionalIntervalEquals
2: required bool hashSaltEquals
3: required bool replicationNumEquals
+ 4: required bool seedNodeEquals
}
struct SendSnapshotRequest {
@@ -212,6 +214,16 @@ struct GroupByRequest {
8: required set<string> deviceMeasurements
}
+struct LastQueryRequest {
+ 1: required string path
+ 2: required int dataTypeOrdinal
+ 3: required long queryId
+ 4: required set<string> deviceMeasurements
+ 5: required Node header
+ 6: required Node requestor
+}
+
+
service RaftService {
/**
* Leader will call this method to all followers to ensure its authority.
@@ -269,7 +281,18 @@ service RaftService {
**/
long requestCommitIndex(1:Node header)
+
+ /**
+ * Read a chunk of a file from the client. If the remaining of the file does not have enough
+ * bytes, only the remaining will be returned.
+ * Notice that when the last chunk of the file is read, the file will be deleted immediately.
+ **/
binary readFile(1:string filePath, 2:i64 offset, 3:i32 length)
+
+ /**
+ * Test if a log of "index" and "term" exists.
+ **/
+ bool matchTerm(1:long index, 2:long term, 3:Node header)
}
@@ -325,7 +348,6 @@ service TSDataService extends RaftService {
binary getAllMeasurementSchema(1: Node header, 2: binary planBinary)
-
list<binary> getAggrResult(1:GetAggrResultRequest request)
PullSnapshotResp pullSnapshot(1:PullSnapshotRequest request)
@@ -351,8 +373,15 @@ service TSDataService extends RaftService {
/**
* Perform a previous fill and return the timevalue pair in binary.
+ * @return a binary TimeValuePair
**/
binary previousFill(1: PreviousFillRequest request)
+
+ /**
+ * Query the last point of a series.
+ * @return a binary TimeValuePair
+ **/
+ binary last(1: LastQueryRequest request)
}
service TSMetaService extends RaftService {
@@ -366,6 +395,9 @@ service TSMetaService extends RaftService {
**/
AddNodeResponse addNode(1: Node node, 2: StartUpStatus startUpStatus)
+
+ CheckStatusResponse checkStatus(1: StartUpStatus startUpStatus)
+
/**
* Remove a node from the cluster. If the node is not in the cluster or the cluster size will
* less than replication number, the request will be rejected.
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index c88481c..4d55f16 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -133,16 +133,16 @@ public class ReadWriteIOUtils {
public static int write(Map<String, String> map, DataOutputStream stream) throws IOException {
int length = 0;
byte[] bytes;
- stream.write(map.size());
+ stream.writeInt(map.size());
length += 4;
for (Entry<String, String> entry : map.entrySet()) {
bytes = entry.getKey().getBytes();
- stream.write(bytes.length);
+ stream.writeInt(bytes.length);
length += 4;
stream.write(bytes);
length += bytes.length;
bytes = entry.getValue().getBytes();
- stream.write(bytes.length);
+ stream.writeInt(bytes.length);
length += 4;
stream.write(bytes);
length += bytes.length;
@@ -344,6 +344,9 @@ public class ReadWriteIOUtils {
* @return the length of string represented by byte[].
*/
public static int write(String s, ByteBuffer buffer) {
+ if (s == null) {
+ return write(-1, buffer);
+ }
int len = 0;
byte[] bytes = s.getBytes();
len += write(bytes.length, buffer);
@@ -563,6 +566,9 @@ public class ReadWriteIOUtils {
*/
public static String readString(ByteBuffer buffer) {
int strLength = readInt(buffer);
+ if (strLength < 0) {
+ return null;
+ }
byte[] bytes = new byte[strLength];
buffer.get(bytes, 0, strLength);
return new String(bytes, 0, strLength);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
index 848e0b3..0ef47ea 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
@@ -294,4 +294,7 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
return sc.toString();
}
+ public void setType(TSDataType type) {
+ this.type = type;
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/TimeseriesSchema.java
similarity index 65%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
copy to tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/TimeseriesSchema.java
index 848e0b3..b7f3737 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/TimeseriesSchema.java
@@ -16,10 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.tsfile.write.schema;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
@@ -37,25 +37,22 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.utils.StringContainer;
/**
- * This class describes a measurement's information registered in {@linkplain Schema FileSchema},
- * including measurement id, data type, encoding and compressor type. For each TSEncoding,
- * MeasurementSchema maintains respective TSEncodingBuilder; For TSDataType, only ENUM has
- * TSDataTypeConverter up to now.
+ * TimeseriesSchema is like MeasurementSchema, but instead of measurementId, it stores the full
+ * path.
*/
-public class MeasurementSchema implements Comparable<MeasurementSchema>, Serializable {
-
- private String measurementId;
+public class TimeseriesSchema implements Comparable<TimeseriesSchema>, Serializable {
+ private String fullPath;
private TSDataType type;
private TSEncoding encoding;
- private TSEncodingBuilder encodingConverter;
+ private transient TSEncodingBuilder encodingConverter;
private CompressionType compressor;
private Map<String, String> props = new HashMap<>();
- public MeasurementSchema() {
+ public TimeseriesSchema() {
}
- public MeasurementSchema(String measurementId, TSDataType tsDataType) {
- this(measurementId, tsDataType,
+ public TimeseriesSchema(String fullPath, TSDataType tsDataType) {
+ this(fullPath, tsDataType,
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder()),
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
@@ -64,96 +61,67 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
/**
* set properties as an empty Map.
*/
- public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encoding) {
- this(measurementId, type, encoding,
+ public TimeseriesSchema(String fullPath, TSDataType type, TSEncoding encoding) {
+ this(fullPath, type, encoding,
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
}
- public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encoding,
+ public TimeseriesSchema(String fullPath, TSDataType type, TSEncoding encoding,
CompressionType compressionType) {
- this(measurementId, type, encoding, compressionType, Collections.emptyMap());
+ this(fullPath, type, encoding, compressionType, Collections.emptyMap());
}
/**
- * Constructor of MeasurementSchema.
+ * Constructor of TimeseriesSchema.
*
* <p>props - information in encoding method. For RLE, Encoder.MAX_POINT_NUMBER For PLAIN,
* Encoder.maxStringLength
*/
- public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encoding,
+ public TimeseriesSchema(String fullPath, TSDataType type, TSEncoding encoding,
CompressionType compressionType, Map<String, String> props) {
this.type = type;
- this.measurementId = measurementId;
+ this.fullPath = fullPath;
this.encoding = encoding;
this.props = props == null ? Collections.emptyMap() : props;
this.compressor = compressionType;
}
/**
- * function for deserializing data from input stream.
- */
- public static MeasurementSchema deserializeFrom(InputStream inputStream) throws IOException {
- MeasurementSchema measurementSchema = new MeasurementSchema();
-
- measurementSchema.measurementId = ReadWriteIOUtils.readString(inputStream);
-
- measurementSchema.type = ReadWriteIOUtils.readDataType(inputStream);
-
- measurementSchema.encoding = ReadWriteIOUtils.readEncoding(inputStream);
-
- measurementSchema.compressor = ReadWriteIOUtils.readCompressionType(inputStream);
-
- int size = ReadWriteIOUtils.readInt(inputStream);
- if (size > 0) {
- measurementSchema.props = new HashMap<>();
- String key;
- String value;
- for (int i = 0; i < size; i++) {
- key = ReadWriteIOUtils.readString(inputStream);
- value = ReadWriteIOUtils.readString(inputStream);
- measurementSchema.props.put(key, value);
- }
- }
-
- return measurementSchema;
- }
-
- /**
* function for deserializing data from byte buffer.
*/
- public static MeasurementSchema deserializeFrom(ByteBuffer buffer) {
- MeasurementSchema measurementSchema = new MeasurementSchema();
+ public static TimeseriesSchema deserializeFrom(ByteBuffer buffer) {
+ TimeseriesSchema timeseriesSchema = new TimeseriesSchema();
- measurementSchema.measurementId = ReadWriteIOUtils.readString(buffer);
+ timeseriesSchema.fullPath = ReadWriteIOUtils.readString(buffer);
- measurementSchema.type = ReadWriteIOUtils.readDataType(buffer);
+ timeseriesSchema.type = ReadWriteIOUtils.readDataType(buffer);
- measurementSchema.encoding = ReadWriteIOUtils.readEncoding(buffer);
+ timeseriesSchema.encoding = ReadWriteIOUtils.readEncoding(buffer);
- measurementSchema.compressor = ReadWriteIOUtils.readCompressionType(buffer);
+ timeseriesSchema.compressor = ReadWriteIOUtils.readCompressionType(buffer);
int size = ReadWriteIOUtils.readInt(buffer);
if (size > 0) {
- measurementSchema.props = new HashMap<>();
+ timeseriesSchema.props = new HashMap<>();
String key;
String value;
for (int i = 0; i < size; i++) {
key = ReadWriteIOUtils.readString(buffer);
value = ReadWriteIOUtils.readString(buffer);
- measurementSchema.props.put(key, value);
+ timeseriesSchema.props.put(key, value);
}
}
- return measurementSchema;
+ return timeseriesSchema;
}
- public String getMeasurementId() {
- return measurementId;
+ public String getFullPath() {
+ return fullPath;
}
- public void setMeasurementId(String measurementId) {
- this.measurementId = measurementId;
+ public void setFullPath(String fullPath) {
+ this.fullPath = fullPath;
}
public Map<String, String> getProps() {
@@ -205,7 +173,7 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
public int serializeTo(OutputStream outputStream) throws IOException {
int byteLen = 0;
- byteLen += ReadWriteIOUtils.write(measurementId, outputStream);
+ byteLen += ReadWriteIOUtils.write(fullPath, outputStream);
byteLen += ReadWriteIOUtils.write(type, outputStream);
@@ -232,7 +200,7 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
public int serializeTo(ByteBuffer buffer) {
int byteLen = 0;
- byteLen += ReadWriteIOUtils.write(measurementId, buffer);
+ byteLen += ReadWriteIOUtils.write(fullPath, buffer);
byteLen += ReadWriteIOUtils.write(type, buffer);
@@ -261,33 +229,33 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
if (o == null || getClass() != o.getClass()) {
return false;
}
- MeasurementSchema that = (MeasurementSchema) o;
+ TimeseriesSchema that = (TimeseriesSchema) o;
return type == that.type && encoding == that.encoding && Objects
- .equals(measurementId, that.measurementId)
+ .equals(fullPath, that.fullPath)
&& Objects.equals(compressor, that.compressor);
}
@Override
public int hashCode() {
- return Objects.hash(type, encoding, measurementId, compressor);
+ return Objects.hash(type, encoding, fullPath, compressor);
}
/**
- * compare by measurementID.
+ * compare by full path.
*/
@Override
- public int compareTo(MeasurementSchema o) {
+ public int compareTo(TimeseriesSchema o) {
if (equals(o)) {
return 0;
} else {
- return this.measurementId.compareTo(o.measurementId);
+ return this.fullPath.compareTo(o.fullPath);
}
}
@Override
public String toString() {
StringContainer sc = new StringContainer("");
- sc.addTail("[", measurementId, ",", type.toString(), ",", encoding.toString(), ",",
+ sc.addTail("[", fullPath, ",", type.toString(), ",", encoding.toString(), ",",
props.toString(), ",",
compressor.toString());
sc.addTail("]");