You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2021/06/29 03:34:54 UTC

[GitHub] [hbase] ddupg commented on a change in pull request #3430: HBASE-26029 It is not reliable to use nodeDeleted event to track regi…

ddupg commented on a change in pull request #3430:
URL: https://github.com/apache/hbase/pull/3430#discussion_r660247427



##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
##########
@@ -838,185 +795,116 @@ public void postLogRoll(Path newLog) throws IOException {
     }
   }
 
-  @Override
-  public void regionServerRemoved(ServerName regionserver) {
-    transferQueues(regionserver);
-  }
-
-  /**
-   * Transfer all the queues of the specified to this region server. First it tries to grab a lock
-   * and if it works it will move the old queues and finally will delete the old queues.
-   * <p>
-   * It creates one old source for any type of source of the old rs.
-   */
-  private void transferQueues(ServerName deadRS) {
-    if (server.getServerName().equals(deadRS)) {
-      // it's just us, give up
+  void claimQueue(ServerName deadRS, String queue) {
+    // Wait a bit before transferring the queues, we may be shutting down.
+    // This sleep may not be enough in some cases.
+    try {
+      Thread.sleep(sleepBeforeFailover +
+        (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover));
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while waiting before transferring a queue.");
+      Thread.currentThread().interrupt();
+    }
+    // We try to lock that rs' queue directory
+    if (server.isStopped()) {
+      LOG.info("Not transferring queue since we are shutting down");
+      return;
+    }
+    // After claim the queues from dead region server, wewill skip to start the

Review comment:
       NIT: `wewill` miss a space.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueuesProcedure.java
##########
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueuesStateData;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
+
+/**
+ * Used to assign the replication queues of a dead server to other region servers.
+ */
+@InterfaceAudience.Private
+public class ClaimReplicationQueuesProcedure extends Procedure<MasterProcedureEnv>
+  implements ServerProcedureInterface {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ClaimReplicationQueuesProcedure.class);
+
+  private ServerName crashedServer;
+
+  private RetryCounter retryCounter;
+
+  public ClaimReplicationQueuesProcedure() {
+  }
+
+  public ClaimReplicationQueuesProcedure(ServerName crashedServer) {
+    this.crashedServer = crashedServer;
+  }
+
+  @Override
+  public ServerName getServerName() {
+    return crashedServer;
+  }
+
+  @Override
+  public boolean hasMetaTableRegion() {
+    return false;
+  }
+
+  @Override
+  public ServerOperationType getServerOperationType() {
+    return ServerOperationType.CLAIM_REPLICATION_QUEUES;
+  }
+
+  @Override
+  protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
+    throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+    ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage();
+    try {
+      List<String> queues = storage.getAllQueues(crashedServer);
+      if (queues.isEmpty()) {
+        LOG.debug("Finish claiming replication queues for {}", crashedServer);
+        storage.removeReplicatorIfQueueIsEmpty(crashedServer);
+        // we are done
+        return null;
+      }
+      LOG.debug("There are {} replication queues need to be claimed for {}", queues.size(),
+        crashedServer);
+      List<ServerName> targetServers =
+        env.getMasterServices().getServerManager().getOnlineServersList();
+      if (targetServers.isEmpty()) {
+        throw new ReplicationException("no region server available");
+      }
+      Collections.shuffle(targetServers);
+      ClaimReplicationQueueRemoteProcedure[] procs =
+        new ClaimReplicationQueueRemoteProcedure[Math.min(queues.size(), targetServers.size())];
+      for (int i = 0; i < procs.length; i++) {
+        procs[i] = new ClaimReplicationQueueRemoteProcedure(crashedServer, queues.get(i),

Review comment:
       If queues.size() is greater than targetServers.size(), extra queues will not be claimed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@hbase.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org