You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/14 08:20:13 UTC
[iotdb] 01/01: add DataNodeTSIServiceImpl
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 01a35ea6d42ad114f13314ef57fba85174d4e820
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu Apr 14 16:19:52 2022 +0800
add DataNodeTSIServiceImpl
---
.../org/apache/iotdb/db/qp/sql/SqlLexer.tokens | 245 +++++++
.../iotdb/commons/service/ThriftService.java | 8 +-
.../resources/conf/iotdb-engine.properties | 3 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 3 +
.../iotdb/db/mpp/common/MPPQueryContext.java | 10 +-
.../iotdb/db/mpp/common/header/DatasetHeader.java | 13 +-
.../iotdb/db/mpp/execution/ConfigExecution.java | 44 +-
.../apache/iotdb/db/mpp/execution/Coordinator.java | 18 +-
.../iotdb/db/mpp/execution/IQueryExecution.java | 13 +
.../iotdb/db/mpp/execution/QueryExecution.java | 28 +-
.../apache/iotdb/db/mpp/sql/analyze/Analyzer.java | 334 +++++-----
.../org/apache/iotdb/db/service/RPCService.java | 6 +-
.../thrift/impl/DataNodeTSIServiceImpl.java | 724 +++++++++++++++++++++
.../db/service/thrift/impl/TSServiceImpl.java | 364 +----------
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 4 +-
15 files changed, 1261 insertions(+), 556 deletions(-)
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.tokens b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.tokens
new file mode 100644
index 0000000000..30a7c1b297
--- /dev/null
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.tokens
@@ -0,0 +1,245 @@
+WS=1
+ADD=2
+AFTER=3
+ALIAS=4
+ALIGN=5
+ALIGNED=6
+ALL=7
+ALTER=8
+ANY=9
+APPEND=10
+AS=11
+ASC=12
+ATTRIBUTES=13
+AUTOREGISTER=14
+BEFORE=15
+BEGIN=16
+BOUNDARY=17
+BY=18
+CACHE=19
+CHILD=20
+CLEAR=21
+COMPRESSION=22
+COMPRESSOR=23
+CONCAT=24
+CONFIGURATION=25
+CONTINUOUS=26
+COUNT=27
+CONTAIN=28
+CQ=29
+CQS=30
+CREATE=31
+DATATYPE=32
+DEBUG=33
+DELETE=34
+DESC=35
+DESCRIBE=36
+DEVICE=37
+DEVICES=38
+DISABLE=39
+DROP=40
+ENCODING=41
+END=42
+EVERY=43
+EXPLAIN=44
+FILL=45
+FLUSH=46
+FOR=47
+FROM=48
+FULL=49
+FUNCTION=50
+FUNCTIONS=51
+GLOBAL=52
+GRANT=53
+GROUP=54
+INDEX=55
+INFO=56
+INSERT=57
+INTO=58
+KILL=59
+LABEL=60
+LAST=61
+LATEST=62
+LEVEL=63
+LIKE=64
+LIMIT=65
+LINEAR=66
+LINK=67
+LIST=68
+LOAD=69
+LOCK=70
+MERGE=71
+METADATA=72
+NODES=73
+NOW=74
+OF=75
+OFF=76
+OFFSET=77
+ON=78
+ORDER=79
+PARTITION=80
+PASSWORD=81
+PATHS=82
+PREVIOUS=83
+PREVIOUSUNTILLAST=84
+PRIVILEGES=85
+PROCESSLIST=86
+PROPERTY=87
+PRUNE=88
+QUERIES=89
+QUERY=90
+READONLY=91
+REGEXP=92
+REMOVE=93
+RENAME=94
+RESAMPLE=95
+RESOURCE=96
+REVOKE=97
+ROLE=98
+ROOT=99
+SCHEMA=100
+SELECT=101
+SET=102
+SETTLE=103
+SGLEVEL=104
+SHOW=105
+SLIMIT=106
+SOFFSET=107
+STORAGE=108
+START=109
+STOP=110
+SYSTEM=111
+TAGS=112
+TASK=113
+TEMPLATE=114
+TEMPLATES=115
+TIME=116
+TIMESERIES=117
+TIMESTAMP=118
+TO=119
+TOLERANCE=120
+TOP=121
+TRACING=122
+TRIGGER=123
+TRIGGERS=124
+TTL=125
+UNLINK=126
+UNLOAD=127
+UNSET=128
+UPDATE=129
+UPSERT=130
+USER=131
+USING=132
+VALUES=133
+VERIFY=134
+VERSION=135
+WATERMARK_EMBEDDING=136
+WHERE=137
+WITH=138
+WITHOUT=139
+WRITABLE=140
+DATATYPE_VALUE=141
+BOOLEAN=142
+DOUBLE=143
+FLOAT=144
+INT32=145
+INT64=146
+TEXT=147
+ENCODING_VALUE=148
+DICTIONARY=149
+DIFF=150
+GORILLA=151
+PLAIN=152
+REGULAR=153
+RLE=154
+TS_2DIFF=155
+ZIGZAG=156
+FREQ=157
+COMPRESSOR_VALUE=158
+GZIP=159
+LZ4=160
+SNAPPY=161
+UNCOMPRESSED=162
+PRIVILEGE_VALUE=163
+SET_STORAGE_GROUP=164
+DELETE_STORAGE_GROUP=165
+CREATE_TIMESERIES=166
+INSERT_TIMESERIES=167
+READ_TIMESERIES=168
+DELETE_TIMESERIES=169
+CREATE_USER=170
+DELETE_USER=171
+MODIFY_PASSWORD=172
+LIST_USER=173
+GRANT_USER_PRIVILEGE=174
+REVOKE_USER_PRIVILEGE=175
+GRANT_USER_ROLE=176
+REVOKE_USER_ROLE=177
+CREATE_ROLE=178
+DELETE_ROLE=179
+LIST_ROLE=180
+GRANT_ROLE_PRIVILEGE=181
+REVOKE_ROLE_PRIVILEGE=182
+CREATE_FUNCTION=183
+DROP_FUNCTION=184
+CREATE_TRIGGER=185
+DROP_TRIGGER=186
+START_TRIGGER=187
+STOP_TRIGGER=188
+CREATE_CONTINUOUS_QUERY=189
+DROP_CONTINUOUS_QUERY=190
+MINUS=191
+PLUS=192
+DIV=193
+MOD=194
+OPERATOR_DEQ=195
+OPERATOR_SEQ=196
+OPERATOR_GT=197
+OPERATOR_GTE=198
+OPERATOR_LT=199
+OPERATOR_LTE=200
+OPERATOR_NEQ=201
+OPERATOR_IN=202
+OPERATOR_AND=203
+OPERATOR_OR=204
+OPERATOR_NOT=205
+OPERATOR_CONTAINS=206
+DOT=207
+COMMA=208
+SEMI=209
+STAR=210
+DOUBLE_STAR=211
+LR_BRACKET=212
+RR_BRACKET=213
+LS_BRACKET=214
+RS_BRACKET=215
+STRING_LITERAL=216
+DURATION_LITERAL=217
+DATETIME_LITERAL=218
+INTEGER_LITERAL=219
+EXPONENT_NUM_PART=220
+BOOLEAN_LITERAL=221
+NULL_LITERAL=222
+NAN_LITERAL=223
+ID=224
+QUTOED_ID_IN_NODE_NAME=225
+QUTOED_ID=226
+'-'=191
+'+'=192
+'/'=193
+'%'=194
+'=='=195
+'='=196
+'>'=197
+'>='=198
+'<'=199
+'<='=200
+'.'=207
+','=208
+';'=209
+'*'=210
+'**'=211
+'('=212
+')'=213
+'['=214
+']'=215
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java
index e792d88c81..30d75a94eb 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java
@@ -26,6 +26,7 @@ import org.apache.thrift.TProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.CountDownLatch;
public abstract class ThriftService implements IService {
@@ -88,7 +89,8 @@ public abstract class ThriftService implements IService {
}
public abstract void initTProcessor()
- throws ClassNotFoundException, IllegalAccessException, InstantiationException;
+ throws ClassNotFoundException, IllegalAccessException, InstantiationException,
+ NoSuchMethodException, InvocationTargetException;
public abstract void initThriftServiceThread()
throws IllegalAccessException, InstantiationException, ClassNotFoundException;
@@ -125,7 +127,9 @@ public abstract class ThriftService implements IService {
} catch (InterruptedException
| ClassNotFoundException
| IllegalAccessException
- | InstantiationException e) {
+ | InstantiationException
+ | NoSuchMethodException
+ | InvocationTargetException e) {
Thread.currentThread().interrupt();
throw new StartupException(this.getID().getName(), e.getMessage());
}
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 8a6673dd85..c3b4f4fe2e 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -27,6 +27,9 @@ rpc_address=0.0.0.0
# Datatype: int
rpc_port=6667
+# Client RPC Service Impl Class Name: TSServiceImpl or DataNodeTSIServiceImpl
+# rpc_class_name=TSServiceImpl
+
# Datatype: int
mpp_port=7777
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 5ed958b579..1eeab2bfd4 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -167,6 +167,9 @@ public class IoTDBDescriptor {
Integer.parseInt(
properties.getProperty("rpc_port", Integer.toString(conf.getRpcPort()))));
+ conf.setRpcImplClassName(
+ properties.getProperty("rpc_class_name", conf.getRpcImplClassName()).trim());
+
conf.setMppPort(
Integer.parseInt(
properties.getProperty("mpp_port", Integer.toString(conf.getRpcPort()))));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index def89dba39..b449bdf9e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -29,7 +29,7 @@ public class MPPQueryContext {
private String sql;
private QueryId queryId;
private SessionInfo session;
- private QueryType queryType;
+ private QueryType queryType = QueryType.READ;
private Endpoint hostEndpoint;
private ResultNodeContext resultNodeContext;
@@ -38,12 +38,10 @@ public class MPPQueryContext {
this.queryId = queryId;
}
- public MPPQueryContext(
- String sql, QueryId queryId, SessionInfo session, QueryType type, Endpoint hostEndpoint) {
+ public MPPQueryContext(String sql, QueryId queryId, SessionInfo session, Endpoint hostEndpoint) {
this.sql = sql;
this.queryId = queryId;
this.session = session;
- this.queryType = type;
this.hostEndpoint = hostEndpoint;
this.resultNodeContext = new ResultNodeContext(queryId);
}
@@ -56,6 +54,10 @@ public class MPPQueryContext {
return queryType;
}
+ public void setQueryType(QueryType queryType) {
+ this.queryType = queryType;
+ }
+
public Endpoint getHostEndpoint() {
return hostEndpoint;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeader.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeader.java
index 32a236feae..ee3e52d218 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeader.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeader.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.db.mpp.common.header;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-
import com.google.common.primitives.Bytes;
import java.util.*;
@@ -51,10 +49,6 @@ public class DatasetHeader {
return isIgnoreTimestamp;
}
- public Map<String, Integer> getColumnToTsBlockIndexMap() {
- return columnToTsBlockIndexMap;
- }
-
public void setColumnToTsBlockIndexMap(List<String> outputColumnNames) {
this.columnToTsBlockIndexMap = new HashMap<>();
for (int i = 0; i < outputColumnNames.size(); i++) {
@@ -66,8 +60,11 @@ public class DatasetHeader {
return columnHeaders.stream().map(ColumnHeader::getColumnName).collect(Collectors.toList());
}
- public List<TSDataType> getRespDataTypeList() {
- return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
+ public List<String> getRespDataTypeList() {
+ return columnHeaders.stream()
+ .map(ColumnHeader::getColumnType)
+ .map(Objects::toString)
+ .collect(Collectors.toList());
}
public List<Byte> getRespAliasColumns() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/ConfigExecution.java
index 3d8b5dcdd5..c9cb515e71 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/ConfigExecution.java
@@ -20,15 +20,19 @@
package org.apache.iotdb.db.mpp.execution;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import jersey.repackaged.com.google.common.util.concurrent.SettableFuture;
+import org.jetbrains.annotations.NotNull;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -37,12 +41,12 @@ import static com.google.common.base.Throwables.throwIfInstanceOf;
public class ConfigExecution implements IQueryExecution {
- private MPPQueryContext context;
- private Statement statement;
- private ExecutorService executor;
+ private final MPPQueryContext context;
+ private final Statement statement;
+ private final ExecutorService executor;
- private QueryStateMachine stateMachine;
- private SettableFuture<Boolean> result;
+ private final QueryStateMachine stateMachine;
+ private final SettableFuture<Boolean> result;
public ConfigExecution(MPPQueryContext context, Statement statement, ExecutorService executor) {
this.context = context;
@@ -67,7 +71,7 @@ public class ConfigExecution implements IQueryExecution {
}
@Override
- public void onFailure(Throwable throwable) {
+ public void onFailure(@NotNull Throwable throwable) {
fail(throwable);
}
},
@@ -101,6 +105,34 @@ public class ConfigExecution implements IQueryExecution {
}
}
+ @Override
+ public TsBlock getBatchResult() {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public boolean hasNextResult() {
+ return false;
+ }
+
+ @Override
+ public int getOutputValueColumnCount() {
+ // TODO
+ return 0;
+ }
+
+ @Override
+ public DatasetHeader getDatasetHeader() {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public boolean isQuery() {
+ return context.getQueryType() == QueryType.READ;
+ }
+
// TODO: consider a more suitable implementation for it
// Generate the corresponding IConfigTask by statement.
// Each type of statement will has a ConfigTask
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 4749394259..784b2b5999 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -24,9 +24,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.common.SessionInfo;
-import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.commons.lang3.Validate;
@@ -50,12 +48,12 @@ public class Coordinator {
IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
IoTDBDescriptor.getInstance().getConfig().getMppPort());
- private ExecutorService executor;
- private ScheduledExecutorService scheduledExecutor;
+ private final ExecutorService executor;
+ private final ScheduledExecutorService scheduledExecutor;
private static final Coordinator INSTANCE = new Coordinator();
- private ConcurrentHashMap<QueryId, QueryExecution> queryExecutionMap;
+ private final ConcurrentHashMap<QueryId, IQueryExecution> queryExecutionMap;
private Coordinator() {
this.queryExecutionMap = new ConcurrentHashMap<>();
@@ -68,11 +66,11 @@ public class Coordinator {
}
public ExecutionResult execute(
- Statement statement, QueryId queryId, QueryType queryType, SessionInfo session, String sql) {
+ Statement statement, QueryId queryId, SessionInfo session, String sql) {
QueryExecution execution =
createQueryExecution(
- statement, new MPPQueryContext(sql, queryId, session, queryType, getHostEndpoint()));
+ statement, new MPPQueryContext(sql, queryId, session, getHostEndpoint()));
queryExecutionMap.put(queryId, execution);
execution.start();
@@ -80,10 +78,10 @@ public class Coordinator {
return execution.getStatus();
}
- public TsBlock getResultSet(QueryId queryId) {
- QueryExecution execution = queryExecutionMap.get(queryId);
+ public IQueryExecution getQueryExecution(QueryId queryId) {
+ IQueryExecution execution = queryExecutionMap.get(queryId);
Validate.notNull(execution, "invalid queryId %s", queryId.getId());
- return execution.getBatchResult();
+ return execution;
}
// TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
index 3f9fa05ea8..2e0bd9c76d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.db.mpp.execution;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
public interface IQueryExecution {
void start();
@@ -26,4 +29,14 @@ public interface IQueryExecution {
void stop();
ExecutionResult getStatus();
+
+ TsBlock getBatchResult();
+
+ boolean hasNextResult();
+
+ int getOutputValueColumnCount();
+
+ DatasetHeader getDatasetHeader();
+
+ boolean isQuery();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index a5dbeaf21f..885e822ea3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -21,10 +21,12 @@ package org.apache.iotdb.db.mpp.execution;
import org.apache.iotdb.db.mpp.buffer.DataBlockService;
import org.apache.iotdb.db.mpp.buffer.ISourceHandle;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
import org.apache.iotdb.db.mpp.execution.scheduler.IScheduler;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.analyze.Analyzer;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.optimization.PlanOptimizer;
import org.apache.iotdb.db.mpp.sql.planner.DistributionPlanner;
import org.apache.iotdb.db.mpp.sql.planner.LogicalPlanner;
@@ -55,18 +57,18 @@ import static com.google.common.base.Throwables.throwIfUnchecked;
* corresponding physical nodes. 3. Collect and monitor the progress/states of this query.
*/
public class QueryExecution implements IQueryExecution {
- private MPPQueryContext context;
+ private final MPPQueryContext context;
private IScheduler scheduler;
- private QueryStateMachine stateMachine;
+ private final QueryStateMachine stateMachine;
- private List<PlanOptimizer> planOptimizers;
+ private final List<PlanOptimizer> planOptimizers;
private final Analysis analysis;
private LogicalQueryPlan logicalPlan;
private DistributedQueryPlan distributedPlan;
- private ExecutorService executor;
- private ScheduledExecutorService scheduledExecutor;
+ private final ExecutorService executor;
+ private final ScheduledExecutorService scheduledExecutor;
// The result of QueryExecution will be written to the DataBlockManager in current Node.
// We use this SourceHandle to fetch the TsBlock from it.
@@ -153,6 +155,7 @@ public class QueryExecution implements IQueryExecution {
* DataStreamManager use the virtual ResultOperator's ID (This part will be designed and
* implemented with DataStreamManager)
*/
+ @Override
public TsBlock getBatchResult() {
try {
initResultHandle();
@@ -171,6 +174,7 @@ public class QueryExecution implements IQueryExecution {
}
/** @return true if there is more tsblocks, otherwise false */
+ @Override
public boolean hasNextResult() {
try {
initResultHandle();
@@ -182,9 +186,14 @@ public class QueryExecution implements IQueryExecution {
}
/** return the result column count without the time column */
+ @Override
public int getOutputValueColumnCount() {
- // TODO need return the actual size while there exists output columns in Analysis
- return 1;
+ return analysis.getRespDatasetHeader().getColumnHeaders().size();
+ }
+
+ @Override
+ public DatasetHeader getDatasetHeader() {
+ return analysis.getRespDatasetHeader();
}
/**
@@ -242,4 +251,9 @@ public class QueryExecution implements IQueryExecution {
public LogicalQueryPlan getLogicalPlan() {
return logicalPlan;
}
+
+ @Override
+ public boolean isQuery() {
+ return context.getQueryType() == QueryType.READ;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
index 5eca107403..29090cf0bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
@@ -173,6 +173,7 @@ public class Analyzer {
@Override
public Analysis visitInsert(InsertStatement insertStatement, MPPQueryContext context) {
// TODO: do analyze for insert statement
+ context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
analysis.setStatement(insertStatement);
return analysis;
@@ -234,6 +235,7 @@ public class Analyzer {
@Override
public Analysis visitInsertTablet(
InsertTabletStatement insertTabletStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
dataPartitionQueryParam.setTimePartitionSlotList(
insertTabletStatement.getTimePartitionSlots());
@@ -264,172 +266,9 @@ public class Analyzer {
return analysis;
}
- @Override
- public Analysis visitShowTimeSeries(
- ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext context) {
- SchemaPartition schemaPartitionInfo =
- partitionFetcher.fetchSchemaPartitionInfo(
- showTimeSeriesStatement.getPathPattern().getDevice());
- Analysis analysis = new Analysis();
- analysis.setStatement(showTimeSeriesStatement);
- analysis.setSchemaPartitionInfo(schemaPartitionInfo);
- return analysis;
- }
-
- @Override
- public Analysis visitShowDevices(
- ShowDevicesStatement showDevicesStatement, MPPQueryContext context) {
- SchemaPartition schemaPartitionInfo =
- partitionFetcher.fetchSchemaPartitionInfo(
- showDevicesStatement.getPathPattern().getFullPath());
- Analysis analysis = new Analysis();
- analysis.setStatement(showDevicesStatement);
- analysis.setSchemaPartitionInfo(schemaPartitionInfo);
- return analysis;
- }
-
- @Override
- public Analysis visitCreateUser(AuthorStatement authorStatement, MPPQueryContext context) {
- Analysis analysis = new Analysis();
- analysis.setStatement(authorStatement);
- return analysis;
- }
-
- @Override
- public Analysis visitCreateRole(AuthorStatement authorStatement, MPPQueryContext context) {
- Analysis analysis = new Analysis();
- analysis.setStatement(authorStatement);
- return analysis;
- }
-
- @Override
- public Analysis visitAlterUser(AuthorStatement authorStatement, MPPQueryContext context) {
- Analysis analysis = new Analysis();
- analysis.setStatement(authorStatement);
- return analysis;
- }
-
- @Override
- public Analysis visitGrantUser(AuthorStatement authorStatement, MPPQueryContext context) {
- Analysis analysis = new Analysis();
- analysis.setStatement(authorStatement);
- return analysis;
- }
-
- @Override
- public Analysis visitGrantRole(AuthorStatement authorStatement, MPPQueryContext context) {
- Analysis analysis = new Analysis();
- analysis.setStatement(authorStatement);
- return analysis;
- }
-
- @Override
- public Analysis visitGrantRoleToUser(AuthorStatement authorStatement, MPPQueryContext context) {
- Analysis analysis = new Analysis();
- analysis.setStatement(authorStatement);
- return analysis;
- }
-
- @Override
- public Analysis visitRevokeUser(AuthorStatement authorStatement, MPPQueryContext context) {
- Analysis analysis = new Analysis();
- analysis.setStatement(authorStatement);
- return analysis;
- }
-
- @Override
- public Analysis visitRevokeRole(AuthorStatement authorStatement, MPPQueryContext context) {
- Analysis analysis = new Analysis();
- analysis.setStatement(authorStatement);
- return analysis;
- }
-
- @Override
- public Analysis visitRevokeRoleFromUser(
- AuthorStatement authorStatement, MPPQueryContext context) {
- Analysis analysis = new Analysis();
- analysis.setStatement(authorStatement);
- return analysis;
- }
-
- @Override
- public Analysis visitDropUser(AuthorStatement authorStatement, MPPQueryContext context) {
- Analysis analysis = new Analysis();
- analysis.setStatement(authorStatement);
- return analysis;
- }
-
- @Override
- public Analysis visitDropRole(AuthorStatement authorStatement, MPPQueryContext context) {
- Analysis analysis = new Analysis();
- analysis.setStatement(authorStatement);
- return analysis;
- }
-
- @Override
- public Analysis visitListUser(AuthorStatement authorStatement, MPPQueryContext context) {
- Analysis analysis = new Analysis();
- analysis.setStatement(authorStatement);
- return analysis;
- }
-
- @Override
- public Analysis visitListRole(AuthorStatement authorStatement, MPPQueryContext context) {
- Analysis analysis = new Analysis();
- analysis.setStatement(authorStatement);
- return analysis;
- }
-
- @Override
- public Analysis visitListPrivilegesUser(
- AuthorStatement authorStatement, MPPQueryContext context) {
- Analysis analysis = new Analysis();
- analysis.setStatement(authorStatement);
- return analysis;
- }
-
- @Override
- public Analysis visitListPrivilegesRole(
- AuthorStatement authorStatement, MPPQueryContext context) {
- Analysis analysis = new Analysis();
- analysis.setStatement(authorStatement);
- return analysis;
- }
-
- @Override
- public Analysis visitListUserPrivileges(
- AuthorStatement authorStatement, MPPQueryContext context) {
- Analysis analysis = new Analysis();
- analysis.setStatement(authorStatement);
- return analysis;
- }
-
- @Override
- public Analysis visitListRolePrivileges(
- AuthorStatement authorStatement, MPPQueryContext context) {
- Analysis analysis = new Analysis();
- analysis.setStatement(authorStatement);
- return analysis;
- }
-
- @Override
- public Analysis visitListAllRoleOfUser(
- AuthorStatement authorStatement, MPPQueryContext context) {
- Analysis analysis = new Analysis();
- analysis.setStatement(authorStatement);
- return analysis;
- }
-
- @Override
- public Analysis visitListAllUserOfRole(
- AuthorStatement authorStatement, MPPQueryContext context) {
- Analysis analysis = new Analysis();
- analysis.setStatement(authorStatement);
- return analysis;
- }
-
@Override
public Analysis visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath());
dataPartitionQueryParam.setTimePartitionSlotList(insertRowStatement.getTimePartitionSlots());
@@ -470,6 +309,7 @@ public class Analyzer {
public Analysis visitInsertRows(
InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
// TODO remove duplicate
+ context.setQueryType(QueryType.WRITE);
List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
for (InsertRowStatement insertRowStatement :
insertRowsStatement.getInsertRowStatementList()) {
@@ -523,6 +363,7 @@ public class Analyzer {
@Override
public Analysis visitInsertMultiTablets(
InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
// TODO remove duplicate
List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
for (InsertTabletStatement insertTabletStatement :
@@ -570,6 +411,7 @@ public class Analyzer {
@Override
public Analysis visitInsertRowsOfOneDevice(
InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
dataPartitionQueryParam.setDevicePath(
insertRowsOfOneDeviceStatement.getDevicePath().getFullPath());
@@ -615,5 +457,169 @@ public class Analyzer {
return analysis;
}
+
+ @Override
+ public Analysis visitShowTimeSeries(
+ ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext context) {
+ SchemaPartition schemaPartitionInfo =
+ partitionFetcher.fetchSchemaPartitionInfo(
+ showTimeSeriesStatement.getPathPattern().getDevice());
+ Analysis analysis = new Analysis();
+ analysis.setStatement(showTimeSeriesStatement);
+ analysis.setSchemaPartitionInfo(schemaPartitionInfo);
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitShowDevices(
+ ShowDevicesStatement showDevicesStatement, MPPQueryContext context) {
+ SchemaPartition schemaPartitionInfo =
+ partitionFetcher.fetchSchemaPartitionInfo(
+ showDevicesStatement.getPathPattern().getFullPath());
+ Analysis analysis = new Analysis();
+ analysis.setStatement(showDevicesStatement);
+ analysis.setSchemaPartitionInfo(schemaPartitionInfo);
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitCreateUser(AuthorStatement authorStatement, MPPQueryContext context) {
+ Analysis analysis = new Analysis();
+ analysis.setStatement(authorStatement);
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitCreateRole(AuthorStatement authorStatement, MPPQueryContext context) {
+ Analysis analysis = new Analysis();
+ analysis.setStatement(authorStatement);
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitAlterUser(AuthorStatement authorStatement, MPPQueryContext context) {
+ Analysis analysis = new Analysis();
+ analysis.setStatement(authorStatement);
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitGrantUser(AuthorStatement authorStatement, MPPQueryContext context) {
+ Analysis analysis = new Analysis();
+ analysis.setStatement(authorStatement);
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitGrantRole(AuthorStatement authorStatement, MPPQueryContext context) {
+ Analysis analysis = new Analysis();
+ analysis.setStatement(authorStatement);
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitGrantRoleToUser(AuthorStatement authorStatement, MPPQueryContext context) {
+ Analysis analysis = new Analysis();
+ analysis.setStatement(authorStatement);
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitRevokeUser(AuthorStatement authorStatement, MPPQueryContext context) {
+ Analysis analysis = new Analysis();
+ analysis.setStatement(authorStatement);
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitRevokeRole(AuthorStatement authorStatement, MPPQueryContext context) {
+ Analysis analysis = new Analysis();
+ analysis.setStatement(authorStatement);
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitRevokeRoleFromUser(
+ AuthorStatement authorStatement, MPPQueryContext context) {
+ Analysis analysis = new Analysis();
+ analysis.setStatement(authorStatement);
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitDropUser(AuthorStatement authorStatement, MPPQueryContext context) {
+ Analysis analysis = new Analysis();
+ analysis.setStatement(authorStatement);
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitDropRole(AuthorStatement authorStatement, MPPQueryContext context) {
+ Analysis analysis = new Analysis();
+ analysis.setStatement(authorStatement);
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitListUser(AuthorStatement authorStatement, MPPQueryContext context) {
+ Analysis analysis = new Analysis();
+ analysis.setStatement(authorStatement);
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitListRole(AuthorStatement authorStatement, MPPQueryContext context) {
+ Analysis analysis = new Analysis();
+ analysis.setStatement(authorStatement);
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitListPrivilegesUser(
+ AuthorStatement authorStatement, MPPQueryContext context) {
+ Analysis analysis = new Analysis();
+ analysis.setStatement(authorStatement);
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitListPrivilegesRole(
+ AuthorStatement authorStatement, MPPQueryContext context) {
+ Analysis analysis = new Analysis();
+ analysis.setStatement(authorStatement);
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitListUserPrivileges(
+ AuthorStatement authorStatement, MPPQueryContext context) {
+ Analysis analysis = new Analysis();
+ analysis.setStatement(authorStatement);
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitListRolePrivileges(
+ AuthorStatement authorStatement, MPPQueryContext context) {
+ Analysis analysis = new Analysis();
+ analysis.setStatement(authorStatement);
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitListAllRoleOfUser(
+ AuthorStatement authorStatement, MPPQueryContext context) {
+ Analysis analysis = new Analysis();
+ analysis.setStatement(authorStatement);
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitListAllUserOfRole(
+ AuthorStatement authorStatement, MPPQueryContext context) {
+ Analysis analysis = new Analysis();
+ analysis.setStatement(authorStatement);
+ return analysis;
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
index dc0ee3987c..a376eaece0 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
@@ -31,6 +31,8 @@ import org.apache.iotdb.db.service.thrift.impl.TSServiceImpl;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.apache.iotdb.service.rpc.thrift.TSIService.Processor;
+import java.lang.reflect.InvocationTargetException;
+
/** A service to handle jdbc request from client. */
public class RPCService extends ThriftService implements RPCServiceMBean {
@@ -47,10 +49,12 @@ public class RPCService extends ThriftService implements RPCServiceMBean {
@Override
public void initTProcessor()
- throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+ throws ClassNotFoundException, IllegalAccessException, InstantiationException,
+ NoSuchMethodException, InvocationTargetException {
impl =
(TSServiceImpl)
Class.forName(IoTDBDescriptor.getInstance().getConfig().getRpcImplClassName())
+ .getDeclaredConstructor()
.newInstance();
initSyncedServiceImpl(null);
if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
new file mode 100644
index 0000000000..af372e4319
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -0,0 +1,724 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.service.thrift.impl;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.execution.Coordinator;
+import org.apache.iotdb.db.mpp.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.execution.IQueryExecution;
+import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
+import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowsOfOneDeviceStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowsStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
+import org.apache.iotdb.db.service.metrics.MetricsService;
+import org.apache.iotdb.db.service.metrics.Operation;
+import org.apache.iotdb.db.utils.QueryDataSetUtils;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.*;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.iotdb.db.service.basic.ServiceProvider.AUDIT_LOGGER;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.CONFIG;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.CURRENT_RPC_VERSION;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_FREQUENCY_RECORDER;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_TIME_MANAGER;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.SLOW_SQL_LOGGER;
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
+
+public class DataNodeTSIServiceImpl implements TSIService.Iface {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeTSIServiceImpl.class);
+
+ private static final Coordinator COORDINATOR = Coordinator.getInstance();
+
+ private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
+
+ @Override
+ public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
+ IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
+ BasicOpenSessionResp openSessionResp =
+ SESSION_MANAGER.openSession(
+ req.username, req.password, req.zoneId, req.client_protocol, clientVersion);
+ TSStatus tsStatus = RpcUtils.getStatus(openSessionResp.getCode(), openSessionResp.getMessage());
+ TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
+ return resp.setSessionId(openSessionResp.getSessionId());
+ }
+
+ private IoTDBConstant.ClientVersion parseClientVersion(TSOpenSessionReq req) {
+ Map<String, String> configuration = req.configuration;
+ if (configuration != null && configuration.containsKey("version")) {
+ return IoTDBConstant.ClientVersion.valueOf(configuration.get("version"));
+ }
+ return IoTDBConstant.ClientVersion.V_0_12;
+ }
+
+ @Override
+ public TSStatus closeSession(TSCloseSessionReq req) {
+ return new TSStatus(
+ !SESSION_MANAGER.closeSession(req.sessionId)
+ ? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR)
+ : RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+ }
+
+ @Override
+ public TSStatus cancelOperation(TSCancelOperationReq req) {
+ // TODO implement
+ return RpcUtils.getStatus(TSStatusCode.QUERY_NOT_ALLOWED, "Cancellation is not implemented");
+ }
+
+ @Override
+ public TSStatus closeOperation(TSCloseOperationReq req) {
+ return SESSION_MANAGER.closeOperation(
+ req.sessionId, req.queryId, req.statementId, req.isSetStatementId(), req.isSetQueryId());
+ }
+
+ @Override
+ public TSGetTimeZoneResp getTimeZone(long sessionId) {
+ try {
+ ZoneId zoneId = SESSION_MANAGER.getZoneId(sessionId);
+ return new TSGetTimeZoneResp(
+ RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
+ zoneId != null ? zoneId.toString() : "Unknown time zone");
+ } catch (Exception e) {
+ return new TSGetTimeZoneResp(
+ onNPEOrUnexpectedException(
+ e, OperationType.GET_TIME_ZONE, TSStatusCode.GENERATE_TIME_ZONE_ERROR),
+ "Unknown time zone");
+ }
+ }
+
+ @Override
+ public TSStatus setTimeZone(TSSetTimeZoneReq req) {
+ try {
+ SESSION_MANAGER.setTimezone(req.sessionId, req.timeZone);
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.SET_TIME_ZONE, TSStatusCode.SET_TIME_ZONE_ERROR);
+ }
+ }
+
+ @Override
+ public ServerProperties getProperties() {
+ ServerProperties properties = new ServerProperties();
+ properties.setVersion(IoTDBConstant.VERSION);
+ LOGGER.info("IoTDB server version: {}", IoTDBConstant.VERSION);
+ properties.setSupportedTimeAggregationOperations(new ArrayList<>());
+ properties.getSupportedTimeAggregationOperations().add(IoTDBConstant.MAX_TIME);
+ properties.getSupportedTimeAggregationOperations().add(IoTDBConstant.MIN_TIME);
+ properties.setTimestampPrecision(
+ IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision());
+ properties.setMaxConcurrentClientNum(
+ IoTDBDescriptor.getInstance().getConfig().getRpcMaxConcurrentClientNum());
+ properties.setWatermarkSecretKey(
+ IoTDBDescriptor.getInstance().getConfig().getWatermarkSecretKey());
+ properties.setWatermarkBitString(
+ IoTDBDescriptor.getInstance().getConfig().getWatermarkBitString());
+ properties.setWatermarkParamMarkRate(
+ IoTDBDescriptor.getInstance().getConfig().getWatermarkParamMarkRate());
+ properties.setWatermarkParamMaxRightBit(
+ IoTDBDescriptor.getInstance().getConfig().getWatermarkParamMaxRightBit());
+ properties.setIsReadOnly(IoTDBDescriptor.getInstance().getConfig().isReadOnly());
+ properties.setThriftMaxFrameSize(
+ IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize());
+ return properties;
+ }
+
+ @Override
+ public TSStatus setStorageGroup(long sessionId, String storageGroup) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSStatus createTimeseries(TSCreateTimeseriesReq req) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSStatus createAlignedTimeseries(TSCreateAlignedTimeseriesReq req) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSStatus createMultiTimeseries(TSCreateMultiTimeseriesReq req) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSStatus deleteTimeseries(long sessionId, List<String> path) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSStatus deleteStorageGroups(long sessionId, List<String> storageGroup) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) {
+ String statement = req.getStatement();
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+ }
+
+ long startTime = System.currentTimeMillis();
+ Statement s =
+ StatementGenerator.createStatement(
+ statement, SESSION_MANAGER.getZoneId(req.getSessionId()));
+
+ QUERY_FREQUENCY_RECORDER.incrementAndGet();
+ AUDIT_LOGGER.debug("Session {} execute Query: {}", req.sessionId, statement);
+
+ try {
+ long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
+ QueryId id = new QueryId(String.valueOf(queryId));
+ // create and cache dataset
+ ExecutionResult result =
+ COORDINATOR.execute(s, id, SESSION_MANAGER.getSessionInfo(req.sessionId), statement);
+
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new RuntimeException("");
+ }
+
+ IQueryExecution queryExecution = COORDINATOR.getQueryExecution(id);
+
+ TSExecuteStatementResp resp;
+ if (queryExecution.isQuery()) {
+ resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+ resp.setStatus(result.status);
+ resp.setQueryDataSet(
+ QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, req.fetchSize));
+ } else {
+ resp = RpcUtils.getTSExecuteStatementResp(result.status);
+ }
+
+ return resp;
+ } catch (Exception e) {
+ // TODO call the coordinator to release query resource
+ return RpcUtils.getTSExecuteStatementResp(
+ onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT));
+ } finally {
+ addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+ long costTime = System.currentTimeMillis() - startTime;
+ if (costTime >= CONFIG.getSlowQueryThreshold()) {
+ SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
+ }
+ }
+ }
+
+ @Override
+ public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) throws TException {
+ return executeStatement(req);
+ }
+
+ @Override
+ public TSExecuteStatementResp executeUpdateStatement(TSExecuteStatementReq req)
+ throws TException {
+ return executeStatement(req);
+ }
+
+ @Override
+ public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
+ }
+
+ TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
+ TSQueryDataSet result =
+ QueryDataSetUtils.convertTsBlockByFetchSize(
+ COORDINATOR.getQueryExecution(new QueryId(String.valueOf(req.queryId))),
+ req.fetchSize);
+ boolean hasResultSet = result.bufferForTime().limit() != 0;
+
+ resp.setHasResultSet(hasResultSet);
+ resp.setQueryDataSet(result);
+ resp.setIsAlign(true);
+
+ QUERY_TIME_MANAGER.unRegisterQuery(req.queryId, false);
+ return resp;
+
+ } catch (Exception e) {
+ return RpcUtils.getTSFetchResultsResp(onQueryException(e, OperationType.FETCH_RESULTS));
+ }
+ }
+
+ @Override
+ public TSStatus insertRecords(TSInsertRecordsReq req) {
+ long t1 = System.currentTimeMillis();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ if (AUDIT_LOGGER.isDebugEnabled()) {
+ AUDIT_LOGGER.debug(
+ "Session {} insertRecords, first device {}, first time {}",
+ SESSION_MANAGER.getCurrSessionId(),
+ req.prefixPaths.get(0),
+ req.getTimestamps().get(0));
+ }
+
+ // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
+ InsertRowsStatement statement = (InsertRowsStatement) StatementGenerator.createStatement(req);
+
+ // Step 2: call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ COORDINATOR.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "");
+
+ // TODO(INSERT) do this check in analyze
+ // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+ // req.getSessionId());
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ } finally {
+ addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+ }
+ }
+
+ @Override
+ public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
+ long t1 = System.currentTimeMillis();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ if (AUDIT_LOGGER.isDebugEnabled()) {
+ AUDIT_LOGGER.debug(
+ "Session {} insertRecords, device {}, first time {}",
+ SESSION_MANAGER.getCurrSessionId(),
+ req.prefixPath,
+ req.getTimestamps().get(0));
+ }
+
+ // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
+ InsertRowsOfOneDeviceStatement statement =
+ (InsertRowsOfOneDeviceStatement) StatementGenerator.createStatement(req);
+
+ // Step 2: call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ COORDINATOR.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "");
+
+ // TODO(INSERT) do this check in analyze
+ // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+ // req.getSessionId());
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ } finally {
+ addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+ }
+ }
+
+ @Override
+ public TSStatus insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq req) {
+ long t1 = System.currentTimeMillis();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ if (AUDIT_LOGGER.isDebugEnabled()) {
+ AUDIT_LOGGER.debug(
+ "Session {} insertRecords, device {}, first time {}",
+ SESSION_MANAGER.getCurrSessionId(),
+ req.prefixPath,
+ req.getTimestamps().get(0));
+ }
+
+ // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
+ InsertRowsOfOneDeviceStatement statement =
+ (InsertRowsOfOneDeviceStatement) StatementGenerator.createStatement(req);
+
+ // Step 2: call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ COORDINATOR.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "");
+
+ // TODO(INSERT) do this check in analyze
+ // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+ // req.getSessionId());
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ } finally {
+ addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+ }
+ }
+
+ @Override
+ public TSStatus insertRecord(TSInsertRecordReq req) {
+ long t1 = System.currentTimeMillis();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ AUDIT_LOGGER.debug(
+ "Session {} insertRecord, device {}, time {}",
+ SESSION_MANAGER.getCurrSessionId(),
+ req.getPrefixPath(),
+ req.getTimestamp());
+
+ InsertRowStatement statement = (InsertRowStatement) StatementGenerator.createStatement(req);
+
+ // Step 2: call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ COORDINATOR.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "");
+
+ // TODO(INSERT) do this check in analyze
+ // TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
+ // req.getSessionId());
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ } finally {
+ addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+ }
+ }
+
+ @Override
+ public TSStatus insertTablets(TSInsertTabletsReq req) {
+ long t1 = System.currentTimeMillis();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
+ InsertMultiTabletsStatement statement =
+ (InsertMultiTabletsStatement) StatementGenerator.createStatement(req);
+ // Step 2: call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ COORDINATOR.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "");
+
+ // TODO(INSERT) do this check in analyze
+ // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+ // req.getSessionId());
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ } finally {
+ addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+ }
+ }
+
+ @Override
+ public TSStatus insertTablet(TSInsertTabletReq req) {
+ long t1 = System.currentTimeMillis();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ // Step 1: TODO(INSERT) transfer from TSInsertTabletReq to Statement
+ InsertTabletStatement statement =
+ (InsertTabletStatement) StatementGenerator.createStatement(req);
+
+ // Step 2: call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ COORDINATOR.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "");
+
+ // TODO(INSERT) do this check in analyze
+ // TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
+ // req.getSessionId());
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ } finally {
+ addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+ }
+ }
+
+ @Override
+ public TSStatus insertStringRecords(TSInsertStringRecordsReq req) {
+ long t1 = System.currentTimeMillis();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ if (AUDIT_LOGGER.isDebugEnabled()) {
+ AUDIT_LOGGER.debug(
+ "Session {} insertRecords, first device {}, first time {}",
+ SESSION_MANAGER.getCurrSessionId(),
+ req.prefixPaths.get(0),
+ req.getTimestamps().get(0));
+ }
+
+ InsertRowsStatement statement = (InsertRowsStatement) StatementGenerator.createStatement(req);
+
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ COORDINATOR.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "");
+
+ // TODO(INSERT) do this check in analyze
+ // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+ // req.getSessionId());
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ } finally {
+ addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+ }
+ }
+
+ @Override
+ public TSStatus testInsertTablet(TSInsertTabletReq req) {
+ LOGGER.debug("Test insert batch request receive.");
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ @Override
+ public TSStatus testInsertTablets(TSInsertTabletsReq req) {
+ LOGGER.debug("Test insert batch request receive.");
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ @Override
+ public TSStatus testInsertRecord(TSInsertRecordReq req) {
+ LOGGER.debug("Test insert row request receive.");
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ @Override
+ public TSStatus testInsertStringRecord(TSInsertStringRecordReq req) {
+ LOGGER.debug("Test insert string record request receive.");
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ @Override
+ public TSStatus testInsertRecords(TSInsertRecordsReq req) {
+ LOGGER.debug("Test insert row in batch request receive.");
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ @Override
+ public TSStatus testInsertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
+ LOGGER.debug("Test insert rows in batch request receive.");
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ @Override
+ public TSStatus testInsertStringRecords(TSInsertStringRecordsReq req) {
+ LOGGER.debug("Test insert string records request receive.");
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ @Override
+ public TSStatus deleteData(TSDeleteDataReq req) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSExecuteStatementResp executeRawDataQuery(TSRawDataQueryReq req) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSExecuteStatementResp executeLastDataQuery(TSLastDataQueryReq req) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long requestStatementId(long sessionId) {
+ return SESSION_MANAGER.requestStatementId(sessionId);
+ }
+
+ @Override
+ public TSStatus createSchemaTemplate(TSCreateSchemaTemplateReq req) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSStatus appendSchemaTemplate(TSAppendSchemaTemplateReq req) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSStatus pruneSchemaTemplate(TSPruneSchemaTemplateReq req) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSQueryTemplateResp querySchemaTemplate(TSQueryTemplateReq req) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSStatus setSchemaTemplate(TSSetSchemaTemplateReq req) throws TException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSStatus unsetSchemaTemplate(TSUnsetSchemaTemplateReq req) throws TException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSStatus dropSchemaTemplate(TSDropSchemaTemplateReq req) throws TException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSStatus insertStringRecord(TSInsertStringRecordReq req) {
+ long t1 = System.currentTimeMillis();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ AUDIT_LOGGER.debug(
+ "Session {} insertRecord, device {}, time {}",
+ SESSION_MANAGER.getCurrSessionId(),
+ req.getPrefixPath(),
+ req.getTimestamp());
+
+ InsertRowStatement statement = (InsertRowStatement) StatementGenerator.createStatement(req);
+
+ // Step 2: call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ COORDINATOR.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "");
+
+ // TODO(INSERT) do this check in analyze
+ // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+ // req.getSessionId());
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ } finally {
+ addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+ }
+ }
+
+ private TSExecuteStatementResp createResponse(DatasetHeader header, long queryId) {
+ TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+ resp.setColumnNameIndexMap(header.getColumnNameIndexMap());
+ // TODO deal with the sg name here
+ resp.setSgColumns(new ArrayList<>());
+ resp.setColumns(header.getRespColumns());
+ resp.setDataTypeList(header.getRespDataTypeList());
+ resp.setAliasColumns(header.getRespAliasColumns());
+ resp.setIgnoreTimeStamp(header.isIgnoreTimestamp());
+ resp.setQueryId(queryId);
+ return resp;
+ }
+
+ private TSStatus getNotLoggedInStatus() {
+ return RpcUtils.getStatus(
+ TSStatusCode.NOT_LOGIN_ERROR,
+ "Log in failed. Either you are not authorized or the session has timed out.");
+ }
+
+ /** Add stat of operation into metrics */
+ private void addOperationLatency(Operation operation, long startTime) {
+ if (CONFIG.isEnablePerformanceStat()) {
+ MetricsService.getInstance()
+ .getMetricManager()
+ .histogram(
+ System.currentTimeMillis() - startTime,
+ "operation_histogram",
+ MetricLevel.IMPORTANT,
+ "name",
+ operation.getName());
+ MetricsService.getInstance()
+ .getMetricManager()
+ .count(1, "operation_count", MetricLevel.IMPORTANT, "name", operation.getName());
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 6291b14875..8866bf361d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -37,16 +37,17 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.metadata.template.TemplateQueryType;
-import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.Coordinator;
-import org.apache.iotdb.db.mpp.execution.ExecutionResult;
-import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
-import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
-import org.apache.iotdb.db.mpp.sql.statement.crud.*;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.*;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletsPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.SelectIntoPlan;
+import org.apache.iotdb.db.qp.physical.crud.UDFPlan;
import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
@@ -79,47 +80,7 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.ServerProperties;
-import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
-import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
-import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
-import org.apache.iotdb.service.rpc.thrift.TSIService;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
-import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
-import org.apache.iotdb.service.rpc.thrift.TSPruneSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateResp;
-import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
-import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
-import org.apache.iotdb.service.rpc.thrift.TSTracingInfo;
-import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.*;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -161,11 +122,9 @@ import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
/** Thrift RPC implementation at server side. */
public class TSServiceImpl implements TSIService.Iface {
- private static final Coordinator coordinator = Coordinator.getInstance();
+ private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
- public static SessionManager SESSION_MANAGER = SessionManager.getInstance();
-
- protected class QueryTask implements Callable<TSExecuteStatementResp> {
+ private class QueryTask implements Callable<TSExecuteStatementResp> {
private PhysicalPlan plan;
private final long queryStartTime;
@@ -241,7 +200,7 @@ public class TSServiceImpl implements TSIService.Iface {
}
}
- protected class FetchResultsTask implements Callable<TSFetchResultsResp> {
+ private class FetchResultsTask implements Callable<TSFetchResultsResp> {
private final long sessionId;
private final long queryId;
@@ -1228,46 +1187,6 @@ public class TSServiceImpl implements TSIService.Iface {
allCheckSuccess, tsStatus, insertRowsPlan.getResults(), req.prefixPaths.size());
}
- public TSStatus insertRecordsV2(TSInsertRecordsReq req) {
- long t1 = System.currentTimeMillis();
- try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
- return getNotLoggedInStatus();
- }
-
- if (AUDIT_LOGGER.isDebugEnabled()) {
- AUDIT_LOGGER.debug(
- "Session {} insertRecords, first device {}, first time {}",
- SESSION_MANAGER.getCurrSessionId(),
- req.prefixPaths.get(0),
- req.getTimestamps().get(0));
- }
-
- // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
- InsertRowsStatement statement = (InsertRowsStatement) StatementGenerator.createStatement(req);
-
- // Step 2: call the coordinator
- long queryId = SESSION_MANAGER.requestQueryId(false);
- ExecutionResult result =
- coordinator.execute(
- statement,
- new QueryId(String.valueOf(queryId)),
- QueryType.WRITE,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
-
- // TODO(INSERT) do this check in analyze
- // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
- // req.getSessionId());
- return result.status;
- } catch (Exception e) {
- return onNPEOrUnexpectedException(
- e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
- } finally {
- addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
- }
- }
-
private TSStatus judgeFinalTsStatus(
boolean allCheckSuccess,
TSStatus executeTsStatus,
@@ -1335,47 +1254,6 @@ public class TSServiceImpl implements TSIService.Iface {
return resp;
}
- public TSStatus insertRecordsOfOneDeviceV2(TSInsertRecordsOfOneDeviceReq req) {
- long t1 = System.currentTimeMillis();
- try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
- return getNotLoggedInStatus();
- }
-
- if (AUDIT_LOGGER.isDebugEnabled()) {
- AUDIT_LOGGER.debug(
- "Session {} insertRecords, device {}, first time {}",
- SESSION_MANAGER.getCurrSessionId(),
- req.prefixPath,
- req.getTimestamps().get(0));
- }
-
- // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
- InsertRowsOfOneDeviceStatement statement =
- (InsertRowsOfOneDeviceStatement) StatementGenerator.createStatement(req);
-
- // Step 2: call the coordinator
- long queryId = SESSION_MANAGER.requestQueryId(false);
- ExecutionResult result =
- coordinator.execute(
- statement,
- new QueryId(String.valueOf(queryId)),
- QueryType.WRITE,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
-
- // TODO(INSERT) do this check in analyze
- // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
- // req.getSessionId());
- return result.status;
- } catch (Exception e) {
- return onNPEOrUnexpectedException(
- e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
- } finally {
- addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
- }
- }
-
@Override
public TSStatus insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq req) {
if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
@@ -1434,47 +1312,6 @@ public class TSServiceImpl implements TSIService.Iface {
allCheckSuccess, tsStatus, insertRowsPlan.getResults(), req.timestamps.size());
}
- public TSStatus insertStringRecordsOfOneDeviceV2(TSInsertStringRecordsOfOneDeviceReq req) {
- long t1 = System.currentTimeMillis();
- try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
- return getNotLoggedInStatus();
- }
-
- if (AUDIT_LOGGER.isDebugEnabled()) {
- AUDIT_LOGGER.debug(
- "Session {} insertRecords, device {}, first time {}",
- SESSION_MANAGER.getCurrSessionId(),
- req.prefixPath,
- req.getTimestamps().get(0));
- }
-
- // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
- InsertRowsOfOneDeviceStatement statement =
- (InsertRowsOfOneDeviceStatement) StatementGenerator.createStatement(req);
-
- // Step 2: call the coordinator
- long queryId = SESSION_MANAGER.requestQueryId(false);
- ExecutionResult result =
- coordinator.execute(
- statement,
- new QueryId(String.valueOf(queryId)),
- QueryType.WRITE,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
-
- // TODO(INSERT) do this check in analyze
- // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
- // req.getSessionId());
- return result.status;
- } catch (Exception e) {
- return onNPEOrUnexpectedException(
- e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
- } finally {
- addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
- }
- }
-
@Override
public TSStatus insertStringRecords(TSInsertStringRecordsReq req) {
if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
@@ -1528,44 +1365,6 @@ public class TSServiceImpl implements TSIService.Iface {
allCheckSuccess, tsStatus, insertRowsPlan.getResults(), req.prefixPaths.size());
}
- public TSStatus insertStringRecordsV2(TSInsertStringRecordsReq req) {
- long t1 = System.currentTimeMillis();
- try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
- return getNotLoggedInStatus();
- }
-
- if (AUDIT_LOGGER.isDebugEnabled()) {
- AUDIT_LOGGER.debug(
- "Session {} insertRecords, first device {}, first time {}",
- SESSION_MANAGER.getCurrSessionId(),
- req.prefixPaths.get(0),
- req.getTimestamps().get(0));
- }
-
- InsertRowsStatement statement = (InsertRowsStatement) StatementGenerator.createStatement(req);
-
- long queryId = SESSION_MANAGER.requestQueryId(false);
- ExecutionResult result =
- coordinator.execute(
- statement,
- new QueryId(String.valueOf(queryId)),
- QueryType.WRITE,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
-
- // TODO(INSERT) do this check in analyze
- // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
- // req.getSessionId());
- return result.status;
- } catch (Exception e) {
- return onNPEOrUnexpectedException(
- e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
- } finally {
- addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
- }
- }
-
private void addMeasurementAndValue(
InsertRowPlan insertRowPlan, List<String> measurements, List<String> values) {
List<String> newMeasurements = new ArrayList<>(measurements.size());
@@ -1656,43 +1455,6 @@ public class TSServiceImpl implements TSIService.Iface {
}
}
- public TSStatus insertRecordV2(TSInsertRecordReq req) {
- long t1 = System.currentTimeMillis();
- try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
- return getNotLoggedInStatus();
- }
-
- AUDIT_LOGGER.debug(
- "Session {} insertRecord, device {}, time {}",
- SESSION_MANAGER.getCurrSessionId(),
- req.getPrefixPath(),
- req.getTimestamp());
-
- InsertRowStatement statement = (InsertRowStatement) StatementGenerator.createStatement(req);
-
- // Step 2: call the coordinator
- long queryId = SESSION_MANAGER.requestQueryId(false);
- ExecutionResult result =
- coordinator.execute(
- statement,
- new QueryId(String.valueOf(queryId)),
- QueryType.WRITE,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
-
- // TODO(INSERT) do this check in analyze
- // TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
- // req.getSessionId());
- return result.status;
- } catch (Exception e) {
- return onNPEOrUnexpectedException(
- e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
- } finally {
- addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
- }
- }
-
@Override
public TSStatus insertStringRecord(TSInsertStringRecordReq req) {
try {
@@ -1724,43 +1486,6 @@ public class TSServiceImpl implements TSIService.Iface {
}
}
- public TSStatus insertStringRecordV2(TSInsertStringRecordReq req) {
- long t1 = System.currentTimeMillis();
- try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
- return getNotLoggedInStatus();
- }
-
- AUDIT_LOGGER.debug(
- "Session {} insertRecord, device {}, time {}",
- SESSION_MANAGER.getCurrSessionId(),
- req.getPrefixPath(),
- req.getTimestamp());
-
- InsertRowStatement statement = (InsertRowStatement) StatementGenerator.createStatement(req);
-
- // Step 2: call the coordinator
- long queryId = SESSION_MANAGER.requestQueryId(false);
- ExecutionResult result =
- coordinator.execute(
- statement,
- new QueryId(String.valueOf(queryId)),
- QueryType.WRITE,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
-
- // TODO(INSERT) do this check in analyze
- // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
- // req.getSessionId());
- return result.status;
- } catch (Exception e) {
- return onNPEOrUnexpectedException(
- e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
- } finally {
- addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
- }
- }
-
@Override
public TSStatus deleteData(TSDeleteDataReq req) {
try {
@@ -1819,39 +1544,6 @@ public class TSServiceImpl implements TSIService.Iface {
}
}
- public TSStatus insertTabletV2(TSInsertTabletReq req) {
- long t1 = System.currentTimeMillis();
- try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
- return getNotLoggedInStatus();
- }
-
- // Step 1: TODO(INSERT) transfer from TSInsertTabletReq to Statement
- InsertTabletStatement statement =
- (InsertTabletStatement) StatementGenerator.createStatement(req);
-
- // Step 2: call the coordinator
- long queryId = SESSION_MANAGER.requestQueryId(false);
- ExecutionResult result =
- coordinator.execute(
- statement,
- new QueryId(String.valueOf(queryId)),
- QueryType.WRITE,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
-
- // TODO(INSERT) do this check in analyze
- // TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
- // req.getSessionId());
- return result.status;
- } catch (Exception e) {
- return onNPEOrUnexpectedException(
- e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
- } finally {
- addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
- }
- }
-
@Override
public TSStatus insertTablets(TSInsertTabletsReq req) {
long t1 = System.currentTimeMillis();
@@ -1874,38 +1566,6 @@ public class TSServiceImpl implements TSIService.Iface {
}
}
- public TSStatus insertTabletsV2(TSInsertTabletsReq req) {
- long t1 = System.currentTimeMillis();
- try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
- return getNotLoggedInStatus();
- }
-
- // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
- InsertMultiTabletsStatement statement =
- (InsertMultiTabletsStatement) StatementGenerator.createStatement(req);
- // Step 2: call the coordinator
- long queryId = SESSION_MANAGER.requestQueryId(false);
- ExecutionResult result =
- coordinator.execute(
- statement,
- new QueryId(String.valueOf(queryId)),
- QueryType.WRITE,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
-
- // TODO(INSERT) do this check in analyze
- // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
- // req.getSessionId());
- return result.status;
- } catch (Exception e) {
- return onNPEOrUnexpectedException(
- e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
- } finally {
- addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
- }
- }
-
private InsertTabletPlan constructInsertTabletPlan(TSInsertTabletsReq req, int i)
throws IllegalPathException {
InsertTabletPlan insertTabletPlan =
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 29efa1a4c7..455c274f41 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.utils;
-import org.apache.iotdb.db.mpp.execution.QueryExecution;
+import org.apache.iotdb.db.mpp.execution.IQueryExecution;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -177,7 +177,7 @@ public class QueryDataSetUtils {
}
public static TSQueryDataSet convertTsBlockByFetchSize(
- QueryExecution queryExecution, int fetchSize) throws IOException {
+ IQueryExecution queryExecution, int fetchSize) throws IOException {
int columnNum = queryExecution.getOutputValueColumnCount();
TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
// one time column and each value column has an actual value buffer and a bitmap value to