You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/13 14:45:35 UTC
[iotdb] branch master updated: Make some modifications according to talking for recent PRs of SchemaRegion (#5498)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8ade542133 Make some modifications according to talking for recent PRs of SchemaRegion (#5498)
8ade542133 is described below
commit 8ade54213392786ce8032bb731efd709daeb4fcf
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Wed Apr 13 22:45:30 2022 +0800
Make some modifications according to talking for recent PRs of SchemaRegion (#5498)
---
.../server/ConfigNodeRPCServerProcessorTest.java | 63 ++++++-----------
.../iotdb/commons/consensus/ConsensusGroupId.java | 5 +-
.../iotdb/commons/partition/RegionReplicaSet.java | 3 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 16 +----
.../iotdb/db/consensus/ConsensusManager.java | 72 --------------------
.../consensus/statemachine/BaseStateMachine.java | 10 +--
.../iotdb/db/metadata/Executor/SchemaVisitor.java | 43 +++++++++++-
.../db/mpp/sql/planner/plan/FragmentInstance.java | 25 +++----
.../db/mpp/sql/planner/plan/PlanFragment.java | 5 +-
.../db/mpp/sql/planner/plan/node/PlanNode.java | 6 --
.../db/mpp/sql/planner/plan/node/PlanNodeType.java | 3 +-
.../db/mpp/sql/planner/plan/node/PlanVisitor.java | 11 ++-
.../node/metedata/read/DevicesSchemaScanNode.java | 10 ++-
.../metedata/read/TimeSeriesSchemaScanNode.java | 10 ++-
.../write/CreateAlignedTimeSeriesNode.java | 21 ++----
.../node/metedata/write/CreateTimeSeriesNode.java | 24 ++-----
.../planner/plan/node/process/DeviceMergeNode.java | 3 +-
.../planner/plan/node/process/ExchangeNode.java | 3 +-
.../plan/node/source/SeriesAggregateScanNode.java | 6 +-
.../planner/plan/node/source/SeriesScanNode.java | 7 +-
.../iotdb/db/service/InternalServiceImpl.java | 78 ++++++----------------
.../thrift/impl/DataNodeManagementServiceImpl.java | 5 --
.../db/mpp/sql/plan/FragmentInstanceSerdeTest.java | 5 +-
.../iotdb/db/service/InternalServiceImplTest.java | 50 ++++++++++----
server/src/test/resources/iotdb-engine.properties | 3 +-
25 files changed, 185 insertions(+), 302 deletions(-)
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
index 7ed1fe2a2c..0aa164ccd0 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
@@ -268,13 +268,10 @@ public class ConfigNodeRPCServerProcessorTest {
(tSeriesPartitionSlot, tRegionReplicaSet) -> {
Assert.assertEquals(3, tRegionReplicaSet.getEndpointSize());
ConsensusGroupId regionId = null;
- try {
- regionId =
- ConsensusGroupId.Factory.create(
- ByteBuffer.wrap(tRegionReplicaSet.getRegionId()));
- } catch (IOException ignore) {
- // Ignore
- }
+ regionId =
+ ConsensusGroupId.Factory.create(
+ ByteBuffer.wrap(tRegionReplicaSet.getRegionId()));
+
Assert.assertTrue(regionId instanceof SchemaRegionId);
});
}
@@ -298,13 +295,11 @@ public class ConfigNodeRPCServerProcessorTest {
(tSeriesPartitionSlot, tRegionReplicaSet) -> {
Assert.assertEquals(3, tRegionReplicaSet.getEndpointSize());
ConsensusGroupId regionId = null;
- try {
- regionId =
- ConsensusGroupId.Factory.create(
- ByteBuffer.wrap(tRegionReplicaSet.getRegionId()));
- } catch (IOException ignore) {
- // Ignore
- }
+
+ regionId =
+ ConsensusGroupId.Factory.create(
+ ByteBuffer.wrap(tRegionReplicaSet.getRegionId()));
+
Assert.assertTrue(regionId instanceof SchemaRegionId);
});
}
@@ -327,13 +322,8 @@ public class ConfigNodeRPCServerProcessorTest {
(tSeriesPartitionSlot, tRegionReplicaSet) -> {
Assert.assertEquals(3, tRegionReplicaSet.getEndpointSize());
ConsensusGroupId regionId = null;
- try {
- regionId =
- ConsensusGroupId.Factory.create(
- ByteBuffer.wrap(tRegionReplicaSet.getRegionId()));
- } catch (IOException ignore) {
- // Ignore
- }
+ regionId =
+ ConsensusGroupId.Factory.create(ByteBuffer.wrap(tRegionReplicaSet.getRegionId()));
Assert.assertTrue(regionId instanceof SchemaRegionId);
});
// Check "root.sg1"
@@ -345,13 +335,8 @@ public class ConfigNodeRPCServerProcessorTest {
(tSeriesPartitionSlot, tRegionReplicaSet) -> {
Assert.assertEquals(3, tRegionReplicaSet.getEndpointSize());
ConsensusGroupId regionId = null;
- try {
- regionId =
- ConsensusGroupId.Factory.create(
- ByteBuffer.wrap(tRegionReplicaSet.getRegionId()));
- } catch (IOException ignore) {
- // Ignore
- }
+ regionId =
+ ConsensusGroupId.Factory.create(ByteBuffer.wrap(tRegionReplicaSet.getRegionId()));
Assert.assertTrue(regionId instanceof SchemaRegionId);
});
}
@@ -413,19 +398,15 @@ public class ConfigNodeRPCServerProcessorTest {
.size());
// Is DataRegion
ConsensusGroupId regionId = null;
- try {
- regionId =
- ConsensusGroupId.Factory.create(
- ByteBuffer.wrap(
- dataPartitionMap
- .get(storageGroup)
- .get(seriesPartitionSlot)
- .get(timePartitionSlot)
- .get(0)
- .getRegionId()));
- } catch (IOException ignore) {
- // Ignore
- }
+ regionId =
+ ConsensusGroupId.Factory.create(
+ ByteBuffer.wrap(
+ dataPartitionMap
+ .get(storageGroup)
+ .get(seriesPartitionSlot)
+ .get(timePartitionSlot)
+ .get(0)
+ .getRegionId()));
Assert.assertTrue(regionId instanceof DataRegionId);
// Including three RegionReplica
Assert.assertEquals(
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
index 13e38df6c0..ba38960173 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.commons.consensus;
-import java.io.IOException;
import java.nio.ByteBuffer;
public interface ConsensusGroupId {
@@ -39,10 +38,10 @@ public interface ConsensusGroupId {
GroupType getType();
class Factory {
- public static ConsensusGroupId create(ByteBuffer buffer) throws IOException {
+ public static ConsensusGroupId create(ByteBuffer buffer) {
int index = buffer.get();
if (index >= GroupType.values().length) {
- throw new IOException("unrecognized id type " + index);
+ throw new IllegalArgumentException("invalid ConsensusGroup type. Ordinal is: " + index);
}
GroupType type = GroupType.values()[index];
ConsensusGroupId groupId = createEmpty(type);
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
index 2217f8832c..d98695824d 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.commons.partition;
import org.apache.iotdb.commons.cluster.DataNodeLocation;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -70,7 +69,7 @@ public class RegionReplicaSet {
});
}
- public static RegionReplicaSet deserializeImpl(ByteBuffer buffer) throws IOException {
+ public static RegionReplicaSet deserializeImpl(ByteBuffer buffer) {
ConsensusGroupId consensusGroupId = ConsensusGroupId.Factory.create(buffer);
int size = buffer.getInt();
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index a7603576a1..5a87a42625 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -45,8 +45,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -844,10 +842,10 @@ public class IoTDBConfig {
/**
* Ip and port of config nodes. each one is a {internalIp | domain name}:{meta port} string tuple.
*/
- private List<String> configNodeUrls;
+ private List<String> configNodeUrls = new ArrayList<>();
/** Internal ip for data node */
- private String internalIp;
+ private String internalIp = "127.0.0.1";
/** Internal port for coordinator */
private int internalPort = 9003;
@@ -888,16 +886,6 @@ public class IoTDBConfig {
/** Thread keep alive time in ms of data block manager. */
private int dataBlockManagerKeepAliveTimeInMs = 1000;
- public IoTDBConfig() {
- try {
- internalIp = InetAddress.getLocalHost().getHostAddress();
- } catch (UnknownHostException e) {
- logger.error(e.getMessage());
- internalIp = "127.0.0.1";
- }
- configNodeUrls = new ArrayList<>();
- }
-
public float getUdfMemoryBudgetInMB() {
return udfMemoryBudgetInMB;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusManager.java b/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusManager.java
deleted file mode 100644
index 1a036908ff..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusManager.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.consensus;
-
-import org.apache.iotdb.commons.cluster.DataNodeLocation;
-import org.apache.iotdb.commons.consensus.ConsensusGroupId;
-import org.apache.iotdb.commons.partition.RegionReplicaSet;
-import org.apache.iotdb.consensus.IConsensus;
-import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
-import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/** DataNode Consensus layer manager */
-public class ConsensusManager {
- private static final Logger LOGGER = LoggerFactory.getLogger(ConsensusManager.class);
-
- private IConsensus consensusImpl;
-
- public ConsensusManager() throws IOException {
- consensusImpl = ConsensusImpl.getInstance();
- consensusImpl.start();
- }
-
- public void addConsensusGroup(RegionReplicaSet regionReplicaSet) {
- ConsensusGroupId consensusGroupId = regionReplicaSet.getConsensusGroupId();
- List<Peer> peerList = new ArrayList<>();
- for (DataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeList()) {
- peerList.add(new Peer(consensusGroupId, dataNodeLocation.getEndPoint()));
- }
- consensusImpl.addConsensusGroup(consensusGroupId, peerList);
- }
-
- /** Transmit FragmentInstance to datanode.consensus.statemachine */
- public ConsensusWriteResponse write(ConsensusGroupId consensusGroupId, IConsensusRequest plan) {
- return consensusImpl.write(consensusGroupId, plan);
- }
-
- /** Transmit FragmentInstance to datanode.consensus.statemachine */
- public ConsensusReadResponse read(ConsensusGroupId consensusGroupId, FragmentInstance plan) {
- return consensusImpl.read(consensusGroupId, plan);
- }
-
- public void close() throws IOException {
- consensusImpl.stop();
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
index a7ef03b32e..873d652013 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -24,15 +24,12 @@ import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.statemachine.IStateMachine;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-
public abstract class BaseStateMachine implements IStateMachine {
private static final Logger logger = LoggerFactory.getLogger(BaseStateMachine.class);
@@ -41,7 +38,7 @@ public abstract class BaseStateMachine implements IStateMachine {
public TSStatus write(IConsensusRequest request) {
try {
return write(getFragmentInstance(request));
- } catch (IllegalArgumentException | IllegalPathException | IOException e) {
+ } catch (IllegalArgumentException e) {
logger.error(e.getMessage());
return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
}
@@ -53,7 +50,7 @@ public abstract class BaseStateMachine implements IStateMachine {
public DataSet read(IConsensusRequest request) {
try {
return read(getFragmentInstance(request));
- } catch (IllegalArgumentException | IllegalPathException | IOException e) {
+ } catch (IllegalArgumentException e) {
logger.error(e.getMessage());
return null;
}
@@ -61,8 +58,7 @@ public abstract class BaseStateMachine implements IStateMachine {
protected abstract DataSet read(FragmentInstance fragmentInstance);
- private FragmentInstance getFragmentInstance(IConsensusRequest request)
- throws IllegalPathException, IOException {
+ private FragmentInstance getFragmentInstance(IConsensusRequest request) {
FragmentInstance instance;
if (request instanceof ByteBufferConsensusRequest) {
instance =
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/Executor/SchemaVisitor.java b/server/src/main/java/org/apache/iotdb/db/metadata/Executor/SchemaVisitor.java
index 08252364ce..5ed7cbf395 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/Executor/SchemaVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/Executor/SchemaVisitor.java
@@ -25,10 +25,14 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +44,8 @@ public class SchemaVisitor extends PlanVisitor<TSStatus, ISchemaRegion> {
@Override
public TSStatus visitCreateTimeSeries(CreateTimeSeriesNode node, ISchemaRegion schemaRegion) {
try {
- schemaRegion.createTimeseries((CreateTimeSeriesPlan) node.transferToPhysicalPlan(), -1);
+ PhysicalPlan plan = node.accept(new PhysicalPlanTransformer(), new TransformerContext());
+ schemaRegion.createTimeseries((CreateTimeSeriesPlan) plan, -1);
} catch (MetadataException e) {
logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage());
@@ -52,4 +57,40 @@ public class SchemaVisitor extends PlanVisitor<TSStatus, ISchemaRegion> {
public TSStatus visitPlan(PlanNode node, ISchemaRegion context) {
return null;
}
+
+ private static class PhysicalPlanTransformer
+ extends PlanVisitor<PhysicalPlan, TransformerContext> {
+ @Override
+ public PhysicalPlan visitPlan(PlanNode node, TransformerContext context) {
+ throw new NotImplementedException();
+ }
+
+ public PhysicalPlan visitCreateTimeSeries(
+ CreateTimeSeriesNode node, TransformerContext context) {
+ return new CreateTimeSeriesPlan(
+ node.getPath(),
+ node.getDataType(),
+ node.getEncoding(),
+ node.getCompressor(),
+ node.getProps(),
+ node.getTags(),
+ node.getAttributes(),
+ node.getAlias());
+ }
+
+ public PhysicalPlan visitCreateAlignedTimeSeries(
+ CreateAlignedTimeSeriesNode node, TransformerContext context) {
+ return new CreateAlignedTimeSeriesPlan(
+ node.getDevicePath(),
+ node.getMeasurements(),
+ node.getDataTypes(),
+ node.getEncodings(),
+ node.getCompressors(),
+ node.getAliasList(),
+ node.getTagsList(),
+ node.getAttributesList());
+ }
+ }
+
+ private static class TransformerContext {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index a9b259c693..3e55a5d32c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.mpp.sql.planner.plan;
import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
@@ -32,7 +31,6 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
@@ -115,23 +113,20 @@ public class FragmentInstance implements IConsensusRequest {
public String toString() {
StringBuilder ret = new StringBuilder();
ret.append(String.format("FragmentInstance-%s:", getId()));
- if (getHostEndpoint() == null) {
- ret.append(String.format("host endpoint has not set."));
- } else {
- ret.append(String.format("host endpoint: %s.", getHostEndpoint().toString()));
- }
- if (getRegionReplicaSet() == null) {
- ret.append(String.format("Region Replica set has not set.\n"));
- } else {
- ret.append(String.format("Region Replica set: %s.\n", getRegionReplicaSet().toString()));
- }
+ ret.append(
+ String.format("Host: %s", getHostEndpoint() == null ? "Not set" : getHostEndpoint()));
+ ret.append(
+ String.format(
+ "Region: %s",
+ getRegionReplicaSet() == null
+ ? "Not set"
+ : getRegionReplicaSet().getConsensusGroupId()));
ret.append("---- Plan Node Tree ----\n");
ret.append(PlanNodeUtil.nodeToString(getFragment().getRoot()));
return ret.toString();
}
- public static FragmentInstance deserializeFrom(ByteBuffer buffer)
- throws IllegalPathException, IOException {
+ public static FragmentInstance deserializeFrom(ByteBuffer buffer) {
FragmentInstanceId id = FragmentInstanceId.deserialize(buffer);
PlanFragment planFragment = PlanFragment.deserialize(buffer);
boolean hasTimeFilter = ReadWriteIOUtils.readBool(buffer);
@@ -148,7 +143,6 @@ public class FragmentInstance implements IConsensusRequest {
@Override
public void serializeRequest(ByteBuffer buffer) {
- buffer.mark();
id.serialize(buffer);
fragment.serialize(buffer);
ReadWriteIOUtils.write(timeFilter != null, buffer);
@@ -157,7 +151,6 @@ public class FragmentInstance implements IConsensusRequest {
}
ReadWriteIOUtils.write(type.ordinal(), buffer);
regionReplicaSet.serializeImpl(buffer);
-
hostEndpoint.serializeImpl(buffer);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
index 0f00703c86..38365ec4af 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.sql.planner.plan;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -101,12 +100,12 @@ public class PlanFragment {
root.serialize(byteBuffer);
}
- public static PlanFragment deserialize(ByteBuffer byteBuffer) throws IllegalPathException {
+ public static PlanFragment deserialize(ByteBuffer byteBuffer) {
return new PlanFragment(PlanFragmentId.deserialize(byteBuffer), deserializeHelper(byteBuffer));
}
// deserialize the plan node recursively
- public static PlanNode deserializeHelper(ByteBuffer byteBuffer) throws IllegalPathException {
+ public static PlanNode deserializeHelper(ByteBuffer byteBuffer) {
PlanNode root = PlanNodeType.deserialize(byteBuffer);
int childrenCount = byteBuffer.getInt();
for (int i = 0; i < childrenCount; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
index 5df9022b84..d99c3361bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.commons.lang.Validate;
@@ -115,9 +114,4 @@ public abstract class PlanNode {
public int hashCode() {
return Objects.hash(id);
}
-
- // TODO (yifuzhou) will remote later
- public PhysicalPlan transferToPhysicalPlan() {
- return null;
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
index 85d9f78d2f..e5cd7ff8be 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.DevicesSchemaScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.ShowDevicesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
@@ -87,7 +86,7 @@ public enum PlanNodeType {
buffer.putShort(nodeType);
}
- public static PlanNode deserialize(ByteBuffer buffer) throws IllegalPathException {
+ public static PlanNode deserialize(ByteBuffer buffer) {
short nodeType = buffer.getShort();
switch (nodeType) {
case 0:
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
index 953db9f514..e5a5dfe8c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.DevicesSchema
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
@@ -106,12 +107,16 @@ public abstract class PlanVisitor<R, C> {
return visitPlan(node, context);
}
+ public R visitCreateAlignedTimeSeries(CreateAlignedTimeSeriesNode node, C context) {
+ return visitPlan(node, context);
+ }
+
public R visitTimeSeriesMetaScan(TimeSeriesSchemaScanNode node, C context) {
- return visitMetaScan(node, context);
+ return visitPlan(node, context);
}
public R visitDevicesMetaScan(DevicesSchemaScanNode node, C context) {
- return visitMetaScan(node, context);
+ return visitPlan(node, context);
}
public R visitFragmentSink(FragmentSinkNode node, C context) {
@@ -119,6 +124,6 @@ public abstract class PlanVisitor<R, C> {
}
public R visitCreateTimeSeries(CreateTimeSeriesNode node, C context) {
- return null;
+ return visitPlan(node, context);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesSchemaScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesSchemaScanNode.java
index da381349ff..43be8ea836 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesSchemaScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesSchemaScanNode.java
@@ -89,12 +89,16 @@ public class DevicesSchemaScanNode extends SchemaScanNode {
ReadWriteIOUtils.write(hasSgCol, byteBuffer);
}
- public static DevicesSchemaScanNode deserialize(ByteBuffer byteBuffer)
- throws IllegalPathException {
+ public static DevicesSchemaScanNode deserialize(ByteBuffer byteBuffer) {
String id = ReadWriteIOUtils.readString(byteBuffer);
PlanNodeId planNodeId = new PlanNodeId(id);
String fullPath = ReadWriteIOUtils.readString(byteBuffer);
- PartialPath path = new PartialPath(fullPath);
+ PartialPath path = null;
+ try {
+ path = new PartialPath(fullPath);
+ } catch (IllegalPathException e) {
+ throw new IllegalArgumentException("Cannot deserialize DevicesSchemaScanNode", e);
+ }
int limit = ReadWriteIOUtils.readInt(byteBuffer);
int offset = ReadWriteIOUtils.readInt(byteBuffer);
boolean isPrefixPath = ReadWriteIOUtils.readBool(byteBuffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java
index c7feee1a82..1e2cc49c25 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java
@@ -79,12 +79,16 @@ public class TimeSeriesSchemaScanNode extends SchemaScanNode {
ReadWriteIOUtils.write(isPrefixPath, byteBuffer);
}
- public static TimeSeriesSchemaScanNode deserialize(ByteBuffer byteBuffer)
- throws IllegalPathException {
+ public static TimeSeriesSchemaScanNode deserialize(ByteBuffer byteBuffer) {
String id = ReadWriteIOUtils.readString(byteBuffer);
PlanNodeId planNodeId = new PlanNodeId(id);
String fullPath = ReadWriteIOUtils.readString(byteBuffer);
- PartialPath path = new PartialPath(fullPath);
+ PartialPath path = null;
+ try {
+ path = new PartialPath(fullPath);
+ } catch (IllegalPathException e) {
+ throw new IllegalArgumentException("Cannot deserialize TimeSeriesSchemaScanNode", e);
+ }
String key = ReadWriteIOUtils.readString(byteBuffer);
String value = ReadWriteIOUtils.readString(byteBuffer);
int limit = ReadWriteIOUtils.readInt(byteBuffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
index 89605a8704..b6d0a56143 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
@@ -24,8 +24,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -160,6 +159,11 @@ public class CreateAlignedTimeSeriesNode extends PlanNode {
return NO_CHILD_ALLOWED;
}
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C schemaRegion) {
+ return visitor.visitCreateAlignedTimeSeries(this, schemaRegion);
+ }
+
@Override
public void serialize(ByteBuffer byteBuffer) {
byteBuffer.putShort((short) PlanNodeType.CREATE_ALIGNED_TIME_SERIES.ordinal());
@@ -354,19 +358,6 @@ public class CreateAlignedTimeSeriesNode extends PlanNode {
attributesList);
}
- @Override
- public PhysicalPlan transferToPhysicalPlan() {
- return new CreateAlignedTimeSeriesPlan(
- getDevicePath(),
- getMeasurements(),
- getDataTypes(),
- getEncodings(),
- getCompressors(),
- getAliasList(),
- getTagsList(),
- getAttributesList());
- }
-
// @Override
// public void executeOn(SchemaRegion schemaRegion) throws MetadataException {
// schemaRegion.createAlignedTimeSeries((CreateAlignedTimeSeriesPlan)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
index 45b7856878..36d9f35db6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -163,21 +161,7 @@ public class CreateTimeSeriesNode extends PlanNode {
return NO_CHILD_ALLOWED;
}
- @Override
- public PhysicalPlan transferToPhysicalPlan() {
- return new CreateTimeSeriesPlan(
- getPath(),
- getDataType(),
- getEncoding(),
- getCompressor(),
- getProps(),
- getTags(),
- getAttributes(),
- getAlias());
- }
-
- public static CreateTimeSeriesNode deserialize(ByteBuffer byteBuffer)
- throws IllegalPathException {
+ public static CreateTimeSeriesNode deserialize(ByteBuffer byteBuffer) {
String id;
PartialPath path = null;
TSDataType dataType;
@@ -193,7 +177,11 @@ public class CreateTimeSeriesNode extends PlanNode {
int length = byteBuffer.getInt();
byte[] bytes = new byte[length];
byteBuffer.get(bytes);
- path = new PartialPath(new String(bytes));
+ try {
+ path = new PartialPath(new String(bytes));
+ } catch (IllegalPathException e) {
+ throw new IllegalArgumentException("Cannot deserialize CreateTimeSeriesNode", e);
+ }
dataType = TSDataType.values()[byteBuffer.get()];
encoding = TSEncoding.values()[byteBuffer.get()];
compressor = CompressionType.values()[byteBuffer.get()];
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
index ec40d5df52..d427b85809 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
@@ -156,7 +155,7 @@ public class DeviceMergeNode extends ProcessNode implements IOutputPlanNode {
}
}
- public static DeviceMergeNode deserialize(ByteBuffer byteBuffer) throws IllegalPathException {
+ public static DeviceMergeNode deserialize(ByteBuffer byteBuffer) {
int orderByIndex = ReadWriteIOUtils.readInt(byteBuffer);
OrderBy orderBy = OrderBy.values()[orderByIndex];
FilterNullComponent filterNullComponent = FilterNullComponent.deserialize(byteBuffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index ec0cb655d6..5947fc3d21 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.commons.cluster.Endpoint;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
@@ -87,7 +86,7 @@ public class ExchangeNode extends PlanNode {
this.upstreamPlanNodeId = nodeId;
}
- public static ExchangeNode deserialize(ByteBuffer byteBuffer) throws IllegalPathException {
+ public static ExchangeNode deserialize(ByteBuffer byteBuffer) {
FragmentSinkNode fragmentSinkNode =
(FragmentSinkNode) PlanFragment.deserializeHelper(byteBuffer);
Endpoint endPoint =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
index a3002ebeb9..87c2ff1d05 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
@@ -196,11 +196,7 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
// TODO serialize groupByTimeParameter
RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
- try {
- regionReplicaSet.deserializeImpl(byteBuffer);
- } catch (IOException e) {
- e.printStackTrace();
- }
+ RegionReplicaSet.deserializeImpl(byteBuffer);
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
SeriesAggregateScanNode seriesAggregateScanNode =
new SeriesAggregateScanNode(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
index 3511a4c9c3..9700720466 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import com.google.common.collect.ImmutableList;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
@@ -211,11 +210,7 @@ public class SeriesScanNode extends SourceNode implements IOutputPlanNode {
int limit = ReadWriteIOUtils.readInt(byteBuffer);
int offset = ReadWriteIOUtils.readInt(byteBuffer);
RegionReplicaSet dataRegionReplicaSet = new RegionReplicaSet();
- try {
- dataRegionReplicaSet.deserializeImpl(byteBuffer);
- } catch (IOException e) {
- e.printStackTrace();
- }
+ RegionReplicaSet.deserializeImpl(byteBuffer);
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
SeriesScanNode seriesScanNode = new SeriesScanNode(planNodeId, partialPath);
seriesScanNode.allSensors = allSensors;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
index 959e2879a3..190655719f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
@@ -19,18 +19,16 @@
package org.apache.iotdb.db.service;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.GroupType;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.db.consensus.ConsensusImpl;
-import org.apache.iotdb.db.consensus.ConsensusManager;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceInfo;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
-import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.InternalService;
import org.apache.iotdb.mpp.rpc.thrift.SchemaFetchRequest;
import org.apache.iotdb.mpp.rpc.thrift.SchemaFetchResponse;
@@ -43,68 +41,38 @@ import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
public class InternalServiceImpl implements InternalService.Iface {
- private static final Logger LOGGER = LoggerFactory.getLogger(InternalServiceImpl.class);
-
- private final ConsensusManager consensusManager;
- public InternalServiceImpl() throws IOException {
+ public InternalServiceImpl() {
super();
- consensusManager = new ConsensusManager();
}
@Override
public TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req) {
- TSendFragmentInstanceResp response = new TSendFragmentInstanceResp();
- FragmentInstance fragmentInstance = null;
- try {
- fragmentInstance = FragmentInstance.deserializeFrom(req.fragmentInstance.body);
- } catch (IOException | IllegalPathException e) {
- LOGGER.error(e.getMessage());
- response.setAccepted(false);
- response.setMessage(e.getMessage());
- return response;
- }
-
- ByteBufferConsensusRequest request = new ByteBufferConsensusRequest(req.fragmentInstance.body);
- QueryType type = fragmentInstance.getType();
- ConsensusGroupId groupId = fragmentInstance.getRegionReplicaSet().getConsensusGroupId();
-
- if (fragmentInstance.getRegionReplicaSet() == null
- || fragmentInstance.getRegionReplicaSet().isEmpty()) {
- String msg = "Unknown regions to write, since getRegionReplicaSet is empty.";
- LOGGER.error(msg);
- response.setAccepted(false);
- response.setMessage(msg);
- return response;
- }
- consensusManager.addConsensusGroup(fragmentInstance.getRegionReplicaSet());
-
+ QueryType type = QueryType.valueOf(req.queryType);
+ ConsensusGroupId groupId =
+ ConsensusGroupId.Factory.create(
+ req.consensusGroupId.id, GroupType.valueOf(req.consensusGroupId.type));
switch (type) {
case READ:
- ConsensusReadResponse readResp = ConsensusImpl.getInstance().read(groupId, request);
+ ConsensusReadResponse readResp =
+ ConsensusImpl.getInstance()
+ .read(groupId, new ByteBufferConsensusRequest(req.fragmentInstance.body));
FragmentInstanceInfo info = (FragmentInstanceInfo) readResp.getDataset();
return new TSendFragmentInstanceResp(info.getState().isFailed());
case WRITE:
- TSStatus status =
- consensusManager
- .write(
- fragmentInstance.getRegionReplicaSet().getConsensusGroupId(), fragmentInstance)
- .getStatus();
+ TSendFragmentInstanceResp response = new TSendFragmentInstanceResp();
+ ConsensusWriteResponse resp =
+ ConsensusImpl.getInstance()
+ .write(groupId, new ByteBufferConsensusRequest(req.fragmentInstance.body));
// TODO need consider more status
- if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == status.getCode()) {
- response.setAccepted(true);
- } else {
- response.setAccepted(false);
- }
- response.setMessage(status.message);
+ response.setAccepted(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode() == resp.getStatus().getCode());
+ response.setMessage(resp.getStatus().message);
return response;
}
return null;
@@ -120,25 +88,21 @@ public class InternalServiceImpl implements InternalService.Iface {
@Override
public TCancelResp cancelQuery(TCancelQueryReq req) throws TException {
- return null;
+ throw new NotImplementedException();
}
@Override
public TCancelResp cancelPlanFragment(TCancelPlanFragmentReq req) throws TException {
- return null;
+ throw new NotImplementedException();
}
@Override
public TCancelResp cancelFragmentInstance(TCancelFragmentInstanceReq req) throws TException {
- return null;
+ throw new NotImplementedException();
}
@Override
public SchemaFetchResponse fetchSchema(SchemaFetchRequest req) throws TException {
throw new UnsupportedOperationException();
}
-
- public void close() throws IOException {
- consensusManager.close();
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeManagementServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeManagementServiceImpl.java
index fdd0a863ad..4976369443 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeManagementServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeManagementServiceImpl.java
@@ -47,7 +47,6 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -94,10 +93,6 @@ public class DataNodeManagementServiceImpl implements ManagementIService.Iface {
tsStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
tsStatus.setMessage(
String.format("Create Schema Region failed because of %s", e2.getMessage()));
- } catch (IOException e3) {
- LOGGER.error("Can't deserialize regionId", e3);
- tsStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
- tsStatus.setMessage(String.format("Can't deserialize regionId %s", e3));
}
return tsStatus;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
index ad1f71af7f..6871ff6560 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
@@ -45,7 +45,6 @@ import org.apache.iotdb.tsfile.read.filter.operator.Gt;
import org.junit.Test;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -54,7 +53,7 @@ import static org.junit.Assert.assertEquals;
public class FragmentInstanceSerdeTest {
@Test
- public void TestSerializeAndDeserializeForTree1() throws IllegalPathException, IOException {
+ public void TestSerializeAndDeserializeForTree1() throws IllegalPathException {
FragmentInstance fragmentInstance =
new FragmentInstance(
new PlanFragment(new PlanFragmentId("test", -1), constructPlanNodeTree()),
@@ -74,7 +73,7 @@ public class FragmentInstanceSerdeTest {
}
@Test
- public void TestSerializeAndDeserializeWithNullFilter() throws IllegalPathException, IOException {
+ public void TestSerializeAndDeserializeWithNullFilter() throws IllegalPathException {
FragmentInstance fragmentInstance =
new FragmentInstance(
new PlanFragment(new PlanFragmentId("test2", 1), constructPlanNodeTree()),
diff --git a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
index 4475eea8fa..7ed08d65b6 100644
--- a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@ -23,8 +23,10 @@ import org.apache.iotdb.commons.cluster.DataNodeLocation;
import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.consensus.ConsensusImpl;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.LocalConfigNode;
import org.apache.iotdb.db.metadata.path.PartialPath;
@@ -35,6 +37,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.mpp.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
@@ -44,7 +47,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
import org.apache.ratis.util.FileUtils;
-import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -64,20 +66,27 @@ public class InternalServiceImplTest {
@Before
public void setUp() throws Exception {
IoTDB.configManager.init();
- internalServiceImpl = new InternalServiceImpl();
configNode = LocalConfigNode.getInstance();
+ configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln"));
+ ConsensusImpl.getInstance().start();
+ RegionReplicaSet regionReplicaSet = genRegionReplicaSet();
+ ConsensusImpl.getInstance()
+ .addConsensusGroup(regionReplicaSet.getConsensusGroupId(), genPeerList(regionReplicaSet));
+ internalServiceImpl = new InternalServiceImpl();
}
@After
public void tearDown() throws Exception {
IoTDB.configManager.clear();
+ RegionReplicaSet regionReplicaSet = genRegionReplicaSet();
+ ConsensusImpl.getInstance().removeConsensusGroup(regionReplicaSet.getConsensusGroupId());
+ ConsensusImpl.getInstance().stop();
EnvironmentUtils.cleanEnv();
- internalServiceImpl.close();
FileUtils.deleteFully(new File("data" + File.separator + "consensus"));
}
@Test
- public void createTimeseriesTest() throws MetadataException, TException {
+ public void createTimeseriesTest() throws MetadataException {
configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln"));
CreateTimeSeriesNode createTimeSeriesNode =
new CreateTimeSeriesNode(
@@ -105,14 +114,7 @@ public class InternalServiceImplTest {
},
"meter1");
- List<DataNodeLocation> dataNodeList = new ArrayList<>();
- dataNodeList.add(
- new DataNodeLocation(
- conf.getConsensusPort(), new Endpoint(conf.getInternalIp(), conf.getConsensusPort())));
-
- // construct fragmentInstance
- SchemaRegionId schemaRegionId = new SchemaRegionId(0);
- RegionReplicaSet regionReplicaSet = new RegionReplicaSet(schemaRegionId, dataNodeList);
+ RegionReplicaSet regionReplicaSet = genRegionReplicaSet();
PlanFragment planFragment = new PlanFragment(new PlanFragmentId("2", 3), createTimeSeriesNode);
FragmentInstance fragmentInstance =
new FragmentInstance(planFragment, 4, new GroupByFilter(1, 2, 3, 4), QueryType.WRITE);
@@ -129,10 +131,34 @@ public class InternalServiceImplTest {
TFragmentInstance tFragmentInstance = new TFragmentInstance();
tFragmentInstance.setBody(byteBuffer);
request.setFragmentInstance(tFragmentInstance);
+ request.setConsensusGroupId(
+ new TConsensusGroupId(
+ regionReplicaSet.getConsensusGroupId().getId(),
+ regionReplicaSet.getConsensusGroupId().getType().toString()));
+ request.setQueryType(QueryType.WRITE.toString());
// Use consensus layer to execute request
TSendFragmentInstanceResp response = internalServiceImpl.sendFragmentInstance(request);
Assert.assertTrue(response.accepted);
}
+
+ private RegionReplicaSet genRegionReplicaSet() {
+ List<DataNodeLocation> dataNodeList = new ArrayList<>();
+ dataNodeList.add(
+ new DataNodeLocation(
+ conf.getConsensusPort(), new Endpoint(conf.getInternalIp(), conf.getConsensusPort())));
+
+ // construct fragmentInstance
+ SchemaRegionId schemaRegionId = new SchemaRegionId(0);
+ return new RegionReplicaSet(schemaRegionId, dataNodeList);
+ }
+
+ private List<Peer> genPeerList(RegionReplicaSet regionReplicaSet) {
+ List<Peer> peerList = new ArrayList<>();
+ for (DataNodeLocation node : regionReplicaSet.getDataNodeList()) {
+ peerList.add(new Peer(regionReplicaSet.getConsensusGroupId(), node.getEndPoint()));
+ }
+ return peerList;
+ }
}
diff --git a/server/src/test/resources/iotdb-engine.properties b/server/src/test/resources/iotdb-engine.properties
index 559698f908..4074a55472 100644
--- a/server/src/test/resources/iotdb-engine.properties
+++ b/server/src/test/resources/iotdb-engine.properties
@@ -26,4 +26,5 @@ trigger_root_dir=target/ext/trigger
tracing_dir=target/data/tracing
minimum_schema_file_segment_in_bytes=0
page_cache_in_schema_file=10
-sync_dir=target/sync
\ No newline at end of file
+internal_ip=0.0.0.0
+sync_dir=target/sync