You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/08/02 03:00:27 UTC
[iotdb] branch master updated: Influxdb service adapts to distributed MPP framework and fixes some bugs in InfluxFunction and TagInfoRecord (#6828)
This is an automated email from the ASF dual-hosted git repository.
qijun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1140cf2da6 Influxdb service adapts to distributed MPP framework and fixes some bugs in InfluxFunction and TagInfoRecord (#6828)
1140cf2da6 is described below
commit 1140cf2da6aae64f28f9ac36b97c6048756228e6
Author: Jian Zhang <38...@users.noreply.github.com>
AuthorDate: Tue Aug 2 11:00:21 2022 +0800
Influxdb service adapts to distributed MPP framework and fixes some bugs in InfluxFunction and TagInfoRecord (#6828)
* Influxdb service adapts to distributed MPP framework
* fix InfluxFirstFunction,InfluxLastFunction,InfluxMeanFunction
* Using the '.*' form of import should be avoided
* spotless apply
* influxdb mpp test
* influxdb mpp test
* Modified to rely on an AbstractInfluxDBMetaManager
* Generate tag timestamps using AtomicLong
---
docker/src/main/Dockerfile-single-influxdb | 4 +-
.../iotdb/db/protocol/influxdb/dto/IoTDBPoint.java | 40 +-
.../function/aggregator/InfluxMeanFunction.java | 4 +-
.../function/selector/InfluxFirstFunction.java | 5 +-
.../function/selector/InfluxLastFunction.java | 5 +-
.../influxdb/handler/AbstractQueryHandler.java | 511 +++++++++++++++++++++
.../protocol/influxdb/handler/NewQueryHandler.java | 200 ++++++++
.../db/protocol/influxdb/handler/QueryHandler.java | 468 +------------------
.../influxdb/meta/AbstractInfluxDBMetaManager.java | 114 +++++
.../influxdb/meta/InfluxDBMetaManager.java | 96 +---
.../influxdb/meta/NewInfluxDBMetaManager.java | 129 ++++++
.../db/protocol/influxdb/meta/TagInfoRecords.java | 28 +-
.../influxdb/util/InfluxReqAndRespUtils.java | 55 +++
.../protocol/influxdb/util/QueryResultUtils.java | 153 ++++++
.../db/protocol/influxdb/util/StringUtils.java | 11 +
.../iotdb/db/service/InfluxDBRPCService.java | 26 +-
.../handler/InfluxDBServiceThriftHandler.java | 10 +-
.../thrift/impl/IInfluxDBServiceWithHandler.java | 25 +
.../service/thrift/impl/InfluxDBServiceImpl.java | 20 +-
.../thrift/impl/NewInfluxDBServiceImpl.java | 128 ++++++
20 files changed, 1444 insertions(+), 588 deletions(-)
diff --git a/docker/src/main/Dockerfile-single-influxdb b/docker/src/main/Dockerfile-single-influxdb
index 8319075d5b..ff15b26cfb 100644
--- a/docker/src/main/Dockerfile-single-influxdb
+++ b/docker/src/main/Dockerfile-single-influxdb
@@ -33,7 +33,7 @@ RUN apt update \
&& apt autoremove -y \
&& apt purge --auto-remove -y \
&& apt clean -y
-RUN dos2unix /iotdb/sbin/start-server.sh
+RUN dos2unix /iotdb/sbin/start-new-server.sh
RUN dos2unix /iotdb/sbin/../conf/datanode-env.sh
EXPOSE 6667
EXPOSE 31999
@@ -43,4 +43,4 @@ EXPOSE 8181
VOLUME /iotdb/data
VOLUME /iotdb/logs
ENV PATH="/iotdb/sbin/:/iotdb/tools/:${PATH}"
-ENTRYPOINT ["/iotdb/sbin/start-server.sh"]
+ENTRYPOINT ["/iotdb/sbin/start-new-server.sh"]
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/dto/IoTDBPoint.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/dto/IoTDBPoint.java
index 1c7019bb54..ac7c1ce984 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/dto/IoTDBPoint.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/dto/IoTDBPoint.java
@@ -21,11 +21,12 @@ package org.apache.iotdb.db.protocol.influxdb.dto;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager;
+import org.apache.iotdb.db.protocol.influxdb.meta.AbstractInfluxDBMetaManager;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.utils.DataTypeUtils;
import org.apache.iotdb.db.utils.ParameterUtils;
import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.influxdb.dto.Point;
@@ -57,7 +58,8 @@ public class IoTDBPoint {
this.values = values;
}
- public IoTDBPoint(String database, Point point, InfluxDBMetaManager metaManager) {
+ public IoTDBPoint(
+ String database, Point point, AbstractInfluxDBMetaManager metaManager, long sessionID) {
String measurement = null;
Map<String, String> tags = new HashMap<>();
Map<String, Object> fields = new HashMap<>();
@@ -67,8 +69,8 @@ public class IoTDBPoint {
for (java.lang.reflect.Field reflectField : point.getClass().getDeclaredFields()) {
reflectField.setAccessible(true);
try {
- if (reflectField.getType().getName().equalsIgnoreCase("java.util.concurrent.TimeUnit")
- && reflectField.getName().equalsIgnoreCase("precision")) {
+ if ("java.util.concurrent.TimeUnit".equalsIgnoreCase(reflectField.getType().getName())
+ && "precision".equalsIgnoreCase(reflectField.getName())) {
precision = (TimeUnit) reflectField.get(point);
}
} catch (IllegalAccessException e) {
@@ -79,17 +81,17 @@ public class IoTDBPoint {
for (java.lang.reflect.Field reflectField : point.getClass().getDeclaredFields()) {
reflectField.setAccessible(true);
try {
- if (reflectField.getType().getName().equalsIgnoreCase("java.util.Map")
- && reflectField.getName().equalsIgnoreCase("fields")) {
+ if ("java.util.Map".equalsIgnoreCase(reflectField.getType().getName())
+ && "fields".equalsIgnoreCase(reflectField.getName())) {
fields = (Map<String, Object>) reflectField.get(point);
- } else if (reflectField.getType().getName().equalsIgnoreCase("java.util.Map")
- && reflectField.getName().equalsIgnoreCase("tags")) {
+ } else if ("java.util.Map".equalsIgnoreCase(reflectField.getType().getName())
+ && "tags".equalsIgnoreCase(reflectField.getName())) {
tags = (Map<String, String>) reflectField.get(point);
- } else if (reflectField.getType().getName().equalsIgnoreCase("java.lang.String")
- && reflectField.getName().equalsIgnoreCase("measurement")) {
+ } else if ("java.lang.String".equalsIgnoreCase(reflectField.getType().getName())
+ && "measurement".equalsIgnoreCase(reflectField.getName())) {
measurement = (String) reflectField.get(point);
- } else if (reflectField.getType().getName().equalsIgnoreCase("java.lang.Number")
- && reflectField.getName().equalsIgnoreCase("time")) {
+ } else if ("java.lang.Number".equalsIgnoreCase(reflectField.getType().getName())
+ && "time".equalsIgnoreCase(reflectField.getName())) {
time = (Long) reflectField.get(point);
time = TimeUnit.MILLISECONDS.convert(time, precision);
}
@@ -103,7 +105,7 @@ public class IoTDBPoint {
}
ParameterUtils.checkNonEmptyString(database, "database");
ParameterUtils.checkNonEmptyString(measurement, "measurement name");
- String path = metaManager.generatePath(database, measurement, tags);
+ String path = metaManager.generatePath(database, measurement, tags, sessionID);
List<String> measurements = new ArrayList<>();
List<TSDataType> types = new ArrayList<>();
List<Object> values = new ArrayList<>();
@@ -149,4 +151,16 @@ public class IoTDBPoint {
DataTypeUtils.getValueBuffer(getTypes(), getValues()),
false);
}
+
+ public TSInsertRecordReq convertToTSInsertRecordReq(long sessionID)
+ throws IoTDBConnectionException {
+ TSInsertRecordReq tsInsertRecordReq = new TSInsertRecordReq();
+ tsInsertRecordReq.setValues(DataTypeUtils.getValueBuffer(getTypes(), getValues()));
+ tsInsertRecordReq.setMeasurements(getMeasurements());
+ tsInsertRecordReq.setPrefixPath(getDeviceId());
+ tsInsertRecordReq.setIsAligned(false);
+ tsInsertRecordReq.setTimestamp(getTime());
+ tsInsertRecordReq.setSessionId(sessionID);
+ return tsInsertRecordReq;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/aggregator/InfluxMeanFunction.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/aggregator/InfluxMeanFunction.java
index fb1b936e2a..1337cc701b 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/aggregator/InfluxMeanFunction.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/aggregator/InfluxMeanFunction.java
@@ -29,7 +29,7 @@ import java.util.List;
public class InfluxMeanFunction extends InfluxAggregator {
private final List<Double> numbers = new ArrayList<>();
- long sum = 0;
+ double sum = 0;
long count = 0;
public InfluxMeanFunction(List<Expression> expressionList) {
@@ -66,7 +66,7 @@ public class InfluxMeanFunction extends InfluxAggregator {
if (functionValues.length == 1) {
count += (long) functionValues[0].getValue();
} else if (functionValues.length == 2) {
- sum += (long) functionValues[1].getValue();
+ sum += (double) functionValues[1].getValue();
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/selector/InfluxFirstFunction.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/selector/InfluxFirstFunction.java
index 0599b8ddc5..9ec5dbcdbb 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/selector/InfluxFirstFunction.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/selector/InfluxFirstFunction.java
@@ -45,10 +45,7 @@ public class InfluxFirstFunction extends InfluxSelector {
@Override
public void updateValueIoTDBFunc(InfluxFunctionValue... functionValues) {
- if (value == null && getTimestamp() == null) {
- value = functionValues[0].getValue();
- setTimestamp(functionValues[0].getTimestamp());
- } else if (getTimestamp() < functionValues[0].getTimestamp()) {
+ if (functionValues[0].getTimestamp() < getTimestamp()) {
value = functionValues[0].getValue();
setTimestamp(functionValues[0].getTimestamp());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/selector/InfluxLastFunction.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/selector/InfluxLastFunction.java
index be9cce9119..062f3f5df6 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/selector/InfluxLastFunction.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/selector/InfluxLastFunction.java
@@ -45,10 +45,7 @@ public class InfluxLastFunction extends InfluxSelector {
@Override
public void updateValueIoTDBFunc(InfluxFunctionValue... functionValues) {
- if (value == null && getTimestamp() == null) {
- value = functionValues[0].getValue();
- setTimestamp(functionValues[0].getTimestamp());
- } else if (getTimestamp() > functionValues[0].getTimestamp()) {
+ if (functionValues[0].getTimestamp() > getTimestamp()) {
value = functionValues[0].getValue();
setTimestamp(functionValues[0].getTimestamp());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/AbstractQueryHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/AbstractQueryHandler.java
new file mode 100644
index 0000000000..78899f7e34
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/AbstractQueryHandler.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.protocol.influxdb.handler;
+
+import org.apache.iotdb.commons.auth.AuthException;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.expression.ResultColumn;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression;
+import org.apache.iotdb.db.protocol.influxdb.constant.InfluxSQLConstant;
+import org.apache.iotdb.db.protocol.influxdb.function.InfluxFunction;
+import org.apache.iotdb.db.protocol.influxdb.function.InfluxFunctionFactory;
+import org.apache.iotdb.db.protocol.influxdb.function.InfluxFunctionValue;
+import org.apache.iotdb.db.protocol.influxdb.function.aggregator.InfluxAggregator;
+import org.apache.iotdb.db.protocol.influxdb.function.selector.InfluxSelector;
+import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager;
+import org.apache.iotdb.db.protocol.influxdb.operator.InfluxQueryOperator;
+import org.apache.iotdb.db.protocol.influxdb.operator.InfluxSelectComponent;
+import org.apache.iotdb.db.protocol.influxdb.util.FilterUtils;
+import org.apache.iotdb.db.protocol.influxdb.util.JacksonUtils;
+import org.apache.iotdb.db.protocol.influxdb.util.QueryResultUtils;
+import org.apache.iotdb.db.protocol.influxdb.util.StringUtils;
+import org.apache.iotdb.db.qp.constant.FilterConstant;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator;
+import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
+import org.apache.iotdb.db.service.basic.ServiceProvider;
+import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxQueryResultRsp;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+
+import org.influxdb.dto.QueryResult;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractQueryHandler {
+
+ abstract Map<String, Integer> getFieldOrders(
+ String database, String measurement, ServiceProvider serviceProvider, long sessionId);
+
+ abstract InfluxFunctionValue updateByIoTDBFunc(
+ InfluxFunction function, ServiceProvider serviceProvider, String path, long sessionid);
+
+ abstract QueryResult queryByConditions(
+ String querySql,
+ String database,
+ String measurement,
+ ServiceProvider serviceProvider,
+ Map<String, Integer> fieldOrders,
+ long sessionId)
+ throws AuthException;
+
+ public final InfluxQueryResultRsp queryInfluxDB(
+ String database,
+ InfluxQueryOperator queryOperator,
+ long sessionId,
+ ServiceProvider serviceProvider) {
+ String measurement = queryOperator.getFromComponent().getPrefixPaths().get(0).getFullPath();
+ // The list of fields under the current measurement and the order of the specified rules
+ Map<String, Integer> fieldOrders =
+ getFieldOrders(database, measurement, serviceProvider, sessionId);
+ QueryResult queryResult;
+ InfluxQueryResultRsp tsQueryResultRsp = new InfluxQueryResultRsp();
+ try {
+ // contain filter condition or have common query the result of by traversal.
+ if (queryOperator.getWhereComponent() != null
+ || queryOperator.getSelectComponent().isHasCommonQuery()
+ || queryOperator.getSelectComponent().isHasOnlyTraverseFunction()) {
+ // step1 : generate query results
+ queryResult =
+ queryExpr(
+ queryOperator.getWhereComponent() != null
+ ? queryOperator.getWhereComponent().getFilterOperator()
+ : null,
+ database,
+ measurement,
+ serviceProvider,
+ fieldOrders,
+ sessionId);
+ // step2 : select filter
+ ProcessSelectComponent(queryResult, queryOperator.getSelectComponent());
+ }
+ // don't contain filter condition and only have function use iotdb function.
+ else {
+ queryResult =
+ queryFuncWithoutFilter(
+ queryOperator.getSelectComponent(),
+ database,
+ measurement,
+ serviceProvider,
+ sessionId);
+ }
+ return tsQueryResultRsp
+ .setResultJsonString(JacksonUtils.bean2Json(queryResult))
+ .setStatus(RpcUtils.getInfluxDBStatus(TSStatusCode.SUCCESS_STATUS));
+ } catch (AuthException e) {
+ return tsQueryResultRsp.setStatus(
+ RpcUtils.getInfluxDBStatus(
+ TSStatusCode.UNINITIALIZED_AUTH_ERROR.getStatusCode(), e.getMessage()));
+ }
+ }
+
+ /**
+ * conditions are generated from subtrees of unique conditions
+ *
+ * @param basicFunctionOperator subtree to generate condition
+ * @return corresponding conditions
+ */
+ public IExpression getIExpressionForBasicFunctionOperator(
+ BasicFunctionOperator basicFunctionOperator) {
+ return new SingleSeriesExpression(
+ basicFunctionOperator.getSinglePath(),
+ FilterUtils.filterTypeToFilter(
+ basicFunctionOperator.getFilterType(), basicFunctionOperator.getValue()));
+ }
+
+ /**
+ * further process the obtained query result through the query criteria of select
+ *
+ * @param queryResult query results to be processed
+ * @param selectComponent select conditions to be filtered
+ */
+ public void ProcessSelectComponent(
+ QueryResult queryResult, InfluxSelectComponent selectComponent) {
+
+ // get the row order map of the current data result first
+ List<String> columns = queryResult.getResults().get(0).getSeries().get(0).getColumns();
+ Map<String, Integer> columnOrders = new HashMap<>();
+ for (int i = 0; i < columns.size(); i++) {
+ columnOrders.put(columns.get(i), i);
+ }
+ // get current values
+ List<List<Object>> values = queryResult.getResults().get(0).getSeries().get(0).getValues();
+ // new columns
+ List<String> newColumns = new ArrayList<>();
+ newColumns.add(InfluxSQLConstant.RESERVED_TIME);
+
+ // when have function
+ if (selectComponent.isHasFunction()) {
+ List<InfluxFunction> functions = new ArrayList<>();
+ for (ResultColumn resultColumn : selectComponent.getResultColumns()) {
+ Expression expression = resultColumn.getExpression();
+ if (expression instanceof FunctionExpression) {
+ String functionName = ((FunctionExpression) expression).getFunctionName();
+ functions.add(
+ InfluxFunctionFactory.generateFunction(functionName, expression.getExpressions()));
+ newColumns.add(functionName);
+ } else if (expression instanceof TimeSeriesOperand) {
+ String columnName = ((TimeSeriesOperand) expression).getPath().getFullPath();
+ if (!columnName.equals(InfluxSQLConstant.STAR)) {
+ newColumns.add(columnName);
+ } else {
+ newColumns.addAll(columns.subList(1, columns.size()));
+ }
+ }
+ }
+ for (List<Object> value : values) {
+ for (InfluxFunction function : functions) {
+ List<Expression> expressions = function.getExpressions();
+ if (expressions == null) {
+ throw new IllegalArgumentException("not support param");
+ }
+ TimeSeriesOperand parmaExpression = (TimeSeriesOperand) expressions.get(0);
+ String parmaName = parmaExpression.getPath().getFullPath();
+ if (columnOrders.containsKey(parmaName)) {
+ Object selectedValue = value.get(columnOrders.get(parmaName));
+ Long selectedTimestamp = (Long) value.get(0);
+ if (selectedValue != null) {
+ // selector function
+ if (function instanceof InfluxSelector) {
+ ((InfluxSelector) function)
+ .updateValueAndRelateValues(
+ new InfluxFunctionValue(selectedValue, selectedTimestamp), value);
+ } else {
+ // aggregate function
+ ((InfluxAggregator) function)
+ .updateValueBruteForce(
+ new InfluxFunctionValue(selectedValue, selectedTimestamp));
+ }
+ }
+ }
+ }
+ }
+ List<Object> value = new ArrayList<>();
+ values = new ArrayList<>();
+ // after the data is constructed, the final results are generated
+ // First, judge whether there are common queries. If there are, a selector function is allowed
+ // without aggregate functions
+ if (selectComponent.isHasCommonQuery()) {
+ InfluxSelector selector = (InfluxSelector) functions.get(0);
+ List<Object> relatedValue = selector.getRelatedValues();
+ for (String column : newColumns) {
+ if (InfluxSQLConstant.getNativeSelectorFunctionNames().contains(column)) {
+ value.add(selector.calculateBruteForce().getValue());
+ } else {
+ if (relatedValue != null) {
+ value.add(relatedValue.get(columnOrders.get(column)));
+ }
+ }
+ }
+ } else {
+ // If there are no common queries, they are all function queries
+ for (InfluxFunction function : functions) {
+ if (value.size() == 0) {
+ value.add(function.calculateBruteForce().getTimestamp());
+ } else {
+ value.set(0, function.calculateBruteForce().getTimestamp());
+ }
+ value.add(function.calculateBruteForce().getValue());
+ }
+ if (selectComponent.isHasAggregationFunction() || selectComponent.isHasMoreFunction()) {
+ value.set(0, 0);
+ }
+ }
+ values.add(value);
+ }
+ // if it is not a function query, it is only a common query
+ else if (selectComponent.isHasCommonQuery()) {
+ // start traversing the scope of the select
+ for (ResultColumn resultColumn : selectComponent.getResultColumns()) {
+ Expression expression = resultColumn.getExpression();
+ if (expression instanceof TimeSeriesOperand) {
+ // not star case
+ if (!((TimeSeriesOperand) expression)
+ .getPath()
+ .getFullPath()
+ .equals(InfluxSQLConstant.STAR)) {
+ newColumns.add(((TimeSeriesOperand) expression).getPath().getFullPath());
+ } else {
+ newColumns.addAll(columns.subList(1, columns.size()));
+ }
+ }
+ }
+ List<List<Object>> newValues = new ArrayList<>();
+ for (List<Object> value : values) {
+ List<Object> tmpValue = new ArrayList<>();
+ for (String newColumn : newColumns) {
+ tmpValue.add(value.get(columnOrders.get(newColumn)));
+ }
+ newValues.add(tmpValue);
+ }
+ values = newValues;
+ }
+ QueryResultUtils.updateQueryResultColumnValue(
+ queryResult, StringUtils.removeDuplicate(newColumns), values);
+ }
+
+ /**
+ * Query the select result. By default, there are no filter conditions. The functions to be
+ * queried use the built-in iotdb functions
+ *
+ * @param selectComponent select data to query
+ * @return select query result
+ */
+ public final QueryResult queryFuncWithoutFilter(
+ InfluxSelectComponent selectComponent,
+ String database,
+ String measurement,
+ ServiceProvider serviceProvider,
+ long sessionid) {
+ // columns
+ List<String> columns = new ArrayList<>();
+ columns.add(InfluxSQLConstant.RESERVED_TIME);
+
+ List<InfluxFunction> functions = new ArrayList<>();
+ String path = "root." + database + "." + measurement;
+ for (ResultColumn resultColumn : selectComponent.getResultColumns()) {
+ Expression expression = resultColumn.getExpression();
+ if (expression instanceof FunctionExpression) {
+ String functionName = ((FunctionExpression) expression).getFunctionName();
+ functions.add(
+ InfluxFunctionFactory.generateFunction(functionName, expression.getExpressions()));
+ columns.add(functionName);
+ }
+ }
+
+ List<Object> value = new ArrayList<>();
+ List<List<Object>> values = new ArrayList<>();
+ for (InfluxFunction function : functions) {
+ InfluxFunctionValue functionValue =
+ updateByIoTDBFunc(function, serviceProvider, path, sessionid);
+ // InfluxFunctionValue functionValue = function.calculateByIoTDBFunc();
+ if (value.size() == 0) {
+ value.add(functionValue.getTimestamp());
+ } else {
+ value.set(0, functionValue.getTimestamp());
+ }
+ value.add(functionValue.getValue());
+ }
+ if (selectComponent.isHasAggregationFunction() || selectComponent.isHasMoreFunction()) {
+ value.set(0, 0);
+ }
+ values.add(value);
+
+ // generate series
+ QueryResult queryResult = new QueryResult();
+ QueryResult.Series series = new QueryResult.Series();
+ series.setColumns(columns);
+ series.setValues(values);
+ series.setName(measurement);
+ QueryResult.Result result = new QueryResult.Result();
+ result.setSeries(new ArrayList<>(Arrays.asList(series)));
+ queryResult.setResults(new ArrayList<>(Arrays.asList(result)));
+ return queryResult;
+ }
+
+ public QueryResult queryExpr(
+ FilterOperator operator,
+ String database,
+ String measurement,
+ ServiceProvider serviceProvider,
+ Map<String, Integer> fieldOrders,
+ Long sessionId)
+ throws AuthException {
+ if (operator == null) {
+ List<IExpression> expressions = new ArrayList<>();
+ return queryByConditions(
+ expressions, database, measurement, serviceProvider, fieldOrders, sessionId);
+ } else if (operator instanceof BasicFunctionOperator) {
+ List<IExpression> iExpressions = new ArrayList<>();
+ iExpressions.add(getIExpressionForBasicFunctionOperator((BasicFunctionOperator) operator));
+ return queryByConditions(
+ iExpressions, database, measurement, serviceProvider, fieldOrders, sessionId);
+ } else {
+ FilterOperator leftOperator = operator.getChildren().get(0);
+ FilterOperator rightOperator = operator.getChildren().get(1);
+ if (operator.getFilterType() == FilterConstant.FilterType.KW_OR) {
+ return QueryResultUtils.orQueryResultProcess(
+ queryExpr(leftOperator, database, measurement, serviceProvider, fieldOrders, sessionId),
+ queryExpr(
+ rightOperator, database, measurement, serviceProvider, fieldOrders, sessionId));
+ } else if (operator.getFilterType() == FilterConstant.FilterType.KW_AND) {
+ if (canMergeOperator(leftOperator) && canMergeOperator(rightOperator)) {
+ List<IExpression> iExpressions1 = getIExpressionByFilterOperatorOperator(leftOperator);
+ List<IExpression> iExpressions2 = getIExpressionByFilterOperatorOperator(rightOperator);
+ iExpressions1.addAll(iExpressions2);
+ return queryByConditions(
+ iExpressions1, database, measurement, serviceProvider, fieldOrders, sessionId);
+ } else {
+ return QueryResultUtils.andQueryResultProcess(
+ queryExpr(
+ leftOperator, database, measurement, serviceProvider, fieldOrders, sessionId),
+ queryExpr(
+ rightOperator, database, measurement, serviceProvider, fieldOrders, sessionId));
+ }
+ }
+ }
+ throw new IllegalArgumentException("unknown operator " + operator);
+ }
+
+ /**
+ * get query results in the format of influxdb through conditions
+ *
+ * @param expressions list of conditions, including tag and field condition
+ * @return returns the results of the influxdb query
+ */
+ private QueryResult queryByConditions(
+ List<IExpression> expressions,
+ String database,
+ String measurement,
+ ServiceProvider serviceProvider,
+ Map<String, Integer> fieldOrders,
+ Long sessionId)
+ throws AuthException {
+ // used to store the actual order according to the tag
+ Map<Integer, SingleSeriesExpression> realTagOrders = new HashMap<>();
+ // stores a list of conditions belonging to the field
+ List<SingleSeriesExpression> fieldExpressions = new ArrayList<>();
+ // maximum number of tags in the current query criteria
+ int currentQueryMaxTagNum = 0;
+ Map<String, Integer> tagOrders = InfluxDBMetaManager.getTagOrders(database, measurement);
+ for (IExpression expression : expressions) {
+ SingleSeriesExpression singleSeriesExpression = ((SingleSeriesExpression) expression);
+ // the current condition is in tag
+ if (tagOrders.containsKey(singleSeriesExpression.getSeriesPath().getFullPath())) {
+ int curOrder = tagOrders.get(singleSeriesExpression.getSeriesPath().getFullPath());
+ // put it into the map according to the tag
+ realTagOrders.put(curOrder, singleSeriesExpression);
+ // update the maximum tag order of the current query criteria
+ currentQueryMaxTagNum = Math.max(currentQueryMaxTagNum, curOrder);
+ } else {
+ fieldExpressions.add(singleSeriesExpression);
+ }
+ }
+ // construct the actual query path
+ StringBuilder curQueryPath = new StringBuilder("root." + database + "." + measurement);
+ // the maximum number of traversals from 1 to the current query condition
+ for (int i = 1; i <= currentQueryMaxTagNum; i++) {
+ if (realTagOrders.containsKey(i)) {
+ // since it is the value in the path, you need to remove the quotation marks at the
+ // beginning and end
+ curQueryPath
+ .append(".")
+ .append(
+ StringUtils.removeQuotation(
+ FilterUtils.getFilterStringValue(realTagOrders.get(i).getFilter())));
+ } else {
+ curQueryPath.append(".").append("*");
+ }
+ }
+ if (currentQueryMaxTagNum < tagOrders.size()) {
+ curQueryPath.append(".**");
+ }
+ // construct actual query condition
+ StringBuilder realIotDBCondition = new StringBuilder();
+ for (int i = 0; i < fieldExpressions.size(); i++) {
+ SingleSeriesExpression singleSeriesExpression = fieldExpressions.get(i);
+ if (i != 0) {
+ realIotDBCondition.append(" and ");
+ }
+ realIotDBCondition
+ .append(singleSeriesExpression.getSeriesPath().getFullPath())
+ .append(" ")
+ .append((FilterUtils.getFilerSymbol(singleSeriesExpression.getFilter())))
+ .append(" ")
+ .append(FilterUtils.getFilterStringValue(singleSeriesExpression.getFilter()));
+ }
+ // actual query SQL statement
+ String realQuerySql;
+
+ realQuerySql = "select * from " + curQueryPath;
+ if (!(realIotDBCondition.length() == 0)) {
+ realQuerySql += " where " + realIotDBCondition;
+ }
+ realQuerySql += " align by device";
+ return queryByConditions(
+ realQuerySql, database, measurement, serviceProvider, fieldOrders, sessionId);
+ }
+
+ /**
+ * generate query conditions through the syntax tree (if you enter this function, it means that it
+ * must be a syntax tree that can be merged, and there is no or)
+ *
+ * @param filterOperator the syntax tree of query criteria needs to be generated
+ * @return condition list
+ */
+ public List<IExpression> getIExpressionByFilterOperatorOperator(FilterOperator filterOperator) {
+ if (filterOperator instanceof BasicFunctionOperator) {
+ // It must be a non-or situation
+ List<IExpression> expressions = new ArrayList<>();
+ expressions.add(
+ getIExpressionForBasicFunctionOperator((BasicFunctionOperator) filterOperator));
+ return expressions;
+ } else {
+ FilterOperator leftOperator = filterOperator.getChildren().get(0);
+ FilterOperator rightOperator = filterOperator.getChildren().get(1);
+ List<IExpression> expressions1 = getIExpressionByFilterOperatorOperator(leftOperator);
+ List<IExpression> expressions2 = getIExpressionByFilterOperatorOperator(rightOperator);
+ expressions1.addAll(expressions2);
+ return expressions1;
+ }
+ }
+
+ /**
+ * judge whether the subtrees of the syntax tree have or operations. If not, the query can be
+ * merged
+ *
+ * @param operator subtree to judge
+ * @return can merge queries
+ */
+ public boolean canMergeOperator(FilterOperator operator) {
+ if (operator instanceof BasicFunctionOperator) {
+ return true;
+ } else {
+ if (operator.getFilterType() == FilterConstant.FilterType.KW_OR) {
+ return false;
+ } else {
+ FilterOperator leftOperator = operator.getChildren().get(0);
+ FilterOperator rightOperator = operator.getChildren().get(1);
+ return canMergeOperator(leftOperator) && canMergeOperator(rightOperator);
+ }
+ }
+ }
+
+ public void checkInfluxDBQueryOperator(Operator operator) {
+ if (!(operator instanceof InfluxQueryOperator)) {
+ throw new IllegalArgumentException("not query sql");
+ }
+ InfluxSelectComponent selectComponent = ((InfluxQueryOperator) operator).getSelectComponent();
+ if (selectComponent.isHasMoreSelectorFunction() && selectComponent.isHasCommonQuery()) {
+ throw new IllegalArgumentException(
+ "ERR: mixing multiple selector functions with tags or fields is not supported");
+ }
+ if (selectComponent.isHasAggregationFunction() && selectComponent.isHasCommonQuery()) {
+ throw new IllegalArgumentException(
+ "ERR: mixing aggregate and non-aggregate queries is not supported");
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/NewQueryHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/NewQueryHandler.java
new file mode 100644
index 0000000000..ee8d0db5c0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/NewQueryHandler.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.protocol.influxdb.handler;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.protocol.influxdb.constant.InfluxConstant;
+import org.apache.iotdb.db.protocol.influxdb.constant.InfluxSQLConstant;
+import org.apache.iotdb.db.protocol.influxdb.function.InfluxFunction;
+import org.apache.iotdb.db.protocol.influxdb.function.InfluxFunctionValue;
+import org.apache.iotdb.db.protocol.influxdb.meta.NewInfluxDBMetaManager;
+import org.apache.iotdb.db.protocol.influxdb.util.QueryResultUtils;
+import org.apache.iotdb.db.protocol.influxdb.util.StringUtils;
+import org.apache.iotdb.db.service.basic.ServiceProvider;
+import org.apache.iotdb.db.service.thrift.impl.NewInfluxDBServiceImpl;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+
+import org.influxdb.InfluxDBException;
+import org.influxdb.dto.QueryResult;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class NewQueryHandler extends AbstractQueryHandler {
+
+ public static TSExecuteStatementResp executeStatement(String sql, long sessionId) {
+ TSExecuteStatementReq tsExecuteStatementReq = new TSExecuteStatementReq();
+ tsExecuteStatementReq.setStatement(sql);
+ tsExecuteStatementReq.setSessionId(sessionId);
+ tsExecuteStatementReq.setStatementId(
+ NewInfluxDBServiceImpl.getClientRPCService().requestStatementId(sessionId));
+ tsExecuteStatementReq.setFetchSize(InfluxConstant.DEFAULT_FETCH_SIZE);
+ TSExecuteStatementResp executeStatementResp =
+ NewInfluxDBServiceImpl.getClientRPCService().executeStatement(tsExecuteStatementReq);
+ TSStatus tsStatus = executeStatementResp.getStatus();
+ if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new InfluxDBException(tsStatus.getMessage());
+ }
+ return executeStatementResp;
+ }
+
+ @Override
+ public Map<String, Integer> getFieldOrders(
+ String database, String measurement, ServiceProvider serviceProvider, long sessionID) {
+ Map<String, Integer> fieldOrders = new HashMap<>();
+ String showTimeseriesSql = "show timeseries root." + database + '.' + measurement + ".**";
+ TSExecuteStatementResp executeStatementResp = executeStatement(showTimeseriesSql, sessionID);
+ List<String> paths = QueryResultUtils.getFullPaths(executeStatementResp);
+ Map<String, Integer> tagOrders = NewInfluxDBMetaManager.getTagOrders(database, measurement);
+ int tagOrderNums = tagOrders.size();
+ int fieldNums = 0;
+ for (String path : paths) {
+ String filed = StringUtils.getFieldByPath(path);
+ if (!fieldOrders.containsKey(filed)) {
+ // The corresponding order of fields is 1 + tagNum (the first is timestamp, then all tags,
+ // and finally all fields)
+ fieldOrders.put(filed, tagOrderNums + fieldNums + 1);
+ fieldNums++;
+ }
+ }
+ return fieldOrders;
+ }
+
+ @Override
+ public InfluxFunctionValue updateByIoTDBFunc(
+ InfluxFunction function, ServiceProvider serviceProvider, String path, long sessionid) {
+ switch (function.getFunctionName()) {
+ case InfluxSQLConstant.COUNT:
+ {
+ String functionSql =
+ StringUtils.generateFunctionSql(
+ function.getFunctionName(), function.getParmaName(), path);
+ TSExecuteStatementResp tsExecuteStatementResp = executeStatement(functionSql, sessionid);
+ List<InfluxFunctionValue> list =
+ QueryResultUtils.getInfluxFunctionValues(tsExecuteStatementResp);
+ for (InfluxFunctionValue influxFunctionValue : list) {
+ function.updateValueIoTDBFunc(influxFunctionValue);
+ }
+ break;
+ }
+ case InfluxSQLConstant.MEAN:
+ {
+ String functionSqlCount =
+ StringUtils.generateFunctionSql("count", function.getParmaName(), path);
+ TSExecuteStatementResp tsExecuteStatementResp =
+ executeStatement(functionSqlCount, sessionid);
+ List<InfluxFunctionValue> list =
+ QueryResultUtils.getInfluxFunctionValues(tsExecuteStatementResp);
+ for (InfluxFunctionValue influxFunctionValue : list) {
+ function.updateValueIoTDBFunc(influxFunctionValue);
+ }
+ String functionSqlSum =
+ StringUtils.generateFunctionSql("sum", function.getParmaName(), path);
+ tsExecuteStatementResp = executeStatement(functionSqlSum, sessionid);
+ list = QueryResultUtils.getInfluxFunctionValues(tsExecuteStatementResp);
+ for (InfluxFunctionValue influxFunctionValue : list) {
+ function.updateValueIoTDBFunc(null, influxFunctionValue);
+ }
+ break;
+ }
+ case InfluxSQLConstant.SUM:
+ {
+ String functionSql =
+ StringUtils.generateFunctionSql("sum", function.getParmaName(), path);
+ TSExecuteStatementResp tsExecuteStatementResp = executeStatement(functionSql, sessionid);
+ List<InfluxFunctionValue> list =
+ QueryResultUtils.getInfluxFunctionValues(tsExecuteStatementResp);
+ for (InfluxFunctionValue influxFunctionValue : list) {
+ function.updateValueIoTDBFunc(influxFunctionValue);
+ }
+ break;
+ }
+ case InfluxSQLConstant.FIRST:
+ case InfluxSQLConstant.LAST:
+ {
+ String functionSql;
+ String functionName;
+ if (function.getFunctionName().equals(InfluxSQLConstant.FIRST)) {
+ functionSql =
+ StringUtils.generateFunctionSql("first_value", function.getParmaName(), path);
+ functionName = "first_value";
+ } else {
+ functionSql =
+ StringUtils.generateFunctionSql("last_value", function.getParmaName(), path);
+ functionName = "last_value";
+ }
+ TSExecuteStatementResp tsExecuteStatementResp = executeStatement(functionSql, sessionid);
+ Map<String, Object> map = QueryResultUtils.getColumnNameAndValue(tsExecuteStatementResp);
+ for (String colume : map.keySet()) {
+ Object o = map.get(colume);
+ String fullPath = colume.substring(functionName.length() + 1, colume.length() - 1);
+ String devicePath = StringUtils.getDeviceByPath(fullPath);
+ String specificSql =
+ String.format(
+ "select %s from %s where %s=%s",
+ function.getParmaName(), devicePath, fullPath, o);
+ TSExecuteStatementResp resp = executeStatement(specificSql, sessionid);
+ List<InfluxFunctionValue> list = QueryResultUtils.getInfluxFunctionValues(resp);
+ for (InfluxFunctionValue influxFunctionValue : list) {
+ function.updateValueIoTDBFunc(influxFunctionValue);
+ }
+ }
+ break;
+ }
+ case InfluxSQLConstant.MAX:
+ case InfluxSQLConstant.MIN:
+ {
+ String functionSql;
+ if (function.getFunctionName().equals(InfluxSQLConstant.MAX)) {
+ functionSql =
+ StringUtils.generateFunctionSql("max_value", function.getParmaName(), path);
+ } else {
+ functionSql =
+ StringUtils.generateFunctionSql("min_value", function.getParmaName(), path);
+ }
+ TSExecuteStatementResp tsExecuteStatementResp = executeStatement(functionSql, sessionid);
+ List<InfluxFunctionValue> list =
+ QueryResultUtils.getInfluxFunctionValues(tsExecuteStatementResp);
+ for (InfluxFunctionValue influxFunctionValue : list) {
+ function.updateValueIoTDBFunc(influxFunctionValue);
+ }
+ break;
+ }
+ default:
+ throw new IllegalStateException("Unexpected value: " + function.getFunctionName());
+ }
+ return function.calculateByIoTDBFunc();
+ }
+
+ @Override
+ public QueryResult queryByConditions(
+ String querySql,
+ String database,
+ String measurement,
+ ServiceProvider serviceProvider,
+ Map<String, Integer> fieldOrders,
+ long sessionId) {
+ TSExecuteStatementResp executeStatementResp = executeStatement(querySql, sessionId);
+ return QueryResultUtils.iotdbResultConvertInfluxResult(
+ executeStatementResp, database, measurement, fieldOrders);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandler.java
index 113da3912f..b58b65f6bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandler.java
@@ -23,43 +23,23 @@ import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.plan.expression.ResultColumn;
-import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
-import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression;
import org.apache.iotdb.db.protocol.influxdb.constant.InfluxConstant;
import org.apache.iotdb.db.protocol.influxdb.constant.InfluxSQLConstant;
import org.apache.iotdb.db.protocol.influxdb.function.InfluxFunction;
-import org.apache.iotdb.db.protocol.influxdb.function.InfluxFunctionFactory;
import org.apache.iotdb.db.protocol.influxdb.function.InfluxFunctionValue;
-import org.apache.iotdb.db.protocol.influxdb.function.aggregator.InfluxAggregator;
-import org.apache.iotdb.db.protocol.influxdb.function.selector.InfluxSelector;
import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager;
-import org.apache.iotdb.db.protocol.influxdb.operator.InfluxQueryOperator;
-import org.apache.iotdb.db.protocol.influxdb.operator.InfluxSelectComponent;
import org.apache.iotdb.db.protocol.influxdb.util.FieldUtils;
-import org.apache.iotdb.db.protocol.influxdb.util.FilterUtils;
-import org.apache.iotdb.db.protocol.influxdb.util.JacksonUtils;
import org.apache.iotdb.db.protocol.influxdb.util.QueryResultUtils;
import org.apache.iotdb.db.protocol.influxdb.util.StringUtils;
-import org.apache.iotdb.db.qp.constant.FilterConstant;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator;
-import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.service.basic.ServiceProvider;
-import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxQueryResultRsp;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.expression.IExpression;
-import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.thrift.TException;
@@ -69,205 +49,15 @@ import org.influxdb.dto.QueryResult;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class QueryHandler {
+public class QueryHandler extends AbstractQueryHandler {
- public static InfluxQueryResultRsp queryInfluxDB(
- String database,
- InfluxQueryOperator queryOperator,
- long sessionId,
- ServiceProvider serviceProvider) {
- String measurement = queryOperator.getFromComponent().getPrefixPaths().get(0).getFullPath();
- // The list of fields under the current measurement and the order of the specified rules
- Map<String, Integer> fieldOrders = getFieldOrders(database, measurement, serviceProvider);
- QueryResult queryResult;
- InfluxQueryResultRsp tsQueryResultRsp = new InfluxQueryResultRsp();
- try {
- // contain filter condition or have common query the result of by traversal.
- if (queryOperator.getWhereComponent() != null
- || queryOperator.getSelectComponent().isHasCommonQuery()
- || queryOperator.getSelectComponent().isHasOnlyTraverseFunction()) {
- // step1 : generate query results
- queryResult =
- queryExpr(
- queryOperator.getWhereComponent() != null
- ? queryOperator.getWhereComponent().getFilterOperator()
- : null,
- database,
- measurement,
- serviceProvider,
- fieldOrders,
- sessionId);
- // step2 : select filter
- ProcessSelectComponent(queryResult, queryOperator.getSelectComponent());
- }
- // don't contain filter condition and only have function use iotdb function.
- else {
- queryResult =
- queryFuncWithoutFilter(
- queryOperator.getSelectComponent(), database, measurement, serviceProvider);
- }
- return tsQueryResultRsp
- .setResultJsonString(JacksonUtils.bean2Json(queryResult))
- .setStatus(RpcUtils.getInfluxDBStatus(TSStatusCode.SUCCESS_STATUS));
- } catch (AuthException e) {
- return tsQueryResultRsp.setStatus(
- RpcUtils.getInfluxDBStatus(
- TSStatusCode.UNINITIALIZED_AUTH_ERROR.getStatusCode(), e.getMessage()));
- }
- }
-
- /**
- * conditions are generated from subtrees of unique conditions
- *
- * @param basicFunctionOperator subtree to generate condition
- * @return corresponding conditions
- */
- public static IExpression getIExpressionForBasicFunctionOperator(
- BasicFunctionOperator basicFunctionOperator) {
- return new SingleSeriesExpression(
- basicFunctionOperator.getSinglePath(),
- FilterUtils.filterTypeToFilter(
- basicFunctionOperator.getFilterType(), basicFunctionOperator.getValue()));
- }
-
- /**
- * further process the obtained query result through the query criteria of select
- *
- * @param queryResult query results to be processed
- * @param selectComponent select conditions to be filtered
- */
- public static void ProcessSelectComponent(
- QueryResult queryResult, InfluxSelectComponent selectComponent) {
-
- // get the row order map of the current data result first
- List<String> columns = queryResult.getResults().get(0).getSeries().get(0).getColumns();
- Map<String, Integer> columnOrders = new HashMap<>();
- for (int i = 0; i < columns.size(); i++) {
- columnOrders.put(columns.get(i), i);
- }
- // get current values
- List<List<Object>> values = queryResult.getResults().get(0).getSeries().get(0).getValues();
- // new columns
- List<String> newColumns = new ArrayList<>();
- newColumns.add(InfluxSQLConstant.RESERVED_TIME);
-
- // when have function
- if (selectComponent.isHasFunction()) {
- List<InfluxFunction> functions = new ArrayList<>();
- for (ResultColumn resultColumn : selectComponent.getResultColumns()) {
- Expression expression = resultColumn.getExpression();
- if (expression instanceof FunctionExpression) {
- String functionName = ((FunctionExpression) expression).getFunctionName();
- functions.add(
- InfluxFunctionFactory.generateFunction(functionName, expression.getExpressions()));
- newColumns.add(functionName);
- } else if (expression instanceof TimeSeriesOperand) {
- String columnName = ((TimeSeriesOperand) expression).getPath().getFullPath();
- if (!columnName.equals(InfluxSQLConstant.STAR)) {
- newColumns.add(columnName);
- } else {
- newColumns.addAll(columns.subList(1, columns.size()));
- }
- }
- }
- for (List<Object> value : values) {
- for (InfluxFunction function : functions) {
- List<Expression> expressions = function.getExpressions();
- if (expressions == null) {
- throw new IllegalArgumentException("not support param");
- }
- TimeSeriesOperand parmaExpression = (TimeSeriesOperand) expressions.get(0);
- String parmaName = parmaExpression.getPath().getFullPath();
- if (columnOrders.containsKey(parmaName)) {
- Object selectedValue = value.get(columnOrders.get(parmaName));
- Long selectedTimestamp = (Long) value.get(0);
- if (selectedValue != null) {
- // selector function
- if (function instanceof InfluxSelector) {
- ((InfluxSelector) function)
- .updateValueAndRelateValues(
- new InfluxFunctionValue(selectedValue, selectedTimestamp), value);
- } else {
- // aggregate function
- ((InfluxAggregator) function)
- .updateValueBruteForce(
- new InfluxFunctionValue(selectedValue, selectedTimestamp));
- }
- }
- }
- }
- }
- List<Object> value = new ArrayList<>();
- values = new ArrayList<>();
- // after the data is constructed, the final results are generated
- // First, judge whether there are common queries. If there are, a selector function is allowed
- // without aggregate functions
- if (selectComponent.isHasCommonQuery()) {
- InfluxSelector selector = (InfluxSelector) functions.get(0);
- List<Object> relatedValue = selector.getRelatedValues();
- for (String column : newColumns) {
- if (InfluxSQLConstant.getNativeSelectorFunctionNames().contains(column)) {
- value.add(selector.calculateBruteForce().getValue());
- } else {
- if (relatedValue != null) {
- value.add(relatedValue.get(columnOrders.get(column)));
- }
- }
- }
- } else {
- // If there are no common queries, they are all function queries
- for (InfluxFunction function : functions) {
- if (value.size() == 0) {
- value.add(function.calculateBruteForce().getTimestamp());
- } else {
- value.set(0, function.calculateBruteForce().getTimestamp());
- }
- value.add(function.calculateBruteForce().getValue());
- }
- if (selectComponent.isHasAggregationFunction() || selectComponent.isHasMoreFunction()) {
- value.set(0, 0);
- }
- }
- values.add(value);
- }
- // if it is not a function query, it is only a common query
- else if (selectComponent.isHasCommonQuery()) {
- // start traversing the scope of the select
- for (ResultColumn resultColumn : selectComponent.getResultColumns()) {
- Expression expression = resultColumn.getExpression();
- if (expression instanceof TimeSeriesOperand) {
- // not star case
- if (!((TimeSeriesOperand) expression)
- .getPath()
- .getFullPath()
- .equals(InfluxSQLConstant.STAR)) {
- newColumns.add(((TimeSeriesOperand) expression).getPath().getFullPath());
- } else {
- newColumns.addAll(columns.subList(1, columns.size()));
- }
- }
- }
- List<List<Object>> newValues = new ArrayList<>();
- for (List<Object> value : values) {
- List<Object> tmpValue = new ArrayList<>();
- for (String newColumn : newColumns) {
- tmpValue.add(value.get(columnOrders.get(newColumn)));
- }
- newValues.add(tmpValue);
- }
- values = newValues;
- }
- QueryResultUtils.updateQueryResultColumnValue(
- queryResult, StringUtils.removeDuplicate(newColumns), values);
- }
-
- public static Map<String, Integer> getFieldOrders(
- String database, String measurement, ServiceProvider serviceProvider) {
+ @Override
+ public Map<String, Integer> getFieldOrders(
+ String database, String measurement, ServiceProvider serviceProvider, long sessionID) {
Map<String, Integer> fieldOrders = new HashMap<>();
long queryId = ServiceProvider.SESSION_MANAGER.requestQueryId(true);
try {
@@ -312,65 +102,9 @@ public class QueryHandler {
return fieldOrders;
}
- /**
- * Query the select result. By default, there are no filter conditions. The functions to be
- * queried use the built-in iotdb functions
- *
- * @param selectComponent select data to query
- * @return select query result
- */
- public static QueryResult queryFuncWithoutFilter(
- InfluxSelectComponent selectComponent,
- String database,
- String measurement,
- ServiceProvider serviceProvider) {
- // columns
- List<String> columns = new ArrayList<>();
- columns.add(InfluxSQLConstant.RESERVED_TIME);
-
- List<InfluxFunction> functions = new ArrayList<>();
- String path = "root." + database + "." + measurement;
- for (ResultColumn resultColumn : selectComponent.getResultColumns()) {
- Expression expression = resultColumn.getExpression();
- if (expression instanceof FunctionExpression) {
- String functionName = ((FunctionExpression) expression).getFunctionName();
- functions.add(
- InfluxFunctionFactory.generateFunction(functionName, expression.getExpressions()));
- columns.add(functionName);
- }
- }
-
- List<Object> value = new ArrayList<>();
- List<List<Object>> values = new ArrayList<>();
- for (InfluxFunction function : functions) {
- InfluxFunctionValue functionValue = updateByIoTDBFunc(function, serviceProvider, path);
- // InfluxFunctionValue functionValue = function.calculateByIoTDBFunc();
- if (value.size() == 0) {
- value.add(functionValue.getTimestamp());
- } else {
- value.set(0, functionValue.getTimestamp());
- }
- value.add(functionValue.getValue());
- }
- if (selectComponent.isHasAggregationFunction() || selectComponent.isHasMoreFunction()) {
- value.set(0, 0);
- }
- values.add(value);
-
- // generate series
- QueryResult queryResult = new QueryResult();
- QueryResult.Series series = new QueryResult.Series();
- series.setColumns(columns);
- series.setValues(values);
- series.setName(measurement);
- QueryResult.Result result = new QueryResult.Result();
- result.setSeries(new ArrayList<>(Arrays.asList(series)));
- queryResult.setResults(new ArrayList<>(Arrays.asList(result)));
- return queryResult;
- }
-
- private static InfluxFunctionValue updateByIoTDBFunc(
- InfluxFunction function, ServiceProvider serviceProvider, String path) {
+ @Override
+ public InfluxFunctionValue updateByIoTDBFunc(
+ InfluxFunction function, ServiceProvider serviceProvider, String path, long sessionid) {
switch (function.getFunctionName()) {
case InfluxSQLConstant.COUNT:
{
@@ -666,7 +400,9 @@ public class QueryHandler {
RowRecord recordNew = queryDataSetNew.next();
List<Field> newFields = recordNew.getFields();
long time = recordNew.getTimestamp();
- function.updateValueIoTDBFunc(new InfluxFunctionValue(newFields.get(0), time));
+ function.updateValueIoTDBFunc(
+ new InfluxFunctionValue(
+ FieldUtils.iotdbFieldConvert(newFields.get(0)), time));
}
}
}
@@ -740,145 +476,19 @@ public class QueryHandler {
return function.calculateByIoTDBFunc();
}
- public static void checkInfluxDBQueryOperator(Operator operator) {
- if (!(operator instanceof InfluxQueryOperator)) {
- throw new IllegalArgumentException("not query sql");
- }
- InfluxSelectComponent selectComponent = ((InfluxQueryOperator) operator).getSelectComponent();
- if (selectComponent.isHasMoreSelectorFunction() && selectComponent.isHasCommonQuery()) {
- throw new IllegalArgumentException(
- "ERR: mixing multiple selector functions with tags or fields is not supported");
- }
- if (selectComponent.isHasAggregationFunction() && selectComponent.isHasCommonQuery()) {
- throw new IllegalArgumentException(
- "ERR: mixing aggregate and non-aggregate queries is not supported");
- }
- }
-
- public static QueryResult queryExpr(
- FilterOperator operator,
- String database,
- String measurement,
- ServiceProvider serviceProvider,
- Map<String, Integer> fieldOrders,
- Long sessionId)
- throws AuthException {
- if (operator == null) {
- List<IExpression> expressions = new ArrayList<>();
- return queryByConditions(
- expressions, database, measurement, serviceProvider, fieldOrders, sessionId);
- } else if (operator instanceof BasicFunctionOperator) {
- List<IExpression> iExpressions = new ArrayList<>();
- iExpressions.add(getIExpressionForBasicFunctionOperator((BasicFunctionOperator) operator));
- return queryByConditions(
- iExpressions, database, measurement, serviceProvider, fieldOrders, sessionId);
- } else {
- FilterOperator leftOperator = operator.getChildren().get(0);
- FilterOperator rightOperator = operator.getChildren().get(1);
- if (operator.getFilterType() == FilterConstant.FilterType.KW_OR) {
- return QueryResultUtils.orQueryResultProcess(
- queryExpr(leftOperator, database, measurement, serviceProvider, fieldOrders, sessionId),
- queryExpr(
- rightOperator, database, measurement, serviceProvider, fieldOrders, sessionId));
- } else if (operator.getFilterType() == FilterConstant.FilterType.KW_AND) {
- if (canMergeOperator(leftOperator) && canMergeOperator(rightOperator)) {
- List<IExpression> iExpressions1 = getIExpressionByFilterOperatorOperator(leftOperator);
- List<IExpression> iExpressions2 = getIExpressionByFilterOperatorOperator(rightOperator);
- iExpressions1.addAll(iExpressions2);
- return queryByConditions(
- iExpressions1, database, measurement, serviceProvider, fieldOrders, sessionId);
- } else {
- return QueryResultUtils.andQueryResultProcess(
- queryExpr(
- leftOperator, database, measurement, serviceProvider, fieldOrders, sessionId),
- queryExpr(
- rightOperator, database, measurement, serviceProvider, fieldOrders, sessionId));
- }
- }
- }
- throw new IllegalArgumentException("unknown operator " + operator);
- }
-
- /**
- * get query results in the format of influxdb through conditions
- *
- * @param expressions list of conditions, including tag and field condition
- * @return returns the results of the influxdb query
- */
- private static QueryResult queryByConditions(
- List<IExpression> expressions,
+ @Override
+ public QueryResult queryByConditions(
+ String querySql,
String database,
String measurement,
ServiceProvider serviceProvider,
Map<String, Integer> fieldOrders,
- Long sessionId)
+ long sessionId)
throws AuthException {
- // used to store the actual order according to the tag
- Map<Integer, SingleSeriesExpression> realTagOrders = new HashMap<>();
- // stores a list of conditions belonging to the field
- List<SingleSeriesExpression> fieldExpressions = new ArrayList<>();
- // maximum number of tags in the current query criteria
- int currentQueryMaxTagNum = 0;
- Map<String, Integer> tagOrders = InfluxDBMetaManager.getTagOrders(database, measurement);
- for (IExpression expression : expressions) {
- SingleSeriesExpression singleSeriesExpression = ((SingleSeriesExpression) expression);
- // the current condition is in tag
- if (tagOrders.containsKey(singleSeriesExpression.getSeriesPath().getFullPath())) {
- int curOrder = tagOrders.get(singleSeriesExpression.getSeriesPath().getFullPath());
- // put it into the map according to the tag
- realTagOrders.put(curOrder, singleSeriesExpression);
- // update the maximum tag order of the current query criteria
- currentQueryMaxTagNum = Math.max(currentQueryMaxTagNum, curOrder);
- } else {
- fieldExpressions.add(singleSeriesExpression);
- }
- }
- // construct the actual query path
- StringBuilder curQueryPath = new StringBuilder("root." + database + "." + measurement);
- // the maximum number of traversals from 1 to the current query condition
- for (int i = 1; i <= currentQueryMaxTagNum; i++) {
- if (realTagOrders.containsKey(i)) {
- // since it is the value in the path, you need to remove the quotation marks at the
- // beginning and end
- curQueryPath
- .append(".")
- .append(
- StringUtils.removeQuotation(
- FilterUtils.getFilterStringValue(realTagOrders.get(i).getFilter())));
- } else {
- curQueryPath.append(".").append("*");
- }
- }
- if (currentQueryMaxTagNum < tagOrders.size()) {
- curQueryPath.append(".**");
- }
- // construct actual query condition
- StringBuilder realIotDBCondition = new StringBuilder();
- for (int i = 0; i < fieldExpressions.size(); i++) {
- SingleSeriesExpression singleSeriesExpression = fieldExpressions.get(i);
- if (i != 0) {
- realIotDBCondition.append(" and ");
- }
- realIotDBCondition
- .append(singleSeriesExpression.getSeriesPath().getFullPath())
- .append(" ")
- .append((FilterUtils.getFilerSymbol(singleSeriesExpression.getFilter())))
- .append(" ")
- .append(FilterUtils.getFilterStringValue(singleSeriesExpression.getFilter()));
- }
- // actual query SQL statement
- String realQuerySql;
-
- realQuerySql = "select * from " + curQueryPath;
- if (!(realIotDBCondition.length() == 0)) {
- realQuerySql += " where " + realIotDBCondition;
- }
- realQuerySql += " align by device";
-
long queryId = ServiceProvider.SESSION_MANAGER.requestQueryId(true);
try {
QueryPlan queryPlan =
- (QueryPlan) serviceProvider.getPlanner().parseSQLToPhysicalPlan(realQuerySql);
+ (QueryPlan) serviceProvider.getPlanner().parseSQLToPhysicalPlan(querySql);
TSStatus tsStatus = SessionManager.getInstance().checkAuthority(queryPlan, sessionId);
if (tsStatus != null) {
throw new AuthException(tsStatus.getMessage());
@@ -888,7 +498,7 @@ public class QueryHandler {
queryId,
true,
System.currentTimeMillis(),
- realQuerySql,
+ querySql,
InfluxConstant.DEFAULT_CONNECTION_TIMEOUT_MS);
QueryDataSet queryDataSet =
serviceProvider.createQueryDataSet(
@@ -908,50 +518,4 @@ public class QueryHandler {
ServiceProvider.SESSION_MANAGER.releaseQueryResourceNoExceptions(queryId);
}
}
-
- /**
- * generate query conditions through the syntax tree (if you enter this function, it means that it
- * must be a syntax tree that can be merged, and there is no or)
- *
- * @param filterOperator the syntax tree of query criteria needs to be generated
- * @return condition list
- */
- public static List<IExpression> getIExpressionByFilterOperatorOperator(
- FilterOperator filterOperator) {
- if (filterOperator instanceof BasicFunctionOperator) {
- // It must be a non-or situation
- List<IExpression> expressions = new ArrayList<>();
- expressions.add(
- getIExpressionForBasicFunctionOperator((BasicFunctionOperator) filterOperator));
- return expressions;
- } else {
- FilterOperator leftOperator = filterOperator.getChildren().get(0);
- FilterOperator rightOperator = filterOperator.getChildren().get(1);
- List<IExpression> expressions1 = getIExpressionByFilterOperatorOperator(leftOperator);
- List<IExpression> expressions2 = getIExpressionByFilterOperatorOperator(rightOperator);
- expressions1.addAll(expressions2);
- return expressions1;
- }
- }
-
- /**
- * judge whether the subtrees of the syntax tree have or operations. If not, the query can be
- * merged
- *
- * @param operator subtree to judge
- * @return can merge queries
- */
- public static boolean canMergeOperator(FilterOperator operator) {
- if (operator instanceof BasicFunctionOperator) {
- return true;
- } else {
- if (operator.getFilterType() == FilterConstant.FilterType.KW_OR) {
- return false;
- } else {
- FilterOperator leftOperator = operator.getChildren().get(0);
- FilterOperator rightOperator = operator.getChildren().get(1);
- return canMergeOperator(leftOperator) && canMergeOperator(rightOperator);
- }
- }
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/AbstractInfluxDBMetaManager.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/AbstractInfluxDBMetaManager.java
new file mode 100644
index 0000000000..513b06e59f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/AbstractInfluxDBMetaManager.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.protocol.influxdb.meta;
+
+import org.apache.iotdb.db.protocol.influxdb.constant.InfluxConstant;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class AbstractInfluxDBMetaManager {
+
+ protected static final String SELECT_TAG_INFO_SQL =
+ "select database_name,measurement_name,tag_name,tag_order from root.TAG_INFO ";
+
+ // TODO avoid OOM
+ protected static Map<String, Map<String, Map<String, Integer>>> database2Measurement2TagOrders =
+ new HashMap<>();
+
+ public static Map<String, Integer> getTagOrders(String database, String measurement) {
+ Map<String, Integer> tagOrders = new HashMap<>();
+ Map<String, Map<String, Integer>> measurement2TagOrders =
+ database2Measurement2TagOrders.get(database);
+ if (measurement2TagOrders != null) {
+ tagOrders = measurement2TagOrders.get(measurement);
+ }
+ if (tagOrders == null) {
+ tagOrders = new HashMap<>();
+ }
+ return tagOrders;
+ }
+
+ abstract void recover();
+
+ abstract void setStorageGroup(String database, long sessionID);
+
+ abstract void updateTagInfoRecords(TagInfoRecords tagInfoRecords, long sessionID);
+
+ public final synchronized Map<String, Map<String, Integer>> createDatabase(
+ String database, long sessionID) {
+ Map<String, Map<String, Integer>> measurement2TagOrders =
+ database2Measurement2TagOrders.get(database);
+ if (measurement2TagOrders != null) {
+ return measurement2TagOrders;
+ }
+ setStorageGroup(database, sessionID);
+ measurement2TagOrders = new HashMap<>();
+ database2Measurement2TagOrders.put(database, measurement2TagOrders);
+ return measurement2TagOrders;
+ }
+
+ public final synchronized Map<String, Integer> getTagOrdersWithAutoCreatingSchema(
+ String database, String measurement, long sessionID) {
+ return createDatabase(database, sessionID).computeIfAbsent(measurement, m -> new HashMap<>());
+ }
+
+ public final synchronized String generatePath(
+ String database, String measurement, Map<String, String> tags, long sessionID) {
+ Map<String, Integer> tagKeyToLayerOrders =
+ getTagOrdersWithAutoCreatingSchema(database, measurement, sessionID);
+ // to support rollback if fails to persisting new tag info
+ Map<String, Integer> newTagKeyToLayerOrders = new HashMap<>(tagKeyToLayerOrders);
+ // record the layer orders of tag keys that the path contains
+ Map<Integer, String> layerOrderToTagKeysInPath = new HashMap<>();
+
+ int tagNumber = tagKeyToLayerOrders.size();
+
+ TagInfoRecords newTagInfoRecords = null;
+ for (Map.Entry<String, String> tag : tags.entrySet()) {
+ final String tagKey = tag.getKey();
+ if (!newTagKeyToLayerOrders.containsKey(tagKey)) {
+ if (newTagInfoRecords == null) {
+ newTagInfoRecords = new TagInfoRecords();
+ }
+ ++tagNumber;
+ newTagInfoRecords.add(database, measurement, tagKey, tagNumber);
+ newTagKeyToLayerOrders.put(tagKey, tagNumber);
+ }
+
+ layerOrderToTagKeysInPath.put(newTagKeyToLayerOrders.get(tagKey), tagKey);
+ }
+
+ if (newTagInfoRecords != null) {
+ updateTagInfoRecords(newTagInfoRecords, sessionID);
+ database2Measurement2TagOrders.get(database).put(measurement, newTagKeyToLayerOrders);
+ }
+
+ StringBuilder path =
+ new StringBuilder("root.").append(database).append(".").append(measurement);
+ for (int i = 1; i <= tagNumber; ++i) {
+ path.append(".")
+ .append(
+ layerOrderToTagKeysInPath.containsKey(i)
+ ? tags.get(layerOrderToTagKeysInPath.get(i))
+ : InfluxConstant.PLACE_HOLDER);
+ }
+ return path.toString();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/InfluxDBMetaManager.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/InfluxDBMetaManager.java
index 0dd094f9da..f2e58de977 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/InfluxDBMetaManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/InfluxDBMetaManager.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.protocol.influxdb.constant.InfluxConstant;
import org.apache.iotdb.db.qp.Planner;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
@@ -47,29 +46,23 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class InfluxDBMetaManager {
+public class InfluxDBMetaManager extends AbstractInfluxDBMetaManager {
protected final Planner planner;
private final ServiceProvider serviceProvider;
- private static final String SELECT_TAG_INFO_SQL =
- "select database_name,measurement_name,tag_name,tag_order from root.TAG_INFO ";
-
- public static InfluxDBMetaManager getInstance() {
- return InfluxDBMetaManagerHolder.INSTANCE;
- }
-
- // TODO avoid OOM
- private static Map<String, Map<String, Map<String, Integer>>> database2Measurement2TagOrders =
- new HashMap<>();
-
private InfluxDBMetaManager() {
serviceProvider = IoTDB.serviceProvider;
database2Measurement2TagOrders = new HashMap<>();
planner = serviceProvider.getPlanner();
}
+ public static InfluxDBMetaManager getInstance() {
+ return InfluxDBMetaManagerHolder.INSTANCE;
+ }
+
+ @Override
public void recover() {
long queryId = ServiceProvider.SESSION_MANAGER.requestQueryId(true);
try {
@@ -121,13 +114,8 @@ public class InfluxDBMetaManager {
}
}
- public synchronized Map<String, Map<String, Integer>> createDatabase(String database) {
- Map<String, Map<String, Integer>> measurement2TagOrders =
- database2Measurement2TagOrders.get(database);
- if (measurement2TagOrders != null) {
- return measurement2TagOrders;
- }
-
+ @Override
+ public void setStorageGroup(String database, long sessionID) {
try {
SetStorageGroupPlan setStorageGroupPlan =
new SetStorageGroupPlan(new PartialPath("root." + database));
@@ -140,61 +128,10 @@ public class InfluxDBMetaManager {
} catch (IllegalPathException | StorageGroupNotSetException | StorageEngineException e) {
throw new InfluxDBException(e.getMessage());
}
-
- measurement2TagOrders = new HashMap<>();
- database2Measurement2TagOrders.put(database, measurement2TagOrders);
- return measurement2TagOrders;
- }
-
- public synchronized Map<String, Integer> getTagOrdersWithAutoCreatingSchema(
- String database, String measurement) {
- return createDatabase(database).computeIfAbsent(measurement, m -> new HashMap<>());
}
- public synchronized String generatePath(
- String database, String measurement, Map<String, String> tags) {
- Map<String, Integer> tagKeyToLayerOrders =
- getTagOrdersWithAutoCreatingSchema(database, measurement);
- // to support rollback if fails to persisting new tag info
- Map<String, Integer> newTagKeyToLayerOrders = new HashMap<>(tagKeyToLayerOrders);
- // record the layer orders of tag keys that the path contains
- Map<Integer, String> layerOrderToTagKeysInPath = new HashMap<>();
-
- int tagNumber = tagKeyToLayerOrders.size();
-
- TagInfoRecords newTagInfoRecords = null;
- for (Map.Entry<String, String> tag : tags.entrySet()) {
- final String tagKey = tag.getKey();
- if (!newTagKeyToLayerOrders.containsKey(tagKey)) {
- if (newTagInfoRecords == null) {
- newTagInfoRecords = new TagInfoRecords();
- }
- ++tagNumber;
- newTagInfoRecords.add(database, measurement, tagKey, tagNumber);
- newTagKeyToLayerOrders.put(tagKey, tagNumber);
- }
-
- layerOrderToTagKeysInPath.put(newTagKeyToLayerOrders.get(tagKey), tagKey);
- }
-
- if (newTagInfoRecords != null) {
- updateTagInfoRecords(newTagInfoRecords);
- database2Measurement2TagOrders.get(database).put(measurement, newTagKeyToLayerOrders);
- }
-
- StringBuilder path =
- new StringBuilder("root.").append(database).append(".").append(measurement);
- for (int i = 1; i <= tagNumber; ++i) {
- path.append(".")
- .append(
- layerOrderToTagKeysInPath.containsKey(i)
- ? tags.get(layerOrderToTagKeysInPath.get(i))
- : InfluxConstant.PLACE_HOLDER);
- }
- return path.toString();
- }
-
- private void updateTagInfoRecords(TagInfoRecords tagInfoRecords) {
+ @Override
+ public void updateTagInfoRecords(TagInfoRecords tagInfoRecords, long sessionID) {
List<InsertRowPlan> plans = tagInfoRecords.convertToInsertRowPlans();
for (InsertRowPlan plan : plans) {
try {
@@ -205,19 +142,6 @@ public class InfluxDBMetaManager {
}
}
- public static Map<String, Integer> getTagOrders(String database, String measurement) {
- Map<String, Integer> tagOrders = new HashMap<>();
- Map<String, Map<String, Integer>> measurement2TagOrders =
- database2Measurement2TagOrders.get(database);
- if (measurement2TagOrders != null) {
- tagOrders = measurement2TagOrders.get(measurement);
- }
- if (tagOrders == null) {
- tagOrders = new HashMap<>();
- }
- return tagOrders;
- }
-
private static class InfluxDBMetaManagerHolder {
private static final InfluxDBMetaManager INSTANCE = new InfluxDBMetaManager();
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/NewInfluxDBMetaManager.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/NewInfluxDBMetaManager.java
new file mode 100644
index 0000000000..5269a2bf44
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/NewInfluxDBMetaManager.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.protocol.influxdb.meta;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.protocol.influxdb.handler.NewQueryHandler;
+import org.apache.iotdb.db.protocol.influxdb.util.QueryResultUtils;
+import org.apache.iotdb.db.service.thrift.impl.ClientRPCServiceImpl;
+import org.apache.iotdb.db.service.thrift.impl.NewInfluxDBServiceImpl;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.IoTDBJDBCDataSet;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+
+import org.influxdb.InfluxDBException;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class NewInfluxDBMetaManager extends AbstractInfluxDBMetaManager {
+
+ private final ClientRPCServiceImpl clientRPCService;
+
+ private NewInfluxDBMetaManager() {
+ clientRPCService = NewInfluxDBServiceImpl.getClientRPCService();
+ }
+
+ public static NewInfluxDBMetaManager getInstance() {
+ return InfluxDBMetaManagerHolder.INSTANCE;
+ }
+
+ @Override
+ public void recover() {
+ long sessionID = 0;
+ try {
+ TSOpenSessionResp tsOpenSessionResp =
+ clientRPCService.openSession(
+ new TSOpenSessionReq().setUsername("root").setPassword("root"));
+ sessionID = tsOpenSessionResp.getSessionId();
+ TSExecuteStatementResp resp =
+ NewQueryHandler.executeStatement(SELECT_TAG_INFO_SQL, sessionID);
+ IoTDBJDBCDataSet dataSet = QueryResultUtils.creatIoTJDBCDataset(resp);
+ try {
+ Map<String, Map<String, Integer>> measurement2TagOrders;
+ Map<String, Integer> tagOrders;
+ while (dataSet.hasCachedResults()) {
+ dataSet.constructOneRow();
+ String database = dataSet.getString("root.TAG_INFO.database_name");
+ String measurement = dataSet.getString("root.TAG_INFO.measurement_name");
+ String tag = dataSet.getString("root.TAG_INFO.tag_name");
+ Integer tagOrder = dataSet.getInt("root.TAG_INFO.tag_order");
+ if (database2Measurement2TagOrders.containsKey(database)) {
+ measurement2TagOrders = database2Measurement2TagOrders.get(database);
+ if (measurement2TagOrders.containsKey(measurement)) {
+ tagOrders = measurement2TagOrders.get(measurement);
+ } else {
+ tagOrders = new HashMap<>();
+ }
+ } else {
+ measurement2TagOrders = new HashMap<>();
+ tagOrders = new HashMap<>();
+ }
+ tagOrders.put(tag, tagOrder);
+ measurement2TagOrders.put(measurement, tagOrders);
+ database2Measurement2TagOrders.put(database, measurement2TagOrders);
+ }
+ } catch (StatementExecutionException e) {
+ throw new InfluxDBException(e.getMessage());
+ }
+ } catch (Exception e) {
+ throw new InfluxDBException(e.getMessage());
+ } finally {
+ clientRPCService.closeSession(new TSCloseSessionReq().setSessionId(sessionID));
+ }
+ }
+
+ @Override
+ public void setStorageGroup(String database, long sessionID) {
+ TSStatus status = clientRPCService.setStorageGroup(sessionID, "root." + database);
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ || status.getCode() == TSStatusCode.STORAGE_GROUP_ALREADY_EXISTS.getStatusCode()) {
+ return;
+ }
+ throw new InfluxDBException(status.getMessage());
+ }
+
+ @Override
+ public void updateTagInfoRecords(TagInfoRecords tagInfoRecords, long sessionID) {
+ try {
+ List<TSInsertRecordReq> reqs = tagInfoRecords.convertToInsertRecordsReq(sessionID);
+ for (TSInsertRecordReq tsInsertRecordReq : reqs) {
+ TSStatus tsStatus = clientRPCService.insertRecord(tsInsertRecordReq);
+ if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new InfluxDBException(tsStatus.getMessage());
+ }
+ }
+ } catch (IoTDBConnectionException e) {
+ throw new InfluxDBException(e.getMessage());
+ }
+ }
+
+ private static class InfluxDBMetaManagerHolder {
+ private static final NewInfluxDBMetaManager INSTANCE = new NewInfluxDBMetaManager();
+
+ private InfluxDBMetaManagerHolder() {}
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/TagInfoRecords.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/TagInfoRecords.java
index e6cf7b4f41..c38df89d0f 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/TagInfoRecords.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/TagInfoRecords.java
@@ -24,18 +24,21 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.utils.DataTypeUtils;
import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.influxdb.InfluxDBException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
public class TagInfoRecords {
private static final String TAG_INFO_DEVICE_ID = "root.TAG_INFO";
private static final List<String> TAG_INFO_MEASUREMENTS = new ArrayList<>();
private static final List<TSDataType> TAG_INFO_TYPES = new ArrayList<>();
+ private static final AtomicLong TAG_TIME_STAMPS = new AtomicLong();
static {
TAG_INFO_MEASUREMENTS.add("database_name");
@@ -65,11 +68,8 @@ public class TagInfoRecords {
public void add(String database, String measurement, String tag, int order) {
deviceIds.add(TAG_INFO_DEVICE_ID);
- if (times.size() == 0) {
- times.add(System.currentTimeMillis());
- } else {
- times.add(times.get(times.size() - 1) + 1);
- }
+ // Multiple adjacent records, possibly with the same timestamp
+ times.add(TAG_TIME_STAMPS.getAndIncrement());
measurementsList.add(TAG_INFO_MEASUREMENTS);
typesList.add(TAG_INFO_TYPES);
@@ -98,4 +98,22 @@ public class TagInfoRecords {
}
return insertRowPlans;
}
+
+ public List<TSInsertRecordReq> convertToInsertRecordsReq(long sessionID)
+ throws IoTDBConnectionException {
+ ArrayList<TSInsertRecordReq> reqs = new ArrayList<>();
+ long now = 0;
+ for (int i = 0; i < deviceIds.size(); i++) {
+ TSInsertRecordReq tsInsertRecordReq = new TSInsertRecordReq();
+ tsInsertRecordReq.setSessionId(sessionID);
+ tsInsertRecordReq.setTimestamp(times.get(i));
+ tsInsertRecordReq.setIsAligned(false);
+ tsInsertRecordReq.setPrefixPath(deviceIds.get(i));
+ tsInsertRecordReq.setMeasurements(measurementsList.get(i));
+ tsInsertRecordReq.setValues(
+ DataTypeUtils.getValueBuffer(typesList.get(i), valuesList.get(i)));
+ reqs.add(tsInsertRecordReq);
+ }
+ return reqs;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/InfluxReqAndRespUtils.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/InfluxReqAndRespUtils.java
new file mode 100644
index 0000000000..c3c27af3d8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/InfluxReqAndRespUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.protocol.influxdb.util;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.utils.DataTypeUtils;
+import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxCloseSessionReq;
+import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxOpenSessionReq;
+import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxOpenSessionResp;
+import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+
+public class InfluxReqAndRespUtils {
+
+ public static TSOpenSessionReq convertOpenSessionReq(InfluxOpenSessionReq influxOpenSessionReq) {
+ TSOpenSessionReq tsOpenSessionReq = new TSOpenSessionReq();
+ tsOpenSessionReq.setZoneId(influxOpenSessionReq.getZoneId());
+ tsOpenSessionReq.setUsername(influxOpenSessionReq.getUsername());
+ tsOpenSessionReq.setPassword(influxOpenSessionReq.getPassword());
+ tsOpenSessionReq.setConfiguration(influxOpenSessionReq.getConfiguration());
+ return tsOpenSessionReq;
+ }
+
+ public static InfluxOpenSessionResp convertOpenSessionResp(TSOpenSessionResp tsOpenSessionResp) {
+ InfluxOpenSessionResp influxOpenSessionResp = new InfluxOpenSessionResp();
+ influxOpenSessionResp.setSessionId(tsOpenSessionResp.getSessionId());
+ TSStatus tsStatus = tsOpenSessionResp.getStatus();
+ influxOpenSessionResp.setStatus(DataTypeUtils.RPCStatusToInfluxDBTSStatus(tsStatus));
+ return influxOpenSessionResp;
+ }
+
+ public static TSCloseSessionReq convertCloseSessionReq(
+ InfluxCloseSessionReq influxCloseSessionReq) {
+ TSCloseSessionReq tsCloseSessionReq = new TSCloseSessionReq();
+ tsCloseSessionReq.setSessionId(influxCloseSessionReq.getSessionId());
+ return tsCloseSessionReq;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/QueryResultUtils.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/QueryResultUtils.java
index 686dd20b91..325971df25 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/QueryResultUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/QueryResultUtils.java
@@ -19,17 +19,23 @@
package org.apache.iotdb.db.protocol.influxdb.util;
import org.apache.iotdb.db.protocol.influxdb.constant.InfluxConstant;
+import org.apache.iotdb.db.protocol.influxdb.function.InfluxFunctionValue;
import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager;
import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
+import org.apache.iotdb.rpc.IoTDBJDBCDataSet;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.influxdb.InfluxDBException;
import org.influxdb.dto.QueryResult;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -287,4 +293,151 @@ public class QueryResultUtils {
public static boolean checkQueryResultNull(QueryResult queryResult) {
return queryResult.getResults().get(0).getSeries() == null;
}
+
+ public static List<String> getFullPaths(TSExecuteStatementResp tsExecuteStatementResp) {
+ List<String> res = new ArrayList<>();
+ IoTDBJDBCDataSet ioTDBJDBCDataSet = creatIoTJDBCDataset(tsExecuteStatementResp);
+ try {
+ while (ioTDBJDBCDataSet.hasCachedResults()) {
+ ioTDBJDBCDataSet.constructOneRow();
+ String path = ioTDBJDBCDataSet.getValueByName("timeseries");
+ res.add(path);
+ }
+ } catch (StatementExecutionException e) {
+ throw new InfluxDBException(e.getMessage());
+ }
+ return res;
+ }
+
+ public static QueryResult iotdbResultConvertInfluxResult(
+ TSExecuteStatementResp tsExecuteStatementResp,
+ String database,
+ String measurement,
+ Map<String, Integer> fieldOrders) {
+ if (tsExecuteStatementResp == null) {
+ return getNullQueryResult();
+ }
+ // generate series
+ QueryResult.Series series = new QueryResult.Series();
+ series.setName(measurement);
+ // gets the reverse map of the tag
+ Map<String, Integer> tagOrders = InfluxDBMetaManager.getTagOrders(database, measurement);
+ Map<Integer, String> tagOrderReversed =
+ tagOrders.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
+ Map<Integer, String> fieldOrdersReversed =
+ fieldOrders.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
+ int tagSize = tagOrderReversed.size();
+ ArrayList<String> tagList = new ArrayList<>();
+ for (int i = 1; i <= tagSize; i++) {
+ tagList.add(tagOrderReversed.get(i));
+ }
+
+ ArrayList<String> fieldList = new ArrayList<>();
+ for (int i = 1 + tagSize; i < 1 + tagSize + fieldOrders.size(); i++) {
+ fieldList.add(fieldOrdersReversed.get(i));
+ }
+ ArrayList<String> columns = new ArrayList<>();
+ columns.add("time");
+ columns.addAll(tagList);
+ columns.addAll(fieldList);
+ // insert columns into series
+ series.setColumns(columns);
+ List<List<Object>> values = new ArrayList<>();
+ IoTDBJDBCDataSet ioTDBJDBCDataSet = creatIoTJDBCDataset(tsExecuteStatementResp);
+ try {
+ while (ioTDBJDBCDataSet.hasCachedResults()) {
+ Object[] value = new Object[columns.size()];
+ ioTDBJDBCDataSet.constructOneRow();
+ value[0] = Long.valueOf(ioTDBJDBCDataSet.getValueByName("Time"));
+ String deviceName = ioTDBJDBCDataSet.getValueByName("Device");
+ String[] deviceNameList = deviceName.split("\\.");
+ for (int i = 3; i < deviceNameList.length; i++) {
+ if (!deviceNameList[i].equals(InfluxConstant.PLACE_HOLDER)) {
+ value[i - 2] = deviceNameList[i];
+ }
+ }
+ for (int i = 3; i <= ioTDBJDBCDataSet.columnNameList.size(); i++) {
+ Object o = ioTDBJDBCDataSet.getObject(ioTDBJDBCDataSet.findColumnNameByIndex(i));
+ if (o != null) {
+ // insert the value of filed into it
+ value[fieldOrders.get(ioTDBJDBCDataSet.findColumnNameByIndex(i))] = o;
+ }
+ }
+ values.add(Arrays.asList(value));
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ series.setValues(values);
+
+ QueryResult queryResult = new QueryResult();
+ QueryResult.Result result = new QueryResult.Result();
+ result.setSeries(new ArrayList<>(Arrays.asList(series)));
+ queryResult.setResults(new ArrayList<>(Arrays.asList(result)));
+
+ return queryResult;
+ }
+
+ public static List<InfluxFunctionValue> getInfluxFunctionValues(
+ TSExecuteStatementResp tsExecuteStatementResp) {
+ IoTDBJDBCDataSet ioTDBJDBCDataSet = creatIoTJDBCDataset(tsExecuteStatementResp);
+ List<InfluxFunctionValue> result = new ArrayList<>(ioTDBJDBCDataSet.columnSize);
+ try {
+ while (ioTDBJDBCDataSet.hasCachedResults()) {
+ ioTDBJDBCDataSet.constructOneRow();
+ Long timestamp = null;
+ for (String columnName : ioTDBJDBCDataSet.columnNameList) {
+ if ("Time".equals(columnName)) {
+ timestamp = ioTDBJDBCDataSet.getTimestamp(columnName).getTime();
+ continue;
+ }
+ Object o = ioTDBJDBCDataSet.getObject(columnName);
+ result.add(new InfluxFunctionValue(o, timestamp));
+ }
+ }
+ } catch (StatementExecutionException e) {
+ throw new InfluxDBException(e.getMessage());
+ }
+ return result;
+ }
+
+ public static Map<String, Object> getColumnNameAndValue(
+ TSExecuteStatementResp tsExecuteStatementResp) {
+ IoTDBJDBCDataSet ioTDBJDBCDataSet = creatIoTJDBCDataset(tsExecuteStatementResp);
+ Map<String, Object> result = new HashMap<>();
+ try {
+ while (ioTDBJDBCDataSet.hasCachedResults()) {
+ ioTDBJDBCDataSet.constructOneRow();
+ for (String columnName : ioTDBJDBCDataSet.columnNameList) {
+ Object o = ioTDBJDBCDataSet.getObject(columnName);
+ result.put(columnName, o);
+ }
+ }
+ } catch (StatementExecutionException e) {
+ throw new InfluxDBException(e.getMessage());
+ }
+ return result;
+ }
+
+ public static IoTDBJDBCDataSet creatIoTJDBCDataset(
+ TSExecuteStatementResp tsExecuteStatementResp) {
+ return new IoTDBJDBCDataSet(
+ null,
+ tsExecuteStatementResp.getColumns(),
+ tsExecuteStatementResp.getDataTypeList(),
+ tsExecuteStatementResp.columnNameIndexMap,
+ tsExecuteStatementResp.ignoreTimeStamp,
+ tsExecuteStatementResp.queryId,
+ 0,
+ null,
+ 0,
+ tsExecuteStatementResp.queryDataSet,
+ 0,
+ 0,
+ tsExecuteStatementResp.sgColumns,
+ null);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/StringUtils.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/StringUtils.java
index 40d588c979..9cda89a43e 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/StringUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/StringUtils.java
@@ -72,6 +72,17 @@ public class StringUtils {
return tmpList[tmpList.length - 1];
}
+ /**
+ * get the devicePath through the fullPath
+ *
+ * @param path path to process
+ * @return devicePath
+ */
+ public static String getDeviceByPath(String path) {
+ String field = getFieldByPath(path);
+ return path.substring(0, path.length() - field.length() - 1);
+ }
+
/**
* determine whether the two string lists are the same
*
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InfluxDBRPCService.java b/server/src/main/java/org/apache/iotdb/db/service/InfluxDBRPCService.java
index 4f3ffe3ccb..3930e27fe2 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InfluxDBRPCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InfluxDBRPCService.java
@@ -27,11 +27,13 @@ import org.apache.iotdb.commons.service.ThriftServiceThread;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.thrift.handler.InfluxDBServiceThriftHandler;
-import org.apache.iotdb.db.service.thrift.impl.InfluxDBServiceImpl;
+import org.apache.iotdb.db.service.thrift.impl.ClientRPCServiceImpl;
+import org.apache.iotdb.db.service.thrift.impl.IInfluxDBServiceWithHandler;
+import org.apache.iotdb.db.service.thrift.impl.NewInfluxDBServiceImpl;
import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxDBService.Processor;
public class InfluxDBRPCService extends ThriftService implements InfluxDBRPCServiceMBean {
- private InfluxDBServiceImpl impl;
+ private IInfluxDBServiceWithHandler impl;
public static InfluxDBRPCService getInstance() {
return InfluxDBServiceHolder.INSTANCE;
@@ -40,17 +42,25 @@ public class InfluxDBRPCService extends ThriftService implements InfluxDBRPCServ
@Override
public void initTProcessor()
throws ClassNotFoundException, IllegalAccessException, InstantiationException {
- impl =
- (InfluxDBServiceImpl)
- Class.forName(IoTDBDescriptor.getInstance().getConfig().getInfluxDBImplClassName())
- .newInstance();
+ if (IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getRpcImplClassName()
+ .equals(ClientRPCServiceImpl.class.getName())) {
+ impl =
+ (IInfluxDBServiceWithHandler)
+ Class.forName(NewInfluxDBServiceImpl.class.getName()).newInstance();
+ } else {
+ impl =
+ (IInfluxDBServiceWithHandler)
+ Class.forName(IoTDBDescriptor.getInstance().getConfig().getInfluxDBImplClassName())
+ .newInstance();
+ }
initSyncedServiceImpl(null);
processor = new Processor<>(impl);
}
@Override
- public void initThriftServiceThread()
- throws IllegalAccessException, InstantiationException, ClassNotFoundException {
+ public void initThriftServiceThread() throws IllegalAccessException {
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
try {
thriftServiceThread =
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java
index 75ad2c8a30..031dcb1e49 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.service.thrift.handler;
-import org.apache.iotdb.db.service.thrift.impl.InfluxDBServiceImpl;
+import org.apache.iotdb.db.service.thrift.impl.IInfluxDBServiceWithHandler;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.ServerContext;
@@ -27,10 +27,10 @@ import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.transport.TTransport;
public class InfluxDBServiceThriftHandler implements TServerEventHandler {
- private final InfluxDBServiceImpl influxDBServiceImpl;
+ private final IInfluxDBServiceWithHandler impl;
- public InfluxDBServiceThriftHandler(InfluxDBServiceImpl influxDBServiceImpl) {
- this.influxDBServiceImpl = influxDBServiceImpl;
+ public InfluxDBServiceThriftHandler(IInfluxDBServiceWithHandler impl) {
+ this.impl = impl;
}
@Override
@@ -48,7 +48,7 @@ public class InfluxDBServiceThriftHandler implements TServerEventHandler {
public void deleteContext(
ServerContext serverContext, TProtocol tProtocol, TProtocol tProtocol1) {
// release resources.
- influxDBServiceImpl.handleClientExit();
+ impl.handleClientExit();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/IInfluxDBServiceWithHandler.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/IInfluxDBServiceWithHandler.java
new file mode 100644
index 0000000000..d71e7570a0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/IInfluxDBServiceWithHandler.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.service.thrift.impl;
+
+import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxDBService;
+
+public interface IInfluxDBServiceWithHandler extends InfluxDBService.Iface {
+ void handleClientExit();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java
index dee86edde3..89e5429dd1 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java
@@ -25,8 +25,10 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.protocol.influxdb.dto.IoTDBPoint;
+import org.apache.iotdb.db.protocol.influxdb.handler.AbstractQueryHandler;
import org.apache.iotdb.db.protocol.influxdb.handler.QueryHandler;
import org.apache.iotdb.db.protocol.influxdb.input.InfluxLineParser;
+import org.apache.iotdb.db.protocol.influxdb.meta.AbstractInfluxDBMetaManager;
import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager;
import org.apache.iotdb.db.protocol.influxdb.operator.InfluxQueryOperator;
import org.apache.iotdb.db.protocol.influxdb.sql.InfluxDBLogicalGenerator;
@@ -41,7 +43,6 @@ import org.apache.iotdb.db.service.basic.ServiceProvider;
import org.apache.iotdb.db.utils.DataTypeUtils;
import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxCloseSessionReq;
import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxCreateDatabaseReq;
-import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxDBService;
import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxOpenSessionReq;
import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxOpenSessionResp;
import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxQueryReq;
@@ -60,14 +61,17 @@ import org.influxdb.dto.Point;
import java.util.ArrayList;
import java.util.List;
-public class InfluxDBServiceImpl implements InfluxDBService.Iface {
+public class InfluxDBServiceImpl implements IInfluxDBServiceWithHandler {
private final SessionManager SESSION_MANAGER = SessionManager.getInstance();
- private final InfluxDBMetaManager metaManager;
+ private final AbstractInfluxDBMetaManager metaManager;
+
+ private final AbstractQueryHandler queryHandler;
public InfluxDBServiceImpl() {
metaManager = InfluxDBMetaManager.getInstance();
+ queryHandler = new QueryHandler();
}
@Override
@@ -100,7 +104,8 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
int executeCode = TSStatusCode.SUCCESS_STATUS.getStatusCode();
for (Point point :
InfluxLineParser.parserRecordsToPointsWithPrecision(req.lineProtocol, req.precision)) {
- IoTDBPoint iotdbPoint = new IoTDBPoint(req.database, point, metaManager);
+ IoTDBPoint iotdbPoint = new IoTDBPoint(req.database, point, metaManager, req.sessionId);
+
try {
InsertRowPlan plan = iotdbPoint.convertToInsertRowPlan();
InfluxTSStatus tsStatus = executeNonQueryPlan(plan, req.sessionId);
@@ -121,7 +126,7 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
}
@Override
- public InfluxTSStatus createDatabase(InfluxCreateDatabaseReq req) throws TException {
+ public InfluxTSStatus createDatabase(InfluxCreateDatabaseReq req) {
if (!SESSION_MANAGER.checkLogin(req.sessionId)) {
return getNotLoggedInStatus();
}
@@ -145,11 +150,12 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
@Override
public InfluxQueryResultRsp query(InfluxQueryReq req) throws TException {
Operator operator = InfluxDBLogicalGenerator.generate(req.command);
- QueryHandler.checkInfluxDBQueryOperator(operator);
- return QueryHandler.queryInfluxDB(
+ queryHandler.checkInfluxDBQueryOperator(operator);
+ return queryHandler.queryInfluxDB(
req.database, (InfluxQueryOperator) operator, req.sessionId, IoTDB.serviceProvider);
}
+ @Override
public void handleClientExit() {
Long sessionId = ServiceProvider.SESSION_MANAGER.getCurrSessionId();
if (sessionId != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/NewInfluxDBServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/NewInfluxDBServiceImpl.java
new file mode 100644
index 0000000000..422bc27fd4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/NewInfluxDBServiceImpl.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.service.thrift.impl;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.protocol.influxdb.dto.IoTDBPoint;
+import org.apache.iotdb.db.protocol.influxdb.handler.AbstractQueryHandler;
+import org.apache.iotdb.db.protocol.influxdb.handler.NewQueryHandler;
+import org.apache.iotdb.db.protocol.influxdb.input.InfluxLineParser;
+import org.apache.iotdb.db.protocol.influxdb.meta.AbstractInfluxDBMetaManager;
+import org.apache.iotdb.db.protocol.influxdb.meta.NewInfluxDBMetaManager;
+import org.apache.iotdb.db.protocol.influxdb.operator.InfluxQueryOperator;
+import org.apache.iotdb.db.protocol.influxdb.sql.InfluxDBLogicalGenerator;
+import org.apache.iotdb.db.protocol.influxdb.util.InfluxReqAndRespUtils;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.DataTypeUtils;
+import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxCloseSessionReq;
+import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxCreateDatabaseReq;
+import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxOpenSessionReq;
+import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxOpenSessionResp;
+import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxQueryReq;
+import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxQueryResultRsp;
+import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxTSStatus;
+import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxWritePointsReq;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+
+import org.apache.thrift.TException;
+import org.influxdb.InfluxDBException;
+import org.influxdb.dto.Point;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class NewInfluxDBServiceImpl implements IInfluxDBServiceWithHandler {
+
+ private static final ClientRPCServiceImpl clientRPCService = new ClientRPCServiceImpl();
+
+ private final AbstractInfluxDBMetaManager metaManager;
+
+ private final AbstractQueryHandler queryHandler;
+
+ public NewInfluxDBServiceImpl() {
+ metaManager = NewInfluxDBMetaManager.getInstance();
+ queryHandler = new NewQueryHandler();
+ }
+
+ public static ClientRPCServiceImpl getClientRPCService() {
+ return clientRPCService;
+ }
+
+ @Override
+ public InfluxOpenSessionResp openSession(InfluxOpenSessionReq req) throws TException {
+ TSOpenSessionReq tsOpenSessionReq = InfluxReqAndRespUtils.convertOpenSessionReq(req);
+ TSOpenSessionResp tsOpenSessionResp = clientRPCService.openSession(tsOpenSessionReq);
+ return InfluxReqAndRespUtils.convertOpenSessionResp(tsOpenSessionResp);
+ }
+
+ @Override
+ public InfluxTSStatus closeSession(InfluxCloseSessionReq req) {
+ TSCloseSessionReq tsCloseSessionReq = InfluxReqAndRespUtils.convertCloseSessionReq(req);
+ TSStatus tsStatus = clientRPCService.closeSession(tsCloseSessionReq);
+ return DataTypeUtils.RPCStatusToInfluxDBTSStatus(tsStatus);
+ }
+
+ @Override
+ public InfluxTSStatus writePoints(InfluxWritePointsReq req) {
+ List<InfluxTSStatus> tsStatusList = new ArrayList<>();
+ int executeCode = TSStatusCode.SUCCESS_STATUS.getStatusCode();
+ for (Point point :
+ InfluxLineParser.parserRecordsToPointsWithPrecision(req.lineProtocol, req.precision)) {
+ IoTDBPoint iotdbPoint = new IoTDBPoint(req.database, point, metaManager, req.sessionId);
+ try {
+ TSInsertRecordReq insertRecordReq = iotdbPoint.convertToTSInsertRecordReq(req.sessionId);
+ TSStatus tsStatus = clientRPCService.insertRecord(insertRecordReq);
+ tsStatusList.add(DataTypeUtils.RPCStatusToInfluxDBTSStatus(tsStatus));
+ } catch (IoTDBConnectionException e) {
+ throw new InfluxDBException(e.getMessage());
+ }
+ }
+ return new InfluxTSStatus().setCode(executeCode).setSubStatus(tsStatusList);
+ }
+
+ @Override
+ public InfluxTSStatus createDatabase(InfluxCreateDatabaseReq req) {
+ TSStatus tsStatus =
+ clientRPCService.setStorageGroup(req.sessionId, "root." + req.getDatabase());
+ if (tsStatus.getCode() == TSStatusCode.STORAGE_GROUP_ALREADY_EXISTS.getStatusCode()) {
+ tsStatus.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ tsStatus.setMessage("Execute successfully");
+ }
+ return DataTypeUtils.RPCStatusToInfluxDBTSStatus(tsStatus);
+ }
+
+ @Override
+ public InfluxQueryResultRsp query(InfluxQueryReq req) throws TException {
+ Operator operator = InfluxDBLogicalGenerator.generate(req.command);
+ queryHandler.checkInfluxDBQueryOperator(operator);
+ return queryHandler.queryInfluxDB(
+ req.database, (InfluxQueryOperator) operator, req.sessionId, IoTDB.serviceProvider);
+ }
+
+ @Override
+ public void handleClientExit() {
+ clientRPCService.handleClientExit();
+ }
+}