You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/15 08:47:14 UTC
[iotdb] branch master updated: Add DataNodeTSIServiceImpl as RPCServiceImpl in mpp mode (#5540)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 93ee11b9f3 Add DataNodeTSIServiceImpl as RPCServiceImpl in mpp mode (#5540)
93ee11b9f3 is described below
commit 93ee11b9f3fa10af4b032d9204b13ee9f6fc8248
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Fri Apr 15 16:47:10 2022 +0800
Add DataNodeTSIServiceImpl as RPCServiceImpl in mpp mode (#5540)
---
.../iotdb/confignode/conf/ConfigNodeConf.java | 2 +-
.../persistence/PartitionInfoPersistence.java | 8 +-
.../iotdb/commons/concurrent/ThreadName.java | 1 +
.../iotdb/commons/partition/DataPartition.java | 28 +-
.../commons/partition/DataPartitionQueryParam.java | 3 +-
.../iotdb/commons/partition/SchemaPartition.java | 46 +-
.../apache/iotdb/commons/service/ServiceType.java | 3 +-
.../iotdb/commons/service/ThriftService.java | 8 +-
.../resources/conf/iotdb-engine.properties | 35 +-
.../apache/iotdb/db/client/ConfigNodeClient.java | 2 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 4 +
.../iotdb/db/mpp/buffer/DataBlockManager.java | 2 +-
.../iotdb/db/mpp/buffer/DataBlockService.java | 29 +-
.../DataBlockServiceMBean.java} | 15 +-
.../org/apache/iotdb/db/mpp/buffer/SinkHandle.java | 14 +-
.../apache/iotdb/db/mpp/buffer/SourceHandle.java | 2 +
.../iotdb/db/mpp/common/MPPQueryContext.java | 10 +-
.../iotdb/db/mpp/common/header/DatasetHeader.java | 13 +-
.../apache/iotdb/db/mpp/execution/Coordinator.java | 27 +-
.../apache/iotdb/db/mpp/execution/DataDriver.java | 1 +
.../iotdb/db/mpp/execution/IQueryExecution.java | 13 +
.../iotdb/db/mpp/execution/QueryExecution.java | 68 +-
.../iotdb/db/mpp/execution/SchemaDriver.java | 3 +-
.../db/mpp/execution/config/ConfigExecution.java | 48 +-
.../scheduler/InternalServiceClientFactory.java | 16 +-
.../scheduler/SimpleFragInstanceDispatcher.java | 17 +-
.../db/mpp/schedule/FragmentInstanceScheduler.java | 2 +
.../apache/iotdb/db/mpp/sql/analyze/Analyzer.java | 71 +-
.../mpp/sql/analyze/ClusterPartitionFetcher.java | 7 +-
.../db/mpp/sql/analyze/ClusterSchemaFetcher.java | 12 +-
.../db/mpp/sql/planner/DistributionPlanner.java | 18 +-
.../db/mpp/sql/planner/LocalExecutionPlanner.java | 5 +-
.../db/mpp/sql/planner/plan/FragmentInstance.java | 1 +
.../db/mpp/sql/planner/plan/node/PlanNodeType.java | 6 +-
.../plan/node/metedata/read/SchemaFetchNode.java | 4 +-
.../plan/node/metedata/read/SchemaMergeNode.java | 11 +-
.../node/metedata/write/CreateTimeSeriesNode.java | 29 +-
.../planner/plan/node/process/ExchangeNode.java | 6 +
.../planner/plan/node/process/TimeJoinNode.java | 9 +-
.../planner/plan/node/sink/FragmentSinkNode.java | 6 +
.../sql/planner/plan/node/write/InsertRowNode.java | 32 +-
.../apache/iotdb/db/service/InternalService.java | 55 +-
.../iotdb/db/service/InternalServiceImpl.java | 2 +-
.../InternalServiceMBean.java} | 11 +-
...vice.java => InternalServiceThriftHandler.java} | 34 +-
.../java/org/apache/iotdb/db/service/IoTDB.java | 29 +-
.../org/apache/iotdb/db/service/RPCService.java | 12 +-
.../thrift/handler/RPCServiceThriftHandler.java | 10 +-
.../thrift/impl/DataNodeTSIServiceImpl.java | 733 +++++++++++++++++++++
.../thrift/impl/TSIEventHandler.java} | 12 +-
.../db/service/thrift/impl/TSServiceImpl.java | 367 +----------
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 19 +-
.../db/mpp/sql/plan/DistributionPlannerTest.java | 19 +-
.../iotdb/db/mpp/sql/plan/QueryPlannerTest.java | 4 +-
55 files changed, 1343 insertions(+), 614 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
index a92faf8485..27a60ea35b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
@@ -88,7 +88,7 @@ public class ConfigNodeConf {
ConfigNodeConstant.DATA_DIR + File.separator + ConfigNodeConstant.CONSENSUS_FOLDER;
/** Default TTL for storage groups that are not set TTL by statements, in ms. */
- private long defaultTTL = 36000000;
+ private long defaultTTL = Long.MAX_VALUE;
/** The number of replicas of each region */
private int regionReplicaCount = 3;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java
index c7154f0e13..888c6ff9c2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java
@@ -59,7 +59,10 @@ public class PartitionInfoPersistence {
public PartitionInfoPersistence() {
this.schemaPartitionReadWriteLock = new ReentrantReadWriteLock();
this.dataPartitionReadWriteLock = new ReentrantReadWriteLock();
- this.schemaPartition = new SchemaPartition();
+ this.schemaPartition =
+ new SchemaPartition(
+ ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionExecutorClass(),
+ ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum());
this.schemaPartition.setSchemaPartitionMap(new HashMap<>());
this.dataPartition =
new DataPartition(
@@ -79,8 +82,7 @@ public class PartitionInfoPersistence {
schemaPartitionReadWriteLock.readLock().lock();
try {
- schemaPartitionDataSet.setSchemaPartition(
- schemaPartition.getSchemaPartition(physicalPlan.getPartitionSlotsMap()));
+ schemaPartitionDataSet.setSchemaPartition(schemaPartition.getSchemaPartition(physicalPlan.getPartitionSlotsMap()));
} finally {
schemaPartitionReadWriteLock.readLock().unlock();
schemaPartitionDataSet.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index a4e0dc35ad..e587d38730 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -79,6 +79,7 @@ public enum ThreadName {
Cluster_Monitor("ClusterMonitor"),
DATA_BLOCK_MANAGER_SERVICE("DataBlockManagerService"),
DATA_BLOCK_MANAGER_CLIENT("DataBlockManagerService-Client"),
+ INTERNAL_SERVICE_CLIENT("InternalService-Client"),
;
private final String name;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index dcec685e63..bb62f1cea4 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -149,16 +149,24 @@ public class DataPartition {
if (seriesTimePartitionSlotMap.containsKey(seriesPartitionSlot)) {
Map<TimePartitionSlot, List<RegionReplicaSet>> timePartitionSlotMap =
seriesTimePartitionSlotMap.get(seriesPartitionSlot);
- for (TimePartitionSlot timePartitionSlot :
- partitionSlotsMap.get(storageGroupName).get(seriesPartitionSlot)) {
- // Compare TimePartitionSlot
- if (timePartitionSlotMap.containsKey(timePartitionSlot)) {
- result
- .computeIfAbsent(storageGroupName, key -> new HashMap<>())
- .computeIfAbsent(seriesPartitionSlot, key -> new HashMap<>())
- .put(
- timePartitionSlot,
- new ArrayList<>(timePartitionSlotMap.get(timePartitionSlot)));
+ // TODO: (xingtanzjr) optimize if timeSlotPartition is empty
+ if (partitionSlotsMap.get(storageGroupName).get(seriesPartitionSlot).size() == 0) {
+ result
+ .computeIfAbsent(storageGroupName, key -> new HashMap<>())
+ .computeIfAbsent(seriesPartitionSlot, key -> new HashMap<>())
+ .putAll(new HashMap<>(timePartitionSlotMap));
+ } else {
+ for (TimePartitionSlot timePartitionSlot :
+ partitionSlotsMap.get(storageGroupName).get(seriesPartitionSlot)) {
+ // Compare TimePartitionSlot
+ if (timePartitionSlotMap.containsKey(timePartitionSlot)) {
+ result
+ .computeIfAbsent(storageGroupName, key -> new HashMap<>())
+ .computeIfAbsent(seriesPartitionSlot, key -> new HashMap<>())
+ .put(
+ timePartitionSlot,
+ new ArrayList<>(timePartitionSlotMap.get(timePartitionSlot)));
+ }
}
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
index d8bc05e294..23b61902a9 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
@@ -18,12 +18,13 @@
*/
package org.apache.iotdb.commons.partition;
+import java.util.ArrayList;
import java.util.List;
public class DataPartitionQueryParam {
private String devicePath;
- private List<TimePartitionSlot> timePartitionSlotList;
+ private List<TimePartitionSlot> timePartitionSlotList = new ArrayList<>();
public String getDevicePath() {
return devicePath;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
index d4af85106b..f0ca88f774 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.commons.partition;
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -25,15 +27,22 @@ import java.util.Map;
public class SchemaPartition {
+ private String seriesSlotExecutorName;
+ private int seriesPartitionSlotNum;
+
// Map<StorageGroup, Map<SeriesPartitionSlot, SchemaRegionPlaceInfo>>
private Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> schemaPartitionMap;
- public SchemaPartition() {
- // Empty constructor
+ public SchemaPartition(String seriesSlotExecutorName, int seriesPartitionSlotNum) {
+ this.seriesSlotExecutorName = seriesSlotExecutorName;
+ this.seriesPartitionSlotNum = seriesPartitionSlotNum;
}
public SchemaPartition(
- Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> schemaPartitionMap) {
+ Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> schemaPartitionMap,
+ String seriesSlotExecutorName,
+ int seriesPartitionSlotNum) {
+ this(seriesSlotExecutorName, seriesPartitionSlotNum);
this.schemaPartitionMap = schemaPartitionMap;
}
@@ -46,6 +55,32 @@ public class SchemaPartition {
this.schemaPartitionMap = schemaPartitionMap;
}
+ public RegionReplicaSet getSchemaRegionReplicaSet(String deviceName) {
+ // A list of data region replica sets will store data in a same time partition.
+ // We will insert data to the last set in the list.
+ // TODO return the latest dataRegionReplicaSet for each time partition
+ String storageGroup = getStorageGroupByDevice(deviceName);
+ SeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName);
+ return schemaPartitionMap.get(storageGroup).get(seriesPartitionSlot);
+ }
+
+ private SeriesPartitionSlot calculateDeviceGroupId(String deviceName) {
+ SeriesPartitionExecutor executor =
+ SeriesPartitionExecutor.getSeriesPartitionExecutor(
+ seriesSlotExecutorName, seriesPartitionSlotNum);
+ return executor.getSeriesPartitionSlot(deviceName);
+ }
+
+ private String getStorageGroupByDevice(String deviceName) {
+ for (String storageGroup : schemaPartitionMap.keySet()) {
+ if (deviceName.startsWith(storageGroup)) {
+ return storageGroup;
+ }
+ }
+ // TODO: (xingtanzjr) how to handle this exception in IoTDB
+ return null;
+ }
+
/* Interfaces for ConfigNode */
/**
@@ -59,7 +94,8 @@ public class SchemaPartition {
Map<String, List<SeriesPartitionSlot>> partitionSlotsMap) {
if (partitionSlotsMap.isEmpty()) {
// Return all SchemaPartitions when the partitionSlotsMap is empty
- return new SchemaPartition(new HashMap<>(schemaPartitionMap));
+ return new SchemaPartition(
+ new HashMap<>(schemaPartitionMap), seriesSlotExecutorName, seriesPartitionSlotNum);
} else {
Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> result = new HashMap<>();
@@ -86,7 +122,7 @@ public class SchemaPartition {
}
});
- return new SchemaPartition(result);
+ return new SchemaPartition(result, seriesSlotExecutorName, seriesPartitionSlotNum);
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index 0626a0261a..1e913818cc 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -70,7 +70,8 @@ public enum ServiceType {
CONFIG_NODE_SERVICE("Config Node service", "ConfigNodeRPCServer"),
DATA_NODE_MANAGEMENT_SERVICE("Data Node management service", "DataNodeManagementServer"),
FRAGMENT_INSTANCE_MANAGER_SERVICE("Fragment instance manager", "FragmentInstanceManager"),
- DATA_BLOCK_MANAGER_SERVICE("Data block manager", "DataBlockManager");
+ DATA_BLOCK_MANAGER_SERVICE("Data block manager", "DataBlockManager"),
+ INTERNAL_SERVICE("Internal Service", "InternalService");
private final String name;
private final String jmxName;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java
index e792d88c81..30d75a94eb 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java
@@ -26,6 +26,7 @@ import org.apache.thrift.TProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.CountDownLatch;
public abstract class ThriftService implements IService {
@@ -88,7 +89,8 @@ public abstract class ThriftService implements IService {
}
public abstract void initTProcessor()
- throws ClassNotFoundException, IllegalAccessException, InstantiationException;
+ throws ClassNotFoundException, IllegalAccessException, InstantiationException,
+ NoSuchMethodException, InvocationTargetException;
public abstract void initThriftServiceThread()
throws IllegalAccessException, InstantiationException, ClassNotFoundException;
@@ -125,7 +127,9 @@ public abstract class ThriftService implements IService {
} catch (InterruptedException
| ClassNotFoundException
| IllegalAccessException
- | InstantiationException e) {
+ | InstantiationException
+ | NoSuchMethodException
+ | InvocationTargetException e) {
Thread.currentThread().interrupt();
throw new StartupException(this.getID().getName(), e.getMessage());
}
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 761ca79c87..f66262d72c 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -27,8 +27,26 @@ rpc_address=0.0.0.0
# Datatype: int
rpc_port=6667
+# whether start data node in mpp mode
+# mpp_mode=false
+
+####################
+### Shuffle Configuration
+####################
+# Datatype: int
+# data_block_manager_port=8777
+
+# Datatype: int
+# data_block_manager_core_pool_size=1
+
+# Datatype: int
+# data_block_manager_max_pool_size=5
+
+# Datatype: int
+# data_block_manager_keep_alive_time_in_ms=1000
+
# Datatype: int
-mpp_port=7777
+# mpp_port=7777
# Datatype: String
# used for communication between cluster nodes.
@@ -963,21 +981,6 @@ timestamp_precision=ms
# Datatype: float
# group_by_fill_cache_size_in_mb=1.0
-####################
-### Shuffle Configuration
-####################
-# Datatype: int
-# data_block_manager_port=7777
-
-# Datatype: int
-# data_block_manager_core_pool_size=1
-
-# Datatype: int
-# data_block_manager_max_pool_size=5
-
-# Datatype: int
-# data_block_manager_keep_alive_time_in_ms=1000
-
####################
### Schema Engine Configuration
####################
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 0816618fb3..4c2d0d7ed3 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -54,7 +54,7 @@ import java.util.Random;
public class ConfigNodeClient {
private static final Logger logger = LoggerFactory.getLogger(ConfigNodeClient.class);
- private static final int TIMEOUT_MS = 2000;
+ private static final int TIMEOUT_MS = 10000;
private static final int RETRY_NUM = 5;
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 83e30ba005..856381c0d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -507,6 +507,9 @@ public class IoTDBConfig {
/** Replace implementation class of JDBC service */
private String rpcImplClassName = TSServiceImpl.class.getName();
+ /** indicate whether current mode is mpp */
+ private boolean mppMode = false;
+
/** Replace implementation class of influxdb protocol service */
private String influxdbImplClassName = InfluxDBServiceImpl.class.getName();
@@ -875,7 +878,7 @@ public class IoTDBConfig {
private int seriesPartitionSlotNum = 10000;
/** Port that data block manager thrift service listen to. */
- private int dataBlockManagerPort = 7777;
+ private int dataBlockManagerPort = 8777;
/** Core pool size of data block manager. */
private int dataBlockManagerCorePoolSize = 1;
@@ -2795,4 +2798,12 @@ public class IoTDBConfig {
public void setDataBlockManagerKeepAliveTimeInMs(int dataBlockManagerKeepAliveTimeInMs) {
this.dataBlockManagerKeepAliveTimeInMs = dataBlockManagerKeepAliveTimeInMs;
}
+
+ public boolean isMppMode() {
+ return mppMode;
+ }
+
+ public void setMppMode(boolean mppMode) {
+ this.mppMode = mppMode;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 36faf46a38..189d65a85d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -167,6 +167,10 @@ public class IoTDBDescriptor {
Integer.parseInt(
properties.getProperty("rpc_port", Integer.toString(conf.getRpcPort()))));
+ conf.setMppMode(
+ Boolean.parseBoolean(
+ properties.getProperty("mpp_mode", Boolean.toString(conf.isMppMode()))));
+
conf.setMppPort(
Integer.parseInt(
properties.getProperty("mpp_port", Integer.toString(conf.getRpcPort()))));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index 5f06947a8f..f90fce1f3e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -236,7 +236,7 @@ public class DataBlockManager implements IDataBlockManager {
}
public DataBlockServiceImpl getOrCreateDataBlockServiceImpl() {
- if (dataBlockService != null) {
+ if (dataBlockService == null) {
dataBlockService = new DataBlockServiceImpl();
}
return dataBlockService;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
index 1557987d17..fa9b30cd8a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
@@ -31,19 +31,14 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
import org.apache.iotdb.mpp.rpc.thrift.DataBlockService.Processor;
-import org.apache.commons.lang3.Validate;
-
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-public class DataBlockService extends ThriftService {
+public class DataBlockService extends ThriftService implements DataBlockServiceMBean {
- private LocalMemoryManager localMemoryManager;
- private TsBlockSerdeFactory tsBlockSerdeFactory;
private DataBlockManager dataBlockManager;
private ExecutorService executorService;
- private DataBlockServiceClientFactory clientFactory;
private DataBlockService() {}
@@ -55,6 +50,7 @@ public class DataBlockService extends ThriftService {
@Override
public void initTProcessor()
throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+ initSyncedServiceImpl(null);
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
executorService =
IoTDBThreadPoolFactory.newThreadPool(
@@ -66,21 +62,15 @@ public class DataBlockService extends ThriftService {
new LinkedBlockingQueue<>(),
new IoTThreadFactory("data-block-manager-task-executors"),
"data-block-manager-task-executors");
- clientFactory = new DataBlockServiceClientFactory();
this.dataBlockManager =
new DataBlockManager(
- localMemoryManager, tsBlockSerdeFactory, executorService, clientFactory);
+ new LocalMemoryManager(),
+ new TsBlockSerdeFactory(),
+ executorService,
+ new DataBlockServiceClientFactory());
processor = new Processor<>(dataBlockManager.getOrCreateDataBlockServiceImpl());
}
- public void setLocalMemoryManager(LocalMemoryManager localMemoryManager) {
- this.localMemoryManager = Validate.notNull(localMemoryManager);
- }
-
- public void setTsBlockSerdeFactory(TsBlockSerdeFactory tsBlockSerdeFactory) {
- this.tsBlockSerdeFactory = Validate.notNull(tsBlockSerdeFactory);
- }
-
public DataBlockManager getDataBlockManager() {
return dataBlockManager;
}
@@ -101,7 +91,7 @@ public class DataBlockService extends ThriftService {
config.getThriftServerAwaitTimeForStopService(),
new DataBlockServiceThriftHandler(),
// TODO: hard coded compress strategy
- true);
+ false);
} catch (RPCServiceException e) {
throw new IllegalAccessException(e.getMessage());
}
@@ -133,6 +123,11 @@ public class DataBlockService extends ThriftService {
return DataBlockManagerServiceHolder.INSTANCE;
}
+ @Override
+ public int getRPCPort() {
+ return getBindPort();
+ }
+
private static class DataBlockManagerServiceHolder {
private static final DataBlockService INSTANCE = new DataBlockService();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceMBean.java
similarity index 72%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceMBean.java
index 3f9fa05ea8..52aae71ebb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceMBean.java
@@ -16,14 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.iotdb.db.mpp.buffer;
-package org.apache.iotdb.db.mpp.execution;
+import org.apache.iotdb.commons.exception.StartupException;
-public interface IQueryExecution {
+public interface DataBlockServiceMBean {
- void start();
+ String getRPCServiceStatus();
- void stop();
+ int getRPCPort();
- ExecutionResult getStatus();
+ void startService() throws StartupException;
+
+ void restartService() throws StartupException;
+
+ void stopService();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index a5fa545bcc..96802c3bd5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -105,7 +105,11 @@ public class SinkHandle implements ISinkHandle {
}
private void submitSendNewDataBlockEventTask(int startSequenceId, List<Long> blockSizes) {
- executorService.submit(new SendNewDataBlockEventTask(startSequenceId, blockSizes));
+ // TODO: (xingtanzjr)
+ // We temporarily make it sync instead of async to avoid EOS Event(SinkHandle close() method is
+ // called) is sent before NewDataBlockEvent arrived
+ new SendNewDataBlockEventTask(startSequenceId, blockSizes).run();
+ // executorService.submit(new SendNewDataBlockEventTask(startSequenceId, blockSizes));
}
@Override
@@ -157,9 +161,10 @@ public class SinkHandle implements ISinkHandle {
private void sendEndOfDataBlockEvent() throws TException {
logger.debug(
- "Send end of data block event to plan node {} of {}.",
+ "Send end of data block event to plan node {} of {}. {}",
remotePlanNodeId,
- remoteFragmentInstanceId);
+ remoteFragmentInstanceId,
+ Thread.currentThread().getName());
int attempt = 0;
EndOfDataBlockEvent endOfDataBlockEvent =
new EndOfDataBlockEvent(
@@ -178,7 +183,8 @@ public class SinkHandle implements ISinkHandle {
remotePlanNodeId,
remoteFragmentInstanceId,
e.getMessage(),
- attempt);
+ attempt,
+ e);
if (attempt == MAX_ATTEMPT_TIMES) {
throw e;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index b577174702..af6e715a16 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -212,6 +212,8 @@ public class SourceHandle implements ISourceHandle {
synchronized void setNoMoreTsBlocks(int lastSequenceId) {
this.lastSequenceId = lastSequenceId;
noMoreTsBlocks = true;
+ // someone may be waiting for this blocked, so here we need to notify it
+ blocked.set(null);
}
synchronized void updatePendingDataBlockInfo(int startSequenceId, List<Long> dataBlockSizes) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index def89dba39..b449bdf9e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -29,7 +29,7 @@ public class MPPQueryContext {
private String sql;
private QueryId queryId;
private SessionInfo session;
- private QueryType queryType;
+ private QueryType queryType = QueryType.READ;
private Endpoint hostEndpoint;
private ResultNodeContext resultNodeContext;
@@ -38,12 +38,10 @@ public class MPPQueryContext {
this.queryId = queryId;
}
- public MPPQueryContext(
- String sql, QueryId queryId, SessionInfo session, QueryType type, Endpoint hostEndpoint) {
+ public MPPQueryContext(String sql, QueryId queryId, SessionInfo session, Endpoint hostEndpoint) {
this.sql = sql;
this.queryId = queryId;
this.session = session;
- this.queryType = type;
this.hostEndpoint = hostEndpoint;
this.resultNodeContext = new ResultNodeContext(queryId);
}
@@ -56,6 +54,10 @@ public class MPPQueryContext {
return queryType;
}
+ public void setQueryType(QueryType queryType) {
+ this.queryType = queryType;
+ }
+
public Endpoint getHostEndpoint() {
return hostEndpoint;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeader.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeader.java
index 32a236feae..ee3e52d218 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeader.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeader.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.db.mpp.common.header;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-
import com.google.common.primitives.Bytes;
import java.util.*;
@@ -51,10 +49,6 @@ public class DatasetHeader {
return isIgnoreTimestamp;
}
- public Map<String, Integer> getColumnToTsBlockIndexMap() {
- return columnToTsBlockIndexMap;
- }
-
public void setColumnToTsBlockIndexMap(List<String> outputColumnNames) {
this.columnToTsBlockIndexMap = new HashMap<>();
for (int i = 0; i < outputColumnNames.size(); i++) {
@@ -66,8 +60,11 @@ public class DatasetHeader {
return columnHeaders.stream().map(ColumnHeader::getColumnName).collect(Collectors.toList());
}
- public List<TSDataType> getRespDataTypeList() {
- return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
+ public List<String> getRespDataTypeList() {
+ return columnHeaders.stream()
+ .map(ColumnHeader::getColumnType)
+ .map(Objects::toString)
+ .collect(Collectors.toList());
}
public List<Byte> getRespAliasColumns() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 4749394259..4bb6bfbf58 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -24,9 +24,10 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.common.SessionInfo;
+import org.apache.iotdb.db.mpp.execution.config.ConfigExecution;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
import org.apache.commons.lang3.Validate;
@@ -50,12 +51,12 @@ public class Coordinator {
IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
IoTDBDescriptor.getInstance().getConfig().getMppPort());
- private ExecutorService executor;
- private ScheduledExecutorService scheduledExecutor;
+ private final ExecutorService executor;
+ private final ScheduledExecutorService scheduledExecutor;
private static final Coordinator INSTANCE = new Coordinator();
- private ConcurrentHashMap<QueryId, QueryExecution> queryExecutionMap;
+ private final ConcurrentHashMap<QueryId, IQueryExecution> queryExecutionMap;
private Coordinator() {
this.queryExecutionMap = new ConcurrentHashMap<>();
@@ -63,16 +64,20 @@ public class Coordinator {
this.scheduledExecutor = getScheduledExecutor();
}
- private QueryExecution createQueryExecution(Statement statement, MPPQueryContext queryContext) {
+ private IQueryExecution createQueryExecution(Statement statement, MPPQueryContext queryContext) {
+ if (statement instanceof SetStorageGroupStatement) {
+ queryContext.setQueryType(QueryType.WRITE);
+ return new ConfigExecution(queryContext, statement, executor);
+ }
return new QueryExecution(statement, queryContext, executor, scheduledExecutor);
}
public ExecutionResult execute(
- Statement statement, QueryId queryId, QueryType queryType, SessionInfo session, String sql) {
+ Statement statement, QueryId queryId, SessionInfo session, String sql) {
- QueryExecution execution =
+ IQueryExecution execution =
createQueryExecution(
- statement, new MPPQueryContext(sql, queryId, session, queryType, getHostEndpoint()));
+ statement, new MPPQueryContext(sql, queryId, session, getHostEndpoint()));
queryExecutionMap.put(queryId, execution);
execution.start();
@@ -80,10 +85,10 @@ public class Coordinator {
return execution.getStatus();
}
- public TsBlock getResultSet(QueryId queryId) {
- QueryExecution execution = queryExecutionMap.get(queryId);
+ public IQueryExecution getQueryExecution(QueryId queryId) {
+ IQueryExecution execution = queryExecutionMap.get(queryId);
Validate.notNull(execution, "invalid queryId %s", queryId.getId());
- return execution.getBatchResult();
+ return execution;
}
// TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
index 55a6d566f1..ce6a513080 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
@@ -90,6 +90,7 @@ public class DataDriver implements Driver {
boolean isFinished =
closed || (driverBlockedFuture.get().isDone() && root != null && root.isFinished());
if (isFinished) {
+ close();
driverContext.finish();
}
return isFinished;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
index 3f9fa05ea8..2e0bd9c76d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.db.mpp.execution;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
public interface IQueryExecution {
void start();
@@ -26,4 +29,14 @@ public interface IQueryExecution {
void stop();
ExecutionResult getStatus();
+
+ TsBlock getBatchResult();
+
+ boolean hasNextResult();
+
+ int getOutputValueColumnCount();
+
+ DatasetHeader getDatasetHeader();
+
+ boolean isQuery();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index a5dbeaf21f..99241305d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -18,13 +18,16 @@
*/
package org.apache.iotdb.db.mpp.execution;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.buffer.DataBlockService;
import org.apache.iotdb.db.mpp.buffer.ISourceHandle;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
import org.apache.iotdb.db.mpp.execution.scheduler.IScheduler;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.analyze.Analyzer;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.optimization.PlanOptimizer;
import org.apache.iotdb.db.mpp.sql.planner.DistributionPlanner;
import org.apache.iotdb.db.mpp.sql.planner.LogicalPlanner;
@@ -55,18 +58,18 @@ import static com.google.common.base.Throwables.throwIfUnchecked;
* corresponding physical nodes. 3. Collect and monitor the progress/states of this query.
*/
public class QueryExecution implements IQueryExecution {
- private MPPQueryContext context;
+ private final MPPQueryContext context;
private IScheduler scheduler;
- private QueryStateMachine stateMachine;
+ private final QueryStateMachine stateMachine;
- private List<PlanOptimizer> planOptimizers;
+ private final List<PlanOptimizer> planOptimizers;
private final Analysis analysis;
private LogicalQueryPlan logicalPlan;
private DistributedQueryPlan distributedPlan;
- private ExecutorService executor;
- private ScheduledExecutorService scheduledExecutor;
+ private final ExecutorService executor;
+ private final ScheduledExecutorService scheduledExecutor;
// The result of QueryExecution will be written to the DataBlockManager in current Node.
// We use this SourceHandle to fetch the TsBlock from it.
@@ -100,6 +103,9 @@ public class QueryExecution implements IQueryExecution {
public void start() {
doLogicalPlan();
doDistributedPlan();
+ if (context.getQueryType() == QueryType.READ) {
+ initResultHandle();
+ }
schedule();
}
@@ -153,11 +159,14 @@ public class QueryExecution implements IQueryExecution {
* DataStreamManager use the virtual ResultOperator's ID (This part will be designed and
* implemented with DataStreamManager)
*/
+ @Override
public TsBlock getBatchResult() {
try {
- initResultHandle();
ListenableFuture<Void> blocked = resultHandle.isBlocked();
blocked.get();
+ if (resultHandle.isFinished()) {
+ return null;
+ }
return resultHandle.receive();
} catch (ExecutionException | IOException e) {
@@ -171,20 +180,20 @@ public class QueryExecution implements IQueryExecution {
}
/** @return true if there is more tsblocks, otherwise false */
+ @Override
public boolean hasNextResult() {
- try {
- initResultHandle();
- return resultHandle.isFinished();
- } catch (IOException e) {
- throwIfUnchecked(e.getCause());
- throw new RuntimeException(e.getCause());
- }
+ return !resultHandle.isFinished();
}
/** return the result column count without the time column */
+ @Override
public int getOutputValueColumnCount() {
- // TODO need return the actual size while there exists output columns in Analysis
- return 1;
+ return analysis.getRespDatasetHeader().getColumnHeaders().size();
+ }
+
+ @Override
+ public DatasetHeader getDatasetHeader() {
+ return analysis.getRespDatasetHeader();
}
/**
@@ -221,17 +230,21 @@ public class QueryExecution implements IQueryExecution {
}
}
- private void initResultHandle() throws IOException {
+ private void initResultHandle() {
if (this.resultHandle == null) {
- this.resultHandle =
- DataBlockService.getInstance()
- .getDataBlockManager()
- .createSourceHandle(
- context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
- context.getResultNodeContext().getVirtualResultNodeId().getId(),
- context.getResultNodeContext().getUpStreamEndpoint().getIp(),
- context.getResultNodeContext().getUpStreamEndpoint().getPort(),
- context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift());
+ try {
+ this.resultHandle =
+ DataBlockService.getInstance()
+ .getDataBlockManager()
+ .createSourceHandle(
+ context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
+ context.getResultNodeContext().getVirtualResultNodeId().getId(),
+ context.getResultNodeContext().getUpStreamEndpoint().getIp(),
+ IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort(),
+ context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift());
+ } catch (IOException e) {
+ stateMachine.transitionToFailed();
+ }
}
}
@@ -242,4 +255,9 @@ public class QueryExecution implements IQueryExecution {
public LogicalQueryPlan getLogicalPlan() {
return logicalPlan;
}
+
+ @Override
+ public boolean isQuery() {
+ return context.getQueryType() == QueryType.READ;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
index 4a1eef21fa..ea49d012e3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
@@ -42,7 +42,7 @@ import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
@NotThreadSafe
public class SchemaDriver implements Driver {
- private static final Logger logger = LoggerFactory.getLogger(DataDriver.class);
+ private static final Logger logger = LoggerFactory.getLogger(SchemaDriver.class);
private final Operator root;
private final ISinkHandle sinkHandle;
@@ -65,6 +65,7 @@ public class SchemaDriver implements Driver {
try {
boolean isFinished = driverBlockedFuture.get().isDone() && root != null && root.isFinished();
if (isFinished) {
+ close();
driverContext.finish();
}
return isFinished;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
index 2d158c4856..21afde5d29 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
@@ -20,19 +20,23 @@
package org.apache.iotdb.db.mpp.execution.config;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.db.mpp.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.execution.IQueryExecution;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import jersey.repackaged.com.google.common.util.concurrent.SettableFuture;
+import org.jetbrains.annotations.NotNull;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -41,12 +45,12 @@ import static com.google.common.base.Throwables.throwIfInstanceOf;
public class ConfigExecution implements IQueryExecution {
- private MPPQueryContext context;
- private Statement statement;
- private ExecutorService executor;
+ private final MPPQueryContext context;
+ private final Statement statement;
+ private final ExecutorService executor;
- private QueryStateMachine stateMachine;
- private SettableFuture<Boolean> result;
+ private final QueryStateMachine stateMachine;
+ private final SettableFuture<Boolean> result;
public ConfigExecution(MPPQueryContext context, Statement statement, ExecutorService executor) {
this.context = context;
@@ -71,7 +75,7 @@ public class ConfigExecution implements IQueryExecution {
}
@Override
- public void onFailure(Throwable throwable) {
+ public void onFailure(@NotNull Throwable throwable) {
fail(throwable);
}
},
@@ -93,6 +97,10 @@ public class ConfigExecution implements IQueryExecution {
@Override
public ExecutionResult getStatus() {
try {
+ if (result.isCancelled()) {
+ return new ExecutionResult(
+ context.getQueryId(), RpcUtils.getStatus(TSStatusCode.QUERY_PROCESS_ERROR));
+ }
Boolean success = result.get();
TSStatusCode statusCode =
success ? TSStatusCode.SUCCESS_STATUS : TSStatusCode.QUERY_PROCESS_ERROR;
@@ -105,6 +113,34 @@ public class ConfigExecution implements IQueryExecution {
}
}
+ @Override
+ public TsBlock getBatchResult() {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public boolean hasNextResult() {
+ return false;
+ }
+
+ @Override
+ public int getOutputValueColumnCount() {
+ // TODO
+ return 0;
+ }
+
+ @Override
+ public DatasetHeader getDatasetHeader() {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public boolean isQuery() {
+ return context.getQueryType() == QueryType.READ;
+ }
+
// TODO: consider a more suitable implementation for it
// Generate the corresponding IConfigTask by statement.
// Each type of statement will has a ConfigTask
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
index fefd526b54..4a5a94b4bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
@@ -20,21 +20,25 @@
package org.apache.iotdb.db.mpp.execution.scheduler;
import org.apache.iotdb.mpp.rpc.thrift.InternalService;
+import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
public class InternalServiceClientFactory {
+ private static final int TIMEOUT_MS = 10000;
+
// TODO: (xingtanzjr) consider the best practice to maintain the clients
public static InternalService.Client getInternalServiceClient(String endpoint, int port)
throws TTransportException {
- try (TTransport transport = new TSocket(endpoint, port)) {
- transport.open();
- TProtocol protocol = new TBinaryProtocol(transport);
- return new InternalService.Client(protocol);
- }
+ TTransport transport =
+ RpcTransportFactory.INSTANCE.getTransport(
+ // as there is a try-catch already, we do not need to use TSocket.wrap
+ endpoint, port, TIMEOUT_MS);
+ transport.open();
+ TProtocol protocol = new TBinaryProtocol(transport);
+ return new InternalService.Client(protocol);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index 3602bd43fa..fa81d65764 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -19,13 +19,13 @@
package org.apache.iotdb.db.mpp.execution.scheduler;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.InternalService;
import org.apache.iotdb.mpp.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
-
-import org.apache.thrift.TException;
+import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
import java.nio.ByteBuffer;
import java.util.List;
@@ -44,11 +44,13 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances) {
return executor.submit(
() -> {
+ TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
try {
for (FragmentInstance instance : instances) {
InternalService.Client client =
InternalServiceClientFactory.getInternalServiceClient(
- instance.getHostEndpoint().getIp(), instance.getHostEndpoint().getPort());
+ instance.getHostEndpoint().getIp(),
+ IoTDBDescriptor.getInstance().getConfig().getMppPort());
// TODO: (xingtanzjr) consider how to handle the buffer here
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
instance.serializeRequest(buffer);
@@ -60,13 +62,16 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
TSendFragmentInstanceReq req =
new TSendFragmentInstanceReq(
new TFragmentInstance(buffer), groupId, instance.getType().toString());
- client.sendFragmentInstance(req);
+ resp = client.sendFragmentInstance(req);
+ if (!resp.accepted) {
+ break;
+ }
}
- } catch (TException e) {
+ } catch (Exception e) {
// TODO: (xingtanzjr) add more details
return new FragInstanceDispatchResult(false);
}
- return new FragInstanceDispatchResult(true);
+ return new FragInstanceDispatchResult(resp.accepted);
});
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
index efaa56b89e..b2200b7690 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.mpp.buffer.DataBlockService;
import org.apache.iotdb.db.mpp.buffer.IDataBlockManager;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.QueryId;
@@ -89,6 +90,7 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
this.scheduler = new Scheduler();
this.workerGroups = new ThreadGroup("ScheduleThreads");
this.threads = new ArrayList<>();
+ this.blockManager = DataBlockService.getInstance().getDataBlockManager();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
index bb7114c01f..d8fcd29210 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
@@ -71,9 +71,9 @@ public class Analyzer {
private final MPPQueryContext context;
// TODO need to use factory to decide standalone or cluster
- private final IPartitionFetcher partitionFetcher = new FakePartitionFetcherImpl();
+ private final IPartitionFetcher partitionFetcher = ClusterPartitionFetcher.getInstance();
// TODO need to use factory to decide standalone or cluster
- private final ISchemaFetcher schemaFetcher = new FakeSchemaFetcherImpl();
+ private final ISchemaFetcher schemaFetcher = ClusterSchemaFetcher.getInstance();
public Analyzer(MPPQueryContext context) {
this.context = context;
@@ -177,14 +177,52 @@ public class Analyzer {
@Override
public Analysis visitInsert(InsertStatement insertStatement, MPPQueryContext context) {
// TODO: do analyze for insert statement
- Analysis analysis = new Analysis();
- analysis.setStatement(insertStatement);
- return analysis;
+ context.setQueryType(QueryType.WRITE);
+
+ long[] timeArray = insertStatement.getTimes();
+ PartialPath devicePath = insertStatement.getDevice();
+ String[] measurements = insertStatement.getMeasurementList();
+ if (timeArray.length == 1) {
+ // construct insert row statement
+ InsertRowStatement insertRowStatement = new InsertRowStatement();
+ insertRowStatement.setDevicePath(devicePath);
+ insertRowStatement.setTime(timeArray[0]);
+ insertRowStatement.setMeasurements(measurements);
+ insertRowStatement.setDataTypes(
+ new TSDataType[insertStatement.getMeasurementList().length]);
+ Object[] values = new Object[insertStatement.getMeasurementList().length];
+ System.arraycopy(insertStatement.getValuesList().get(0), 0, values, 0, values.length);
+ insertRowStatement.setValues(values);
+ insertRowStatement.setNeedInferType(true);
+ insertRowStatement.setAligned(insertStatement.isAligned());
+ return insertRowStatement.accept(this, context);
+ } else {
+ // construct insert rows statement
+ // construct insert statement
+ InsertRowsStatement insertRowsStatement = new InsertRowsStatement();
+ List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
+ for (int i = 0; i < timeArray.length; i++) {
+ InsertRowStatement statement = new InsertRowStatement();
+ statement.setDevicePath(devicePath);
+ statement.setMeasurements(measurements);
+ statement.setTime(timeArray[i]);
+ statement.setDataTypes(new TSDataType[insertStatement.getMeasurementList().length]);
+ Object[] values = new Object[insertStatement.getMeasurementList().length];
+ System.arraycopy(insertStatement.getValuesList().get(i), 0, values, 0, values.length);
+ statement.setValues(values);
+ statement.setAligned(insertStatement.isAligned());
+ statement.setNeedInferType(true);
+ insertRowStatementList.add(statement);
+ }
+ insertRowsStatement.setInsertRowStatementList(insertRowStatementList);
+ return insertRowsStatement.accept(this, context);
+ }
}
@Override
public Analysis visitCreateTimeseries(
CreateTimeSeriesStatement createTimeSeriesStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
if (createTimeSeriesStatement.getTags() != null
&& !createTimeSeriesStatement.getTags().isEmpty()
&& createTimeSeriesStatement.getAttributes() != null
@@ -201,7 +239,7 @@ public class Analyzer {
analysis.setStatement(createTimeSeriesStatement);
SchemaPartition schemaPartitionInfo =
- partitionFetcher.getSchemaPartition(
+ partitionFetcher.getOrCreateSchemaPartition(
new PathPatternTree(createTimeSeriesStatement.getPath()));
analysis.setSchemaPartitionInfo(schemaPartitionInfo);
return analysis;
@@ -211,6 +249,7 @@ public class Analyzer {
public Analysis visitCreateAlignedTimeseries(
CreateAlignedTimeSeriesStatement createAlignedTimeSeriesStatement,
MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
List<String> measurements = createAlignedTimeSeriesStatement.getMeasurements();
Set<String> measurementsSet = new HashSet<>(measurements);
if (measurementsSet.size() < measurements.size()) {
@@ -234,6 +273,7 @@ public class Analyzer {
@Override
public Analysis visitAlterTimeseries(
AlterTimeSeriesStatement alterTimeSeriesStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
analysis.setStatement(alterTimeSeriesStatement);
@@ -248,6 +288,7 @@ public class Analyzer {
@Override
public Analysis visitInsertTablet(
InsertTabletStatement insertTabletStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
SchemaTree schemaTree =
schemaFetcher.fetchSchemaWithAutoCreate(
insertTabletStatement.getDevicePath(),
@@ -320,6 +361,7 @@ public class Analyzer {
@Override
public Analysis visitCreateUser(AuthorStatement authorStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
analysis.setStatement(authorStatement);
return analysis;
@@ -327,6 +369,7 @@ public class Analyzer {
@Override
public Analysis visitCreateRole(AuthorStatement authorStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
analysis.setStatement(authorStatement);
return analysis;
@@ -334,6 +377,7 @@ public class Analyzer {
@Override
public Analysis visitAlterUser(AuthorStatement authorStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
analysis.setStatement(authorStatement);
return analysis;
@@ -341,6 +385,7 @@ public class Analyzer {
@Override
public Analysis visitGrantUser(AuthorStatement authorStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
analysis.setStatement(authorStatement);
return analysis;
@@ -348,6 +393,7 @@ public class Analyzer {
@Override
public Analysis visitGrantRole(AuthorStatement authorStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
analysis.setStatement(authorStatement);
return analysis;
@@ -355,6 +401,7 @@ public class Analyzer {
@Override
public Analysis visitGrantRoleToUser(AuthorStatement authorStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
analysis.setStatement(authorStatement);
return analysis;
@@ -362,6 +409,7 @@ public class Analyzer {
@Override
public Analysis visitRevokeUser(AuthorStatement authorStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
analysis.setStatement(authorStatement);
return analysis;
@@ -369,6 +417,7 @@ public class Analyzer {
@Override
public Analysis visitRevokeRole(AuthorStatement authorStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
analysis.setStatement(authorStatement);
return analysis;
@@ -377,6 +426,7 @@ public class Analyzer {
@Override
public Analysis visitRevokeRoleFromUser(
AuthorStatement authorStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
analysis.setStatement(authorStatement);
return analysis;
@@ -384,6 +434,7 @@ public class Analyzer {
@Override
public Analysis visitDropUser(AuthorStatement authorStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
analysis.setStatement(authorStatement);
return analysis;
@@ -391,6 +442,7 @@ public class Analyzer {
@Override
public Analysis visitDropRole(AuthorStatement authorStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
analysis.setStatement(authorStatement);
return analysis;
@@ -460,6 +512,7 @@ public class Analyzer {
@Override
public Analysis visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
// TODO remove duplicate
SchemaTree schemaTree =
schemaFetcher.fetchSchemaWithAutoCreate(
@@ -485,7 +538,8 @@ public class Analyzer {
sgNameToQueryParamsMap.put(
schemaTree.getBelongedStorageGroup(insertRowStatement.getDevicePath()),
Collections.singletonList(dataPartitionQueryParam));
- DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+ DataPartition dataPartition =
+ partitionFetcher.getOrCreateDataPartition(sgNameToQueryParamsMap);
Analysis analysis = new Analysis();
analysis.setSchemaTree(schemaTree);
@@ -498,6 +552,7 @@ public class Analyzer {
@Override
public Analysis visitInsertRows(
InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
// TODO remove duplicate
SchemaTree schemaTree =
schemaFetcher.fetchSchemaListWithAutoCreate(
@@ -542,6 +597,7 @@ public class Analyzer {
@Override
public Analysis visitInsertMultiTablets(
InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
// TODO remove duplicate
SchemaTree schemaTree =
schemaFetcher.fetchSchemaListWithAutoCreate(
@@ -580,6 +636,7 @@ public class Analyzer {
@Override
public Analysis visitInsertRowsOfOneDevice(
InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
// TODO remove duplicate
SchemaTree schemaTree =
schemaFetcher.fetchSchemaWithAutoCreate(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
index 7c0f6c9470..f8a9d348be 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
@@ -164,7 +164,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
try {
patternTree.serialize(baos);
ByteBuffer serializedPatternTree = ByteBuffer.allocate(baos.size());
- serializedPatternTree.put(baos.getBuf());
+ serializedPatternTree.put(baos.getBuf(), 0, baos.size());
serializedPatternTree.flip();
return new TSchemaPartitionReq(serializedPatternTree);
} catch (IOException e) {
@@ -209,7 +209,10 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
}
schemaPartitionMap.put(storageGroupName, deviceToSchemaRegionMap);
}
- return new SchemaPartition(schemaPartitionMap);
+ return new SchemaPartition(
+ schemaPartitionMap,
+ IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+ IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
}
private DataPartition parseDataPartitionResp(TDataPartitionResp dataPartitionResp) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
index 9e47c755eb..ca7f1f06cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
@@ -26,8 +26,10 @@ import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.db.mpp.execution.Coordinator;
+import org.apache.iotdb.db.mpp.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.sql.statement.metadata.SchemaFetchStatement;
import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -69,11 +71,15 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
QueryId queryId =
new QueryId(String.valueOf(SessionManager.getInstance().requestQueryId(false)));
- coordinator.execute(schemaFetchStatement, queryId, QueryType.READ, null, "");
- TsBlock tsBlock = coordinator.getResultSet(queryId);
+ ExecutionResult executionResult = coordinator.execute(schemaFetchStatement, queryId, null, "");
+ // TODO: (xingtanzjr) throw exception
+ if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new RuntimeException("cannot fetch schema");
+ }
+ TsBlock tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
+ // TODO: (xingtanzjr) need to release this query's resource here
SchemaTree result = new SchemaTree();
result.setStorageGroups(storageGroups);
-
Binary binary;
SchemaTree fetchedSchemaTree;
Column column = tsBlock.getColumn(0);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 17961e5420..909ab98080 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -215,17 +215,23 @@ public class DistributionPlanner {
// and make the
// new TimeJoinNode as the child of current TimeJoinNode
// TODO: (xingtanzjr) optimize the procedure here to remove duplicated TimeJoinNode
+ final boolean[] addParent = {false};
sourceGroup.forEach(
(dataRegion, seriesScanNodes) -> {
if (seriesScanNodes.size() == 1) {
root.addChild(seriesScanNodes.get(0));
} else {
- // We clone a TimeJoinNode from root to make the params to be consistent.
- // But we need to assign a new ID to it
- TimeJoinNode parentOfGroup = (TimeJoinNode) root.clone();
- root.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
- seriesScanNodes.forEach(parentOfGroup::addChild);
- root.addChild(parentOfGroup);
+ if (!addParent[0]) {
+ seriesScanNodes.forEach(root::addChild);
+ addParent[0] = true;
+ } else {
+ // We clone a TimeJoinNode from root to make the params to be consistent.
+ // But we need to assign a new ID to it
+ TimeJoinNode parentOfGroup = (TimeJoinNode) root.clone();
+ root.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+ seriesScanNodes.forEach(parentOfGroup::addChild);
+ root.addChild(parentOfGroup);
+ }
}
});
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index 77508888a2..166a3077a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.sql.planner;
import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
@@ -306,7 +307,7 @@ public class LocalExecutionPlanner {
localInstanceId.toThrift(),
node.getPlanNodeId().getId(),
source.getIp(),
- source.getPort(),
+ IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort(),
remoteInstanceId.toThrift());
return new ExchangeOperator(operatorContext, sourceHandle, node.getUpstreamPlanNodeId());
} catch (IOException e) {
@@ -325,7 +326,7 @@ public class LocalExecutionPlanner {
DATA_BLOCK_MANAGER.createSinkHandle(
localInstanceId.toThrift(),
target.getIp(),
- target.getPort(),
+ IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort(),
targetInstanceId.toThrift(),
node.getDownStreamPlanNodeId().getId());
context.setSinkHandle(sinkHandle);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index a3065c8c03..2063206c45 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan;
import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
index 99b5d4fe35..a97ccddc16 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.DevicesSchemaScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaFetchNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.ShowDevicesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSeriesNode;
@@ -79,7 +80,8 @@ public enum PlanNodeType {
TIME_SERIES_SCHEMA_SCAN((short) 24),
// TODO @xinzhongtianxia remove this
SHOW_DEVICES((short) 25),
- SCHEMA_FETCH((short) 26);
+ SCHEMA_FETCH((short) 26),
+ SCHEMA_MERGE((short) 27);
private final short nodeType;
@@ -161,6 +163,8 @@ public enum PlanNodeType {
return ShowDevicesNode.deserialize(buffer);
case 26:
return SchemaFetchNode.deserialize(buffer);
+ case 27:
+ return SchemaMergeNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java
index 87888c9e1f..5100ba3f21 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import com.google.common.collect.ImmutableList;
+
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
@@ -44,7 +46,7 @@ public class SchemaFetchNode extends SchemaScanNode {
@Override
public List<PlanNode> getChildren() {
- return null;
+ return ImmutableList.of();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java
index 7ae31a523e..7e9357cb68 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ProcessNode;
@@ -64,10 +65,14 @@ public class SchemaMergeNode extends ProcessNode {
}
@Override
- public void serialize(ByteBuffer byteBuffer) {}
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.SCHEMA_MERGE.serialize(byteBuffer);
+ }
- @Override
- protected void serializeAttributes(ByteBuffer byteBuffer) {}
+ public static SchemaMergeNode deserialize(ByteBuffer byteBuffer) {
+ PlanNodeId id = PlanNodeId.deserialize(byteBuffer);
+ return new SchemaMergeNode(id);
+ }
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
index 36d9f35db6..c22136ad04 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
@@ -18,25 +18,31 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import com.google.common.collect.ImmutableList;
+
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-public class CreateTimeSeriesNode extends PlanNode {
+public class CreateTimeSeriesNode extends WritePlanNode {
private PartialPath path;
private TSDataType dataType;
private TSEncoding encoding;
@@ -47,6 +53,8 @@ public class CreateTimeSeriesNode extends PlanNode {
private Map<String, String> attributes = null;
private long tagOffset = -1;
+ private RegionReplicaSet regionReplicaSet;
+
public CreateTimeSeriesNode(
PlanNodeId id,
PartialPath path,
@@ -145,7 +153,7 @@ public class CreateTimeSeriesNode extends PlanNode {
@Override
public List<PlanNode> getChildren() {
- return null;
+ return new ArrayList<>();
}
@Override
@@ -303,4 +311,21 @@ public class CreateTimeSeriesNode extends PlanNode {
&& ((tags == null && that.tags == null) || tags.equals(that.tags))
&& ((attributes == null && that.attributes == null) || attributes.equals(that.attributes));
}
+
+ @Override
+ public RegionReplicaSet getRegionReplicaSet() {
+ return regionReplicaSet;
+ }
+
+ @Override
+ public List<WritePlanNode> splitByPartition(Analysis analysis) {
+ RegionReplicaSet regionReplicaSet =
+ analysis.getSchemaPartitionInfo().getSchemaRegionReplicaSet(path.getDevice());
+ setRegionReplicaSet(regionReplicaSet);
+ return ImmutableList.of(this);
+ }
+
+ public void setRegionReplicaSet(RegionReplicaSet regionReplicaSet) {
+ this.regionReplicaSet = regionReplicaSet;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index 5947fc3d21..d3f82fd328 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -59,6 +60,11 @@ public class ExchangeNode extends PlanNode {
return ImmutableList.of(child);
}
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitExchange(this, context);
+ }
+
@Override
public void addChild(PlanNode child) {
this.child = child;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
index 90ebc81ee3..fdda60a311 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
@@ -52,11 +52,11 @@ public class TimeJoinNode extends ProcessNode implements IOutputPlanNode {
// The without policy is able to be push down to the TimeJoinOperator because we can know whether
// a row contains
// null or not.
- private FilterNullPolicy filterNullPolicy;
+ private FilterNullPolicy filterNullPolicy = FilterNullPolicy.NO_FILTER;
private List<PlanNode> children;
- private final List<ColumnHeader> columnHeaders = new ArrayList<>();
+ private List<ColumnHeader> columnHeaders = new ArrayList<>();
public TimeJoinNode(PlanNodeId id, OrderBy mergeOrder) {
super(id);
@@ -77,7 +77,10 @@ public class TimeJoinNode extends ProcessNode implements IOutputPlanNode {
@Override
public PlanNode clone() {
- return new TimeJoinNode(getPlanNodeId(), this.mergeOrder);
+ // TODO: (xingtanzjr)
+ TimeJoinNode cloneNode = new TimeJoinNode(getPlanNodeId(), this.mergeOrder);
+ cloneNode.columnHeaders = this.columnHeaders;
+ return cloneNode;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index 0fe26258e5..a738c8a502 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import com.google.common.collect.ImmutableList;
@@ -67,6 +68,11 @@ public class FragmentSinkNode extends SinkNode {
return sinkNode;
}
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitFragmentSink(this, context);
+ }
+
@Override
public void addChild(PlanNode child) {
this.child = child;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
index 2b3481fd74..e999046ff8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
@@ -169,12 +169,25 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
}
@Override
- public void serialize(ByteBuffer byteBuffer) {
- byteBuffer.putShort((short) PlanNodeType.INSERT_ROW.ordinal());
- getPlanNodeId().serialize(byteBuffer);
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.INSERT_ROW.serialize(byteBuffer);
subSerialize(byteBuffer);
}
+ public static InsertRowNode deserialize(ByteBuffer byteBuffer) {
+ // TODO: (xingtanzjr) remove placeholder
+ InsertRowNode insertNode = new InsertRowNode(new PlanNodeId("1"));
+ insertNode.setTime(byteBuffer.getLong());
+ try {
+ insertNode.setDevicePath(new PartialPath(ReadWriteIOUtils.readString(byteBuffer)));
+ } catch (IllegalPathException e) {
+ throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
+ }
+ insertNode.deserializeMeasurementsAndValues(byteBuffer);
+ insertNode.setPlanNodeId(PlanNodeId.deserialize(byteBuffer));
+ return insertNode;
+ }
+
void subSerialize(ByteBuffer buffer) {
buffer.putLong(time);
ReadWriteIOUtils.write(devicePath.getFullPath(), buffer);
@@ -314,19 +327,6 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
this.time = time;
}
- public static InsertRowNode deserialize(ByteBuffer byteBuffer) {
- InsertRowNode insertNode = new InsertRowNode(PlanNodeId.deserialize(byteBuffer));
- insertNode.setTime(byteBuffer.getLong());
- try {
- insertNode.setDevicePath(new PartialPath(ReadWriteIOUtils.readString(byteBuffer)));
- } catch (IllegalPathException e) {
- throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
- }
- insertNode.deserializeMeasurementsAndValues(byteBuffer);
-
- return insertNode;
- }
-
void deserializeMeasurementsAndValues(ByteBuffer buffer) {
int measurementSize = buffer.getInt();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InternalService.java b/server/src/main/java/org/apache/iotdb/db/service/InternalService.java
index 9c751e7766..4414fb9110 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InternalService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InternalService.java
@@ -19,38 +19,79 @@
package org.apache.iotdb.db.service;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.exception.runtime.RPCServiceException;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.service.ThriftService;
+import org.apache.iotdb.commons.service.ThriftServiceThread;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.mpp.rpc.thrift.InternalService.Processor;
-public class InternalService extends ThriftService {
+public class InternalService extends ThriftService implements InternalServiceMBean {
private InternalServiceImpl impl;
+ private InternalService() {}
+
@Override
public ServiceType getID() {
- return null;
+ return ServiceType.INTERNAL_SERVICE;
}
@Override
public ThriftService getImplementation() {
- return null;
+ return InternalServiceHolder.INSTANCE;
}
@Override
public void initTProcessor()
- throws ClassNotFoundException, IllegalAccessException, InstantiationException {}
+ throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+ impl = new InternalServiceImpl();
+ initSyncedServiceImpl(null);
+ processor = new Processor<>(impl);
+ }
@Override
public void initThriftServiceThread()
- throws IllegalAccessException, InstantiationException, ClassNotFoundException {}
+ throws IllegalAccessException, InstantiationException, ClassNotFoundException {
+ try {
+ IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ thriftServiceThread =
+ new ThriftServiceThread(
+ processor,
+ getID().getName(),
+ ThreadName.INTERNAL_SERVICE_CLIENT.getName(),
+ getBindIP(),
+ getBindPort(),
+ config.getRpcMaxConcurrentClientNum(),
+ config.getThriftServerAwaitTimeForStopService(),
+ new InternalServiceThriftHandler(),
+ // TODO: hard coded compress strategy
+ false);
+ } catch (RPCServiceException e) {
+ throw new IllegalAccessException(e.getMessage());
+ }
+ thriftServiceThread.setName(ThreadName.INTERNAL_SERVICE_CLIENT.getName());
+ }
@Override
public String getBindIP() {
- return null;
+ return IoTDBDescriptor.getInstance().getConfig().getRpcAddress();
}
@Override
public int getBindPort() {
- return 0;
+ return IoTDBDescriptor.getInstance().getConfig().getMppPort();
+ }
+
+ private static class InternalServiceHolder {
+ private static final InternalService INSTANCE = new InternalService();
+
+ private InternalServiceHolder() {}
+ }
+
+ public static InternalService getInstance() {
+ return InternalService.InternalServiceHolder.INSTANCE;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
index 190655719f..34634d74cc 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
@@ -63,7 +63,7 @@ public class InternalServiceImpl implements InternalService.Iface {
ConsensusImpl.getInstance()
.read(groupId, new ByteBufferConsensusRequest(req.fragmentInstance.body));
FragmentInstanceInfo info = (FragmentInstanceInfo) readResp.getDataset();
- return new TSendFragmentInstanceResp(info.getState().isFailed());
+ return new TSendFragmentInstanceResp(!info.getState().isFailed());
case WRITE:
TSendFragmentInstanceResp response = new TSendFragmentInstanceResp();
ConsensusWriteResponse resp =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceMBean.java
similarity index 84%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
copy to server/src/main/java/org/apache/iotdb/db/service/InternalServiceMBean.java
index 3f9fa05ea8..6abb847e81 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceMBean.java
@@ -17,13 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution;
+package org.apache.iotdb.db.service;
-public interface IQueryExecution {
-
- void start();
-
- void stop();
-
- ExecutionResult getStatus();
-}
+public interface InternalServiceMBean {}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InternalService.java b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceThriftHandler.java
similarity index 58%
copy from server/src/main/java/org/apache/iotdb/db/service/InternalService.java
copy to server/src/main/java/org/apache/iotdb/db/service/InternalServiceThriftHandler.java
index 9c751e7766..bfaf11c583 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InternalService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceThriftHandler.java
@@ -19,38 +19,26 @@
package org.apache.iotdb.db.service;
-import org.apache.iotdb.commons.service.ServiceType;
-import org.apache.iotdb.commons.service.ThriftService;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.transport.TTransport;
-public class InternalService extends ThriftService {
-
- private InternalServiceImpl impl;
+public class InternalServiceThriftHandler implements TServerEventHandler {
@Override
- public ServiceType getID() {
- return null;
- }
+ public void preServe() {}
@Override
- public ThriftService getImplementation() {
+ public ServerContext createContext(TProtocol tProtocol, TProtocol tProtocol1) {
return null;
}
@Override
- public void initTProcessor()
- throws ClassNotFoundException, IllegalAccessException, InstantiationException {}
-
- @Override
- public void initThriftServiceThread()
- throws IllegalAccessException, InstantiationException, ClassNotFoundException {}
+ public void deleteContext(
+ ServerContext serverContext, TProtocol tProtocol, TProtocol tProtocol1) {}
@Override
- public String getBindIP() {
- return null;
- }
-
- @Override
- public int getBindPort() {
- return 0;
- }
+ public void processContext(
+ ServerContext serverContext, TTransport tTransport, TTransport tTransport1) {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index f8c8839444..d67b6048ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.rest.IoTDBRestServiceCheck;
import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.cq.ContinuousQueryService;
@@ -38,6 +39,8 @@ import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.LocalConfigNode;
import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
+import org.apache.iotdb.db.mpp.buffer.DataBlockService;
+import org.apache.iotdb.db.mpp.schedule.FragmentInstanceScheduler;
import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager;
import org.apache.iotdb.db.protocol.rest.RestService;
import org.apache.iotdb.db.query.udf.service.TemporaryQueryDataFileService;
@@ -48,6 +51,7 @@ import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.service.basic.ServiceProvider;
import org.apache.iotdb.db.service.basic.StandaloneServiceProvider;
import org.apache.iotdb.db.service.metrics.MetricsService;
+import org.apache.iotdb.db.service.thrift.impl.DataNodeTSIServiceImpl;
import org.apache.iotdb.db.sync.receiver.ReceiverService;
import org.apache.iotdb.db.sync.sender.service.SenderService;
import org.apache.iotdb.db.wal.WALManager;
@@ -135,8 +139,21 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(CacheHitRatioMonitor.getInstance());
registerManager.register(CompactionTaskManager.getInstance());
JMXService.registerMBean(getInstance(), mbeanName);
+
+ // in mpp mode we need to start some other services
+ if (IoTDBDescriptor.getInstance().getConfig().isMppMode()) {
+ registerManager.register(StorageEngineV2.getInstance());
+ registerManager.register(DataBlockService.getInstance());
+ registerManager.register(InternalService.getInstance());
+ registerManager.register(FragmentInstanceScheduler.getInstance());
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setRpcImplClassName(DataNodeTSIServiceImpl.class.getName());
+ } else {
+ registerManager.register(StorageEngine.getInstance());
+ }
+
registerManager.register(WALManager.getInstance());
- registerManager.register(StorageEngine.getInstance());
registerManager.register(TemporaryQueryDataFileService.getInstance());
registerManager.register(UDFClassLoaderManager.getInstance());
registerManager.register(UDFRegistrationService.getInstance());
@@ -158,7 +175,9 @@ public class IoTDB implements IoTDBMBean {
logger.info("IoTDB is set up, now may some sgs are not ready, please wait several seconds...");
- while (!StorageEngine.getInstance().isAllSgReady()) {
+ while (IoTDBDescriptor.getInstance().getConfig().isMppMode()
+ ? !StorageEngineV2.getInstance().isAllSgReady()
+ : !StorageEngine.getInstance().isAllSgReady()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
@@ -170,7 +189,11 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(SenderService.getInstance());
registerManager.register(UpgradeSevice.getINSTANCE());
- registerManager.register(SettleService.getINSTANCE());
+ // in mpp mode we temporarily don't start settle service because it uses StorageEngine directly
+ // in itself, but currently we need to use StorageEngineV2 instead of StorageEngine in mpp mode.
+ if (!IoTDBDescriptor.getInstance().getConfig().isMppMode()) {
+ registerManager.register(SettleService.getINSTANCE());
+ }
registerManager.register(TriggerRegistrationService.getInstance());
registerManager.register(ContinuousQueryService.getInstance());
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
index dc0ee3987c..0080857171 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
@@ -27,14 +27,16 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.thrift.ProcessorWithMetrics;
import org.apache.iotdb.db.service.thrift.handler.RPCServiceThriftHandler;
-import org.apache.iotdb.db.service.thrift.impl.TSServiceImpl;
+import org.apache.iotdb.db.service.thrift.impl.TSIEventHandler;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.apache.iotdb.service.rpc.thrift.TSIService.Processor;
+import java.lang.reflect.InvocationTargetException;
+
/** A service to handle jdbc request from client. */
public class RPCService extends ThriftService implements RPCServiceMBean {
- private TSServiceImpl impl;
+ private TSIEventHandler impl;
public static RPCService getInstance() {
return RPCServiceHolder.INSTANCE;
@@ -47,10 +49,12 @@ public class RPCService extends ThriftService implements RPCServiceMBean {
@Override
public void initTProcessor()
- throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+ throws ClassNotFoundException, IllegalAccessException, InstantiationException,
+ NoSuchMethodException, InvocationTargetException {
impl =
- (TSServiceImpl)
+ (TSIEventHandler)
Class.forName(IoTDBDescriptor.getInstance().getConfig().getRpcImplClassName())
+ .getDeclaredConstructor()
.newInstance();
initSyncedServiceImpl(null);
if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
index 7bdf65fd7b..1501402b08 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
@@ -16,7 +16,7 @@
*/
package org.apache.iotdb.db.service.thrift.handler;
-import org.apache.iotdb.db.service.thrift.impl.TSServiceImpl;
+import org.apache.iotdb.db.service.thrift.impl.TSIEventHandler;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.ServerContext;
@@ -24,10 +24,10 @@ import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.transport.TTransport;
public class RPCServiceThriftHandler implements TServerEventHandler {
- private TSServiceImpl serviceImpl;
+ private TSIEventHandler eventHandler;
- public RPCServiceThriftHandler(TSServiceImpl serviceImpl) {
- this.serviceImpl = serviceImpl;
+ public RPCServiceThriftHandler(TSIEventHandler eventHandler) {
+ this.eventHandler = eventHandler;
}
@Override
@@ -39,7 +39,7 @@ public class RPCServiceThriftHandler implements TServerEventHandler {
@Override
public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) {
// release query resources.
- serviceImpl.handleClientExit();
+ eventHandler.handleClientExit();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
new file mode 100644
index 0000000000..f3cfdf5bf2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -0,0 +1,733 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.service.thrift.impl;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.execution.Coordinator;
+import org.apache.iotdb.db.mpp.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.execution.IQueryExecution;
+import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
+import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowsOfOneDeviceStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowsStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
+import org.apache.iotdb.db.service.metrics.MetricsService;
+import org.apache.iotdb.db.service.metrics.Operation;
+import org.apache.iotdb.db.utils.QueryDataSetUtils;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.*;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.iotdb.db.service.basic.ServiceProvider.AUDIT_LOGGER;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.CONFIG;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.CURRENT_RPC_VERSION;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_FREQUENCY_RECORDER;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_TIME_MANAGER;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.SLOW_SQL_LOGGER;
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
+
+public class DataNodeTSIServiceImpl implements TSIEventHandler {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeTSIServiceImpl.class);
+
+ private static final Coordinator COORDINATOR = Coordinator.getInstance();
+
+ private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
+
+ @Override
+ public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
+ IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
+ BasicOpenSessionResp openSessionResp =
+ SESSION_MANAGER.openSession(
+ req.username, req.password, req.zoneId, req.client_protocol, clientVersion);
+ TSStatus tsStatus = RpcUtils.getStatus(openSessionResp.getCode(), openSessionResp.getMessage());
+ TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
+ return resp.setSessionId(openSessionResp.getSessionId());
+ }
+
+ private IoTDBConstant.ClientVersion parseClientVersion(TSOpenSessionReq req) {
+ Map<String, String> configuration = req.configuration;
+ if (configuration != null && configuration.containsKey("version")) {
+ return IoTDBConstant.ClientVersion.valueOf(configuration.get("version"));
+ }
+ return IoTDBConstant.ClientVersion.V_0_12;
+ }
+
+ @Override
+ public TSStatus closeSession(TSCloseSessionReq req) {
+ return new TSStatus(
+ !SESSION_MANAGER.closeSession(req.sessionId)
+ ? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR)
+ : RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+ }
+
+ @Override
+ public TSStatus cancelOperation(TSCancelOperationReq req) {
+ // TODO implement
+ return RpcUtils.getStatus(TSStatusCode.QUERY_NOT_ALLOWED, "Cancellation is not implemented");
+ }
+
+ @Override
+ public TSStatus closeOperation(TSCloseOperationReq req) {
+ return SESSION_MANAGER.closeOperation(
+ req.sessionId, req.queryId, req.statementId, req.isSetStatementId(), req.isSetQueryId());
+ }
+
+ @Override
+ public TSGetTimeZoneResp getTimeZone(long sessionId) {
+ try {
+ ZoneId zoneId = SESSION_MANAGER.getZoneId(sessionId);
+ return new TSGetTimeZoneResp(
+ RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
+ zoneId != null ? zoneId.toString() : "Unknown time zone");
+ } catch (Exception e) {
+ return new TSGetTimeZoneResp(
+ onNPEOrUnexpectedException(
+ e, OperationType.GET_TIME_ZONE, TSStatusCode.GENERATE_TIME_ZONE_ERROR),
+ "Unknown time zone");
+ }
+ }
+
+ @Override
+ public TSStatus setTimeZone(TSSetTimeZoneReq req) {
+ try {
+ SESSION_MANAGER.setTimezone(req.sessionId, req.timeZone);
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.SET_TIME_ZONE, TSStatusCode.SET_TIME_ZONE_ERROR);
+ }
+ }
+
+ @Override
+ public ServerProperties getProperties() {
+ ServerProperties properties = new ServerProperties();
+ properties.setVersion(IoTDBConstant.VERSION);
+ LOGGER.info("IoTDB server version: {}", IoTDBConstant.VERSION);
+ properties.setSupportedTimeAggregationOperations(new ArrayList<>());
+ properties.getSupportedTimeAggregationOperations().add(IoTDBConstant.MAX_TIME);
+ properties.getSupportedTimeAggregationOperations().add(IoTDBConstant.MIN_TIME);
+ properties.setTimestampPrecision(
+ IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision());
+ properties.setMaxConcurrentClientNum(
+ IoTDBDescriptor.getInstance().getConfig().getRpcMaxConcurrentClientNum());
+ properties.setWatermarkSecretKey(
+ IoTDBDescriptor.getInstance().getConfig().getWatermarkSecretKey());
+ properties.setWatermarkBitString(
+ IoTDBDescriptor.getInstance().getConfig().getWatermarkBitString());
+ properties.setWatermarkParamMarkRate(
+ IoTDBDescriptor.getInstance().getConfig().getWatermarkParamMarkRate());
+ properties.setWatermarkParamMaxRightBit(
+ IoTDBDescriptor.getInstance().getConfig().getWatermarkParamMaxRightBit());
+ properties.setIsReadOnly(IoTDBDescriptor.getInstance().getConfig().isReadOnly());
+ properties.setThriftMaxFrameSize(
+ IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize());
+ return properties;
+ }
+
+ @Override
+ public TSStatus setStorageGroup(long sessionId, String storageGroup) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSStatus createTimeseries(TSCreateTimeseriesReq req) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSStatus createAlignedTimeseries(TSCreateAlignedTimeseriesReq req) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSStatus createMultiTimeseries(TSCreateMultiTimeseriesReq req) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSStatus deleteTimeseries(long sessionId, List<String> path) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSStatus deleteStorageGroups(long sessionId, List<String> storageGroup) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) {
+ String statement = req.getStatement();
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+ }
+
+ long startTime = System.currentTimeMillis();
+ Statement s =
+ StatementGenerator.createStatement(
+ statement, SESSION_MANAGER.getZoneId(req.getSessionId()));
+
+ QUERY_FREQUENCY_RECORDER.incrementAndGet();
+ AUDIT_LOGGER.debug("Session {} execute Query: {}", req.sessionId, statement);
+
+ try {
+ long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
+ QueryId id = new QueryId(String.valueOf(queryId));
+ // create and cache dataset
+ ExecutionResult result =
+ COORDINATOR.execute(s, id, SESSION_MANAGER.getSessionInfo(req.sessionId), statement);
+
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new RuntimeException("");
+ }
+
+ IQueryExecution queryExecution = COORDINATOR.getQueryExecution(id);
+
+ TSExecuteStatementResp resp;
+ if (queryExecution.isQuery()) {
+ resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+ resp.setStatus(result.status);
+ resp.setQueryDataSet(
+ QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, req.fetchSize));
+ } else {
+ resp = RpcUtils.getTSExecuteStatementResp(result.status);
+ }
+
+ return resp;
+ } catch (Exception e) {
+ // TODO call the coordinator to release query resource
+ return RpcUtils.getTSExecuteStatementResp(
+ onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT));
+ } finally {
+ addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+ long costTime = System.currentTimeMillis() - startTime;
+ if (costTime >= CONFIG.getSlowQueryThreshold()) {
+ SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
+ }
+ }
+ }
+
+ @Override
+ public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) throws TException {
+ return executeStatement(req);
+ }
+
+ @Override
+ public TSExecuteStatementResp executeUpdateStatement(TSExecuteStatementReq req)
+ throws TException {
+ return executeStatement(req);
+ }
+
+ @Override
+ public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
+ }
+
+ TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
+ TSQueryDataSet result =
+ QueryDataSetUtils.convertTsBlockByFetchSize(
+ COORDINATOR.getQueryExecution(new QueryId(String.valueOf(req.queryId))),
+ req.fetchSize);
+ boolean hasResultSet = result.bufferForTime().limit() != 0;
+
+ resp.setHasResultSet(hasResultSet);
+ resp.setQueryDataSet(result);
+ resp.setIsAlign(true);
+
+ QUERY_TIME_MANAGER.unRegisterQuery(req.queryId, false);
+ return resp;
+
+ } catch (Exception e) {
+ return RpcUtils.getTSFetchResultsResp(onQueryException(e, OperationType.FETCH_RESULTS));
+ }
+ }
+
+ @Override
+ public TSStatus insertRecords(TSInsertRecordsReq req) {
+ long t1 = System.currentTimeMillis();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ if (AUDIT_LOGGER.isDebugEnabled()) {
+ AUDIT_LOGGER.debug(
+ "Session {} insertRecords, first device {}, first time {}",
+ SESSION_MANAGER.getCurrSessionId(),
+ req.prefixPaths.get(0),
+ req.getTimestamps().get(0));
+ }
+
+ // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
+ InsertRowsStatement statement = (InsertRowsStatement) StatementGenerator.createStatement(req);
+
+ // Step 2: call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ COORDINATOR.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "");
+
+ // TODO(INSERT) do this check in analyze
+ // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+ // req.getSessionId());
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ } finally {
+ addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+ }
+ }
+
+ @Override
+ public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
+ long t1 = System.currentTimeMillis();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ if (AUDIT_LOGGER.isDebugEnabled()) {
+ AUDIT_LOGGER.debug(
+ "Session {} insertRecords, device {}, first time {}",
+ SESSION_MANAGER.getCurrSessionId(),
+ req.prefixPath,
+ req.getTimestamps().get(0));
+ }
+
+ // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
+ InsertRowsOfOneDeviceStatement statement =
+ (InsertRowsOfOneDeviceStatement) StatementGenerator.createStatement(req);
+
+ // Step 2: call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ COORDINATOR.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "");
+
+ // TODO(INSERT) do this check in analyze
+ // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+ // req.getSessionId());
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ } finally {
+ addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+ }
+ }
+
+ @Override
+ public TSStatus insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq req) {
+ long t1 = System.currentTimeMillis();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ if (AUDIT_LOGGER.isDebugEnabled()) {
+ AUDIT_LOGGER.debug(
+ "Session {} insertRecords, device {}, first time {}",
+ SESSION_MANAGER.getCurrSessionId(),
+ req.prefixPath,
+ req.getTimestamps().get(0));
+ }
+
+ // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
+ InsertRowsOfOneDeviceStatement statement =
+ (InsertRowsOfOneDeviceStatement) StatementGenerator.createStatement(req);
+
+ // Step 2: call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ COORDINATOR.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "");
+
+ // TODO(INSERT) do this check in analyze
+ // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+ // req.getSessionId());
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ } finally {
+ addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+ }
+ }
+
+ @Override
+ public TSStatus insertRecord(TSInsertRecordReq req) {
+ long t1 = System.currentTimeMillis();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ AUDIT_LOGGER.debug(
+ "Session {} insertRecord, device {}, time {}",
+ SESSION_MANAGER.getCurrSessionId(),
+ req.getPrefixPath(),
+ req.getTimestamp());
+
+ InsertRowStatement statement = (InsertRowStatement) StatementGenerator.createStatement(req);
+
+ // Step 2: call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ COORDINATOR.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "");
+
+ // TODO(INSERT) do this check in analyze
+ // TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
+ // req.getSessionId());
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ } finally {
+ addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+ }
+ }
+
+ @Override
+ public TSStatus insertTablets(TSInsertTabletsReq req) {
+ long t1 = System.currentTimeMillis();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
+ InsertMultiTabletsStatement statement =
+ (InsertMultiTabletsStatement) StatementGenerator.createStatement(req);
+ // Step 2: call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ COORDINATOR.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "");
+
+ // TODO(INSERT) do this check in analyze
+ // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+ // req.getSessionId());
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ } finally {
+ addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+ }
+ }
+
+ @Override
+ public TSStatus insertTablet(TSInsertTabletReq req) {
+ long t1 = System.currentTimeMillis();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ // Step 1: TODO(INSERT) transfer from TSInsertTabletReq to Statement
+ InsertTabletStatement statement =
+ (InsertTabletStatement) StatementGenerator.createStatement(req);
+
+ // Step 2: call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ COORDINATOR.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "");
+
+ // TODO(INSERT) do this check in analyze
+ // TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
+ // req.getSessionId());
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ } finally {
+ addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+ }
+ }
+
+ @Override
+ public TSStatus insertStringRecords(TSInsertStringRecordsReq req) {
+ long t1 = System.currentTimeMillis();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ if (AUDIT_LOGGER.isDebugEnabled()) {
+ AUDIT_LOGGER.debug(
+ "Session {} insertRecords, first device {}, first time {}",
+ SESSION_MANAGER.getCurrSessionId(),
+ req.prefixPaths.get(0),
+ req.getTimestamps().get(0));
+ }
+
+ InsertRowsStatement statement = (InsertRowsStatement) StatementGenerator.createStatement(req);
+
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ COORDINATOR.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "");
+
+ // TODO(INSERT) do this check in analyze
+ // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+ // req.getSessionId());
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ } finally {
+ addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+ }
+ }
+
+ @Override
+ public TSStatus testInsertTablet(TSInsertTabletReq req) {
+ LOGGER.debug("Test insert batch request receive.");
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ @Override
+ public TSStatus testInsertTablets(TSInsertTabletsReq req) {
+ LOGGER.debug("Test insert batch request receive.");
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ @Override
+ public TSStatus testInsertRecord(TSInsertRecordReq req) {
+ LOGGER.debug("Test insert row request receive.");
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ @Override
+ public TSStatus testInsertStringRecord(TSInsertStringRecordReq req) {
+ LOGGER.debug("Test insert string record request receive.");
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ @Override
+ public TSStatus testInsertRecords(TSInsertRecordsReq req) {
+ LOGGER.debug("Test insert row in batch request receive.");
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ @Override
+ public TSStatus testInsertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
+ LOGGER.debug("Test insert rows in batch request receive.");
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ @Override
+ public TSStatus testInsertStringRecords(TSInsertStringRecordsReq req) {
+ LOGGER.debug("Test insert string records request receive.");
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ @Override
+ public TSStatus deleteData(TSDeleteDataReq req) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSExecuteStatementResp executeRawDataQuery(TSRawDataQueryReq req) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSExecuteStatementResp executeLastDataQuery(TSLastDataQueryReq req) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long requestStatementId(long sessionId) {
+ return SESSION_MANAGER.requestStatementId(sessionId);
+ }
+
+ @Override
+ public TSStatus createSchemaTemplate(TSCreateSchemaTemplateReq req) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSStatus appendSchemaTemplate(TSAppendSchemaTemplateReq req) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSStatus pruneSchemaTemplate(TSPruneSchemaTemplateReq req) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSQueryTemplateResp querySchemaTemplate(TSQueryTemplateReq req) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSStatus setSchemaTemplate(TSSetSchemaTemplateReq req) throws TException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSStatus unsetSchemaTemplate(TSUnsetSchemaTemplateReq req) throws TException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSStatus dropSchemaTemplate(TSDropSchemaTemplateReq req) throws TException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSStatus insertStringRecord(TSInsertStringRecordReq req) {
+ long t1 = System.currentTimeMillis();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ AUDIT_LOGGER.debug(
+ "Session {} insertRecord, device {}, time {}",
+ SESSION_MANAGER.getCurrSessionId(),
+ req.getPrefixPath(),
+ req.getTimestamp());
+
+ InsertRowStatement statement = (InsertRowStatement) StatementGenerator.createStatement(req);
+
+ // Step 2: call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ COORDINATOR.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "");
+
+ // TODO(INSERT) do this check in analyze
+ // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+ // req.getSessionId());
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ } finally {
+ addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+ }
+ }
+
+ private TSExecuteStatementResp createResponse(DatasetHeader header, long queryId) {
+ TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+ resp.setColumnNameIndexMap(header.getColumnNameIndexMap());
+ // TODO deal with the sg name here
+ resp.setSgColumns(new ArrayList<>());
+ resp.setColumns(header.getRespColumns());
+ resp.setDataTypeList(header.getRespDataTypeList());
+ resp.setAliasColumns(header.getRespAliasColumns());
+ resp.setIgnoreTimeStamp(header.isIgnoreTimestamp());
+ resp.setQueryId(queryId);
+ return resp;
+ }
+
+ private TSStatus getNotLoggedInStatus() {
+ return RpcUtils.getStatus(
+ TSStatusCode.NOT_LOGIN_ERROR,
+ "Log in failed. Either you are not authorized or the session has timed out.");
+ }
+
+ /** Add stat of operation into metrics */
+ private void addOperationLatency(Operation operation, long startTime) {
+ if (CONFIG.isEnablePerformanceStat()) {
+ MetricsService.getInstance()
+ .getMetricManager()
+ .histogram(
+ System.currentTimeMillis() - startTime,
+ "operation_histogram",
+ MetricLevel.IMPORTANT,
+ "name",
+ operation.getName());
+ MetricsService.getInstance()
+ .getMetricManager()
+ .count(1, "operation_count", MetricLevel.IMPORTANT, "name", operation.getName());
+ }
+ }
+
+ @Override
+ public void handleClientExit() {
+ Long sessionId = SESSION_MANAGER.getCurrSessionId();
+ if (sessionId != null) {
+ TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
+ closeSession(req);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSIEventHandler.java
similarity index 80%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
copy to server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSIEventHandler.java
index 3f9fa05ea8..0a4887ffac 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSIEventHandler.java
@@ -16,14 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.iotdb.db.service.thrift.impl;
-package org.apache.iotdb.db.mpp.execution;
+import org.apache.iotdb.service.rpc.thrift.TSIService;
-public interface IQueryExecution {
-
- void start();
-
- void stop();
-
- ExecutionResult getStatus();
+public interface TSIEventHandler extends TSIService.Iface {
+ void handleClientExit();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 6291b14875..5fab670693 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -37,16 +37,17 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.metadata.template.TemplateQueryType;
-import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.Coordinator;
-import org.apache.iotdb.db.mpp.execution.ExecutionResult;
-import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
-import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
-import org.apache.iotdb.db.mpp.sql.statement.crud.*;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.*;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletsPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.SelectIntoPlan;
+import org.apache.iotdb.db.qp.physical.crud.UDFPlan;
import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
@@ -79,47 +80,7 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.ServerProperties;
-import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
-import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
-import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
-import org.apache.iotdb.service.rpc.thrift.TSIService;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
-import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
-import org.apache.iotdb.service.rpc.thrift.TSPruneSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateResp;
-import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
-import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
-import org.apache.iotdb.service.rpc.thrift.TSTracingInfo;
-import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.*;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -159,13 +120,11 @@ import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNonQueryException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
/** Thrift RPC implementation at server side. */
-public class TSServiceImpl implements TSIService.Iface {
+public class TSServiceImpl implements TSIEventHandler {
- private static final Coordinator coordinator = Coordinator.getInstance();
+ private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
- public static SessionManager SESSION_MANAGER = SessionManager.getInstance();
-
- protected class QueryTask implements Callable<TSExecuteStatementResp> {
+ private class QueryTask implements Callable<TSExecuteStatementResp> {
private PhysicalPlan plan;
private final long queryStartTime;
@@ -241,7 +200,7 @@ public class TSServiceImpl implements TSIService.Iface {
}
}
- protected class FetchResultsTask implements Callable<TSFetchResultsResp> {
+ private class FetchResultsTask implements Callable<TSFetchResultsResp> {
private final long sessionId;
private final long queryId;
@@ -1117,6 +1076,7 @@ public class TSServiceImpl implements TSIService.Iface {
.setQueryId(SESSION_MANAGER.requestQueryId(false));
}
+ @Override
public void handleClientExit() {
Long sessionId = SESSION_MANAGER.getCurrSessionId();
if (sessionId != null) {
@@ -1228,46 +1188,6 @@ public class TSServiceImpl implements TSIService.Iface {
allCheckSuccess, tsStatus, insertRowsPlan.getResults(), req.prefixPaths.size());
}
- public TSStatus insertRecordsV2(TSInsertRecordsReq req) {
- long t1 = System.currentTimeMillis();
- try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
- return getNotLoggedInStatus();
- }
-
- if (AUDIT_LOGGER.isDebugEnabled()) {
- AUDIT_LOGGER.debug(
- "Session {} insertRecords, first device {}, first time {}",
- SESSION_MANAGER.getCurrSessionId(),
- req.prefixPaths.get(0),
- req.getTimestamps().get(0));
- }
-
- // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
- InsertRowsStatement statement = (InsertRowsStatement) StatementGenerator.createStatement(req);
-
- // Step 2: call the coordinator
- long queryId = SESSION_MANAGER.requestQueryId(false);
- ExecutionResult result =
- coordinator.execute(
- statement,
- new QueryId(String.valueOf(queryId)),
- QueryType.WRITE,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
-
- // TODO(INSERT) do this check in analyze
- // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
- // req.getSessionId());
- return result.status;
- } catch (Exception e) {
- return onNPEOrUnexpectedException(
- e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
- } finally {
- addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
- }
- }
-
private TSStatus judgeFinalTsStatus(
boolean allCheckSuccess,
TSStatus executeTsStatus,
@@ -1335,47 +1255,6 @@ public class TSServiceImpl implements TSIService.Iface {
return resp;
}
- public TSStatus insertRecordsOfOneDeviceV2(TSInsertRecordsOfOneDeviceReq req) {
- long t1 = System.currentTimeMillis();
- try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
- return getNotLoggedInStatus();
- }
-
- if (AUDIT_LOGGER.isDebugEnabled()) {
- AUDIT_LOGGER.debug(
- "Session {} insertRecords, device {}, first time {}",
- SESSION_MANAGER.getCurrSessionId(),
- req.prefixPath,
- req.getTimestamps().get(0));
- }
-
- // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
- InsertRowsOfOneDeviceStatement statement =
- (InsertRowsOfOneDeviceStatement) StatementGenerator.createStatement(req);
-
- // Step 2: call the coordinator
- long queryId = SESSION_MANAGER.requestQueryId(false);
- ExecutionResult result =
- coordinator.execute(
- statement,
- new QueryId(String.valueOf(queryId)),
- QueryType.WRITE,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
-
- // TODO(INSERT) do this check in analyze
- // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
- // req.getSessionId());
- return result.status;
- } catch (Exception e) {
- return onNPEOrUnexpectedException(
- e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
- } finally {
- addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
- }
- }
-
@Override
public TSStatus insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq req) {
if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
@@ -1434,47 +1313,6 @@ public class TSServiceImpl implements TSIService.Iface {
allCheckSuccess, tsStatus, insertRowsPlan.getResults(), req.timestamps.size());
}
- public TSStatus insertStringRecordsOfOneDeviceV2(TSInsertStringRecordsOfOneDeviceReq req) {
- long t1 = System.currentTimeMillis();
- try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
- return getNotLoggedInStatus();
- }
-
- if (AUDIT_LOGGER.isDebugEnabled()) {
- AUDIT_LOGGER.debug(
- "Session {} insertRecords, device {}, first time {}",
- SESSION_MANAGER.getCurrSessionId(),
- req.prefixPath,
- req.getTimestamps().get(0));
- }
-
- // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
- InsertRowsOfOneDeviceStatement statement =
- (InsertRowsOfOneDeviceStatement) StatementGenerator.createStatement(req);
-
- // Step 2: call the coordinator
- long queryId = SESSION_MANAGER.requestQueryId(false);
- ExecutionResult result =
- coordinator.execute(
- statement,
- new QueryId(String.valueOf(queryId)),
- QueryType.WRITE,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
-
- // TODO(INSERT) do this check in analyze
- // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
- // req.getSessionId());
- return result.status;
- } catch (Exception e) {
- return onNPEOrUnexpectedException(
- e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
- } finally {
- addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
- }
- }
-
@Override
public TSStatus insertStringRecords(TSInsertStringRecordsReq req) {
if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
@@ -1528,44 +1366,6 @@ public class TSServiceImpl implements TSIService.Iface {
allCheckSuccess, tsStatus, insertRowsPlan.getResults(), req.prefixPaths.size());
}
- public TSStatus insertStringRecordsV2(TSInsertStringRecordsReq req) {
- long t1 = System.currentTimeMillis();
- try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
- return getNotLoggedInStatus();
- }
-
- if (AUDIT_LOGGER.isDebugEnabled()) {
- AUDIT_LOGGER.debug(
- "Session {} insertRecords, first device {}, first time {}",
- SESSION_MANAGER.getCurrSessionId(),
- req.prefixPaths.get(0),
- req.getTimestamps().get(0));
- }
-
- InsertRowsStatement statement = (InsertRowsStatement) StatementGenerator.createStatement(req);
-
- long queryId = SESSION_MANAGER.requestQueryId(false);
- ExecutionResult result =
- coordinator.execute(
- statement,
- new QueryId(String.valueOf(queryId)),
- QueryType.WRITE,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
-
- // TODO(INSERT) do this check in analyze
- // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
- // req.getSessionId());
- return result.status;
- } catch (Exception e) {
- return onNPEOrUnexpectedException(
- e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
- } finally {
- addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
- }
- }
-
private void addMeasurementAndValue(
InsertRowPlan insertRowPlan, List<String> measurements, List<String> values) {
List<String> newMeasurements = new ArrayList<>(measurements.size());
@@ -1656,43 +1456,6 @@ public class TSServiceImpl implements TSIService.Iface {
}
}
- public TSStatus insertRecordV2(TSInsertRecordReq req) {
- long t1 = System.currentTimeMillis();
- try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
- return getNotLoggedInStatus();
- }
-
- AUDIT_LOGGER.debug(
- "Session {} insertRecord, device {}, time {}",
- SESSION_MANAGER.getCurrSessionId(),
- req.getPrefixPath(),
- req.getTimestamp());
-
- InsertRowStatement statement = (InsertRowStatement) StatementGenerator.createStatement(req);
-
- // Step 2: call the coordinator
- long queryId = SESSION_MANAGER.requestQueryId(false);
- ExecutionResult result =
- coordinator.execute(
- statement,
- new QueryId(String.valueOf(queryId)),
- QueryType.WRITE,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
-
- // TODO(INSERT) do this check in analyze
- // TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
- // req.getSessionId());
- return result.status;
- } catch (Exception e) {
- return onNPEOrUnexpectedException(
- e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
- } finally {
- addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
- }
- }
-
@Override
public TSStatus insertStringRecord(TSInsertStringRecordReq req) {
try {
@@ -1724,43 +1487,6 @@ public class TSServiceImpl implements TSIService.Iface {
}
}
- public TSStatus insertStringRecordV2(TSInsertStringRecordReq req) {
- long t1 = System.currentTimeMillis();
- try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
- return getNotLoggedInStatus();
- }
-
- AUDIT_LOGGER.debug(
- "Session {} insertRecord, device {}, time {}",
- SESSION_MANAGER.getCurrSessionId(),
- req.getPrefixPath(),
- req.getTimestamp());
-
- InsertRowStatement statement = (InsertRowStatement) StatementGenerator.createStatement(req);
-
- // Step 2: call the coordinator
- long queryId = SESSION_MANAGER.requestQueryId(false);
- ExecutionResult result =
- coordinator.execute(
- statement,
- new QueryId(String.valueOf(queryId)),
- QueryType.WRITE,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
-
- // TODO(INSERT) do this check in analyze
- // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
- // req.getSessionId());
- return result.status;
- } catch (Exception e) {
- return onNPEOrUnexpectedException(
- e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
- } finally {
- addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
- }
- }
-
@Override
public TSStatus deleteData(TSDeleteDataReq req) {
try {
@@ -1819,39 +1545,6 @@ public class TSServiceImpl implements TSIService.Iface {
}
}
- public TSStatus insertTabletV2(TSInsertTabletReq req) {
- long t1 = System.currentTimeMillis();
- try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
- return getNotLoggedInStatus();
- }
-
- // Step 1: TODO(INSERT) transfer from TSInsertTabletReq to Statement
- InsertTabletStatement statement =
- (InsertTabletStatement) StatementGenerator.createStatement(req);
-
- // Step 2: call the coordinator
- long queryId = SESSION_MANAGER.requestQueryId(false);
- ExecutionResult result =
- coordinator.execute(
- statement,
- new QueryId(String.valueOf(queryId)),
- QueryType.WRITE,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
-
- // TODO(INSERT) do this check in analyze
- // TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
- // req.getSessionId());
- return result.status;
- } catch (Exception e) {
- return onNPEOrUnexpectedException(
- e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
- } finally {
- addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
- }
- }
-
@Override
public TSStatus insertTablets(TSInsertTabletsReq req) {
long t1 = System.currentTimeMillis();
@@ -1874,38 +1567,6 @@ public class TSServiceImpl implements TSIService.Iface {
}
}
- public TSStatus insertTabletsV2(TSInsertTabletsReq req) {
- long t1 = System.currentTimeMillis();
- try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
- return getNotLoggedInStatus();
- }
-
- // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
- InsertMultiTabletsStatement statement =
- (InsertMultiTabletsStatement) StatementGenerator.createStatement(req);
- // Step 2: call the coordinator
- long queryId = SESSION_MANAGER.requestQueryId(false);
- ExecutionResult result =
- coordinator.execute(
- statement,
- new QueryId(String.valueOf(queryId)),
- QueryType.WRITE,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
-
- // TODO(INSERT) do this check in analyze
- // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
- // req.getSessionId());
- return result.status;
- } catch (Exception e) {
- return onNPEOrUnexpectedException(
- e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
- } finally {
- addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
- }
- }
-
private InsertTabletPlan constructInsertTabletPlan(TSInsertTabletsReq req, int i)
throws IllegalPathException {
InsertTabletPlan insertTabletPlan =
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 29efa1a4c7..506f180049 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.utils;
-import org.apache.iotdb.db.mpp.execution.QueryExecution;
+import org.apache.iotdb.db.mpp.execution.IQueryExecution;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -177,7 +177,7 @@ public class QueryDataSetUtils {
}
public static TSQueryDataSet convertTsBlockByFetchSize(
- QueryExecution queryExecution, int fetchSize) throws IOException {
+ IQueryExecution queryExecution, int fetchSize) throws IOException {
int columnNum = queryExecution.getOutputValueColumnCount();
TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
// one time column and each value column has an actual value buffer and a bitmap value to
@@ -194,6 +194,9 @@ public class QueryDataSetUtils {
int[] valueOccupation = new int[columnNum];
while (rowCount < fetchSize && queryExecution.hasNextResult()) {
TsBlock tsBlock = queryExecution.getBatchResult();
+ if (tsBlock == null) {
+ break;
+ }
int currentCount = tsBlock.getPositionCount();
// serialize time column
for (int i = 0; i < currentCount; i++) {
@@ -220,7 +223,7 @@ public class QueryDataSetUtils {
dataOutputStream.writeInt(column.getInt(i));
valueOccupation[k] += 4;
}
- if (i % 8 == 0) {
+ if (i != 0 && i % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmap);
// we should clear the bitmap every 8 points
bitmap = 0;
@@ -236,7 +239,7 @@ public class QueryDataSetUtils {
dataOutputStream.writeLong(column.getLong(i));
valueOccupation[k] += 8;
}
- if (i % 8 == 0) {
+ if (i != 0 && i % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmap);
// we should clear the bitmap every 8 points
bitmap = 0;
@@ -252,7 +255,7 @@ public class QueryDataSetUtils {
dataOutputStream.writeFloat(column.getFloat(i));
valueOccupation[k] += 4;
}
- if (i % 8 == 0) {
+ if (i != 0 && i % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmap);
// we should clear the bitmap every 8 points
bitmap = 0;
@@ -268,7 +271,7 @@ public class QueryDataSetUtils {
dataOutputStream.writeDouble(column.getDouble(i));
valueOccupation[k] += 8;
}
- if (i % 8 == 0) {
+ if (i != 0 && i % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmap);
// we should clear the bitmap every 8 points
bitmap = 0;
@@ -284,7 +287,7 @@ public class QueryDataSetUtils {
dataOutputStream.writeBoolean(column.getBoolean(i));
valueOccupation[k] += 1;
}
- if (i % 8 == 0) {
+ if (i != 0 && i % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmap);
// we should clear the bitmap every 8 points
bitmap = 0;
@@ -302,7 +305,7 @@ public class QueryDataSetUtils {
dataOutputStream.write(binary.getValues());
valueOccupation[k] = valueOccupation[k] + 4 + binary.getLength();
}
- if (i % 8 == 0) {
+ if (i != 0 && i % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmap);
// we should clear the bitmap every 8 points
bitmap = 0;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index e428970fb6..cbd56cc790 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -212,8 +212,7 @@ public class DistributionPlannerTest {
Analysis analysis = constructAnalysis();
- MPPQueryContext context =
- new MPPQueryContext("", queryId, null, QueryType.READ, new Endpoint());
+ MPPQueryContext context = new MPPQueryContext("", queryId, null, new Endpoint());
DistributionPlanner planner =
new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
PlanNode rootAfterRewrite = planner.rewriteSource();
@@ -250,8 +249,7 @@ public class DistributionPlannerTest {
Analysis analysis = constructAnalysis();
- MPPQueryContext context =
- new MPPQueryContext("", queryId, null, QueryType.READ, new Endpoint());
+ MPPQueryContext context = new MPPQueryContext("", queryId, null, new Endpoint());
DistributionPlanner planner =
new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
DistributedQueryPlan plan = planner.planFragments();
@@ -276,8 +274,8 @@ public class DistributionPlannerTest {
Analysis analysis = constructAnalysis();
- MPPQueryContext context =
- new MPPQueryContext("", queryId, null, QueryType.WRITE, new Endpoint());
+ MPPQueryContext context = new MPPQueryContext("", queryId, null, new Endpoint());
+ context.setQueryType(QueryType.WRITE);
DistributionPlanner planner =
new DistributionPlanner(analysis, new LogicalQueryPlan(context, insertRowNode));
DistributedQueryPlan plan = planner.planFragments();
@@ -318,8 +316,8 @@ public class DistributionPlannerTest {
Analysis analysis = constructAnalysis();
- MPPQueryContext context =
- new MPPQueryContext("", queryId, null, QueryType.WRITE, new Endpoint());
+ MPPQueryContext context = new MPPQueryContext("", queryId, null, new Endpoint());
+ context.setQueryType(QueryType.WRITE);
DistributionPlanner planner =
new DistributionPlanner(analysis, new LogicalQueryPlan(context, node));
DistributedQueryPlan plan = planner.planFragments();
@@ -405,7 +403,10 @@ public class DistributionPlannerTest {
analysis.setDataPartitionInfo(dataPartition);
// construct schema partition
- SchemaPartition schemaPartition = new SchemaPartition();
+ SchemaPartition schemaPartition =
+ new SchemaPartition(
+ IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+ IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> schemaPartitionMap = new HashMap<>();
RegionReplicaSet schemaRegion1 =
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
index 0931e9faf4..ca4d50f542 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.common.SessionInfo;
import org.apache.iotdb.db.mpp.execution.QueryExecution;
-import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
@@ -49,8 +48,7 @@ public class QueryPlannerTest {
QueryExecution queryExecution =
new QueryExecution(
stmt,
- new MPPQueryContext(
- querySql, new QueryId("query1"), new SessionInfo(), QueryType.READ, new Endpoint()),
+ new MPPQueryContext(querySql, new QueryId("query1"), new SessionInfo(), new Endpoint()),
IoTDBThreadPoolFactory.newSingleThreadExecutor("test_query"),
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("test_query_scheduled"));
queryExecution.doLogicalPlan();