You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2020/03/07 07:50:20 UTC

[hbase] branch master updated: HBASE-23895 STUCK Region-In-Transition when failed to insert procedure to procedure store (#1221)

This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b0d214  HBASE-23895 STUCK Region-In-Transition when failed to insert procedure to procedure store (#1221)
9b0d214 is described below

commit 9b0d214b7b7626d3706f5cddfe44e89962a9fcf0
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Sat Mar 7 15:50:09 2020 +0800

    HBASE-23895 STUCK Region-In-Transition when failed to insert procedure to procedure store (#1221)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
    Signed-off-by: stack <st...@apache.org>
---
 .../org/apache/hadoop/hbase/ipc/RpcServer.java     |  21 +++
 .../store/region/RegionProcedureStore.java         |  76 ++++++---
 .../store/region/TestRegionProcedureStore.java     | 175 +++++++++++++++++++++
 3 files changed, 246 insertions(+), 26 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index d36e89b..44909b2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -680,6 +680,27 @@ public abstract class RpcServer implements RpcServerInterface,
   }
 
   /**
+   * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. For
+   * master's rpc call, it may generate new procedure and mutate the region which store procedure.
+   * There are some check about rpc when mutate region, such as rpc timeout check. So unset the rpc
+   * call to avoid the rpc check.
+   * @return the currently ongoing rpc call
+   */
+  public static Optional<RpcCall> unsetCurrentCall() {
+    Optional<RpcCall> rpcCall = getCurrentCall();
+    CurCall.set(null);
+    return rpcCall;
+  }
+
+  /**
+   * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. Set the
+   * rpc call back after mutate region.
+   */
+  public static void setCurrentCall(RpcCall rpcCall) {
+    CurCall.set(rpcCall);
+  }
+
+  /**
    * Returns the user credentials associated with the current RPC request or not present if no
    * credentials were provided.
    * @return A User
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
index d153508..296a08b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
@@ -29,6 +29,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+
 import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -47,6 +49,8 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
@@ -538,6 +542,20 @@ public class RegionProcedureStore extends ProcedureStoreBase {
     rowsToLock.add(row);
   }
 
+  /**
+   * Insert procedure may be called by master's rpc call. There are some check about the rpc call
+   * when mutate region. Here unset the current rpc call and set it back in finally block. See
+   * HBASE-23895 for more details.
+   */
+  private void runWithoutRpcCall(Runnable runnable) {
+    Optional<RpcCall> rpcCall = RpcServer.unsetCurrentCall();
+    try {
+      runnable.run();
+    } finally {
+      rpcCall.ifPresent(RpcServer::setCurrentCall);
+    }
+  }
+
   @Override
   public void insert(Procedure<?> proc, Procedure<?>[] subProcs) {
     if (subProcs == null || subProcs.length == 0) {
@@ -547,17 +565,19 @@ public class RegionProcedureStore extends ProcedureStoreBase {
     }
     List<Mutation> mutations = new ArrayList<>(subProcs.length + 1);
     List<byte[]> rowsToLock = new ArrayList<>(subProcs.length + 1);
-    try {
-      serializePut(proc, mutations, rowsToLock);
-      for (Procedure<?> subProc : subProcs) {
-        serializePut(subProc, mutations, rowsToLock);
+    runWithoutRpcCall(() -> {
+      try {
+        serializePut(proc, mutations, rowsToLock);
+        for (Procedure<?> subProc : subProcs) {
+          serializePut(subProc, mutations, rowsToLock);
+        }
+        region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
+      } catch (IOException e) {
+        LOG.error(HBaseMarkers.FATAL, "Failed to insert proc {}, sub procs {}", proc,
+          Arrays.toString(subProcs), e);
+        throw new UncheckedIOException(e);
       }
-      region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
-    } catch (IOException e) {
-      LOG.error(HBaseMarkers.FATAL, "Failed to insert proc {}, sub procs {}", proc,
-        Arrays.toString(subProcs), e);
-      throw new UncheckedIOException(e);
-    }
+    });
     flusherAndCompactor.onUpdate();
   }
 
@@ -565,28 +585,32 @@ public class RegionProcedureStore extends ProcedureStoreBase {
   public void insert(Procedure<?>[] procs) {
     List<Mutation> mutations = new ArrayList<>(procs.length);
     List<byte[]> rowsToLock = new ArrayList<>(procs.length);
-    try {
-      for (Procedure<?> proc : procs) {
-        serializePut(proc, mutations, rowsToLock);
+    runWithoutRpcCall(() -> {
+      try {
+        for (Procedure<?> proc : procs) {
+          serializePut(proc, mutations, rowsToLock);
+        }
+        region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
+      } catch (IOException e) {
+        LOG.error(HBaseMarkers.FATAL, "Failed to insert procs {}", Arrays.toString(procs), e);
+        throw new UncheckedIOException(e);
       }
-      region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
-    } catch (IOException e) {
-      LOG.error(HBaseMarkers.FATAL, "Failed to insert procs {}", Arrays.toString(procs), e);
-      throw new UncheckedIOException(e);
-    }
+    });
     flusherAndCompactor.onUpdate();
   }
 
   @Override
   public void update(Procedure<?> proc) {
-    try {
-      ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
-      region.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(FAMILY, PROC_QUALIFIER,
-        proto.toByteArray()));
-    } catch (IOException e) {
-      LOG.error(HBaseMarkers.FATAL, "Failed to update proc {}", proc, e);
-      throw new UncheckedIOException(e);
-    }
+    runWithoutRpcCall(() -> {
+      try {
+        ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
+        region.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(FAMILY, PROC_QUALIFIER,
+          proto.toByteArray()));
+      } catch (IOException e) {
+        LOG.error(HBaseMarkers.FATAL, "Failed to update proc {}", proc, e);
+        throw new UncheckedIOException(e);
+      }
+    });
     flusherAndCompactor.onUpdate();
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
index 8b6535f..178e78d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
@@ -21,13 +21,22 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
+import java.net.InetAddress;
 import java.util.HashSet;
+import java.util.Optional;
 import java.util.Set;
+
+import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.hadoop.hbase.ipc.RpcCallback;
+import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -37,6 +46,13 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
+import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
+
 @Category({ MasterTests.class, SmallTests.class })
 public class TestRegionProcedureStore extends RegionProcedureStoreTestBase {
 
@@ -130,4 +146,163 @@ public class TestRegionProcedureStore extends RegionProcedureStoreTestBase {
     assertFalse(store.region
       .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
   }
+
+  /**
+   * Test for HBASE-23895
+   */
+  @Test
+  public void testInsertWithRpcCall() throws Exception {
+    RpcServer.setCurrentCall(newRpcCallWithDeadline());
+    RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure();
+    store.insert(proc1, null);
+    RpcServer.setCurrentCall(null);
+  }
+
+  private RpcCall newRpcCallWithDeadline() {
+    return new RpcCall() {
+      @Override
+      public long getDeadline() {
+        return System.currentTimeMillis();
+      }
+
+      @Override
+      public BlockingService getService() {
+        return null;
+      }
+
+      @Override
+      public Descriptors.MethodDescriptor getMethod() {
+        return null;
+      }
+
+      @Override
+      public Message getParam() {
+        return null;
+      }
+
+      @Override
+      public CellScanner getCellScanner() {
+        return null;
+      }
+
+      @Override
+      public long getReceiveTime() {
+        return 0;
+      }
+
+      @Override
+      public long getStartTime() {
+        return 0;
+      }
+
+      @Override
+      public void setStartTime(long startTime) {
+
+      }
+
+      @Override
+      public int getTimeout() {
+        return 0;
+      }
+
+      @Override
+      public int getPriority() {
+        return 0;
+      }
+
+      @Override
+      public long getSize() {
+        return 0;
+      }
+
+      @Override
+      public RPCProtos.RequestHeader getHeader() {
+        return null;
+      }
+
+      @Override
+      public int getRemotePort() {
+        return 0;
+      }
+
+      @Override
+      public void setResponse(Message param, CellScanner cells, Throwable errorThrowable,
+          String error) {
+      }
+
+      @Override
+      public void sendResponseIfReady() throws IOException {
+      }
+
+      @Override
+      public void cleanup() {
+      }
+
+      @Override
+      public String toShortString() {
+        return null;
+      }
+
+      @Override
+      public long disconnectSince() {
+        return 0;
+      }
+
+      @Override
+      public boolean isClientCellBlockSupported() {
+        return false;
+      }
+
+      @Override
+      public Optional<User> getRequestUser() {
+        return Optional.empty();
+      }
+
+      @Override
+      public InetAddress getRemoteAddress() {
+        return null;
+      }
+
+      @Override
+      public HBaseProtos.VersionInfo getClientVersionInfo() {
+        return null;
+      }
+
+      @Override
+      public void setCallBack(RpcCallback callback) {
+      }
+
+      @Override
+      public boolean isRetryImmediatelySupported() {
+        return false;
+      }
+
+      @Override
+      public long getResponseCellSize() {
+        return 0;
+      }
+
+      @Override
+      public void incrementResponseCellSize(long cellSize) {
+      }
+
+      @Override
+      public long getResponseBlockSize() {
+        return 0;
+      }
+
+      @Override
+      public void incrementResponseBlockSize(long blockSize) {
+      }
+
+      @Override
+      public long getResponseExceptionSize() {
+        return 0;
+      }
+
+      @Override
+      public void incrementResponseExceptionSize(long exceptionSize) {
+      }
+    };
+  }
 }