You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2020/03/07 07:50:20 UTC
[hbase] branch master updated: HBASE-23895 STUCK
Region-In-Transition when failed to insert procedure to procedure store
(#1221)
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 9b0d214 HBASE-23895 STUCK Region-In-Transition when failed to insert procedure to procedure store (#1221)
9b0d214 is described below
commit 9b0d214b7b7626d3706f5cddfe44e89962a9fcf0
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Sat Mar 7 15:50:09 2020 +0800
HBASE-23895 STUCK Region-In-Transition when failed to insert procedure to procedure store (#1221)
Signed-off-by: Duo Zhang <zh...@apache.org>
Signed-off-by: stack <st...@apache.org>
---
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 21 +++
.../store/region/RegionProcedureStore.java | 76 ++++++---
.../store/region/TestRegionProcedureStore.java | 175 +++++++++++++++++++++
3 files changed, 246 insertions(+), 26 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index d36e89b..44909b2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -680,6 +680,27 @@ public abstract class RpcServer implements RpcServerInterface,
}
/**
+ * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. For
+ * master's rpc call, it may generate new procedure and mutate the region which store procedure.
+ * There are some check about rpc when mutate region, such as rpc timeout check. So unset the rpc
+ * call to avoid the rpc check.
+ * @return the currently ongoing rpc call
+ */
+ public static Optional<RpcCall> unsetCurrentCall() {
+ Optional<RpcCall> rpcCall = getCurrentCall();
+ CurCall.set(null);
+ return rpcCall;
+ }
+
+ /**
+ * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. Set the
+ * rpc call back after mutate region.
+ */
+ public static void setCurrentCall(RpcCall rpcCall) {
+ CurCall.set(rpcCall);
+ }
+
+ /**
* Returns the user credentials associated with the current RPC request or not present if no
* credentials were provided.
* @return A User
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
index d153508..296a08b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
@@ -29,6 +29,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -47,6 +49,8 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
@@ -538,6 +542,20 @@ public class RegionProcedureStore extends ProcedureStoreBase {
rowsToLock.add(row);
}
+ /**
+ * Insert procedure may be called by master's rpc call. There are some check about the rpc call
+ * when mutate region. Here unset the current rpc call and set it back in finally block. See
+ * HBASE-23895 for more details.
+ */
+ private void runWithoutRpcCall(Runnable runnable) {
+ Optional<RpcCall> rpcCall = RpcServer.unsetCurrentCall();
+ try {
+ runnable.run();
+ } finally {
+ rpcCall.ifPresent(RpcServer::setCurrentCall);
+ }
+ }
+
@Override
public void insert(Procedure<?> proc, Procedure<?>[] subProcs) {
if (subProcs == null || subProcs.length == 0) {
@@ -547,17 +565,19 @@ public class RegionProcedureStore extends ProcedureStoreBase {
}
List<Mutation> mutations = new ArrayList<>(subProcs.length + 1);
List<byte[]> rowsToLock = new ArrayList<>(subProcs.length + 1);
- try {
- serializePut(proc, mutations, rowsToLock);
- for (Procedure<?> subProc : subProcs) {
- serializePut(subProc, mutations, rowsToLock);
+ runWithoutRpcCall(() -> {
+ try {
+ serializePut(proc, mutations, rowsToLock);
+ for (Procedure<?> subProc : subProcs) {
+ serializePut(subProc, mutations, rowsToLock);
+ }
+ region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
+ } catch (IOException e) {
+ LOG.error(HBaseMarkers.FATAL, "Failed to insert proc {}, sub procs {}", proc,
+ Arrays.toString(subProcs), e);
+ throw new UncheckedIOException(e);
}
- region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
- } catch (IOException e) {
- LOG.error(HBaseMarkers.FATAL, "Failed to insert proc {}, sub procs {}", proc,
- Arrays.toString(subProcs), e);
- throw new UncheckedIOException(e);
- }
+ });
flusherAndCompactor.onUpdate();
}
@@ -565,28 +585,32 @@ public class RegionProcedureStore extends ProcedureStoreBase {
public void insert(Procedure<?>[] procs) {
List<Mutation> mutations = new ArrayList<>(procs.length);
List<byte[]> rowsToLock = new ArrayList<>(procs.length);
- try {
- for (Procedure<?> proc : procs) {
- serializePut(proc, mutations, rowsToLock);
+ runWithoutRpcCall(() -> {
+ try {
+ for (Procedure<?> proc : procs) {
+ serializePut(proc, mutations, rowsToLock);
+ }
+ region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
+ } catch (IOException e) {
+ LOG.error(HBaseMarkers.FATAL, "Failed to insert procs {}", Arrays.toString(procs), e);
+ throw new UncheckedIOException(e);
}
- region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
- } catch (IOException e) {
- LOG.error(HBaseMarkers.FATAL, "Failed to insert procs {}", Arrays.toString(procs), e);
- throw new UncheckedIOException(e);
- }
+ });
flusherAndCompactor.onUpdate();
}
@Override
public void update(Procedure<?> proc) {
- try {
- ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
- region.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(FAMILY, PROC_QUALIFIER,
- proto.toByteArray()));
- } catch (IOException e) {
- LOG.error(HBaseMarkers.FATAL, "Failed to update proc {}", proc, e);
- throw new UncheckedIOException(e);
- }
+ runWithoutRpcCall(() -> {
+ try {
+ ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
+ region.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(FAMILY, PROC_QUALIFIER,
+ proto.toByteArray()));
+ } catch (IOException e) {
+ LOG.error(HBaseMarkers.FATAL, "Failed to update proc {}", proc, e);
+ throw new UncheckedIOException(e);
+ }
+ });
flusherAndCompactor.onUpdate();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
index 8b6535f..178e78d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
@@ -21,13 +21,22 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import java.net.InetAddress;
import java.util.HashSet;
+import java.util.Optional;
import java.util.Set;
+
+import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.hadoop.hbase.ipc.RpcCallback;
+import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -37,6 +46,13 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
+import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
+
@Category({ MasterTests.class, SmallTests.class })
public class TestRegionProcedureStore extends RegionProcedureStoreTestBase {
@@ -130,4 +146,163 @@ public class TestRegionProcedureStore extends RegionProcedureStoreTestBase {
assertFalse(store.region
.get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
}
+
+ /**
+ * Test for HBASE-23895
+ */
+ @Test
+ public void testInsertWithRpcCall() throws Exception {
+ RpcServer.setCurrentCall(newRpcCallWithDeadline());
+ RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure();
+ store.insert(proc1, null);
+ RpcServer.setCurrentCall(null);
+ }
+
+ private RpcCall newRpcCallWithDeadline() {
+ return new RpcCall() {
+ @Override
+ public long getDeadline() {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public BlockingService getService() {
+ return null;
+ }
+
+ @Override
+ public Descriptors.MethodDescriptor getMethod() {
+ return null;
+ }
+
+ @Override
+ public Message getParam() {
+ return null;
+ }
+
+ @Override
+ public CellScanner getCellScanner() {
+ return null;
+ }
+
+ @Override
+ public long getReceiveTime() {
+ return 0;
+ }
+
+ @Override
+ public long getStartTime() {
+ return 0;
+ }
+
+ @Override
+ public void setStartTime(long startTime) {
+
+ }
+
+ @Override
+ public int getTimeout() {
+ return 0;
+ }
+
+ @Override
+ public int getPriority() {
+ return 0;
+ }
+
+ @Override
+ public long getSize() {
+ return 0;
+ }
+
+ @Override
+ public RPCProtos.RequestHeader getHeader() {
+ return null;
+ }
+
+ @Override
+ public int getRemotePort() {
+ return 0;
+ }
+
+ @Override
+ public void setResponse(Message param, CellScanner cells, Throwable errorThrowable,
+ String error) {
+ }
+
+ @Override
+ public void sendResponseIfReady() throws IOException {
+ }
+
+ @Override
+ public void cleanup() {
+ }
+
+ @Override
+ public String toShortString() {
+ return null;
+ }
+
+ @Override
+ public long disconnectSince() {
+ return 0;
+ }
+
+ @Override
+ public boolean isClientCellBlockSupported() {
+ return false;
+ }
+
+ @Override
+ public Optional<User> getRequestUser() {
+ return Optional.empty();
+ }
+
+ @Override
+ public InetAddress getRemoteAddress() {
+ return null;
+ }
+
+ @Override
+ public HBaseProtos.VersionInfo getClientVersionInfo() {
+ return null;
+ }
+
+ @Override
+ public void setCallBack(RpcCallback callback) {
+ }
+
+ @Override
+ public boolean isRetryImmediatelySupported() {
+ return false;
+ }
+
+ @Override
+ public long getResponseCellSize() {
+ return 0;
+ }
+
+ @Override
+ public void incrementResponseCellSize(long cellSize) {
+ }
+
+ @Override
+ public long getResponseBlockSize() {
+ return 0;
+ }
+
+ @Override
+ public void incrementResponseBlockSize(long blockSize) {
+ }
+
+ @Override
+ public long getResponseExceptionSize() {
+ return 0;
+ }
+
+ @Override
+ public void incrementResponseExceptionSize(long exceptionSize) {
+ }
+ };
+ }
}