You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/09/05 06:09:53 UTC

[iotdb] branch master updated: [IOTDB-4301] [IOTDB-4303] fix bug in create regions group procedure (#7228)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 518dcfbfa4 [IOTDB-4301] [IOTDB-4303] fix bug in create regions group procedure (#7228)
518dcfbfa4 is described below

commit 518dcfbfa461ffe4912fba18cb29299416301588
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Mon Sep 5 14:09:47 2022 +0800

    [IOTDB-4301] [IOTDB-4303] fix bug in create regions group procedure (#7228)
---
 .../request/write/CreateRegionGroupsPlan.java      |  10 ++
 .../impl/CreateRegionGroupsProcedure.java          |  85 +++++++++++++++-
 .../procedure/state/RemoveConfigNodeState.java     |   3 +-
 .../procedure/store/ProcedureFactory.java          |  10 ++
 .../impl/CreateRegionGroupsProcedureTest.java      | 110 +++++++++++++++++++++
 5 files changed, 212 insertions(+), 6 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionGroupsPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionGroupsPlan.java
index ca7e3fde39..57bd998a25 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionGroupsPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionGroupsPlan.java
@@ -60,6 +60,16 @@ public class CreateRegionGroupsPlan extends ConfigPhysicalPlan {
         .add(regionReplicaSet);
   }
 
+  public void serializeForProcedure(DataOutputStream stream) throws IOException {
+    this.serializeImpl(stream);
+  }
+
+  public void deserializeForProcedure(ByteBuffer buffer) throws IOException {
+    // to remove the ordinal of ConfigPhysicalPlanType
+    buffer.getInt();
+    this.deserializeImpl(buffer);
+  }
+
   @Override
   protected void serializeImpl(DataOutputStream stream) throws IOException {
     stream.writeInt(ConfigPhysicalPlanType.CreateRegionGroups.ordinal());
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java
index 32877571bb..db9a308381 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java
@@ -20,26 +20,50 @@ package org.apache.iotdb.confignode.procedure.impl;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
 import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionGroupsPlan;
 import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.state.CreateRegionGroupsState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
 public class CreateRegionGroupsProcedure
     extends StateMachineProcedure<ConfigNodeProcedureEnv, CreateRegionGroupsState> {
 
-  private final CreateRegionGroupsPlan createRegionGroupsPlan;
-  // Map<TConsensusGroupId, Failed RegionReplicas>
-  private Map<TConsensusGroupId, TRegionReplicaSet> failedRegions;
+  private static final Logger LOGGER = LoggerFactory.getLogger(CreateRegionGroupsProcedure.class);
+
+  private CreateRegionGroupsPlan createRegionGroupsPlan = new CreateRegionGroupsPlan();
+
+  /** key: TConsensusGroupId value: Failed RegionReplicas */
+  private Map<TConsensusGroupId, TRegionReplicaSet> failedRegions = new HashMap<>();
+
+  public CreateRegionGroupsProcedure() {
+    super();
+  }
 
   public CreateRegionGroupsProcedure(CreateRegionGroupsPlan createRegionGroupsPlan) {
     this.createRegionGroupsPlan = createRegionGroupsPlan;
   }
 
+  public CreateRegionGroupsProcedure(
+      CreateRegionGroupsPlan createRegionGroupsPlan,
+      Map<TConsensusGroupId, TRegionReplicaSet> failedRegions) {
+    this.createRegionGroupsPlan = createRegionGroupsPlan;
+    this.failedRegions = failedRegions;
+  }
+
   @Override
   protected Flow executeFromState(ConfigNodeProcedureEnv env, CreateRegionGroupsState state) {
     switch (state) {
@@ -130,6 +154,59 @@ public class CreateRegionGroupsProcedure
 
   @Override
   protected CreateRegionGroupsState getInitialState() {
-    return CreateRegionGroupsState.CREATE_REGION_GROUPS;
+    return CreateRegionGroupsState.CREATE_REGION_GROUPS_PREPARE;
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    // must serialize CREATE_REGION_GROUPS.ordinal() firstly
+    stream.writeInt(ProcedureFactory.ProcedureType.CREATE_REGION_GROUPS.ordinal());
+    super.serialize(stream);
+    createRegionGroupsPlan.serializeForProcedure(stream);
+    stream.writeInt(failedRegions.size());
+    failedRegions.forEach(
+        (groupId, replica) -> {
+          ThriftCommonsSerDeUtils.serializeTConsensusGroupId(groupId, stream);
+          ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(replica, stream);
+        });
+  }
+
+  @Override
+  public void deserialize(ByteBuffer byteBuffer) {
+    super.deserialize(byteBuffer);
+    try {
+      createRegionGroupsPlan.deserializeForProcedure(byteBuffer);
+      failedRegions.clear();
+      int failedRegionsSize = byteBuffer.getInt();
+      while (failedRegionsSize-- > 0) {
+        TConsensusGroupId groupId =
+            ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer);
+        TRegionReplicaSet replica =
+            ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(byteBuffer);
+        failedRegions.put(groupId, replica);
+      }
+    } catch (Exception e) {
+      LOGGER.error("Deserialize meets error in CreateRegionGroupsProcedure", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that instanceof CreateRegionGroupsProcedure) {
+      CreateRegionGroupsProcedure thatProc = (CreateRegionGroupsProcedure) that;
+      return thatProc.getProcId() == this.getProcId()
+          && thatProc.getState() == this.getState()
+          && thatProc.createRegionGroupsPlan.equals(this.createRegionGroupsPlan)
+          && thatProc.failedRegions.equals(this.failedRegions);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = createRegionGroupsPlan.hashCode();
+    result = 31 * result + Objects.hash(failedRegions);
+    return result;
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveConfigNodeState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveConfigNodeState.java
index 9c94ac1fde..dc7acbda82 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveConfigNodeState.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveConfigNodeState.java
@@ -20,8 +20,7 @@
 package org.apache.iotdb.confignode.procedure.state;
 
 public enum RemoveConfigNodeState {
-  REMOVE_CONSENSUS_GROUP,
   REMOVE_PEER,
-
+  REMOVE_CONSENSUS_GROUP,
   STOP_CONFIG_NODE
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index 59f0eedaa9..57119408c9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -27,15 +27,21 @@ import org.apache.iotdb.confignode.procedure.impl.RegionMigrateProcedure;
 import org.apache.iotdb.confignode.procedure.impl.RemoveConfigNodeProcedure;
 import org.apache.iotdb.confignode.procedure.impl.RemoveDataNodeProcedure;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
 public class ProcedureFactory implements IProcedureFactory {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(ProcedureFactory.class);
+
   @Override
   public Procedure create(ByteBuffer buffer) throws IOException {
     int typeNum = buffer.getInt();
     if (typeNum >= ProcedureType.values().length) {
+      LOGGER.error("unrecognized log type " + typeNum);
       throw new IOException("unrecognized log type " + typeNum);
     }
     ProcedureType type = ProcedureType.values()[typeNum];
@@ -56,7 +62,11 @@ public class ProcedureFactory implements IProcedureFactory {
       case REGION_MIGRATE_PROCEDURE:
         procedure = new RegionMigrateProcedure();
         break;
+      case CREATE_REGION_GROUPS:
+        procedure = new CreateRegionGroupsProcedure();
+        break;
       default:
+        LOGGER.error("unknown Procedure type: " + typeNum);
         throw new IOException("unknown Procedure type: " + typeNum);
     }
     procedure.deserialize(buffer);
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java
new file mode 100644
index 0000000000..e050ded2c7
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.iotdb.common.rpc.thrift.TConsensusGroupType.DataRegion;
+import static org.apache.iotdb.common.rpc.thrift.TConsensusGroupType.SchemaRegion;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class CreateRegionGroupsProcedureTest {
+
+  @Test
+  public void serializeDeserializeTest() {
+    TDataNodeLocation dataNodeLocation0 = new TDataNodeLocation();
+    dataNodeLocation0.setDataNodeId(5);
+    dataNodeLocation0.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667));
+    dataNodeLocation0.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
+    dataNodeLocation0.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 8777));
+    dataNodeLocation0.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation0.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
+
+    TDataNodeLocation dataNodeLocation1 = new TDataNodeLocation();
+    dataNodeLocation1.setDataNodeId(6);
+    dataNodeLocation1.setClientRpcEndPoint(new TEndPoint("0.0.0.1", 6667));
+    dataNodeLocation1.setInternalEndPoint(new TEndPoint("0.0.0.1", 9003));
+    dataNodeLocation1.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.1", 8777));
+    dataNodeLocation1.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.1", 40010));
+    dataNodeLocation1.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.1", 50010));
+
+    TConsensusGroupId schemaRegionGroupId = new TConsensusGroupId(SchemaRegion, 1);
+    TConsensusGroupId dataRegionGroupId = new TConsensusGroupId(DataRegion, 0);
+
+    TRegionReplicaSet schemaRegionSet =
+        new TRegionReplicaSet(schemaRegionGroupId, Collections.singletonList(dataNodeLocation0));
+    TRegionReplicaSet dataRegionSet =
+        new TRegionReplicaSet(dataRegionGroupId, Collections.singletonList(dataNodeLocation1));
+
+    // to test the equals method of Map<TConsensusGroupId, TRegionReplicaSet>
+    Map<TConsensusGroupId, TRegionReplicaSet> failedRegions0 =
+        new HashMap<TConsensusGroupId, TRegionReplicaSet>() {
+          {
+            put(dataRegionGroupId, dataRegionSet);
+            put(schemaRegionGroupId, schemaRegionSet);
+          }
+        };
+    Map<TConsensusGroupId, TRegionReplicaSet> failedRegions1 =
+        new HashMap<TConsensusGroupId, TRegionReplicaSet>() {
+          {
+            put(schemaRegionGroupId, schemaRegionSet);
+            put(dataRegionGroupId, dataRegionSet);
+          }
+        };
+    assertEquals(failedRegions0, failedRegions1);
+
+    CreateRegionGroupsPlan createRegionGroupsPlan = new CreateRegionGroupsPlan();
+    createRegionGroupsPlan.addRegionGroup("root.sg0", dataRegionSet);
+    createRegionGroupsPlan.addRegionGroup("root.sg1", schemaRegionSet);
+
+    CreateRegionGroupsProcedure procedure0 =
+        new CreateRegionGroupsProcedure(createRegionGroupsPlan, failedRegions0);
+    PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+
+    try {
+      procedure0.serialize(outputStream);
+      CreateRegionGroupsProcedure procedure1 = new CreateRegionGroupsProcedure();
+      ByteBuffer buffer =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+      buffer.getInt();
+      procedure1.deserialize(buffer);
+      assertEquals(procedure0, procedure1);
+      assertEquals(procedure0.hashCode(), procedure1.hashCode());
+    } catch (IOException e) {
+      fail();
+    }
+  }
+}