You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/05/08 03:09:09 UTC
[incubator-iotdb] 01/01: merge master
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 1b9911e5e6661d76a05a0ecea988a9e16d5f4ca8
Merge: 6d0f3d2 4f554c7
Author: lta <li...@163.com>
AuthorDate: Wed May 8 11:06:39 2019 +0800
merge master
.travis.yml | 27 ++-
License | 1 +
clean.sh | 36 ----
.../qp/executor/ClusterQueryProcessExecutor.java | 4 +-
docs/Documentation/QuickStart.md | 74 +++----
iotdb-cli/pom.xml | 5 +
.../java/org/apache/iotdb/cli/tool/ExportCsv.java | 24 ++-
.../java/org/apache/iotdb/cli/tool/ImportCsv.java | 40 +++-
iotdb/iotdb/conf/iotdb-engine.properties | 1 +
iotdb/iotdb/conf/iotdb-env.sh | 2 +-
.../java/org/apache/iotdb/db/engine/Processor.java | 2 +-
.../engine/bufferwrite/BufferWriteProcessor.java | 154 +++++++--------
.../bufferwrite/RestorableTsFileIOWriter.java | 19 +-
.../iotdb/db/engine/filenode/FileNodeManager.java | 40 ++--
.../db/engine/filenode/FileNodeProcessor.java | 173 ++++++++---------
.../iotdb/db/engine/filenode/TsFileResource.java | 216 ++++++++++++---------
.../db/engine/memcontrol/BasicMemController.java | 6 +-
.../memcontrol/DisabledMemController.java} | 40 ++--
.../iotdb/db/engine/memtable/AbstractMemTable.java | 14 +-
.../apache/iotdb/db/engine/memtable/IMemTable.java | 7 +-
.../db/engine/memtable/IWritableMemChunk.java | 2 +
.../db/engine/memtable/MemTableFlushUtil.java | 10 +-
.../iotdb/db/engine/memtable/WritableMemChunk.java | 27 ++-
.../db/engine/overflow/io/OverflowProcessor.java | 113 +++++++----
.../overflow/io/OverflowedTsFileIOWriter.java} | 22 +--
.../db/engine/querycontext/ReadOnlyMemChunk.java | 2 +
.../db/exception/FileNodeProcessorException.java | 4 +
.../java/org/apache/iotdb/db/metadata/MGraph.java | 11 +-
.../org/apache/iotdb/db/metadata/MManager.java | 14 +-
.../java/org/apache/iotdb/db/metadata/MTree.java | 11 +-
.../db/qp/executor/IQueryProcessExecutor.java | 4 +-
.../iotdb/db/qp/executor/OverflowQPExecutor.java | 18 +-
.../iotdb/db/qp/executor/QueryProcessExecutor.java | 1 +
.../iotdb/db/qp/logical/crud/InsertOperator.java | 12 +-
.../iotdb/db/qp/physical/crud/InsertPlan.java | 25 +--
.../db/qp/physical/transfer/CodecInstances.java | 18 +-
.../iotdb/db/qp/strategy/LogicalGenerator.java | 8 +-
.../db/query/control/QueryResourceManager.java | 6 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 3 +-
.../org/apache/iotdb/db/sql/parse/ParseDriver.java | 2 +-
.../iotdb/db/sync/receiver/SyncServiceImpl.java | 91 ++++-----
.../java/org/apache/iotdb/db/tools/WalChecker.java | 11 +-
.../java/org/apache/iotdb/db/utils/MemUtils.java | 24 +++
.../apache/iotdb/db/writelog/io/RAFLogReader.java | 3 +-
.../writelog/manager/MultiFileLogNodeManager.java | 1 +
.../db/writelog/manager/WriteLogNodeManager.java | 6 +-
.../db/writelog/replay/ConcreteLogReplayer.java | 12 +-
.../bufferwrite/BufferWriteProcessorNewTest.java | 1 -
.../db/engine/filenode/TsFileResourceTest.java | 12 +-
.../engine/overflow/io/OverflowProcessorTest.java | 2 +
.../engine/overflow/io/OverflowResourceTest.java | 24 +--
.../transfer/PhysicalPlanLogTransferTest.java | 2 +-
.../apache/iotdb/db/qp/utils/MemIntQpExecutor.java | 4 +-
.../org/apache/iotdb/db/tools/WalCheckerTest.java | 34 +++-
.../apache/iotdb/db/utils/EnvironmentUtils.java | 10 +-
.../apache/iotdb/db/writelog/PerformanceTest.java | 14 +-
.../org/apache/iotdb/db/writelog/RecoverTest.java | 12 +-
.../iotdb/db/writelog/WriteLogNodeManagerTest.java | 8 +-
.../apache/iotdb/db/writelog/WriteLogNodeTest.java | 25 +--
.../iotdb/db/writelog/io/LogWriterReaderTest.java | 8 +-
pom.xml | 1 +
.../apache/iotdb/tsfile/TsFileSequenceRead.java | 4 +-
.../apache/iotdb/tsfile/read/ReadOnlyTsFile.java | 2 +-
.../iotdb/tsfile/read/TsFileSequenceReader.java | 13 +-
.../iotdb/tsfile/write/TsFileReadWriteTest.java | 4 +-
65 files changed, 860 insertions(+), 666 deletions(-)
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/ClusterQueryProcessExecutor.java
index c5032fc,0000000..3dc8c43
mode 100644,000000..100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/ClusterQueryProcessExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/ClusterQueryProcessExecutor.java
@@@ -1,177 -1,0 +1,177 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.qp.executor;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.cluster.query.executor.ClusterQueryRouter;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.fill.IFill;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public class ClusterQueryProcessExecutor extends AbstractQPExecutor implements IQueryProcessExecutor {
+
+ private ThreadLocal<Integer> fetchSize = new ThreadLocal<>();
+ private ClusterQueryRouter clusterQueryRouter = new ClusterQueryRouter();
+
+ private QueryMetadataExecutor queryMetadataExecutor = new QueryMetadataExecutor();
+
+ @Override
+ public QueryDataSet processQuery(QueryPlan queryPlan, QueryContext context)
+ throws IOException, FileNodeManagerException, PathErrorException,
+ QueryFilterOptimizationException, ProcessorException {
+
+ QueryExpression queryExpression = QueryExpression.create().setSelectSeries(queryPlan.getPaths())
+ .setExpression(queryPlan.getExpression());
+ clusterQueryRouter.setReadDataConsistencyLevel(getReadDataConsistencyLevel());
+ if (queryPlan instanceof GroupByPlan) {
+ GroupByPlan groupByPlan = (GroupByPlan) queryPlan;
+ return groupBy(groupByPlan.getPaths(), groupByPlan.getAggregations(),
+ groupByPlan.getExpression(), groupByPlan.getUnit(), groupByPlan.getOrigin(),
+ groupByPlan.getIntervals(), context);
+ }
+
+ if (queryPlan instanceof AggregationPlan) {
+ return aggregate(queryPlan.getPaths(), queryPlan.getAggregations(),
+ queryPlan.getExpression(), context);
+ }
+
+ if (queryPlan instanceof FillQueryPlan) {
+ FillQueryPlan fillQueryPlan = (FillQueryPlan) queryPlan;
+ return fill(queryPlan.getPaths(), fillQueryPlan.getQueryTime(),
+ fillQueryPlan.getFillType(), context);
+ }
+ return clusterQueryRouter.query(queryExpression, context);
+ }
+
+ @Override
+ public QueryDataSet aggregate(List<Path> paths, List<String> aggres, IExpression expression,
+ QueryContext context)
+ throws ProcessorException, IOException, PathErrorException, FileNodeManagerException, QueryFilterOptimizationException {
+ return clusterQueryRouter.aggregate(paths, aggres, expression, context);
+ }
+
+ @Override
+ public QueryDataSet groupBy(List<Path> paths, List<String> aggres, IExpression expression,
+ long unit, long origin, List<Pair<Long, Long>> intervals, QueryContext context)
+ throws ProcessorException, IOException, PathErrorException, FileNodeManagerException, QueryFilterOptimizationException {
+ return clusterQueryRouter.groupBy(paths, aggres, expression, unit, origin, intervals, context);
+ }
+
+ @Override
+ public QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillTypes,
+ QueryContext context)
+ throws ProcessorException, IOException, PathErrorException, FileNodeManagerException {
+ return clusterQueryRouter.fill(fillPaths, queryTime, fillTypes, context);
+ }
+
+ @Override
+ public TSDataType getSeriesType(Path path) throws PathErrorException {
+ if (path.equals(SQLConstant.RESERVED_TIME)) {
+ return TSDataType.INT64;
+ }
+ if (path.equals(SQLConstant.RESERVED_FREQ)) {
+ return TSDataType.FLOAT;
+ }
+ try {
+ return queryMetadataExecutor.processSeriesTypeQuery(path.getFullPath());
+ } catch (InterruptedException | ProcessorException e) {
+ throw new PathErrorException(e.getMessage());
+ }
+ }
+
+ @Override
+ public List<String> getAllPaths(String originPath)
+ throws PathErrorException {
+ try {
+ return queryMetadataExecutor.processPathsQuery(originPath);
+ } catch (InterruptedException | ProcessorException e) {
+ throw new PathErrorException(e.getMessage());
+ }
+ }
+
+ @Override
+ public boolean judgePathExists(Path fullPath) {
+ try {
+ List<List<String>> results = queryMetadataExecutor.processTimeSeriesQuery(fullPath.toString());
+ return !results.isEmpty();
+ } catch (InterruptedException | PathErrorException | ProcessorException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public int getFetchSize() {
+ return fetchSize.get();
+ }
+
+ @Override
+ public void setFetchSize(int fetchSize) {
+ this.fetchSize.set(fetchSize);
+ }
+
+ @Override
+ public boolean update(Path path, long startTime, long endTime, String value)
+ throws ProcessorException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean delete(List<Path> paths, long deleteTime) throws ProcessorException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean delete(Path path, long deleteTime) throws ProcessorException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int insert(Path path, long insertTime, String value) throws ProcessorException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
- public int multiInsert(String deviceId, long insertTime, List<String> measurementList,
- List<String> insertValues) throws ProcessorException {
++ public int multiInsert(String deviceId, long insertTime, String[] measurementList,
++ String[] insertValues) throws ProcessorException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean processNonQuery(PhysicalPlan plan) throws ProcessorException {
+ throw new UnsupportedOperationException();
+ }
+}
diff --cc iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
index 67e9278,0000000..15bcb5a
mode 100644,000000..100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
@@@ -1,143 -1,0 +1,143 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.executor;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.executor.IEngineQueryRouter;
+import org.apache.iotdb.db.query.fill.IFill;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public interface IQueryProcessExecutor {
+
+ boolean processNonQuery(PhysicalPlan plan) throws ProcessorException;
+
+ /**
+ * process query plan of qp layer, construct queryDataSet.
+ *
+ * @param queryPlan QueryPlan
+ * @return QueryDataSet
+ */
+ QueryDataSet processQuery(QueryPlan queryPlan, QueryContext context)
+ throws IOException, FileNodeManagerException, PathErrorException,
+ QueryFilterOptimizationException, ProcessorException;
+
+ /**
+ * process aggregate plan of qp layer, construct queryDataSet.
+ *
+ */
+ QueryDataSet aggregate(List<Path> paths, List<String> aggres, IExpression expression,
+ QueryContext context)
+ throws ProcessorException, IOException, PathErrorException, FileNodeManagerException, QueryFilterOptimizationException;
+
+ /**
+ * process group by plan of qp layer, construct queryDataSet.
+ */
+ QueryDataSet groupBy(List<Path> paths, List<String> aggres, IExpression expression,
+ long unit, long origin, List<Pair<Long, Long>> intervals, QueryContext context)
+ throws ProcessorException, IOException, PathErrorException, FileNodeManagerException, QueryFilterOptimizationException;
+
+ /**
+ * process fill plan of qp layer, construct queryDataSet.
+ */
+ QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillTypes,
+ QueryContext context)
+ throws ProcessorException, IOException, PathErrorException, FileNodeManagerException;
+
+ /**
+ * execute update command and return whether the operator is successful.
+ *
+ * @param path : update series seriesPath
+ * @param startTime start time in update command
+ * @param endTime end time in update command
+ * @param value - in type of string
+ * @return - whether the operator is successful.
+ */
+ boolean update(Path path, long startTime, long endTime, String value)
+ throws ProcessorException;
+
+ /**
+ * execute delete command and return whether the operator is successful.
+ *
+ * @param paths : delete series paths
+ * @param deleteTime end time in delete command
+ * @return - whether the operator is successful.
+ */
+ boolean delete(List<Path> paths, long deleteTime) throws ProcessorException;
+
+ /**
+ * execute delete command and return whether the operator is successful.
+ *
+ * @param path : delete series seriesPath
+ * @param deleteTime end time in delete command
+ * @return - whether the operator is successful.
+ */
+ boolean delete(Path path, long deleteTime) throws ProcessorException;
+
+ /**
+ * insert a single value. Only used in test
+ *
+ * @param path seriesPath to be inserted
+ * @param insertTime - it's time point but not a range
+ * @param value value to be inserted
+ * @return - Operate Type.
+ */
+ int insert(Path path, long insertTime, String value) throws ProcessorException;
+
+ /**
+ * execute insert command and return whether the operator is successful.
+ *
+ * @param deviceId deviceId to be inserted
+ * @param insertTime - it's time point but not a range
+ * @param measurementList measurements to be inserted
+ * @param insertValues values to be inserted
+ * @return - Operate Type.
+ */
- int multiInsert(String deviceId, long insertTime, List<String> measurementList,
- List<String> insertValues) throws ProcessorException;
++ int multiInsert(String deviceId, long insertTime, String[] measurementList,
++ String[] insertValues) throws ProcessorException;
+
+ boolean judgePathExists(Path fullPath);
+
+ /**
+ * Get data type of series
+ */
+ TSDataType getSeriesType(Path path) throws PathErrorException;
+
+ /**
+ * Get all paths of a full path
+ */
+ List<String> getAllPaths(String originPath) throws PathErrorException;
+
+ int getFetchSize();
+
+ void setFetchSize(int fetchSize);
+
+}
diff --cc iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index bef0328,376997e..99476f2
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@@ -113,4 -161,38 +113,5 @@@ public abstract class QueryProcessExecu
throw new ProcessorException(e.getMessage());
}
}
+
- /**
- * executeWithGlobalTimeFilter delete command and return whether the operator is successful.
- *
- * @param path : delete series seriesPath
- * @param deleteTime end time in delete command
- * @return - whether the operator is successful.
- */
- protected abstract boolean delete(Path path, long deleteTime) throws ProcessorException;
-
- /**
- * insert a single value. Only used in test
- *
- * @param path seriesPath to be inserted
- * @param insertTime - it's time point but not a range
- * @param value value to be inserted
- * @return - Operate Type.
- */
- public abstract int insert(Path path, long insertTime, String value) throws ProcessorException;
-
- /**
- * executeWithGlobalTimeFilter insert command and return whether the operator is successful.
- *
- * @param deviceId deviceId to be inserted
- * @param insertTime - it's time point but not a range
- * @param measurementList measurements to be inserted
- * @param insertValues values to be inserted
- * @return - Operate Type.
- */
- public abstract int multiInsert(String deviceId, long insertTime, String[] measurementList,
- String[] insertValues) throws ProcessorException;
-
- public abstract List<String> getAllPaths(String originPath) throws PathErrorException;
-
}
diff --cc iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index e4efaa1,3e0c13a..cdb519c
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@@ -27,10 -28,9 +28,10 @@@ import org.apache.iotdb.tsfile.read.com
public class InsertPlan extends PhysicalPlan {
+ private static final long serialVersionUID = 6102845312368561515L;
private String deviceId;
- private List<String> measurements;
- private List<String> values;
+ private String[] measurements;
+ private String[] values;
private long time;
// insertType
diff --cc iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
index dc07bde,d125ebc..8b1c241
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
@@@ -34,9 -34,10 +34,10 @@@ import org.apache.iotdb.db.qp.physical.
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
-import org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
+import org.apache.iotdb.db.qp.physical.transfer.PhysicalPlanLogTransfer;
import org.apache.iotdb.tsfile.read.common.Path;
import org.junit.After;
+ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;