You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/09/05 06:09:53 UTC
[iotdb] branch master updated: [IOTDB-4301] [IOTDB-4303] fix bug in create regions group procedure (#7228)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 518dcfbfa4 [IOTDB-4301] [IOTDB-4303] fix bug in create regions group procedure (#7228)
518dcfbfa4 is described below
commit 518dcfbfa461ffe4912fba18cb29299416301588
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Mon Sep 5 14:09:47 2022 +0800
[IOTDB-4301] [IOTDB-4303] fix bug in create regions group procedure (#7228)
---
.../request/write/CreateRegionGroupsPlan.java | 10 ++
.../impl/CreateRegionGroupsProcedure.java | 85 +++++++++++++++-
.../procedure/state/RemoveConfigNodeState.java | 3 +-
.../procedure/store/ProcedureFactory.java | 10 ++
.../impl/CreateRegionGroupsProcedureTest.java | 110 +++++++++++++++++++++
5 files changed, 212 insertions(+), 6 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionGroupsPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionGroupsPlan.java
index ca7e3fde39..57bd998a25 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionGroupsPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionGroupsPlan.java
@@ -60,6 +60,16 @@ public class CreateRegionGroupsPlan extends ConfigPhysicalPlan {
.add(regionReplicaSet);
}
+ public void serializeForProcedure(DataOutputStream stream) throws IOException {
+ this.serializeImpl(stream);
+ }
+
+ public void deserializeForProcedure(ByteBuffer buffer) throws IOException {
+ // to remove the ordinal of ConfigPhysicalPlanType
+ buffer.getInt();
+ this.deserializeImpl(buffer);
+ }
+
@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
stream.writeInt(ConfigPhysicalPlanType.CreateRegionGroups.ordinal());
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java
index 32877571bb..db9a308381 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java
@@ -20,26 +20,50 @@ package org.apache.iotdb.confignode.procedure.impl;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionGroupsPlan;
import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.state.CreateRegionGroupsState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
public class CreateRegionGroupsProcedure
extends StateMachineProcedure<ConfigNodeProcedureEnv, CreateRegionGroupsState> {
- private final CreateRegionGroupsPlan createRegionGroupsPlan;
- // Map<TConsensusGroupId, Failed RegionReplicas>
- private Map<TConsensusGroupId, TRegionReplicaSet> failedRegions;
+ private static final Logger LOGGER = LoggerFactory.getLogger(CreateRegionGroupsProcedure.class);
+
+ private CreateRegionGroupsPlan createRegionGroupsPlan = new CreateRegionGroupsPlan();
+
+ /** key: TConsensusGroupId value: Failed RegionReplicas */
+ private Map<TConsensusGroupId, TRegionReplicaSet> failedRegions = new HashMap<>();
+
+ public CreateRegionGroupsProcedure() {
+ super();
+ }
public CreateRegionGroupsProcedure(CreateRegionGroupsPlan createRegionGroupsPlan) {
this.createRegionGroupsPlan = createRegionGroupsPlan;
}
+ public CreateRegionGroupsProcedure(
+ CreateRegionGroupsPlan createRegionGroupsPlan,
+ Map<TConsensusGroupId, TRegionReplicaSet> failedRegions) {
+ this.createRegionGroupsPlan = createRegionGroupsPlan;
+ this.failedRegions = failedRegions;
+ }
+
@Override
protected Flow executeFromState(ConfigNodeProcedureEnv env, CreateRegionGroupsState state) {
switch (state) {
@@ -130,6 +154,59 @@ public class CreateRegionGroupsProcedure
@Override
protected CreateRegionGroupsState getInitialState() {
- return CreateRegionGroupsState.CREATE_REGION_GROUPS;
+ return CreateRegionGroupsState.CREATE_REGION_GROUPS_PREPARE;
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ // must serialize CREATE_REGION_GROUPS.ordinal() firstly
+ stream.writeInt(ProcedureFactory.ProcedureType.CREATE_REGION_GROUPS.ordinal());
+ super.serialize(stream);
+ createRegionGroupsPlan.serializeForProcedure(stream);
+ stream.writeInt(failedRegions.size());
+ failedRegions.forEach(
+ (groupId, replica) -> {
+ ThriftCommonsSerDeUtils.serializeTConsensusGroupId(groupId, stream);
+ ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(replica, stream);
+ });
+ }
+
+ @Override
+ public void deserialize(ByteBuffer byteBuffer) {
+ super.deserialize(byteBuffer);
+ try {
+ createRegionGroupsPlan.deserializeForProcedure(byteBuffer);
+ failedRegions.clear();
+ int failedRegionsSize = byteBuffer.getInt();
+ while (failedRegionsSize-- > 0) {
+ TConsensusGroupId groupId =
+ ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer);
+ TRegionReplicaSet replica =
+ ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(byteBuffer);
+ failedRegions.put(groupId, replica);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Deserialize meets error in CreateRegionGroupsProcedure", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that instanceof CreateRegionGroupsProcedure) {
+ CreateRegionGroupsProcedure thatProc = (CreateRegionGroupsProcedure) that;
+ return thatProc.getProcId() == this.getProcId()
+ && thatProc.getState() == this.getState()
+ && thatProc.createRegionGroupsPlan.equals(this.createRegionGroupsPlan)
+ && thatProc.failedRegions.equals(this.failedRegions);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = createRegionGroupsPlan.hashCode();
+ result = 31 * result + Objects.hash(failedRegions);
+ return result;
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveConfigNodeState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveConfigNodeState.java
index 9c94ac1fde..dc7acbda82 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveConfigNodeState.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveConfigNodeState.java
@@ -20,8 +20,7 @@
package org.apache.iotdb.confignode.procedure.state;
public enum RemoveConfigNodeState {
- REMOVE_CONSENSUS_GROUP,
REMOVE_PEER,
-
+ REMOVE_CONSENSUS_GROUP,
STOP_CONFIG_NODE
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index 59f0eedaa9..57119408c9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -27,15 +27,21 @@ import org.apache.iotdb.confignode.procedure.impl.RegionMigrateProcedure;
import org.apache.iotdb.confignode.procedure.impl.RemoveConfigNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.RemoveDataNodeProcedure;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.nio.ByteBuffer;
public class ProcedureFactory implements IProcedureFactory {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ProcedureFactory.class);
+
@Override
public Procedure create(ByteBuffer buffer) throws IOException {
int typeNum = buffer.getInt();
if (typeNum >= ProcedureType.values().length) {
+ LOGGER.error("unrecognized log type " + typeNum);
throw new IOException("unrecognized log type " + typeNum);
}
ProcedureType type = ProcedureType.values()[typeNum];
@@ -56,7 +62,11 @@ public class ProcedureFactory implements IProcedureFactory {
case REGION_MIGRATE_PROCEDURE:
procedure = new RegionMigrateProcedure();
break;
+ case CREATE_REGION_GROUPS:
+ procedure = new CreateRegionGroupsProcedure();
+ break;
default:
+ LOGGER.error("unknown Procedure type: " + typeNum);
throw new IOException("unknown Procedure type: " + typeNum);
}
procedure.deserialize(buffer);
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java
new file mode 100644
index 0000000000..e050ded2c7
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.iotdb.common.rpc.thrift.TConsensusGroupType.DataRegion;
+import static org.apache.iotdb.common.rpc.thrift.TConsensusGroupType.SchemaRegion;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class CreateRegionGroupsProcedureTest {
+
+ @Test
+ public void serializeDeserializeTest() {
+ TDataNodeLocation dataNodeLocation0 = new TDataNodeLocation();
+ dataNodeLocation0.setDataNodeId(5);
+ dataNodeLocation0.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667));
+ dataNodeLocation0.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
+ dataNodeLocation0.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 8777));
+ dataNodeLocation0.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+ dataNodeLocation0.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
+
+ TDataNodeLocation dataNodeLocation1 = new TDataNodeLocation();
+ dataNodeLocation1.setDataNodeId(6);
+ dataNodeLocation1.setClientRpcEndPoint(new TEndPoint("0.0.0.1", 6667));
+ dataNodeLocation1.setInternalEndPoint(new TEndPoint("0.0.0.1", 9003));
+ dataNodeLocation1.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.1", 8777));
+ dataNodeLocation1.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.1", 40010));
+ dataNodeLocation1.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.1", 50010));
+
+ TConsensusGroupId schemaRegionGroupId = new TConsensusGroupId(SchemaRegion, 1);
+ TConsensusGroupId dataRegionGroupId = new TConsensusGroupId(DataRegion, 0);
+
+ TRegionReplicaSet schemaRegionSet =
+ new TRegionReplicaSet(schemaRegionGroupId, Collections.singletonList(dataNodeLocation0));
+ TRegionReplicaSet dataRegionSet =
+ new TRegionReplicaSet(dataRegionGroupId, Collections.singletonList(dataNodeLocation1));
+
+ // to test the equals method of Map<TConsensusGroupId, TRegionReplicaSet>
+ Map<TConsensusGroupId, TRegionReplicaSet> failedRegions0 =
+ new HashMap<TConsensusGroupId, TRegionReplicaSet>() {
+ {
+ put(dataRegionGroupId, dataRegionSet);
+ put(schemaRegionGroupId, schemaRegionSet);
+ }
+ };
+ Map<TConsensusGroupId, TRegionReplicaSet> failedRegions1 =
+ new HashMap<TConsensusGroupId, TRegionReplicaSet>() {
+ {
+ put(schemaRegionGroupId, schemaRegionSet);
+ put(dataRegionGroupId, dataRegionSet);
+ }
+ };
+ assertEquals(failedRegions0, failedRegions1);
+
+ CreateRegionGroupsPlan createRegionGroupsPlan = new CreateRegionGroupsPlan();
+ createRegionGroupsPlan.addRegionGroup("root.sg0", dataRegionSet);
+ createRegionGroupsPlan.addRegionGroup("root.sg1", schemaRegionSet);
+
+ CreateRegionGroupsProcedure procedure0 =
+ new CreateRegionGroupsProcedure(createRegionGroupsPlan, failedRegions0);
+ PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+
+ try {
+ procedure0.serialize(outputStream);
+ CreateRegionGroupsProcedure procedure1 = new CreateRegionGroupsProcedure();
+ ByteBuffer buffer =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+ buffer.getInt();
+ procedure1.deserialize(buffer);
+ assertEquals(procedure0, procedure1);
+ assertEquals(procedure0.hashCode(), procedure1.hashCode());
+ } catch (IOException e) {
+ fail();
+ }
+ }
+}