You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2020/10/22 06:56:18 UTC
[hbase] 04/07: HBASE-24683 Add a basic ReplicationServer which only
implement ReplicationSink Service (#2111)
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit d43d2c9afa55139cb419229ba960941f823f4168
Author: XinSun <dd...@gmail.com>
AuthorDate: Fri Sep 4 18:53:46 2020 +0800
HBASE-24683 Add a basic ReplicationServer which only implement ReplicationSink Service (#2111)
Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
.../java/org/apache/hadoop/hbase/util/DNS.java | 3 +-
.../hbase/replication/HReplicationServer.java | 391 ++++++++++++++++
.../replication/ReplicationServerRpcServices.java | 516 +++++++++++++++++++++
.../hbase/replication/TestReplicationServer.java | 151 ++++++
4 files changed, 1060 insertions(+), 1 deletion(-)
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DNS.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DNS.java
index 2b4e1cb..ddff6db 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DNS.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DNS.java
@@ -54,7 +54,8 @@ public final class DNS {
public enum ServerType {
MASTER("master"),
- REGIONSERVER("regionserver");
+ REGIONSERVER("regionserver"),
+ REPLICATIONSERVER("replicationserver");
private String name;
ServerType(String name) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
new file mode 100644
index 0000000..31dec0c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
@@ -0,0 +1,391 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.regionserver.ReplicationService;
+import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.trace.TraceUtil;
+import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HReplicationServer which is responsible to all replication stuff. It checks in with
+ * the HMaster. There are many HReplicationServers in a single HBase deployment.
+ */
+@InterfaceAudience.Private
+@SuppressWarnings({ "deprecation"})
+public class HReplicationServer extends Thread implements Server {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HReplicationServer.class);
+
+ /** replication server process name */
+ public static final String REPLICATION_SERVER = "replicationserver";
+
+ /**
+ * This servers start code.
+ */
+ protected final long startCode;
+
+ private volatile boolean stopped = false;
+
+ // Go down hard. Used if file system becomes unavailable and also in
+ // debugging and unit tests.
+ private AtomicBoolean abortRequested;
+
+ // flag set after we're done setting up server threads
+ final AtomicBoolean online = new AtomicBoolean(false);
+
+ /**
+ * The server name the Master sees us as. Its made from the hostname the
+ * master passes us, port, and server start code. Gets set after registration
+ * against Master.
+ */
+ private ServerName serverName;
+
+ protected final Configuration conf;
+
+ private ReplicationSinkService replicationSinkService;
+
+ final int msgInterval;
+ // A sleeper that sleeps for msgInterval.
+ protected final Sleeper sleeper;
+
+ // zookeeper connection and watcher
+ protected final ZKWatcher zooKeeper;
+
+ /**
+ * The asynchronous cluster connection to be shared by services.
+ */
+ protected AsyncClusterConnection asyncClusterConnection;
+
+ private UserProvider userProvider;
+
+ protected final ReplicationServerRpcServices rpcServices;
+
+ public HReplicationServer(final Configuration conf) throws IOException {
+ TraceUtil.initTracer(conf);
+ try {
+ this.startCode = System.currentTimeMillis();
+ this.conf = conf;
+
+ this.abortRequested = new AtomicBoolean(false);
+
+ this.rpcServices = createRpcServices();
+
+ String hostName = this.rpcServices.isa.getHostName();
+ serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startCode);
+
+ this.userProvider = UserProvider.instantiate(conf);
+
+ this.msgInterval = conf.getInt("hbase.replicationserver.msginterval", 3 * 1000);
+ this.sleeper = new Sleeper(this.msgInterval, this);
+
+ // Some unit tests don't need a cluster, so no zookeeper at all
+ if (!conf.getBoolean("hbase.testing.nocluster", false)) {
+ // Open connection to zookeeper and set primary watcher
+ zooKeeper = new ZKWatcher(conf, getProcessName() + ":" +
+ rpcServices.isa.getPort(), this, false);
+ } else {
+ zooKeeper = null;
+ }
+
+ this.rpcServices.start(zooKeeper);
+ } catch (Throwable t) {
+ // Make sure we log the exception. HReplicationServer is often started via reflection and the
+ // cause of failed startup is lost.
+ LOG.error("Failed construction ReplicationServer", t);
+ throw t;
+ }
+ }
+
+ public String getProcessName() {
+ return REPLICATION_SERVER;
+ }
+
+ @Override
+ public void run() {
+ if (isStopped()) {
+ LOG.info("Skipping run; stopped");
+ return;
+ }
+ try {
+ // Do pre-registration initializations; zookeeper, lease threads, etc.
+ preRegistrationInitialization();
+ } catch (Throwable e) {
+ abort("Fatal exception during initialization", e);
+ }
+ try {
+ setupReplication();
+ startReplicationService();
+
+ online.set(true);
+
+ long lastMsg = System.currentTimeMillis();
+ // The main run loop.
+ while (!isStopped()) {
+ long now = System.currentTimeMillis();
+ if ((now - lastMsg) >= msgInterval) {
+ lastMsg = System.currentTimeMillis();
+ }
+ if (!isStopped() && !isAborted()) {
+ this.sleeper.sleep();
+ }
+ }
+
+ stopServiceThreads();
+
+ if (this.rpcServices != null) {
+ this.rpcServices.stop();
+ }
+ } catch (Throwable t) {
+ abort(t.getMessage(), t);
+ }
+
+ if (this.zooKeeper != null) {
+ this.zooKeeper.close();
+ }
+ LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
+ }
+
+ private Configuration cleanupConfiguration() {
+ Configuration conf = this.conf;
+ conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+ HConstants.ZK_CONNECTION_REGISTRY_CLASS);
+ if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
+ // Use server ZK cluster for server-issued connections, so we clone
+ // the conf and unset the client ZK related properties
+ conf = new Configuration(this.conf);
+ conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
+ }
+ return conf;
+ }
+
+ /**
+ * All initialization needed before we go register with Master.<br>
+ * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
+ * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
+ */
+ private void preRegistrationInitialization() {
+ try {
+ setupClusterConnection();
+ } catch (Throwable t) {
+ // Call stop if error or process will stick around for ever since server
+ // puts up non-daemon threads.
+ this.rpcServices.stop();
+ abort("Initialization of RS failed. Hence aborting RS.", t);
+ }
+ }
+
+ /**
+ * Setup our cluster connection if not already initialized.
+ */
+ protected final synchronized void setupClusterConnection() throws IOException {
+ if (asyncClusterConnection == null) {
+ Configuration conf = cleanupConfiguration();
+ InetSocketAddress localAddress = new InetSocketAddress(this.rpcServices.isa.getAddress(), 0);
+ User user = userProvider.getCurrent();
+ asyncClusterConnection =
+ ClusterConnectionFactory.createAsyncClusterConnection(conf, localAddress, user);
+ }
+ }
+
+ /**
+ * Wait on all threads to finish. Presumption is that all closes and stops
+ * have already been called.
+ */
+ protected void stopServiceThreads() {
+ if (this.replicationSinkService != null) {
+ this.replicationSinkService.stopReplicationService();
+ }
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ @Override
+ public ZKWatcher getZooKeeper() {
+ return zooKeeper;
+ }
+
+ @Override
+ public Connection getConnection() {
+ return getAsyncConnection().toConnection();
+ }
+
+ @Override
+ public Connection createConnection(Configuration conf) throws IOException {
+ throw new DoNotRetryIOException(new UnsupportedOperationException("This's ReplicationServer."));
+ }
+
+ @Override
+ public AsyncClusterConnection getAsyncClusterConnection() {
+ return this.asyncClusterConnection;
+ }
+
+ @Override
+ public ServerName getServerName() {
+ return serverName;
+ }
+
+ @Override
+ public CoordinatedStateManager getCoordinatedStateManager() {
+ return null;
+ }
+
+ @Override
+ public ChoreService getChoreService() {
+ return null;
+ }
+
+ @Override
+ public void abort(String why, Throwable cause) {
+ if (!setAbortRequested()) {
+ // Abort already in progress, ignore the new request.
+ LOG.debug(
+ "Abort already in progress. Ignoring the current request with reason: {}", why);
+ return;
+ }
+ String msg = "***** ABORTING replication server " + this + ": " + why + " *****";
+ if (cause != null) {
+ LOG.error(HBaseMarkers.FATAL, msg, cause);
+ } else {
+ LOG.error(HBaseMarkers.FATAL, msg);
+ }
+ stop(why);
+ }
+
+ @Override
+ public boolean isAborted() {
+ return abortRequested.get();
+ }
+
+ @Override
+ public void stop(final String msg) {
+ if (!this.stopped) {
+ LOG.info("***** STOPPING region server '" + this + "' *****");
+ this.stopped = true;
+ LOG.info("STOPPED: " + msg);
+ // Wakes run() if it is sleeping
+ sleeper.skipSleepCycle();
+ }
+ }
+
+ @Override
+ public boolean isStopped() {
+ return this.stopped;
+ }
+
+ /**
+ * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
+ * be hooked up to WAL.
+ */
+ private void setupReplication() throws IOException {
+ // Instantiate replication if replication enabled. Pass it the log directories.
+ createNewReplicationInstance(conf, this);
+ }
+
+ /**
+ * Load the replication executorService objects, if any
+ */
+ private static void createNewReplicationInstance(Configuration conf, HReplicationServer server)
+ throws IOException {
+ // read in the name of the sink replication class from the config file.
+ String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
+ HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
+
+ server.replicationSinkService = newReplicationInstance(sinkClassname,
+ ReplicationSinkService.class, conf, server);
+ }
+
+ private static <T extends ReplicationService> T newReplicationInstance(String classname,
+ Class<T> xface, Configuration conf, HReplicationServer server) throws IOException {
+ final Class<? extends T> clazz;
+ try {
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ clazz = Class.forName(classname, true, classLoader).asSubclass(xface);
+ } catch (java.lang.ClassNotFoundException nfe) {
+ throw new IOException("Could not find class for " + classname);
+ }
+ T service = ReflectionUtils.newInstance(clazz, conf);
+ service.initialize(server, null, null, null, null);
+ return service;
+ }
+
+ /**
+ * Start up replication source and sink handlers.
+ */
+ private void startReplicationService() throws IOException {
+ if (this.replicationSinkService != null) {
+ this.replicationSinkService.startReplicationService();
+ }
+ }
+
+ /**
+ * @return Return the object that implements the replication sink executorService.
+ */
+ public ReplicationSinkService getReplicationSinkService() {
+ return replicationSinkService;
+ }
+
+ /**
+ * Report the status of the server. A server is online once all the startup is
+ * completed (setting up filesystem, starting executorService threads, etc.). This
+ * method is designed mostly to be useful in tests.
+ *
+ * @return true if online, false if not.
+ */
+ public boolean isOnline() {
+ return online.get();
+ }
+
+ protected ReplicationServerRpcServices createRpcServices() throws IOException {
+ return new ReplicationServerRpcServices(this);
+ }
+
+ /**
+ * Sets the abort state if not already set.
+ * @return True if abortRequested set to True successfully, false if an abort is already in
+ * progress.
+ */
+ protected boolean setAbortRequested() {
+ return abortRequested.compareAndSet(false, true);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
new file mode 100644
index 0000000..1b9b699
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
@@ -0,0 +1,516 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.client.ConnectionUtils;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
+import org.apache.hadoop.hbase.ipc.QosPriority;
+import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
+import org.apache.hadoop.hbase.ipc.RpcServerFactory;
+import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.net.Address;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
+import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
+import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
+import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.access.AccessChecker;
+import org.apache.hadoop.hbase.security.access.NoopAccessChecker;
+import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
+import org.apache.hadoop.hbase.util.DNS;
+import org.apache.hadoop.hbase.util.DNS.ServerType;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+/**
+ * Implements the regionserver RPC services for {@link HReplicationServer}.
+ */
+@InterfaceAudience.Private
+@SuppressWarnings("deprecation")
+public class ReplicationServerRpcServices implements HBaseRPCErrorHandler,
+ AdminService.BlockingInterface, PriorityFunction {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(ReplicationServerRpcServices.class);
+
+ /** Parameter name for port replication server listens on. */
+ public static final String REPLICATION_SERVER_PORT = "hbase.replicationserver.port";
+
+ /** Default port replication server listens on. */
+ public static final int DEFAULT_REPLICATION_SERVER_PORT = 16040;
+
+ /** default port for replication server web api */
+ public static final int DEFAULT_REPLICATION_SERVER_INFOPORT = 16050;
+
+ // Request counter.
+ final LongAdder requestCount = new LongAdder();
+
+ // Server to handle client requests.
+ final RpcServerInterface rpcServer;
+ final InetSocketAddress isa;
+
+ protected final HReplicationServer replicationServer;
+
+ // The reference to the priority extraction function
+ private final PriorityFunction priority;
+
+ private AccessChecker accessChecker;
+ private ZKPermissionWatcher zkPermissionWatcher;
+
+ public ReplicationServerRpcServices(final HReplicationServer rs) throws IOException {
+ final Configuration conf = rs.getConfiguration();
+ replicationServer = rs;
+
+ final RpcSchedulerFactory rpcSchedulerFactory;
+ try {
+ rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class)
+ .getDeclaredConstructor().newInstance();
+ } catch (NoSuchMethodException | InvocationTargetException |
+ InstantiationException | IllegalAccessException e) {
+ throw new IllegalArgumentException(e);
+ }
+ // Server to handle client requests.
+ final InetSocketAddress initialIsa;
+ final InetSocketAddress bindAddress;
+
+ String hostname = DNS.getHostname(conf, ServerType.REPLICATIONSERVER);
+ int port = conf.getInt(REPLICATION_SERVER_PORT, DEFAULT_REPLICATION_SERVER_PORT);
+ // Creation of a HSA will force a resolve.
+ initialIsa = new InetSocketAddress(hostname, port);
+ bindAddress = new InetSocketAddress(
+ conf.get("hbase.replicationserver.ipc.address", hostname), port);
+
+ if (initialIsa.getAddress() == null) {
+ throw new IllegalArgumentException("Failed resolve of " + initialIsa);
+ }
+ priority = createPriority();
+ // Using Address means we don't get the IP too. Shorten it more even to just the host name
+ // w/o the domain.
+ final String name = rs.getProcessName() + "/" +
+ Address.fromParts(initialIsa.getHostName(), initialIsa.getPort()).toStringWithoutDomain();
+ // Set how many times to retry talking to another server over Connection.
+ ConnectionUtils.setServerSideHConnectionRetriesConfig(conf, name, LOG);
+ rpcServer = createRpcServer(rs, rpcSchedulerFactory, bindAddress, name);
+
+ final InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
+ // Set our address, however we need the final port that was given to rpcServer
+ isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
+ rpcServer.setErrorHandler(this);
+ rs.setName(name);
+ }
+
+ protected RpcServerInterface createRpcServer(
+ final Server server,
+ final RpcSchedulerFactory rpcSchedulerFactory,
+ final InetSocketAddress bindAddress,
+ final String name
+ ) throws IOException {
+ final Configuration conf = server.getConfiguration();
+ boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
+ try {
+ return RpcServerFactory.createRpcServer(server, name, getServices(),
+ bindAddress, // use final bindAddress for this server.
+ conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled);
+ } catch (BindException be) {
+ throw new IOException(be.getMessage() + ". To switch ports use the '"
+ + REPLICATION_SERVER_PORT + "' configuration property.",
+ be.getCause() != null ? be.getCause() : be);
+ }
+ }
+
+ protected Class<?> getRpcSchedulerFactoryClass() {
+ final Configuration conf = replicationServer.getConfiguration();
+ return conf.getClass(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
+ SimpleRpcSchedulerFactory.class);
+ }
+
+ public PriorityFunction getPriority() {
+ return priority;
+ }
+
+ public Configuration getConfiguration() {
+ return replicationServer.getConfiguration();
+ }
+
+ void start(ZKWatcher zkWatcher) {
+ if (AccessChecker.isAuthorizationSupported(getConfiguration())) {
+ accessChecker = new AccessChecker(getConfiguration());
+ } else {
+ accessChecker = new NoopAccessChecker(getConfiguration());
+ }
+ if (!getConfiguration().getBoolean("hbase.testing.nocluster", false) && zkWatcher != null) {
+ zkPermissionWatcher =
+ new ZKPermissionWatcher(zkWatcher, accessChecker.getAuthManager(), getConfiguration());
+ try {
+ zkPermissionWatcher.start();
+ } catch (KeeperException e) {
+ LOG.error("ZooKeeper permission watcher initialization failed", e);
+ }
+ }
+ rpcServer.start();
+ }
+
+ void stop() {
+ if (zkPermissionWatcher != null) {
+ zkPermissionWatcher.close();
+ }
+ rpcServer.stop();
+ }
+
+ /**
+ * By default, put up an Admin Service.
+ * @return immutable list of blocking services and the security info classes that this server
+ * supports
+ */
+ protected List<BlockingServiceAndInterface> getServices() {
+ List<BlockingServiceAndInterface> bssi = new ArrayList<>();
+ bssi.add(new BlockingServiceAndInterface(
+ AdminService.newReflectiveBlockingService(this),
+ AdminService.BlockingInterface.class));
+ return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
+ }
+
+ public InetSocketAddress getSocketAddress() {
+ return isa;
+ }
+
+ @Override
+ public int getPriority(RequestHeader header, Message param, User user) {
+ return priority.getPriority(header, param, user);
+ }
+
+ @Override
+ public long getDeadline(RequestHeader header, Message param) {
+ return priority.getDeadline(header, param);
+ }
+
+ /*
+ * Check if an OOME and, if so, abort immediately to avoid creating more objects.
+ *
+ * @param e
+ *
+ * @return True if we OOME'd and are aborting.
+ */
+ @Override
+ public boolean checkOOME(final Throwable e) {
+ return exitIfOOME(e);
+ }
+
+ public static boolean exitIfOOME(final Throwable e) {
+ boolean stop = false;
+ try {
+ if (e instanceof OutOfMemoryError
+ || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
+ || (e.getMessage() != null && e.getMessage().contains(
+ "java.lang.OutOfMemoryError"))) {
+ stop = true;
+ LOG.error(HBaseMarkers.FATAL, "Run out of memory; "
+ + ReplicationServerRpcServices.class.getSimpleName() + " will abort itself immediately",
+ e);
+ }
+ } finally {
+ if (stop) {
+ Runtime.getRuntime().halt(1);
+ }
+ }
+ return stop;
+ }
+
+ /**
+ * Called to verify that this server is up and running.
+ */
+ protected void checkOpen() throws IOException {
+ if (replicationServer.isAborted()) {
+ throw new RegionServerAbortedException("Server " + replicationServer.getServerName()
+ + " aborting");
+ }
+ if (replicationServer.isStopped()) {
+ throw new RegionServerStoppedException("Server " + replicationServer.getServerName()
+ + " stopping");
+ }
+ if (!replicationServer.isOnline()) {
+ throw new ServerNotRunningYetException("Server " + replicationServer.getServerName()
+ + " is not running yet");
+ }
+ }
+
+ @Override
+ public GetRegionInfoResponse getRegionInfo(RpcController controller, GetRegionInfoRequest request)
+ throws ServiceException {
+ throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+ }
+
+ @Override
+ public GetStoreFileResponse getStoreFile(RpcController controller, GetStoreFileRequest request)
+ throws ServiceException {
+ throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+ }
+
+ @Override
+ public GetOnlineRegionResponse getOnlineRegion(RpcController controller,
+ GetOnlineRegionRequest request) throws ServiceException {
+ throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+ }
+
+ @Override
+ public OpenRegionResponse openRegion(RpcController controller, OpenRegionRequest request)
+ throws ServiceException {
+ throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+ }
+
+ @Override
+ public WarmupRegionResponse warmupRegion(RpcController controller, WarmupRegionRequest request)
+ throws ServiceException {
+ throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+ }
+
+ @Override
+ public CloseRegionResponse closeRegion(RpcController controller, CloseRegionRequest request)
+ throws ServiceException {
+ throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+ }
+
+ @Override
+ public FlushRegionResponse flushRegion(RpcController controller, FlushRegionRequest request)
+ throws ServiceException {
+ throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+ }
+
+ @Override
+ public CompactionSwitchResponse compactionSwitch(RpcController controller,
+ CompactionSwitchRequest request) throws ServiceException {
+ throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+ }
+
+ @Override
+ public CompactRegionResponse compactRegion(RpcController controller,
+ CompactRegionRequest request) throws ServiceException {
+ throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+ }
+
+ @Override
+ public ReplicateWALEntryResponse replay(RpcController controller,
+ ReplicateWALEntryRequest request) throws ServiceException {
+ throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+ }
+
+ @Override
+ public RollWALWriterResponse rollWALWriter(RpcController controller, RollWALWriterRequest request)
+ throws ServiceException {
+ throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+ }
+
+ @Override
+ public GetServerInfoResponse getServerInfo(RpcController controller, GetServerInfoRequest request)
+ throws ServiceException {
+ throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+ }
+
+ /**
+ * Stop the replication server.
+ *
+ * @param controller the RPC controller
+ * @param request the request
+ */
+ @Override
+ @QosPriority(priority=HConstants.ADMIN_QOS)
+ public StopServerResponse stopServer(final RpcController controller,
+ final StopServerRequest request) {
+ requestCount.increment();
+ String reason = request.getReason();
+ replicationServer.stop(reason);
+ return StopServerResponse.newBuilder().build();
+ }
+
+ @Override
+ public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
+ UpdateFavoredNodesRequest request) throws ServiceException {
+ throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+ }
+
+ @Override
+ public UpdateConfigurationResponse updateConfiguration(RpcController controller,
+ UpdateConfigurationRequest request) throws ServiceException {
+ throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+ }
+
+ @Override
+ public GetRegionLoadResponse getRegionLoad(RpcController controller,
+ GetRegionLoadRequest request) throws ServiceException {
+ throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+ }
+
+ @Override
+ public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
+ ClearCompactionQueuesRequest request) throws ServiceException {
+ throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+ }
+
+ @Override
+ public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
+ ClearRegionBlockCacheRequest request) throws ServiceException {
+ throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+ }
+
+ @Override
+ public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller,
+ GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
+ throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+ }
+
+ @Override
+ public ExecuteProceduresResponse executeProcedures(RpcController controller,
+ ExecuteProceduresRequest request) throws ServiceException {
+ throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+ }
+
+ @Override
+ public SlowLogResponses getSlowLogResponses(RpcController controller,
+ SlowLogResponseRequest request) throws ServiceException {
+ throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+ }
+
+ @Override
+ public SlowLogResponses getLargeLogResponses(RpcController controller,
+ SlowLogResponseRequest request) throws ServiceException {
+ throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+ }
+
+ @Override
+ public ClearSlowLogResponses clearSlowLogsResponses(RpcController controller,
+ ClearSlowLogResponseRequest request) throws ServiceException {
+ throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+ }
+
+ protected AccessChecker getAccessChecker() {
+ return accessChecker;
+ }
+
+ protected PriorityFunction createPriority() {
+ return new PriorityFunction() {
+ @Override
+ public int getPriority(RequestHeader header, Message param, User user) {
+ return 0;
+ }
+
+ @Override
+ public long getDeadline(RequestHeader header, Message param) {
+ return 0;
+ }
+ };
+ }
+
+ @Override
+ public ReplicateWALEntryResponse replicateWALEntry(RpcController controller,
+ ReplicateWALEntryRequest request) throws ServiceException {
+ try {
+ checkOpen();
+ if (replicationServer.getReplicationSinkService() != null) {
+ requestCount.increment();
+ List<WALEntry> entries = request.getEntryList();
+ CellScanner cellScanner = ((HBaseRpcController) controller).cellScanner();
+ // TODO: CP pre
+ replicationServer.getReplicationSinkService().replicateLogEntries(entries, cellScanner,
+ request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
+ request.getSourceHFileArchiveDirPath());
+ // TODO: CP post
+ return ReplicateWALEntryResponse.newBuilder().build();
+ } else {
+ throw new ServiceException("Replication services are not initialized yet");
+ }
+ } catch (IOException ie) {
+ throw new ServiceException(ie);
+ }
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
new file mode 100644
index 0000000..6a0ef3d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ReplicationTests.class, MediumTests.class})
+public class TestReplicationServer {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestReplicationServer.class);
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestReplicationServer.class);
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private static Configuration CONF = TEST_UTIL.getConfiguration();
+
+ private static HMaster MASTER;
+
+ private static HReplicationServer replicationServer;
+
+ private static Path baseNamespaceDir;
+ private static Path hfileArchiveDir;
+ private static String replicationClusterId;
+
+ private static int BATCH_SIZE = 10;
+
+ private static TableName TABLENAME = TableName.valueOf("t");
+ private static String FAMILY = "C";
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ TEST_UTIL.startMiniCluster();
+ MASTER = TEST_UTIL.getMiniHBaseCluster().getMaster();
+
+ replicationServer = new HReplicationServer(CONF);
+ replicationServer.start();
+
+ TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster();
+ TEST_UTIL.waitFor(60000, () -> replicationServer.isOnline());
+
+ Path rootDir = CommonFSUtils.getRootDir(CONF);
+ baseNamespaceDir = new Path(rootDir, new Path(HConstants.BASE_NAMESPACE_DIR));
+ hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY));
+ replicationClusterId = "12345";
+ }
+
+ @AfterClass
+ public static void afterClass() throws IOException {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void before() throws Exception {
+ TEST_UTIL.createTable(TABLENAME, FAMILY);
+ TEST_UTIL.waitTableAvailable(TABLENAME);
+ }
+
+ @After
+ public void after() throws IOException {
+ TEST_UTIL.deleteTableIfAny(TABLENAME);
+ }
+
+ @Test
+ public void testReplicateWAL() throws Exception {
+ AsyncClusterConnection conn = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().get(0)
+ .getRegionServer().getAsyncClusterConnection();
+ AsyncRegionServerAdmin rsAdmin = conn.getRegionServerAdmin(replicationServer.getServerName());
+
+ Entry[] entries = new Entry[BATCH_SIZE];
+ for(int i = 0; i < BATCH_SIZE; i++) {
+ entries[i] = generateEdit(i, TABLENAME, Bytes.toBytes(i));
+ }
+
+ ReplicationProtbufUtil.replicateWALEntry(rsAdmin, entries, replicationClusterId,
+ baseNamespaceDir, hfileArchiveDir, 1000);
+
+ for (int i = 0; i < BATCH_SIZE; i++) {
+ Table table = TEST_UTIL.getConnection().getTable(TABLENAME);
+ Result result = table.get(new Get(Bytes.toBytes(i)));
+ Cell cell = result.getColumnLatestCell(Bytes.toBytes(FAMILY), Bytes.toBytes(FAMILY));
+ assertNotNull(cell);
+ assertTrue(Bytes.equals(CellUtil.cloneValue(cell), Bytes.toBytes(i)));
+ }
+ }
+
+ private static WAL.Entry generateEdit(int i, TableName tableName, byte[] row) {
+ Threads.sleep(1);
+ long timestamp = System.currentTimeMillis();
+ WALKeyImpl key = new WALKeyImpl(new byte[32], tableName, i, timestamp,
+ HConstants.DEFAULT_CLUSTER_ID, null);
+ WALEdit edit = new WALEdit();
+ edit.add(new KeyValue(row, Bytes.toBytes(FAMILY), Bytes.toBytes(FAMILY), timestamp, row));
+ return new WAL.Entry(key, edit);
+ }
+}