You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/12 11:18:54 UTC
[iotdb] branch master updated: [IOTDB-2803] Implement create timeseries metadata operation through consensus layer in MPP framework (#5403)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 53089cee1f [IOTDB-2803] Implement create timeseries metadata operation through consensus layer in MPP framework (#5403)
53089cee1f is described below
commit 53089cee1fcd98558c3709d39a68ca87ca96248e
Author: Yifu Zhou <ef...@outlook.com>
AuthorDate: Tue Apr 12 19:18:48 2022 +0800
[IOTDB-2803] Implement create timeseries metadata operation through consensus layer in MPP framework (#5403)
As our talk before, the PlanNode which is used to execute WRITE operation need to implement a method splitByPartition(). We can merge this PR first and do the work in next PR
---
confignode/pom.xml | 5 +
.../consensus/response/DataPartitionDataSet.java | 3 +-
.../consensus/response/SchemaPartitionDataSet.java | 2 +-
.../iotdb/confignode/manager/RegionManager.java | 2 +-
.../persistence/RegionInfoPersistence.java | 13 +-
.../physical/SerializeDeserializeUT.java | 6 +-
.../iotdb/consensus/ratis/RequestMessage.java | 1 +
.../iotdb/commons/cluster/DataNodeLocation.java | 6 +-
.../org/apache/iotdb/commons/cluster/Endpoint.java | 6 +-
.../apache/iotdb/commons/conf/IoTDBConstant.java | 1 +
.../iotdb/commons/partition/RegionReplicaSet.java | 41 +++---
.../apache/iotdb/commons/utils/CommonUtils.java | 4 +-
server/pom.xml | 5 +
.../iotdb/db/consensus/ConsensusManager.java | 72 +++++++++++
.../consensus/statemachine/BaseStateMachine.java | 12 +-
.../statemachine/SchemaRegionStateMachine.java | 13 +-
.../iotdb/db/metadata/Executor/SchemaVisitor.java | 55 ++++++++
.../apache/iotdb/db/mpp/common/PlanFragmentId.java | 10 +-
.../org/apache/iotdb/db/mpp/common/QueryId.java | 8 +-
.../scheduler/SimpleFragInstanceDispatcher.java | 4 +-
.../db/mpp/sql/planner/plan/FragmentInstance.java | 53 ++++----
.../db/mpp/sql/planner/plan/PlanFragment.java | 5 +-
.../plan/SimpleFragmentParallelPlanner.java | 2 +-
.../db/mpp/sql/planner/plan/node/PlanNode.java | 6 +
.../db/mpp/sql/planner/plan/node/PlanNodeType.java | 3 +-
.../db/mpp/sql/planner/plan/node/PlanNodeUtil.java | 3 +
.../db/mpp/sql/planner/plan/node/PlanVisitor.java | 5 +
.../node/metedata/write/AlterTimeSeriesNode.java | 37 ++++++
.../write/CreateAlignedTimeSeriesNode.java | 22 ++++
.../node/metedata/write/CreateTimeSeriesNode.java | 137 +++++++++++++++++++-
.../planner/plan/node/process/DeviceMergeNode.java | 3 +-
.../planner/plan/node/process/ExchangeNode.java | 3 +-
.../iotdb/db/service/InternalServiceImpl.java | 72 ++++++++---
.../db/mpp/sql/plan/FragmentInstanceSerdeTest.java | 9 +-
.../sql/plan/node/PlanNodeDeserializeHelper.java | 3 +-
.../metadata/read/ShowDevicesNodeSerdeTest.java | 3 +-
.../plan/node/sink/FragmentSinkNodeSerdeTest.java | 3 +-
.../iotdb/db/service/InternalServiceImplTest.java | 138 +++++++++++++++++++++
38 files changed, 659 insertions(+), 117 deletions(-)
diff --git a/confignode/pom.xml b/confignode/pom.xml
index 24fecb0a8e..ff00b99f39 100644
--- a/confignode/pom.xml
+++ b/confignode/pom.xml
@@ -75,6 +75,11 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>3.0.2</version>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
index 4de9443c5e..91397f7ade 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
@@ -99,7 +99,8 @@ public class DataPartitionDataSet implements DataSet {
TRegionReplicaSet tRegionReplicaSet = new TRegionReplicaSet();
// Set TRegionReplicaSet's RegionId
- tRegionReplicaSet.setRegionId(regionReplicaSet.getId().getId());
+ tRegionReplicaSet.setRegionId(
+ regionReplicaSet.getConsensusGroupId().getId());
// Set TRegionReplicaSet's GroupType
tRegionReplicaSet.setGroupType("DataRegion");
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java
index 5c3e95ea3a..bef6f975b1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java
@@ -72,7 +72,7 @@ public class SchemaPartitionDataSet implements DataSet {
seriesPartitionSlotRegionReplicaSetMap.forEach(
((seriesPartitionSlot, regionReplicaSet) -> {
TRegionReplicaSet regionMessage = new TRegionReplicaSet();
- regionMessage.setRegionId(regionReplicaSet.getId().getId());
+ regionMessage.setRegionId(regionReplicaSet.getConsensusGroupId().getId());
List<EndPoint> endPointList = new ArrayList<>();
regionReplicaSet
.getDataNodeList()
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java
index f45591cffb..86e6f31774 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java
@@ -120,7 +120,7 @@ public class RegionManager {
case DataRegion:
consensusGroupId = new DataRegionId(regionInfoPersistence.generateNextRegionGroupId());
}
- regionReplicaSet.setId(consensusGroupId);
+ regionReplicaSet.setConsensusGroupId(consensusGroupId);
regionReplicaSet.setDataNodeList(onlineDataNodes.subList(0, regionReplicaCount));
plan.addRegion(regionReplicaSet);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java
index e7f429b566..620814a7e3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java
@@ -106,12 +106,13 @@ public class RegionInfoPersistence {
StorageGroupSchema schema = storageGroupsMap.get(plan.getStorageGroup());
for (RegionReplicaSet regionReplicaSet : plan.getRegionReplicaSets()) {
- nextRegionGroupId = Math.max(nextRegionGroupId, regionReplicaSet.getId().getId());
- regionMap.put(regionReplicaSet.getId(), regionReplicaSet);
- if (regionReplicaSet.getId() instanceof DataRegionId) {
- schema.addDataRegionGroup(regionReplicaSet.getId());
- } else if (regionReplicaSet.getId() instanceof SchemaRegionId) {
- schema.addSchemaRegionGroup(regionReplicaSet.getId());
+ nextRegionGroupId =
+ Math.max(nextRegionGroupId, regionReplicaSet.getConsensusGroupId().getId());
+ regionMap.put(regionReplicaSet.getConsensusGroupId(), regionReplicaSet);
+ if (regionReplicaSet.getConsensusGroupId() instanceof DataRegionId) {
+ schema.addDataRegionGroup(regionReplicaSet.getConsensusGroupId());
+ } else if (regionReplicaSet.getConsensusGroupId() instanceof SchemaRegionId) {
+ schema.addSchemaRegionGroup(regionReplicaSet.getConsensusGroupId());
}
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/physical/SerializeDeserializeUT.java b/confignode/src/test/java/org/apache/iotdb/confignode/physical/SerializeDeserializeUT.java
index 37262c7f14..0e33e3eaf0 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/physical/SerializeDeserializeUT.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/physical/SerializeDeserializeUT.java
@@ -94,12 +94,12 @@ public class SerializeDeserializeUT {
CreateRegionsPlan plan0 = new CreateRegionsPlan();
plan0.setStorageGroup("sg");
RegionReplicaSet dataRegionSet = new RegionReplicaSet();
- dataRegionSet.setId(new DataRegionId(0));
+ dataRegionSet.setConsensusGroupId(new DataRegionId(0));
dataRegionSet.setDataNodeList(
Collections.singletonList(new DataNodeLocation(0, new Endpoint("0.0.0.0", 6667))));
plan0.addRegion(dataRegionSet);
RegionReplicaSet schemaRegionSet = new RegionReplicaSet();
- schemaRegionSet.setId(new SchemaRegionId(1));
+ schemaRegionSet.setConsensusGroupId(new SchemaRegionId(1));
schemaRegionSet.setDataNodeList(
Collections.singletonList(new DataNodeLocation(0, new Endpoint("0.0.0.0", 6667))));
plan0.addRegion(schemaRegionSet);
@@ -125,7 +125,7 @@ public class SerializeDeserializeUT {
SeriesPartitionSlot seriesPartitionSlot = new SeriesPartitionSlot(10);
TimePartitionSlot timePartitionSlot = new TimePartitionSlot(100);
RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
- regionReplicaSet.setId(new DataRegionId(0));
+ regionReplicaSet.setConsensusGroupId(new DataRegionId(0));
regionReplicaSet.setDataNodeList(
Collections.singletonList(new DataNodeLocation(0, new Endpoint("0.0.0.0", 6667))));
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
index feb42b5f25..050358fdb2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
@@ -59,6 +59,7 @@ public class RequestMessage implements Message {
// TODO Pooling
ByteBuffer byteBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
actualRequest.serializeRequest(byteBuffer);
+ byteBuffer.flip();
serializedContent = ByteString.copyFrom(byteBuffer);
byteBuffer.flip();
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DataNodeLocation.java b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DataNodeLocation.java
index e81af759b3..6c3d762a2c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DataNodeLocation.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DataNodeLocation.java
@@ -50,10 +50,8 @@ public class DataNodeLocation {
endPoint.serializeImpl(buffer);
}
- public void deserializeImpl(ByteBuffer buffer) {
- dataNodeId = buffer.getInt();
- endPoint = new Endpoint();
- endPoint.deserializeImpl(buffer);
+ public static DataNodeLocation deserializeImpl(ByteBuffer buffer) {
+ return new DataNodeLocation(buffer.getInt(), Endpoint.deserializeImpl(buffer));
}
@Override
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/Endpoint.java b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/Endpoint.java
index 25d7b84d0f..b5c15cc85f 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/Endpoint.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/Endpoint.java
@@ -61,13 +61,11 @@ public class Endpoint {
buffer.putInt(port);
}
- public void deserializeImpl(ByteBuffer buffer) {
+ public static Endpoint deserializeImpl(ByteBuffer buffer) {
int length = buffer.getInt();
byte[] bytes = new byte[length];
buffer.get(bytes, 0, length);
- ip = new String(bytes, 0, length);
-
- port = buffer.getInt();
+ return new Endpoint(new String(bytes, 0, length), buffer.getInt());
}
@Override
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index de04e6e0de..b7f1fe4e58 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -143,6 +143,7 @@ public class IoTDBConstant {
public static final String UNSEQUENCE_FLODER_NAME = "unsequence";
public static final String FILE_NAME_SEPARATOR = "-";
public static final String UPGRADE_FOLDER_NAME = "upgrade";
+ public static final String CONSENSUS_FOLDER_NAME = "consensus";
// system folder name
public static final String SYSTEM_FOLDER_NAME = "system";
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
index 115c4406c4..dac39d2d7c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
@@ -28,13 +28,13 @@ import java.util.List;
import java.util.Objects;
public class RegionReplicaSet {
- private ConsensusGroupId id;
+ private ConsensusGroupId consensusGroupId;
private List<DataNodeLocation> dataNodeList;
public RegionReplicaSet() {}
- public RegionReplicaSet(ConsensusGroupId id, List<DataNodeLocation> dataNodeList) {
- this.id = id;
+ public RegionReplicaSet(ConsensusGroupId consensusGroupId, List<DataNodeLocation> dataNodeList) {
+ this.consensusGroupId = consensusGroupId;
this.dataNodeList = dataNodeList;
}
@@ -46,20 +46,22 @@ public class RegionReplicaSet {
this.dataNodeList = dataNodeList;
}
- public ConsensusGroupId getId() {
- return id;
+ public ConsensusGroupId getConsensusGroupId() {
+ return consensusGroupId;
}
- public void setId(ConsensusGroupId id) {
- this.id = id;
+ public void setConsensusGroupId(ConsensusGroupId consensusGroupId) {
+ this.consensusGroupId = consensusGroupId;
}
public String toString() {
- return String.format("RegionReplicaSet[%s-%s]: %s", id.getType(), id, dataNodeList);
+ return String.format(
+ "RegionReplicaSet[%s-%d]: %s",
+ consensusGroupId.getType(), consensusGroupId.getId(), dataNodeList);
}
public void serializeImpl(ByteBuffer buffer) {
- id.serializeImpl(buffer);
+ consensusGroupId.serializeImpl(buffer);
buffer.putInt(dataNodeList.size());
dataNodeList.forEach(
dataNode -> {
@@ -67,18 +69,16 @@ public class RegionReplicaSet {
});
}
- public void deserializeImpl(ByteBuffer buffer) throws IOException {
- id = ConsensusGroupId.Factory.create(buffer);
+ public static RegionReplicaSet deserializeImpl(ByteBuffer buffer) throws IOException {
+ ConsensusGroupId consensusGroupId = ConsensusGroupId.Factory.create(buffer);
int size = buffer.getInt();
// We should always make dataNodeList as a new Object when deserialization
- dataNodeList = new ArrayList<>();
-
+ List<DataNodeLocation> dataNodeList = new ArrayList<>();
for (int i = 0; i < size; i++) {
- DataNodeLocation dataNode = new DataNodeLocation();
- dataNode.deserializeImpl(buffer);
- dataNodeList.add(dataNode);
+ dataNodeList.add(DataNodeLocation.deserializeImpl(buffer));
}
+ return new RegionReplicaSet(consensusGroupId, dataNodeList);
}
@Override
@@ -90,11 +90,16 @@ public class RegionReplicaSet {
return false;
}
RegionReplicaSet that = (RegionReplicaSet) o;
- return Objects.equals(id, that.id) && Objects.equals(dataNodeList, that.dataNodeList);
+ return Objects.equals(consensusGroupId, that.consensusGroupId)
+ && Objects.equals(dataNodeList, that.dataNodeList);
}
@Override
public int hashCode() {
- return Objects.hash(id, dataNodeList);
+ return Objects.hash(consensusGroupId, dataNodeList);
+ }
+
+ public boolean isEmpty() {
+ return dataNodeList == null || dataNodeList.isEmpty();
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonUtils.java b/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonUtils.java
index e9e170232c..b16d7692c8 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonUtils.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonUtils.java
@@ -32,16 +32,16 @@ public class CommonUtils {
private static final Logger logger = LoggerFactory.getLogger(CommonUtils.class);
public static Endpoint parseNodeUrl(String nodeUrl) throws BadNodeUrlException {
- Endpoint result = new Endpoint();
String[] split = nodeUrl.split(":");
if (split.length != 2) {
logger.warn("Bad node url: {}", nodeUrl);
throw new BadNodeUrlException(String.format("Bad node url: %s", nodeUrl));
}
String ip = split[0];
+ Endpoint result;
try {
int port = Integer.parseInt(split[1]);
- result.setIp(ip).setPort(port);
+ result = new Endpoint(ip, port);
} catch (NumberFormatException e) {
logger.warn("Bad node url: {}", nodeUrl);
throw new BadNodeUrlException(String.format("Bad node url: %s", nodeUrl));
diff --git a/server/pom.xml b/server/pom.xml
index 55ffa494c9..6d3dfa22d3 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -262,6 +262,11 @@
<version>2.6</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>3.0.2</version>
+ </dependency>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusManager.java b/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusManager.java
new file mode 100644
index 0000000000..1a036908ff
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusManager.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.consensus;
+
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** DataNode Consensus layer manager */
+public class ConsensusManager {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConsensusManager.class);
+
+ private IConsensus consensusImpl;
+
+ public ConsensusManager() throws IOException {
+ consensusImpl = ConsensusImpl.getInstance();
+ consensusImpl.start();
+ }
+
+ public void addConsensusGroup(RegionReplicaSet regionReplicaSet) {
+ ConsensusGroupId consensusGroupId = regionReplicaSet.getConsensusGroupId();
+ List<Peer> peerList = new ArrayList<>();
+ for (DataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeList()) {
+ peerList.add(new Peer(consensusGroupId, dataNodeLocation.getEndPoint()));
+ }
+ consensusImpl.addConsensusGroup(consensusGroupId, peerList);
+ }
+
+ /** Transmit FragmentInstance to datanode.consensus.statemachine */
+ public ConsensusWriteResponse write(ConsensusGroupId consensusGroupId, IConsensusRequest plan) {
+ return consensusImpl.write(consensusGroupId, plan);
+ }
+
+ /** Transmit FragmentInstance to datanode.consensus.statemachine */
+ public ConsensusReadResponse read(ConsensusGroupId consensusGroupId, FragmentInstance plan) {
+ return consensusImpl.read(consensusGroupId, plan);
+ }
+
+ public void close() throws IOException {
+ consensusImpl.stop();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
index 07f04dc248..a7ef03b32e 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -24,12 +24,15 @@ import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
public abstract class BaseStateMachine implements IStateMachine {
private static final Logger logger = LoggerFactory.getLogger(BaseStateMachine.class);
@@ -38,7 +41,8 @@ public abstract class BaseStateMachine implements IStateMachine {
public TSStatus write(IConsensusRequest request) {
try {
return write(getFragmentInstance(request));
- } catch (IllegalArgumentException e) {
+ } catch (IllegalArgumentException | IllegalPathException | IOException e) {
+ logger.error(e.getMessage());
return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
}
}
@@ -49,14 +53,16 @@ public abstract class BaseStateMachine implements IStateMachine {
public DataSet read(IConsensusRequest request) {
try {
return read(getFragmentInstance(request));
- } catch (IllegalArgumentException e) {
+ } catch (IllegalArgumentException | IllegalPathException | IOException e) {
+ logger.error(e.getMessage());
return null;
}
}
protected abstract DataSet read(FragmentInstance fragmentInstance);
- private FragmentInstance getFragmentInstance(IConsensusRequest request) {
+ private FragmentInstance getFragmentInstance(IConsensusRequest request)
+ throws IllegalPathException, IOException {
FragmentInstance instance;
if (request instanceof ByteBufferConsensusRequest) {
instance =
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index fcb73ee4ea..b67ffe8e92 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -21,9 +21,10 @@ package org.apache.iotdb.db.consensus.statemachine;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.db.metadata.Executor.SchemaVisitor;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
-import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,10 +33,10 @@ public class SchemaRegionStateMachine extends BaseStateMachine {
private static final Logger logger = LoggerFactory.getLogger(SchemaRegionStateMachine.class);
- private final ISchemaRegion region;
+ private final ISchemaRegion schemaRegion;
- public SchemaRegionStateMachine(ISchemaRegion region) {
- this.region = region;
+ public SchemaRegionStateMachine(ISchemaRegion schemaRegion) {
+ this.schemaRegion = schemaRegion;
}
@Override
@@ -47,7 +48,9 @@ public class SchemaRegionStateMachine extends BaseStateMachine {
@Override
protected TSStatus write(FragmentInstance fragmentInstance) {
logger.info("Execute write plan in SchemaRegionStateMachine");
- return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ PlanNode planNode = fragmentInstance.getFragment().getRoot();
+ TSStatus status = planNode.accept(new SchemaVisitor(), schemaRegion);
+ return status;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/Executor/SchemaVisitor.java b/server/src/main/java/org/apache/iotdb/db/metadata/Executor/SchemaVisitor.java
new file mode 100644
index 0000000000..08252364ce
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/Executor/SchemaVisitor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.Executor;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Schema write PlanNode visitor */
+public class SchemaVisitor extends PlanVisitor<TSStatus, ISchemaRegion> {
+ private static final Logger logger = LoggerFactory.getLogger(SchemaVisitor.class);
+
+ @Override
+ public TSStatus visitCreateTimeSeries(CreateTimeSeriesNode node, ISchemaRegion schemaRegion) {
+ try {
+ schemaRegion.createTimeseries((CreateTimeSeriesPlan) node.transferToPhysicalPlan(), -1);
+ } catch (MetadataException e) {
+ logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+ return RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage());
+ }
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
+ }
+
+ @Override
+ public TSStatus visitPlan(PlanNode node, ISchemaRegion context) {
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
index e4bdd7d306..1d3cc2296d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
@@ -70,16 +70,16 @@ public class PlanFragmentId {
return String.format("%s.%d", queryId, id);
}
+ public void serialize(ByteBuffer byteBuffer) {
+ queryId.serialize(byteBuffer);
+ byteBuffer.putInt(id);
+ }
+
public static PlanFragmentId deserialize(ByteBuffer byteBuffer) {
return new PlanFragmentId(
QueryId.deserialize(byteBuffer), ReadWriteIOUtils.readInt(byteBuffer));
}
- public void serialize(ByteBuffer byteBuffer) {
- queryId.serialize(byteBuffer);
- ReadWriteIOUtils.write(id, byteBuffer);
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
index 5bfeb25e26..4d24e5cfc0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
@@ -128,11 +128,11 @@ public class QueryId {
return id;
}
- public static QueryId deserialize(ByteBuffer byteBuffer) {
- return new QueryId(ReadWriteIOUtils.readString(byteBuffer));
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(this.id, byteBuffer);
}
- public void serialize(ByteBuffer byteBuffer) {
- ReadWriteIOUtils.write(id, byteBuffer);
+ public static QueryId deserialize(ByteBuffer byteBuffer) {
+ return new QueryId(ReadWriteIOUtils.readString(byteBuffer));
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index 0dde122c74..3602bd43fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -55,8 +55,8 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
buffer.flip();
TConsensusGroupId groupId =
new TConsensusGroupId(
- instance.getDataRegionId().getId().getId(),
- instance.getDataRegionId().getId().getType().toString());
+ instance.getRegionReplicaSet().getConsensusGroupId().getId(),
+ instance.getRegionReplicaSet().getConsensusGroupId().getType().toString());
TSendFragmentInstanceReq req =
new TSendFragmentInstanceReq(
new TFragmentInstance(buffer), groupId, instance.getType().toString());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index 03fd9f3e0e..a9b259c693 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan;
import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
@@ -40,8 +41,10 @@ public class FragmentInstance implements IConsensusRequest {
private final QueryType type;
// The reference of PlanFragment which this instance is generated from
private final PlanFragment fragment;
- // The DataRegion where the FragmentInstance should run
- private RegionReplicaSet dataRegion;
+
+ // The Region where the FragmentInstance should run
+ private RegionReplicaSet regionReplicaSet;
+
private Endpoint hostEndpoint;
private Filter timeFilter;
@@ -60,12 +63,12 @@ public class FragmentInstance implements IConsensusRequest {
return new FragmentInstanceId(id, String.valueOf(index));
}
- public RegionReplicaSet getDataRegionId() {
- return dataRegion;
+ public RegionReplicaSet getRegionReplicaSet() {
+ return regionReplicaSet;
}
- public void setDataRegionId(RegionReplicaSet dataRegion) {
- this.dataRegion = dataRegion;
+ public void setRegionReplicaSet(RegionReplicaSet regionReplicaSet) {
+ this.regionReplicaSet = regionReplicaSet;
}
public Endpoint getHostEndpoint() {
@@ -111,16 +114,24 @@ public class FragmentInstance implements IConsensusRequest {
public String toString() {
StringBuilder ret = new StringBuilder();
- ret.append(
- String.format(
- "FragmentInstance-%s:[Host: %s/%s]\n",
- getId(), getHostEndpoint().getIp(), getDataRegionId().getId()));
+ ret.append(String.format("FragmentInstance-%s:", getId()));
+ if (getHostEndpoint() == null) {
+ ret.append(String.format("host endpoint has not set."));
+ } else {
+ ret.append(String.format("host endpoint: %s.", getHostEndpoint().toString()));
+ }
+ if (getRegionReplicaSet() == null) {
+ ret.append(String.format("Region Replica set has not set.\n"));
+ } else {
+ ret.append(String.format("Region Replica set: %s.\n", getRegionReplicaSet().toString()));
+ }
ret.append("---- Plan Node Tree ----\n");
ret.append(PlanNodeUtil.nodeToString(getFragment().getRoot()));
return ret.toString();
}
- public static FragmentInstance deserializeFrom(ByteBuffer buffer) {
+ public static FragmentInstance deserializeFrom(ByteBuffer buffer)
+ throws IllegalPathException, IOException {
FragmentInstanceId id = FragmentInstanceId.deserialize(buffer);
PlanFragment planFragment = PlanFragment.deserialize(buffer);
boolean hasTimeFilter = ReadWriteIOUtils.readBool(buffer);
@@ -129,22 +140,15 @@ public class FragmentInstance implements IConsensusRequest {
FragmentInstance fragmentInstance =
new FragmentInstance(
planFragment, Integer.parseInt(id.getInstanceId()), timeFilter, queryType);
- RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
- try {
- regionReplicaSet.deserializeImpl(buffer);
- } catch (IOException e) {
- e.printStackTrace();
- }
- Endpoint endpoint = new Endpoint();
- endpoint.deserializeImpl(buffer);
- fragmentInstance.dataRegion = regionReplicaSet;
- fragmentInstance.hostEndpoint = endpoint;
+ fragmentInstance.regionReplicaSet = RegionReplicaSet.deserializeImpl(buffer);
+ fragmentInstance.hostEndpoint = Endpoint.deserializeImpl(buffer);
return fragmentInstance;
}
@Override
public void serializeRequest(ByteBuffer buffer) {
+ buffer.mark();
id.serialize(buffer);
fragment.serialize(buffer);
ReadWriteIOUtils.write(timeFilter != null, buffer);
@@ -152,7 +156,8 @@ public class FragmentInstance implements IConsensusRequest {
timeFilter.serialize(buffer);
}
ReadWriteIOUtils.write(type.ordinal(), buffer);
- dataRegion.serializeImpl(buffer);
+ regionReplicaSet.serializeImpl(buffer);
+
hostEndpoint.serializeImpl(buffer);
}
@@ -164,13 +169,13 @@ public class FragmentInstance implements IConsensusRequest {
return Objects.equals(id, instance.id)
&& type == instance.type
&& Objects.equals(fragment, instance.fragment)
- && Objects.equals(dataRegion, instance.dataRegion)
+ && Objects.equals(regionReplicaSet, instance.regionReplicaSet)
&& Objects.equals(hostEndpoint, instance.hostEndpoint)
&& Objects.equals(timeFilter, instance.timeFilter);
}
@Override
public int hashCode() {
- return Objects.hash(id, type, fragment, dataRegion, hostEndpoint, timeFilter);
+ return Objects.hash(id, type, fragment, regionReplicaSet, hostEndpoint, timeFilter);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
index 80f16b7465..550ad1e0f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.sql.planner.plan;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -100,12 +101,12 @@ public class PlanFragment {
root.serialize(byteBuffer);
}
- public static PlanFragment deserialize(ByteBuffer byteBuffer) {
+ public static PlanFragment deserialize(ByteBuffer byteBuffer) throws IllegalPathException {
return new PlanFragment(PlanFragmentId.deserialize(byteBuffer), deserializeHelper(byteBuffer));
}
// deserialize the plan node recursively
- public static PlanNode deserializeHelper(ByteBuffer byteBuffer) {
+ public static PlanNode deserializeHelper(ByteBuffer byteBuffer) throws IllegalPathException {
PlanNode root = PlanNodeType.deserialize(byteBuffer);
int childrenCount = byteBuffer.getInt();
for (int i = 0; i < childrenCount; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index c2014c55d2..2818661995 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -100,7 +100,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
// We need to store all the replica host in case of the scenario that the instance need to be
// redirected
// to another host when scheduling
- fragmentInstance.setDataRegionId(dataRegion);
+ fragmentInstance.setRegionReplicaSet(dataRegion);
// TODO: (xingtanzjr) We select the first Endpoint as the default target host for current
// instance
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
index 97075588c9..3a17a08a75 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.commons.lang.Validate;
@@ -113,4 +114,9 @@ public abstract class PlanNode {
public int hashCode() {
return Objects.hash(id);
}
+
+ // TODO (yifuzhou) will remote later
+ public PhysicalPlan transferToPhysicalPlan() {
+ return null;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
index 1d6bed6251..d9de5f02c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.ShowDevicesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AuthorNode;
@@ -81,7 +82,7 @@ public enum PlanNodeType {
buffer.putShort(nodeType);
}
- public static PlanNode deserialize(ByteBuffer buffer) {
+ public static PlanNode deserialize(ByteBuffer buffer) throws IllegalPathException {
short nodeType = buffer.getShort();
switch (nodeType) {
case 0:
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
index 58c11cd563..2ff9c1f1e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
@@ -72,6 +72,9 @@ public class PlanNodeUtil {
}
result.append(root.toString());
result.append(System.lineSeparator());
+ if (root.getChildren() == null) {
+ return;
+ }
for (int i = 0; i < root.getChildren().size(); i++) {
PlanNode child = root.getChildren().get(i);
PrintContext childCtx = ctx.clone();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
index c035a0129e..76eecb8215 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
@@ -96,4 +97,8 @@ public abstract class PlanVisitor<R, C> {
public R visitFragmentSink(FragmentSinkNode node, C context) {
return visitPlan(node, context);
}
+
+ public R visitCreateTimeSeries(CreateTimeSeriesNode node, C context) {
+ return null;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
index 935a4bb116..b1fda2fb3b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
@@ -238,6 +238,43 @@ public class AlterTimeSeriesNode extends PlanNode {
new PlanNodeId(id), path, alterType, alterMap, alias, tagsMap, attributesMap);
}
+ // @Override
+ // public void executeOn(SchemaRegion schemaRegion) throws MetadataException,
+ // QueryProcessException {
+ // try {
+ // switch (getAlterType()) {
+ // case RENAME:
+ // String beforeName = alterMap.keySet().iterator().next();
+ // String currentName = alterMap.get(beforeName);
+ // schemaRegion.renameTagOrAttributeKey(beforeName, currentName, path);
+ // break;
+ // case SET:
+ // schemaRegion.setTagsOrAttributesValue(alterMap, path);
+ // break;
+ // case DROP:
+ // schemaRegion.dropTagsOrAttributes(alterMap.keySet(), path);
+ // break;
+ // case ADD_TAGS:
+ // schemaRegion.addTags(alterMap, path);
+ // break;
+ // case ADD_ATTRIBUTES:
+ // schemaRegion.addAttributes(alterMap, path);
+ // break;
+ // case UPSERT:
+ // schemaRegion.upsertTagsAndAttributes(getAlias(), getTagsMap(), getAttributesMap(),
+ // path);
+ // break;
+ // }
+ // } catch (MetadataException e) {
+ // throw new QueryProcessException(e);
+ // } catch (IOException e) {
+ // throw new QueryProcessException(
+ // String.format(
+ // "Something went wrong while read/write the [%s]'s tag/attribute info.",
+ // path.getFullPath()));
+ // }
+ // }
+
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
throw new NotImplementedException(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
index f9cd7829e9..89605a8704 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -351,4 +353,24 @@ public class CreateAlignedTimeSeriesNode extends PlanNode {
tagsList,
attributesList);
}
+
+ @Override
+ public PhysicalPlan transferToPhysicalPlan() {
+ return new CreateAlignedTimeSeriesPlan(
+ getDevicePath(),
+ getMeasurements(),
+ getDataTypes(),
+ getEncodings(),
+ getCompressors(),
+ getAliasList(),
+ getTagsList(),
+ getAttributesList());
+ }
+
+ // @Override
+ // public void executeOn(SchemaRegion schemaRegion) throws MetadataException {
+ // schemaRegion.createAlignedTimeSeries((CreateAlignedTimeSeriesPlan)
+ // transferToPhysicalPlan());
+ // }
+
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
index 6aed73d62d..45b7856878 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
@@ -18,15 +18,22 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -157,16 +164,136 @@ public class CreateTimeSeriesNode extends PlanNode {
}
@Override
- protected void serializeAttributes(ByteBuffer byteBuffer) {
- throw new NotImplementedException(
- "serializeAttributes of CreateTimeSeriesNode is not implemented");
+ public PhysicalPlan transferToPhysicalPlan() {
+ return new CreateTimeSeriesPlan(
+ getPath(),
+ getDataType(),
+ getEncoding(),
+ getCompressor(),
+ getProps(),
+ getTags(),
+ getAttributes(),
+ getAlias());
}
- public static CreateTimeSeriesNode deserialize(ByteBuffer byteBuffer) {
- return null;
+ public static CreateTimeSeriesNode deserialize(ByteBuffer byteBuffer)
+ throws IllegalPathException {
+ String id;
+ PartialPath path = null;
+ TSDataType dataType;
+ TSEncoding encoding;
+ CompressionType compressor;
+ long tagOffset;
+ String alias = null;
+ Map<String, String> props = null;
+ Map<String, String> tags = null;
+ Map<String, String> attributes = null;
+
+ id = ReadWriteIOUtils.readString(byteBuffer);
+ int length = byteBuffer.getInt();
+ byte[] bytes = new byte[length];
+ byteBuffer.get(bytes);
+ path = new PartialPath(new String(bytes));
+ dataType = TSDataType.values()[byteBuffer.get()];
+ encoding = TSEncoding.values()[byteBuffer.get()];
+ compressor = CompressionType.values()[byteBuffer.get()];
+ tagOffset = byteBuffer.getLong();
+
+ // alias
+ if (byteBuffer.get() == 1) {
+ alias = ReadWriteIOUtils.readString(byteBuffer);
+ }
+
+ byte label = byteBuffer.get();
+ // props
+ if (label == 0) {
+ props = new HashMap<>();
+ } else if (label == 1) {
+ props = ReadWriteIOUtils.readMap(byteBuffer);
+ }
+
+ // tags
+ label = byteBuffer.get();
+ if (label == 0) {
+ tags = new HashMap<>();
+ } else if (label == 1) {
+ tags = ReadWriteIOUtils.readMap(byteBuffer);
+ }
+
+ // attributes
+ label = byteBuffer.get();
+ if (label == 0) {
+ attributes = new HashMap<>();
+ } else if (label == 1) {
+ attributes = ReadWriteIOUtils.readMap(byteBuffer);
+ }
+
+ return new CreateTimeSeriesNode(
+ new PlanNodeId(id), path, dataType, encoding, compressor, props, tags, attributes, alias);
}
@Override
+ public void serialize(ByteBuffer byteBuffer) {
+ byteBuffer.putShort((short) PlanNodeType.CREATE_TIME_SERIES.ordinal());
+ ReadWriteIOUtils.write(this.getPlanNodeId().getId(), byteBuffer);
+ byte[] bytes = path.getFullPath().getBytes();
+ byteBuffer.putInt(bytes.length);
+ byteBuffer.put(bytes);
+ byteBuffer.put((byte) dataType.ordinal());
+ byteBuffer.put((byte) encoding.ordinal());
+ byteBuffer.put((byte) compressor.ordinal());
+ byteBuffer.putLong(tagOffset);
+
+ // alias
+ if (alias != null) {
+ byteBuffer.put((byte) 1);
+ ReadWriteIOUtils.write(alias, byteBuffer);
+ } else {
+ byteBuffer.put((byte) 0);
+ }
+
+ // props
+ if (props == null) {
+ byteBuffer.put((byte) -1);
+ } else if (props.isEmpty()) {
+ byteBuffer.put((byte) 0);
+ } else {
+ byteBuffer.put((byte) 1);
+ ReadWriteIOUtils.write(props, byteBuffer);
+ }
+
+ // tags
+ if (tags == null) {
+ byteBuffer.put((byte) -1);
+ } else if (tags.isEmpty()) {
+ byteBuffer.put((byte) 0);
+ } else {
+ byteBuffer.put((byte) 1);
+ ReadWriteIOUtils.write(tags, byteBuffer);
+ }
+
+ // attributes
+ if (attributes == null) {
+ byteBuffer.put((byte) -1);
+ } else if (attributes.isEmpty()) {
+ byteBuffer.put((byte) 0);
+ } else {
+ byteBuffer.put((byte) 1);
+ ReadWriteIOUtils.write(attributes, byteBuffer);
+ }
+
+ // no children node, need to set 0
+ byteBuffer.putInt(0);
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {}
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C schemaRegion) {
+ return visitor.visitCreateTimeSeries(this, schemaRegion);
+ }
+
public boolean equals(Object o) {
if (this == o) {
return true;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
index d427b85809..ec40d5df52 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
@@ -155,7 +156,7 @@ public class DeviceMergeNode extends ProcessNode implements IOutputPlanNode {
}
}
- public static DeviceMergeNode deserialize(ByteBuffer byteBuffer) {
+ public static DeviceMergeNode deserialize(ByteBuffer byteBuffer) throws IllegalPathException {
int orderByIndex = ReadWriteIOUtils.readInt(byteBuffer);
OrderBy orderBy = OrderBy.values()[orderByIndex];
FilterNullComponent filterNullComponent = FilterNullComponent.deserialize(byteBuffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index fc408b5623..41965c2273 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
@@ -86,7 +87,7 @@ public class ExchangeNode extends PlanNode {
this.upstreamPlanNodeId = nodeId;
}
- public static ExchangeNode deserialize(ByteBuffer byteBuffer) {
+ public static ExchangeNode deserialize(ByteBuffer byteBuffer) throws IllegalPathException {
FragmentSinkNode fragmentSinkNode =
(FragmentSinkNode) PlanFragment.deserializeHelper(byteBuffer);
Endpoint endPoint =
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
index a0e1d01aab..959e2879a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
@@ -19,16 +19,18 @@
package org.apache.iotdb.db.service;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
-import org.apache.iotdb.commons.consensus.GroupType;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.db.consensus.ConsensusImpl;
+import org.apache.iotdb.db.consensus.ConsensusManager;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceInfo;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.InternalService;
import org.apache.iotdb.mpp.rpc.thrift.SchemaFetchRequest;
import org.apache.iotdb.mpp.rpc.thrift.SchemaFetchResponse;
@@ -43,39 +45,73 @@ import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
public class InternalServiceImpl implements InternalService.Iface {
+ private static final Logger LOGGER = LoggerFactory.getLogger(InternalServiceImpl.class);
+
+ private final ConsensusManager consensusManager;
- public InternalServiceImpl() {
+ public InternalServiceImpl() throws IOException {
super();
+ consensusManager = new ConsensusManager();
}
@Override
- public TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req)
- throws TException {
+ public TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req) {
+ TSendFragmentInstanceResp response = new TSendFragmentInstanceResp();
+ FragmentInstance fragmentInstance = null;
+ try {
+ fragmentInstance = FragmentInstance.deserializeFrom(req.fragmentInstance.body);
+ } catch (IOException | IllegalPathException e) {
+ LOGGER.error(e.getMessage());
+ response.setAccepted(false);
+ response.setMessage(e.getMessage());
+ return response;
+ }
+
ByteBufferConsensusRequest request = new ByteBufferConsensusRequest(req.fragmentInstance.body);
- QueryType type = QueryType.valueOf(req.queryType);
- ConsensusGroupId groupId =
- ConsensusGroupId.Factory.create(
- req.consensusGroupId.id, GroupType.valueOf(req.consensusGroupId.type));
+ QueryType type = fragmentInstance.getType();
+ ConsensusGroupId groupId = fragmentInstance.getRegionReplicaSet().getConsensusGroupId();
+
+ if (fragmentInstance.getRegionReplicaSet() == null
+ || fragmentInstance.getRegionReplicaSet().isEmpty()) {
+ String msg = "Unknown regions to write, since getRegionReplicaSet is empty.";
+ LOGGER.error(msg);
+ response.setAccepted(false);
+ response.setMessage(msg);
+ return response;
+ }
+ consensusManager.addConsensusGroup(fragmentInstance.getRegionReplicaSet());
+
switch (type) {
case READ:
ConsensusReadResponse readResp = ConsensusImpl.getInstance().read(groupId, request);
FragmentInstanceInfo info = (FragmentInstanceInfo) readResp.getDataset();
return new TSendFragmentInstanceResp(info.getState().isFailed());
case WRITE:
- ConsensusWriteResponse writeResp = ConsensusImpl.getInstance().write(groupId, request);
- // TODO: (xingtanzjr) need to distinguish more conditions for response status.
- boolean accepted =
- writeResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
- return new TSendFragmentInstanceResp(accepted);
+ TSStatus status =
+ consensusManager
+ .write(
+ fragmentInstance.getRegionReplicaSet().getConsensusGroupId(), fragmentInstance)
+ .getStatus();
+ // TODO need consider more status
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == status.getCode()) {
+ response.setAccepted(true);
+ } else {
+ response.setAccepted(false);
+ }
+ response.setMessage(status.message);
+ return response;
}
return null;
}
@Override
- public TFragmentInstanceStateResp fetchFragmentInstanceState(TFetchFragmentInstanceStateReq req)
- throws TException {
+ public TFragmentInstanceStateResp fetchFragmentInstanceState(TFetchFragmentInstanceStateReq req) {
FragmentInstanceInfo info =
FragmentInstanceManager.getInstance()
.getInstanceInfo(FragmentInstanceId.fromThrift(req.fragmentInstanceId));
@@ -101,4 +137,8 @@ public class InternalServiceImpl implements InternalService.Iface {
public SchemaFetchResponse fetchSchema(SchemaFetchRequest req) throws TException {
throw new UnsupportedOperationException();
}
+
+ public void close() throws IOException {
+ consensusManager.close();
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
index dea1e9b031..ce29fee307 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.tsfile.read.filter.operator.Gt;
import org.junit.Test;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -53,7 +54,7 @@ import static org.junit.Assert.assertEquals;
public class FragmentInstanceSerdeTest {
@Test
- public void TestSerializeAndDeserializeForTree1() throws IllegalPathException {
+ public void TestSerializeAndDeserializeForTree1() throws IllegalPathException, IOException {
FragmentInstance fragmentInstance =
new FragmentInstance(
new PlanFragment(new PlanFragmentId("test", -1), constructPlanNodeTree()),
@@ -62,7 +63,7 @@ public class FragmentInstanceSerdeTest {
QueryType.READ);
RegionReplicaSet regionReplicaSet =
new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
- fragmentInstance.setDataRegionId(regionReplicaSet);
+ fragmentInstance.setRegionReplicaSet(regionReplicaSet);
fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.1", 6666));
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
@@ -73,7 +74,7 @@ public class FragmentInstanceSerdeTest {
}
@Test
- public void TestSerializeAndDeserializeWithNullFilter() throws IllegalPathException {
+ public void TestSerializeAndDeserializeWithNullFilter() throws IllegalPathException, IOException {
FragmentInstance fragmentInstance =
new FragmentInstance(
new PlanFragment(new PlanFragmentId("test2", 1), constructPlanNodeTree()),
@@ -82,7 +83,7 @@ public class FragmentInstanceSerdeTest {
QueryType.READ);
RegionReplicaSet regionReplicaSet =
new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
- fragmentInstance.setDataRegionId(regionReplicaSet);
+ fragmentInstance.setRegionReplicaSet(regionReplicaSet);
fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.2", 6667));
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/PlanNodeDeserializeHelper.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/PlanNodeDeserializeHelper.java
index 55894de485..05b62153fb 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/PlanNodeDeserializeHelper.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/PlanNodeDeserializeHelper.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.sql.plan.node;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
@@ -25,7 +26,7 @@ import java.nio.ByteBuffer;
public class PlanNodeDeserializeHelper {
- public static PlanNode deserialize(ByteBuffer byteBuffer) {
+ public static PlanNode deserialize(ByteBuffer byteBuffer) throws IllegalPathException {
PlanNode root = PlanNodeType.deserialize(byteBuffer);
int childrenCount = byteBuffer.getInt();
for (int i = 0; i < childrenCount; i++) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/metadata/read/ShowDevicesNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/metadata/read/ShowDevicesNodeSerdeTest.java
index 6139bb33df..80ca97c47f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/metadata/read/ShowDevicesNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/metadata/read/ShowDevicesNodeSerdeTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.sql.plan.node.metadata.read;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.mpp.sql.plan.node.PlanNodeDeserializeHelper;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.ShowDevicesNode;
@@ -31,7 +32,7 @@ import static org.junit.Assert.assertEquals;
public class ShowDevicesNodeSerdeTest {
@Test
- public void TestSerializeAndDeserialize() {
+ public void TestSerializeAndDeserialize() throws IllegalPathException {
ShowDevicesNode showDevicesNode = new ShowDevicesNode(new PlanNodeId("TestShowDevicesNode"));
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
showDevicesNode.serialize(byteBuffer);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/sink/FragmentSinkNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/sink/FragmentSinkNodeSerdeTest.java
index 0e5618373b..2aa2610bde 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/sink/FragmentSinkNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/sink/FragmentSinkNodeSerdeTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.sql.plan.node.sink;
import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.plan.node.PlanNodeDeserializeHelper;
@@ -35,7 +36,7 @@ import static org.junit.Assert.assertEquals;
public class FragmentSinkNodeSerdeTest {
@Test
- public void TestSerializeAndDeserialize() {
+ public void TestSerializeAndDeserialize() throws IllegalPathException {
FragmentSinkNode fragmentSinkNode =
new FragmentSinkNode(new PlanNodeId("TestFragmentSinkNode"));
fragmentSinkNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevicesNode")));
diff --git a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
new file mode 100644
index 0000000000..c08553c1d9
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service;
+
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
+import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.LocalConfigManager;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
+import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
+
+import org.apache.ratis.util.FileUtils;
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class InternalServiceImplTest {
+ private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
+ InternalServiceImpl internalServiceImpl;
+ LocalConfigManager configManager;
+
+ @Before
+ public void setUp() throws Exception {
+ IoTDB.configManager.init();
+ internalServiceImpl = new InternalServiceImpl();
+ configManager = LocalConfigManager.getInstance();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ IoTDB.configManager.clear();
+ EnvironmentUtils.cleanEnv();
+ internalServiceImpl.close();
+ FileUtils.deleteFully(new File("data" + File.separator + "consensus"));
+ }
+
+ @Test
+ public void createTimeseriesTest() throws MetadataException, TException {
+ configManager.getBelongedSchemaRegionWithAutoCreate(new PartialPath("root.ln"));
+ CreateTimeSeriesNode createTimeSeriesNode =
+ new CreateTimeSeriesNode(
+ new PlanNodeId("0"),
+ new PartialPath("root.ln.wf01.wt01.status"),
+ TSDataType.BOOLEAN,
+ TSEncoding.PLAIN,
+ CompressionType.SNAPPY,
+ new HashMap<String, String>() {
+ {
+ put("MAX_POINT_NUMBER", "3");
+ }
+ },
+ new HashMap<String, String>() {
+ {
+ put("tag1", "v1");
+ put("tag2", "v2");
+ }
+ },
+ new HashMap<String, String>() {
+ {
+ put("attr1", "a1");
+ put("attr2", "a2");
+ }
+ },
+ "meter1");
+
+ List<DataNodeLocation> dataNodeList = new ArrayList<>();
+ dataNodeList.add(
+ new DataNodeLocation(
+ conf.getConsensusPort(), new Endpoint(conf.getInternalIp(), conf.getConsensusPort())));
+
+ // construct fragmentInstance
+ SchemaRegionId schemaRegionId = new SchemaRegionId(0);
+ RegionReplicaSet regionReplicaSet = new RegionReplicaSet(schemaRegionId, dataNodeList);
+ PlanFragment planFragment = new PlanFragment(new PlanFragmentId("2", 3), createTimeSeriesNode);
+ FragmentInstance fragmentInstance =
+ new FragmentInstance(planFragment, 4, new GroupByFilter(1, 2, 3, 4), QueryType.WRITE);
+ fragmentInstance.setRegionReplicaSet(regionReplicaSet);
+ fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.1", 6666));
+
+ // serialize fragmentInstance
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ fragmentInstance.serializeRequest(byteBuffer);
+ byteBuffer.flip();
+
+ // put serialized fragmentInstance to TSendFragmentInstanceReq
+ TSendFragmentInstanceReq request = new TSendFragmentInstanceReq();
+ TFragmentInstance tFragmentInstance = new TFragmentInstance();
+ tFragmentInstance.setBody(byteBuffer);
+ request.setFragmentInstance(tFragmentInstance);
+
+ // Use consensus layer to execute request
+ TSendFragmentInstanceResp response = internalServiceImpl.sendFragmentInstance(request);
+
+ Assert.assertTrue(response.accepted);
+ }
+}