You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ti...@apache.org on 2019/03/01 03:00:27 UTC
[hbase] branch master updated: HBASE-21934
RemoteProcedureDispatcher should track the ongoing dispatched calls
This is an automated email from the ASF dual-hosted git repository.
tianjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new c19bc59 HBASE-21934 RemoteProcedureDispatcher should track the ongoing dispatched calls
c19bc59 is described below
commit c19bc5911b31552eb5f84628e78675cf1907959d
Author: Jingyun Tian <ti...@gmail.com>
AuthorDate: Thu Feb 28 19:53:24 2019 +0800
HBASE-21934 RemoteProcedureDispatcher should track the ongoing dispatched calls
---
.../procedure2/RemoteProcedureDispatcher.java | 29 +++
.../assignment/RegionRemoteProcedureBase.java | 5 +
.../assignment/RegionTransitionProcedure.java | 1 +
.../master/procedure/ServerRemoteProcedure.java | 131 ++++++++++
.../master/procedure/SplitWALRemoteProcedure.java | 84 +-----
.../SwitchRpcThrottleRemoteProcedure.java | 61 +----
.../master/replication/RefreshPeerProcedure.java | 71 +-----
.../SyncReplicationReplayWALRemoteProcedure.java | 70 +----
.../procedure/TestServerRemoteProcedure.java | 282 +++++++++++++++++++++
9 files changed, 474 insertions(+), 260 deletions(-)
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
index 958b071..da9d08c 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
@@ -172,6 +172,16 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
}
}
+ public void removeCompletedOperation(final TRemote key, RemoteProcedure rp) {
+ BufferNode node = nodeMap.get(key);
+ if (node == null) {
+ LOG.warn("since no node for this key {}, we can't removed the finished remote procedure",
+ key);
+ return;
+ }
+ node.operationCompleted(rp);
+ }
+
/**
* Remove a remote node
* @param key the node identifier
@@ -237,6 +247,16 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
* method.
*/
void remoteOperationFailed(TEnv env, RemoteProcedureException error);
+
+ /**
+ * Whether store this remote procedure in dispatched queue
+ * only OpenRegionProcedure and CloseRegionProcedure return false since they are
+ * not fully controlled by dispatcher
+ */
+ default boolean storeInDispatchedQueue() {
+ return true;
+ }
+
}
/**
@@ -330,6 +350,7 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
protected final class BufferNode extends DelayedContainerWithTimestamp<TRemote>
implements RemoteNode<TEnv, TRemote> {
private Set<RemoteProcedure> operations;
+ private final Set<RemoteProcedure> dispatchedOperations = new HashSet<>();
protected BufferNode(final TRemote key) {
super(key, 0);
@@ -358,6 +379,8 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
public synchronized void dispatch() {
if (operations != null) {
remoteDispatch(getKey(), operations);
+ operations.stream().filter(operation -> operation.storeInDispatchedQueue())
+ .forEach(operation -> dispatchedOperations.add(operation));
this.operations = null;
}
}
@@ -367,6 +390,12 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
abortPendingOperations(getKey(), operations);
this.operations = null;
}
+ abortPendingOperations(getKey(), dispatchedOperations);
+ this.dispatchedOperations.clear();
+ }
+
+ public synchronized void operationCompleted(final RemoteProcedure remoteProcedure){
+ this.dispatchedOperations.remove(remoteProcedure);
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java
index 0a47671..f6d3a2e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java
@@ -180,6 +180,11 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
}
@Override
+ public boolean storeInDispatchedQueue() {
+ return false;
+ }
+
+ @Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
serializer.serialize(
RegionRemoteProcedureBaseStateData.newBuilder().setRegion(ProtobufUtil.toRegionInfo(region))
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
index 2f94765..d2f8e3c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
@@ -174,4 +174,5 @@ public abstract class RegionTransitionProcedure extends Procedure<MasterProcedur
// should not be called for region operation until we modified the open/close region procedure
throw new UnsupportedOperationException();
}
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java
new file mode 100644
index 0000000..568b95d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+/**
+ * This extract the common used methods of procedures which are send to remote servers. Developers
+ * who extends this class only need to override remoteCallBuild() and complete(). This procedure
+ * will help add the operation to {@link RSProcedureDispatcher}
+ *
+ * If adding the operation to dispatcher failed, addOperationToNode will throw
+ * FailedRemoteDispatchException, and this procedure will return null which procedure Executor will
+ * mark this procedure as complete. Thus the upper layer of this procedure must have a way to
+ * check if this procedure really succeed and how to deal with it.
+ *
+ * If sending the operation to remote RS failed, dispatcher will call remoteCallFailed() to
+ * handle this, which actually call remoteOperationDone with the exception.
+ * If the targetServer crashed but this procedure has no response, than dispatcher will call
+ * remoteOperationFailed() to handle this, which also calls remoteOperationDone with the exception.
+ * If the operation is successful, then remoteOperationCompleted will be called and actually calls
+ * the remoteOperationDone without exception.
+ *
+ * In remoteOperationDone, we'll check if the procedure is already get wake up by others. Then
+ * developer could implement complete() based on their own purpose.
+ *
+ * But basic logic is that if operation succeed, set succ to true and do the clean work.
+ *
+ * If operation failed and require to resend it to the same server, leave the succ as false.
+ *
+ * If operation failed and require to resend it to another server, set succ to true and upper layer
+ * should be able to find out this operation not work and send a operation to another server.
+ */
+public abstract class ServerRemoteProcedure extends Procedure<MasterProcedureEnv>
+ implements RemoteProcedureDispatcher.RemoteProcedure<MasterProcedureEnv, ServerName> {
+ protected static final Logger LOG = LoggerFactory.getLogger(ServerRemoteProcedure.class);
+ protected ProcedureEvent<?> event;
+ protected ServerName targetServer;
+ protected boolean dispatched;
+ protected boolean succ;
+
+ protected abstract void complete(MasterProcedureEnv env, Throwable error);
+
+ @Override
+ protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
+ throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+ if (dispatched) {
+ if (succ) {
+ return null;
+ }
+ dispatched = false;
+ }
+ try {
+ env.getRemoteDispatcher().addOperationToNode(targetServer, this);
+ } catch (FailedRemoteDispatchException frde) {
+ LOG.warn("Can not send remote operation {} to {}, this operation will "
+ + "be retried to send to another server",
+ this.getProcId(), targetServer);
+ return null;
+ }
+ dispatched = true;
+ event = new ProcedureEvent<>(this);
+ event.suspendIfNotReady(this);
+ throw new ProcedureSuspendedException();
+ }
+
+ @Override
+ protected synchronized void completionCleanup(MasterProcedureEnv env) {
+ env.getRemoteDispatcher().removeCompletedOperation(targetServer, this);
+ }
+
+ @Override
+ public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName serverName,
+ IOException exception) {
+ remoteOperationDone(env, exception);
+ }
+
+ @Override
+ public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
+ remoteOperationDone(env, null);
+ }
+
+ @Override
+ public synchronized void remoteOperationFailed(MasterProcedureEnv env,
+ RemoteProcedureException error) {
+ remoteOperationDone(env, error);
+ }
+
+ synchronized void remoteOperationDone(MasterProcedureEnv env, Throwable error) {
+ if (this.isFinished()) {
+ LOG.info("This procedure {} is already finished, skip the rest processes", this.getProcId());
+ return;
+ }
+ if (event == null) {
+ LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery",
+ getProcId());
+ return;
+ }
+ complete(env, error);
+ event.wake(env.getProcedureScheduler());
+ event = null;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java
index fb2dbd7..d227022 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java
@@ -23,16 +23,8 @@ import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.procedure2.NoNodeDispatchException;
-import org.apache.hadoop.hbase.procedure2.NoServerDispatchException;
-import org.apache.hadoop.hbase.procedure2.NullTargetServerDispatchException;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
-import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
-import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
-import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
import org.apache.hadoop.hbase.regionserver.SplitWALCallable;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
@@ -48,51 +40,22 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
* DoNotRetryIOException. Otherwise it will retry until succeed.
*/
@InterfaceAudience.Private
-public class SplitWALRemoteProcedure extends Procedure<MasterProcedureEnv>
- implements RemoteProcedureDispatcher.RemoteProcedure<MasterProcedureEnv, ServerName>,
- ServerProcedureInterface {
+public class SplitWALRemoteProcedure extends ServerRemoteProcedure
+ implements ServerProcedureInterface {
private static final Logger LOG = LoggerFactory.getLogger(SplitWALRemoteProcedure.class);
private String walPath;
- private ServerName worker;
private ServerName crashedServer;
- private boolean dispatched;
- private ProcedureEvent<?> event;
- private boolean success = false;
public SplitWALRemoteProcedure() {
}
public SplitWALRemoteProcedure(ServerName worker, ServerName crashedServer, String wal) {
- this.worker = worker;
+ this.targetServer = worker;
this.crashedServer = crashedServer;
this.walPath = wal;
}
@Override
- protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
- throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
- if (dispatched) {
- if (success) {
- return null;
- }
- dispatched = false;
- }
- try {
- env.getRemoteDispatcher().addOperationToNode(worker, this);
- } catch (NoNodeDispatchException | NullTargetServerDispatchException
- | NoServerDispatchException e) {
- // When send to a wrong target server, it need construct a new SplitWALRemoteProcedure.
- // Thus return null for this procedure and let SplitWALProcedure to handle this.
- LOG.warn("dispatch WAL {} to {} failed, will retry on another server", walPath, worker, e);
- return null;
- }
- dispatched = true;
- event = new ProcedureEvent<>(this);
- event.suspendIfNotReady(this);
- throw new ProcedureSuspendedException();
- }
-
- @Override
protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@@ -106,7 +69,7 @@ public class SplitWALRemoteProcedure extends Procedure<MasterProcedureEnv>
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
MasterProcedureProtos.SplitWALRemoteData.Builder builder =
MasterProcedureProtos.SplitWALRemoteData.newBuilder();
- builder.setWalPath(walPath).setWorker(ProtobufUtil.toServerName(worker))
+ builder.setWalPath(walPath).setWorker(ProtobufUtil.toServerName(targetServer))
.setCrashedServer(ProtobufUtil.toServerName(crashedServer));
serializer.serialize(builder.build());
}
@@ -116,7 +79,7 @@ public class SplitWALRemoteProcedure extends Procedure<MasterProcedureEnv>
MasterProcedureProtos.SplitWALRemoteData data =
serializer.deserialize(MasterProcedureProtos.SplitWALRemoteData.class);
walPath = data.getWalPath();
- worker = ProtobufUtil.toServerName(data.getWorker());
+ targetServer = ProtobufUtil.toServerName(data.getWorker());
crashedServer = ProtobufUtil.toServerName(data.getCrashedServer());
}
@@ -129,48 +92,25 @@ public class SplitWALRemoteProcedure extends Procedure<MasterProcedureEnv>
}
@Override
- public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName,
- IOException exception) {
- complete(env, exception);
- }
-
- @Override
- public void remoteOperationCompleted(MasterProcedureEnv env) {
- complete(env, null);
- }
-
- private void complete(MasterProcedureEnv env, Throwable error) {
- if (event == null) {
- LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery",
- getProcId());
- return;
- }
+ protected void complete(MasterProcedureEnv env, Throwable error) {
if (error == null) {
- LOG.info("split WAL {} on {} succeeded", walPath, worker);
+ LOG.info("split WAL {} on {} succeeded", walPath, targetServer);
try {
env.getMasterServices().getSplitWALManager().deleteSplitWAL(walPath);
- } catch (IOException e){
+ } catch (IOException e) {
LOG.warn("remove WAL {} failed, ignore...", walPath, e);
}
- success = true;
+ succ = true;
} else {
if (error instanceof DoNotRetryIOException) {
LOG.warn("WAL split task of {} send to a wrong server {}, will retry on another server",
- walPath, worker, error);
- success = true;
+ walPath, targetServer, error);
+ succ = true;
} else {
LOG.warn("split WAL {} failed, retry...", walPath, error);
- success = false;
+ succ = false;
}
-
}
- event.wake(env.getProcedureScheduler());
- event = null;
- }
-
- @Override
- public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) {
- complete(env, error);
}
public String getWAL() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java
index 9a56ddc..c69faf6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java
@@ -18,17 +18,12 @@
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
+
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
-import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
-import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
-import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
-import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
import org.apache.hadoop.hbase.replication.regionserver.SwitchRpcThrottleRemoteCallable;
+
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,11 +35,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.S
* The procedure to switch rpc throttle on region server
*/
@InterfaceAudience.Private
-public class SwitchRpcThrottleRemoteProcedure extends Procedure<MasterProcedureEnv>
- implements RemoteProcedure<MasterProcedureEnv, ServerName>, ServerProcedureInterface {
+public class SwitchRpcThrottleRemoteProcedure extends ServerRemoteProcedure
+ implements ServerProcedureInterface {
private static final Logger LOG = LoggerFactory.getLogger(SwitchRpcThrottleRemoteProcedure.class);
- private ServerName targetServer;
private boolean rpcThrottleEnabled;
public SwitchRpcThrottleRemoteProcedure() {
@@ -55,32 +49,6 @@ public class SwitchRpcThrottleRemoteProcedure extends Procedure<MasterProcedureE
this.rpcThrottleEnabled = rpcThrottleEnabled;
}
- private boolean dispatched;
- private ProcedureEvent<?> event;
- private boolean succ;
-
- @Override
- protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
- throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
- if (dispatched) {
- if (succ) {
- return null;
- }
- dispatched = false;
- }
- try {
- env.getRemoteDispatcher().addOperationToNode(targetServer, this);
- } catch (FailedRemoteDispatchException frde) {
- LOG.warn("Can not add remote operation for switching rpc throttle to {} on {}",
- rpcThrottleEnabled, targetServer);
- return null;
- }
- dispatched = true;
- event = new ProcedureEvent<>(this);
- event.suspendIfNotReady(this);
- throw new ProcedureSuspendedException();
- }
-
@Override
protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
}
@@ -118,22 +86,6 @@ public class SwitchRpcThrottleRemoteProcedure extends Procedure<MasterProcedureE
}
@Override
- public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName,
- IOException exception) {
- complete(env, exception);
- }
-
- @Override
- public void remoteOperationCompleted(MasterProcedureEnv env) {
- complete(env, null);
- }
-
- @Override
- public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) {
- complete(env, error);
- }
-
- @Override
public ServerName getServerName() {
return targetServer;
}
@@ -148,7 +100,8 @@ public class SwitchRpcThrottleRemoteProcedure extends Procedure<MasterProcedureE
return ServerOperationType.SWITCH_RPC_THROTTLE;
}
- private void complete(MasterProcedureEnv env, Throwable error) {
+ @Override
+ protected void complete(MasterProcedureEnv env, Throwable error) {
if (error != null) {
LOG.warn("Failed to switch rpc throttle to {} on server {}", rpcThrottleEnabled, targetServer,
error);
@@ -156,8 +109,6 @@ public class SwitchRpcThrottleRemoteProcedure extends Procedure<MasterProcedureE
} else {
this.succ = true;
}
- event.wake(env.getProcedureScheduler());
- event = null;
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
index 2f43ae9..d39235f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
@@ -22,15 +22,10 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
-import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
+import org.apache.hadoop.hbase.master.procedure.ServerRemoteProcedure;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
-import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
-import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
-import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
import org.apache.hadoop.hbase.replication.regionserver.RefreshPeerCallable;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -42,7 +37,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.R
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerStateData;
@InterfaceAudience.Private
-public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
+public class RefreshPeerProcedure extends ServerRemoteProcedure
implements PeerProcedureInterface, RemoteProcedure<MasterProcedureEnv, ServerName> {
private static final Logger LOG = LoggerFactory.getLogger(RefreshPeerProcedure.class);
@@ -51,18 +46,8 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
private PeerOperationType type;
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
- justification = "Will never change after construction")
- private ServerName targetServer;
-
private int stage;
- private boolean dispatched;
-
- private ProcedureEvent<?> event;
-
- private boolean succ;
-
public RefreshPeerProcedure() {
}
@@ -135,12 +120,8 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
.toByteArray());
}
- private void complete(MasterProcedureEnv env, Throwable error) {
- if (event == null) {
- LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery",
- getProcId());
- return;
- }
+ @Override
+ protected void complete(MasterProcedureEnv env, Throwable error) {
if (error != null) {
LOG.warn("Refresh peer {} for {} on {} failed", peerId, type, targetServer, error);
this.succ = false;
@@ -148,50 +129,6 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
LOG.info("Refresh peer {} for {} on {} suceeded", peerId, type, targetServer);
this.succ = true;
}
-
- event.wake(env.getProcedureScheduler());
- event = null;
- }
-
- @Override
- public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName remote,
- IOException exception) {
- complete(env, exception);
- }
-
- @Override
- public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
- complete(env, null);
- }
-
- @Override
- public synchronized void remoteOperationFailed(MasterProcedureEnv env,
- RemoteProcedureException error) {
- complete(env, error);
- }
-
- @Override
- protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
- throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
- if (dispatched) {
- if (succ) {
- return null;
- }
- // retry
- dispatched = false;
- }
- try {
- env.getRemoteDispatcher().addOperationToNode(targetServer, this);
- } catch (FailedRemoteDispatchException frde) {
- LOG.info("Can not add remote operation for refreshing peer {} for {} to {}, " +
- "this is usually because the server is already dead, " +
- "give up and mark the procedure as complete", peerId, type, targetServer, frde);
- return null;
- }
- dispatched = true;
- event = new ProcedureEvent<>(this);
- event.suspendIfNotReady(this);
- throw new ProcedureSuspendedException();
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java
index 8e6d411..7452378 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java
@@ -25,15 +25,9 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
-import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
+import org.apache.hadoop.hbase.master.procedure.ServerRemoteProcedure;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
-import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
-import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
-import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
-import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
import org.apache.hadoop.hbase.replication.regionserver.ReplaySyncReplicationWALCallable;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -47,24 +41,16 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.S
* A remote procedure which is used to send replaying remote wal work to region server.
*/
@InterfaceAudience.Private
-public class SyncReplicationReplayWALRemoteProcedure extends Procedure<MasterProcedureEnv>
- implements RemoteProcedure<MasterProcedureEnv, ServerName>, PeerProcedureInterface {
+public class SyncReplicationReplayWALRemoteProcedure extends ServerRemoteProcedure
+ implements PeerProcedureInterface {
private static final Logger LOG =
LoggerFactory.getLogger(SyncReplicationReplayWALRemoteProcedure.class);
private String peerId;
- private ServerName targetServer;
-
private List<String> wals;
- private boolean dispatched;
-
- private ProcedureEvent<?> event;
-
- private boolean succ;
-
public SyncReplicationReplayWALRemoteProcedure() {
}
@@ -85,27 +71,7 @@ public class SyncReplicationReplayWALRemoteProcedure extends Procedure<MasterPro
builder.build().toByteArray());
}
- @Override
- public void remoteCallFailed(MasterProcedureEnv env, ServerName remote, IOException exception) {
- complete(env, exception);
- }
-
- @Override
- public void remoteOperationCompleted(MasterProcedureEnv env) {
- complete(env, null);
- }
-
- @Override
- public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) {
- complete(env, error);
- }
-
- private void complete(MasterProcedureEnv env, Throwable error) {
- if (event == null) {
- LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery",
- getProcId());
- return;
- }
+ protected void complete(MasterProcedureEnv env, Throwable error) {
if (error != null) {
LOG.warn("Replay wals {} on {} failed for peer id={}", wals, targetServer, peerId, error);
this.succ = false;
@@ -114,8 +80,6 @@ public class SyncReplicationReplayWALRemoteProcedure extends Procedure<MasterPro
LOG.info("Replay wals {} on {} succeed for peer id={}", wals, targetServer, peerId);
this.succ = true;
}
- event.wake(env.getProcedureScheduler());
- event = null;
}
/**
@@ -148,32 +112,6 @@ public class SyncReplicationReplayWALRemoteProcedure extends Procedure<MasterPro
}
@Override
- protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
- throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
- if (dispatched) {
- if (succ) {
- return null;
- }
- // retry
- dispatched = false;
- }
-
- // Dispatch task to target server
- try {
- env.getRemoteDispatcher().addOperationToNode(targetServer, this);
- } catch (FailedRemoteDispatchException e) {
- LOG.warn("Can not add remote operation for replay wals {} on {} for peer id={}, " +
- "this usually because the server is already dead", wals, targetServer, peerId);
- // Return directly and the parent procedure will assign a new worker to replay wals
- return null;
- }
- dispatched = true;
- event = new ProcedureEvent<>(this);
- event.suspendIfNotReady(this);
- throw new ProcedureSuspendedException();
- }
-
- @Override
protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java
new file mode 100644
index 0000000..d4745b9
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java
@@ -0,0 +1,282 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SWITCH_RPC_THROTTLE;
+
+import java.io.IOException;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
+import org.apache.hadoop.hbase.master.assignment.MockMasterServices;
+import org.apache.hadoop.hbase.master.assignment.OpenRegionProcedure;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestServerRemoteProcedure {
+ private static final Logger LOG = LoggerFactory.getLogger(TestServerRemoteProcedure.class);
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestServerRemoteProcedure.class);
+ @Rule
+ public TestName name = new TestName();
+ @Rule
+ public final ExpectedException exception = ExpectedException.none();
+ protected HBaseTestingUtility util;
+ protected MockRSProcedureDispatcher rsDispatcher;
+ protected MockMasterServices master;
+ protected AssignmentManager am;
+ protected NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers =
+ new ConcurrentSkipListMap<>();
+ // Simple executor to run some simple tasks.
+ protected ScheduledExecutorService executor;
+
+ @Before
+ public void setUp() throws Exception {
+ util = new HBaseTestingUtility();
+ this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
+ .setUncaughtExceptionHandler((t, e) -> LOG.warn("Uncaught: ", e)).build());
+ master = new MockMasterServices(util.getConfiguration(), this.regionsToRegionServers);
+ rsDispatcher = new MockRSProcedureDispatcher(master);
+ rsDispatcher.setMockRsExecutor(new NoopRSExecutor());
+ master.start(2, rsDispatcher);
+ am = master.getAssignmentManager();
+ master.getServerManager().getOnlineServersList().stream()
+ .forEach(serverName -> am.getRegionStates().getOrCreateServer(serverName));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ master.stop("tearDown");
+ this.executor.shutdownNow();
+ }
+
+ @Test
+ public void testSplitWALAndCrashBeforeResponse() throws Exception {
+ ServerName worker = master.getServerManager().getOnlineServersList().get(0);
+ ServerName crashedWorker = master.getServerManager().getOnlineServersList().get(1);
+ ServerRemoteProcedure splitWALRemoteProcedure =
+ new SplitWALRemoteProcedure(worker, crashedWorker, "test");
+ Future<byte[]> future = submitProcedure(splitWALRemoteProcedure);
+ Thread.sleep(2000);
+ master.getServerManager().expireServer(worker);
+ // if remoteCallFailed is called for this procedure, this procedure should be finished.
+ future.get(5000, TimeUnit.MILLISECONDS);
+ Assert.assertTrue(splitWALRemoteProcedure.isSuccess());
+ }
+
+ @Test
+ public void testRemoteCompleteAndFailedAtTheSameTime() throws Exception {
+ ServerName worker = master.getServerManager().getOnlineServersList().get(0);
+ ServerRemoteProcedure noopServerRemoteProcedure = new NoopServerRemoteProcedure(worker);
+ Future<byte[]> future = submitProcedure(noopServerRemoteProcedure);
+ Thread.sleep(2000);
+ // complete the process and fail the process at the same time
+ ExecutorService threadPool = Executors.newFixedThreadPool(2);
+ threadPool.execute(() -> noopServerRemoteProcedure
+ .remoteOperationDone(master.getMasterProcedureExecutor().getEnvironment(), null));
+ threadPool.execute(() -> noopServerRemoteProcedure.remoteCallFailed(
+ master.getMasterProcedureExecutor().getEnvironment(), worker, new IOException()));
+ future.get(2000, TimeUnit.MILLISECONDS);
+ Assert.assertTrue(noopServerRemoteProcedure.isSuccess());
+ }
+
+ @Test
+ public void testRegionOpenProcedureIsNotHandledByDisPatcher() throws Exception {
+ TableName tableName = TableName.valueOf("testRegionOpenProcedureIsNotHandledByDisPatcher");
+ RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(1))
+ .setEndKey(Bytes.toBytes(2)).setSplit(false).setRegionId(0).build();
+ master.getMasterProcedureExecutor().getEnvironment().getAssignmentManager().getRegionStates()
+ .getOrCreateRegionStateNode(hri);
+ ServerName worker = master.getServerManager().getOnlineServersList().get(0);
+ OpenRegionProcedure openRegionProcedure = new OpenRegionProcedure(hri, worker);
+ Future<byte[]> future = submitProcedure(openRegionProcedure);
+ Thread.sleep(2000);
+ rsDispatcher.removeNode(worker);
+ try {
+ future.get(2000, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ LOG.info("timeout is expected");
+ }
+ Assert.assertFalse(openRegionProcedure.isFinished());
+ }
+
+ private Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) {
+ return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
+ }
+
+ private static class NoopServerRemoteProcedure extends ServerRemoteProcedure
+ implements ServerProcedureInterface {
+
+ public NoopServerRemoteProcedure(ServerName targetServer) {
+ this.targetServer = targetServer;
+ }
+
+ @Override
+ protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
+ return;
+ }
+
+ @Override
+ protected boolean abort(MasterProcedureEnv env) {
+ return false;
+ }
+
+ @Override
+ protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ return;
+ }
+
+ @Override
+ protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ return;
+ }
+
+ @Override
+ public RemoteProcedureDispatcher.RemoteOperation remoteCallBuild(MasterProcedureEnv env,
+ ServerName serverName) {
+ return new RSProcedureDispatcher.ServerOperation(null, 0L, this.getClass(), new byte[0]);
+ }
+
+ @Override
+ public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
+ complete(env, null);
+ }
+
+ @Override
+ public synchronized void remoteOperationFailed(MasterProcedureEnv env,
+ RemoteProcedureException error) {
+ complete(env, error);
+ }
+
+ @Override
+ public void complete(MasterProcedureEnv env, Throwable error) {
+ this.succ = true;
+ return;
+ }
+
+ @Override
+ public ServerName getServerName() {
+ return targetServer;
+ }
+
+ @Override
+ public boolean hasMetaTableRegion() {
+ return false;
+ }
+
+ @Override
+ public ServerOperationType getServerOperationType() {
+ return SWITCH_RPC_THROTTLE;
+ }
+
+ }
+
+ protected interface MockRSExecutor {
+ AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
+ AdminProtos.ExecuteProceduresRequest req) throws IOException;
+ }
+
+ protected static class NoopRSExecutor implements MockRSExecutor {
+ @Override
+ public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
+ AdminProtos.ExecuteProceduresRequest req) throws IOException {
+ if (req.getOpenRegionCount() > 0) {
+ for (AdminProtos.OpenRegionRequest request : req.getOpenRegionList()) {
+ for (AdminProtos.OpenRegionRequest.RegionOpenInfo openReq : request.getOpenInfoList()) {
+ execOpenRegion(server, openReq);
+ }
+ }
+ }
+ return AdminProtos.ExecuteProceduresResponse.getDefaultInstance();
+ }
+
+ protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(ServerName server,
+ AdminProtos.OpenRegionRequest.RegionOpenInfo regionInfo) throws IOException {
+ return null;
+ }
+ }
+
+ protected static class MockRSProcedureDispatcher extends RSProcedureDispatcher {
+ private MockRSExecutor mockRsExec;
+
+ public MockRSProcedureDispatcher(final MasterServices master) {
+ super(master);
+ }
+
+ public void setMockRsExecutor(final MockRSExecutor mockRsExec) {
+ this.mockRsExec = mockRsExec;
+ }
+
+ @Override
+ protected void remoteDispatch(ServerName serverName,
+ @SuppressWarnings("rawtypes") Set<RemoteProcedure> remoteProcedures) {
+ submitTask(new MockRSProcedureDispatcher.MockRemoteCall(serverName, remoteProcedures));
+ }
+
+ private class MockRemoteCall extends ExecuteProceduresRemoteCall {
+ public MockRemoteCall(final ServerName serverName,
+ @SuppressWarnings("rawtypes") final Set<RemoteProcedure> operations) {
+ super(serverName, operations);
+ }
+
+ @Override
+ protected AdminProtos.ExecuteProceduresResponse sendRequest(final ServerName serverName,
+ final AdminProtos.ExecuteProceduresRequest request) throws IOException {
+ return mockRsExec.sendRequest(serverName, request);
+ }
+ }
+ }
+}