You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2020/08/10 16:02:14 UTC

[GitHub] [hbase] wchevreuil commented on a change in pull request #2111: HBASE-24683 Add a basic ReplicationServer which only implement Replic…

wchevreuil commented on a change in pull request #2111:
URL: https://github.com/apache/hbase/pull/2111#discussion_r467981553



##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
##########
@@ -0,0 +1,683 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.locking.EntityLock;
+import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.mob.MobFileCache;
+import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionSizeStore;
+import org.apache.hadoop.hbase.regionserver.FlushRequester;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
+import org.apache.hadoop.hbase.regionserver.LeaseManager;
+import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.ReplicationService;
+import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
+import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
+import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
+import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.AccessChecker;
+import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
+import org.apache.hadoop.hbase.trace.TraceUtil;
+import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.Service;
+
+/**
+ * HReplicationServer which is responsible to all replication stuff. It checks in with
+ * the HMaster. There are many HReplicationServers in a single HBase deployment.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
+@SuppressWarnings({ "deprecation"})
+public class HReplicationServer extends Thread implements Server, RegionServerServices {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HReplicationServer.class);
+
+  /** Parameter name for what region server implementation to use. */
+  public static final String REPLICATION_SERVER_IMPL = "hbase.replicationserver.impl";
+
+  /** replication server process name */
+  public static final String REPLICATIONSERVER = "replicationserver";
+
+  /**
+   * This servers startcode.
+   */
+  protected final long startcode;
+
+  private volatile boolean stopped = false;
+
+  // A state before we go into stopped state.  At this stage we're closing user
+  // space regions.
+  private boolean stopping = false;
+  private volatile boolean killed = false;
+  private volatile boolean shutDown = false;
+
+  // Go down hard. Used if file system becomes unavailable and also in
+  // debugging and unit tests.
+  private AtomicBoolean abortRequested;
+
+  // flag set after we're done setting up server threads
+  final AtomicBoolean online = new AtomicBoolean(false);
+
+  /**
+   * The server name the Master sees us as.  Its made from the hostname the
+   * master passes us, port, and server startcode. Gets set after registration
+   * against Master.
+   */
+  private ServerName serverName;
+
+  protected final Configuration conf;
+
+  private ReplicationSinkService replicationSinkHandler;

Review comment:
       At the level of HReplicationServer, we only need to call methods from ReplicationService. I found a bit confusing that we were referring directly a _sink_ only here, until I realised _replicationSinkHandler_ is an instance of Replication. Can we just refer to _ReplicationService_ interface here?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
##########
@@ -0,0 +1,544 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionUtils;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
+import org.apache.hadoop.hbase.ipc.QosPriority;
+import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
+import org.apache.hadoop.hbase.ipc.RpcServerFactory;
+import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.net.Address;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
+import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
+import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
+import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
+import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory;
+import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.access.AccessChecker;
+import org.apache.hadoop.hbase.security.access.NoopAccessChecker;
+import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
+import org.apache.hadoop.hbase.util.DNS;
+import org.apache.hadoop.hbase.util.DNS.ServerType;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+/**
+ * Implements the regionserver RPC services for {@link HReplicationServer}.
+ */
+@InterfaceAudience.Private
+@SuppressWarnings("deprecation")
+public class ReplicationServerRpcServices implements HBaseRPCErrorHandler,
+    AdminService.BlockingInterface, PriorityFunction {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(ReplicationServerRpcServices.class);
+
+  /** Parameter name for port replication server listens on. */
+  public static final String REPLICATION_SERVER_PORT = "hbase.replicationserver.port";
+
+  /** Default port replication server listens on. */
+  public static final int DEFAULT_REPLICATION_SERVER_PORT = 16040;
+
+  /** default port for replication server web api */
+  public static final int DEFAULT_REPLICATION_SERVER_INFOPORT = 16050;
+
+  // Request counter.
+  final LongAdder requestCount = new LongAdder();
+
+  // Server to handle client requests.
+  final RpcServerInterface rpcServer;
+  final InetSocketAddress isa;
+
+  @VisibleForTesting

Review comment:
       There's been a discussion, lately, about removing `VisibleForTesting` annotation, since it's not mentioned on our compatibility promises. The outcome is that we should rather rely on IA.Private only, and avoid _VisibleForTesting_.
   
   See: https://lists.apache.org/thread.html/r9a2df6a3b58e00c0c482d8660434d8ce6075863c18700978e6ea8b21%40%3Cdev.hbase.apache.org%3E
   

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
##########
@@ -0,0 +1,683 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.locking.EntityLock;
+import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.mob.MobFileCache;
+import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionSizeStore;
+import org.apache.hadoop.hbase.regionserver.FlushRequester;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
+import org.apache.hadoop.hbase.regionserver.LeaseManager;
+import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.ReplicationService;
+import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
+import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
+import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
+import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.AccessChecker;
+import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
+import org.apache.hadoop.hbase.trace.TraceUtil;
+import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.Service;
+
+/**
+ * HReplicationServer which is responsible to all replication stuff. It checks in with
+ * the HMaster. There are many HReplicationServers in a single HBase deployment.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
+@SuppressWarnings({ "deprecation"})
+public class HReplicationServer extends Thread implements Server, RegionServerServices {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HReplicationServer.class);
+
+  /** Parameter name for what region server implementation to use. */
+  public static final String REPLICATION_SERVER_IMPL = "hbase.replicationserver.impl";
+
+  /** replication server process name */
+  public static final String REPLICATIONSERVER = "replicationserver";
+
+  /**
+   * This servers startcode.
+   */
+  protected final long startcode;

Review comment:
       Nit: camel case?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
##########
@@ -0,0 +1,683 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.locking.EntityLock;
+import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.mob.MobFileCache;
+import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionSizeStore;
+import org.apache.hadoop.hbase.regionserver.FlushRequester;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
+import org.apache.hadoop.hbase.regionserver.LeaseManager;
+import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.ReplicationService;
+import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
+import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
+import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
+import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.AccessChecker;
+import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
+import org.apache.hadoop.hbase.trace.TraceUtil;
+import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.Service;
+
+/**
+ * HReplicationServer which is responsible to all replication stuff. It checks in with
+ * the HMaster. There are many HReplicationServers in a single HBase deployment.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
+@SuppressWarnings({ "deprecation"})
+public class HReplicationServer extends Thread implements Server, RegionServerServices {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HReplicationServer.class);
+
+  /** Parameter name for what region server implementation to use. */
+  public static final String REPLICATION_SERVER_IMPL = "hbase.replicationserver.impl";
+
+  /** replication server process name */
+  public static final String REPLICATIONSERVER = "replicationserver";
+
+  /**
+   * This servers startcode.
+   */
+  protected final long startcode;
+
+  private volatile boolean stopped = false;
+
+  // A state before we go into stopped state.  At this stage we're closing user
+  // space regions.
+  private boolean stopping = false;
+  private volatile boolean killed = false;
+  private volatile boolean shutDown = false;
+
+  // Go down hard. Used if file system becomes unavailable and also in
+  // debugging and unit tests.
+  private AtomicBoolean abortRequested;
+
+  // flag set after we're done setting up server threads
+  final AtomicBoolean online = new AtomicBoolean(false);
+
+  /**
+   * The server name the Master sees us as.  Its made from the hostname the
+   * master passes us, port, and server startcode. Gets set after registration
+   * against Master.
+   */
+  private ServerName serverName;
+
+  protected final Configuration conf;
+
+  private ReplicationSinkService replicationSinkHandler;
+
+  final int msgInterval;
+  // A sleeper that sleeps for msgInterval.
+  protected final Sleeper sleeper;
+
+  // zookeeper connection and watcher
+  protected final ZKWatcher zooKeeper;
+
+  /**
+   * The asynchronous cluster connection to be shared by services.
+   */
+  protected AsyncClusterConnection asyncClusterConnection;
+
+  private UserProvider userProvider;
+
+  protected final ReplicationServerRpcServices rpcServices;
+
+  public HReplicationServer(final Configuration conf) throws IOException {
+    TraceUtil.initTracer(conf);
+    try {
+      this.startcode = System.currentTimeMillis();
+      this.conf = conf;
+
+      this.abortRequested = new AtomicBoolean(false);
+
+      this.rpcServices = createRpcServices();
+
+      String hostName = this.rpcServices.isa.getHostName();
+      serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode);
+
+      this.userProvider = UserProvider.instantiate(conf);
+
+      this.msgInterval = conf.getInt("hbase.replicationserver.msginterval", 3 * 1000);
+      this.sleeper = new Sleeper(this.msgInterval, this);
+
+      // Some unit tests don't need a cluster, so no zookeeper at all
+      if (!conf.getBoolean("hbase.testing.nocluster", false)) {
+        // Open connection to zookeeper and set primary watcher
+        zooKeeper = new ZKWatcher(conf, getProcessName() + ":" +
+            rpcServices.isa.getPort(), this, false);
+      } else {
+        zooKeeper = null;
+      }
+
+      this.rpcServices.start(zooKeeper);
+    } catch (Throwable t) {
+      // Make sure we log the exception. HReplicationServer is often started via reflection and the
+      // cause of failed startup is lost.
+      LOG.error("Failed construction ReplicationServer", t);
+      throw t;
+    }
+  }
+
+  /**
+   * Utility for constructing an instance of the passed HRegionServer class.
+   */
+  static HReplicationServer constructReplicationServer(
+      final Class<? extends HReplicationServer> replicationServerClass,
+      final Configuration conf) {
+    try {
+      Constructor<? extends HReplicationServer> c =
+          replicationServerClass.getConstructor(Configuration.class);
+      return c.newInstance(conf);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed construction of " + "ReplicationServer: "
+          + replicationServerClass.toString(), e);
+    }
+  }
+
+  public String getProcessName() {
+    return REPLICATIONSERVER;
+  }
+
+  @Override
+  public void run() {
+    if (isStopped()) {
+      LOG.info("Skipping run; stopped");
+      return;
+    }
+    try {
+      // Do pre-registration initializations; zookeeper, lease threads, etc.
+      preRegistrationInitialization();
+    } catch (Throwable e) {
+      abort("Fatal exception during initialization", e);
+    }
+    try {
+      setupReplication();
+      startReplicationService();
+
+      // Wake up anyone waiting for this server to online
+      synchronized (online) {
+        online.set(true);
+        online.notifyAll();
+      }
+
+      long lastMsg = System.currentTimeMillis();
+      // The main run loop.
+      while (!isStopped()) {
+        if (!isClusterUp()) {
+          if (!this.stopping) {
+            this.stopping = true;
+          }
+        }
+        long now = System.currentTimeMillis();
+        if ((now - lastMsg) >= msgInterval) {
+          lastMsg = System.currentTimeMillis();
+        }
+        if (!isStopped() && !isAborted()) {
+          this.sleeper.sleep();
+        }
+      }
+
+      if (!killed) {
+        stopServiceThreads();
+      }
+      if (this.rpcServices != null) {
+        this.rpcServices.stop();
+      }
+    } catch (Throwable t) {
+      abort(t.getMessage(), t);
+    }
+
+    if (this.zooKeeper != null) {
+      this.zooKeeper.close();
+    }
+    this.shutDown = true;
+    LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
+  }
+
+  private Configuration cleanupConfiguration() {
+    Configuration conf = this.conf;
+    // We use ZKConnectionRegistry for all the internal communication, primarily for these reasons:
+    // - Decouples RS and master life cycles. RegionServers can continue be up independent of
+    //   masters' availability.
+    // - Configuration management for region servers (cluster internal) is much simpler when adding
+    //   new masters or removing existing masters, since only clients' config needs to be updated.
+    // - We need to retain ZKConnectionRegistry for replication use anyway, so we just extend it for
+    //   other internal connections too.
+    conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+        HConstants.ZK_CONNECTION_REGISTRY_CLASS);
+    if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
+      // Use server ZK cluster for server-issued connections, so we clone
+      // the conf and unset the client ZK related properties
+      conf = new Configuration(this.conf);
+      conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
+    }
+    return conf;
+  }
+
+  /**
+   * All initialization needed before we go register with Master.<br>
+   * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
+   * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
+   */
+  private void preRegistrationInitialization() {
+    try {
+      setupClusterConnection();
+    } catch (Throwable t) {
+      // Call stop if error or process will stick around for ever since server
+      // puts up non-daemon threads.
+      this.rpcServices.stop();
+      abort("Initialization of RS failed.  Hence aborting RS.", t);
+    }
+  }
+
+  /**
+   * Setup our cluster connection if not already initialized.
+   */
+  protected final synchronized void setupClusterConnection() throws IOException {
+    if (asyncClusterConnection == null) {
+      Configuration conf = cleanupConfiguration();
+      InetSocketAddress localAddress = new InetSocketAddress(this.rpcServices.isa.getAddress(), 0);
+      User user = userProvider.getCurrent();
+      asyncClusterConnection =
+          ClusterConnectionFactory.createAsyncClusterConnection(conf, localAddress, user);
+    }
+  }
+
+  /**
+   * Wait on all threads to finish. Presumption is that all closes and stops
+   * have already been called.
+   */
+  protected void stopServiceThreads() {
+    if (this.replicationSinkHandler != null) {
+      this.replicationSinkHandler.stopReplicationService();
+    }
+  }
+
+  public static void main(String[] args) {
+    LOG.info("STARTING executorService " + HReplicationServer.class.getSimpleName());
+    VersionInfo.logVersion();
+    Configuration conf = HBaseConfiguration.create();
+    @SuppressWarnings("unchecked")
+    Class<? extends HReplicationServer> replicationServerClass =
+        (Class<? extends HReplicationServer>)
+            conf.getClass(REPLICATION_SERVER_IMPL, HReplicationServer.class);
+
+    new HReplicationServerCommandLine(replicationServerClass).doMain(args);
+  }
+
+  @Override
+  public WAL getWAL(RegionInfo regionInfo) throws IOException {
+    throw new DoNotRetryIOException(new UnsupportedOperationException("This's ReplicationServer."));
+  }
+
+  @Override
+  public List<WAL> getWALs() throws IOException {
+    throw new DoNotRetryIOException(new UnsupportedOperationException("This's ReplicationServer."));
+  }
+
+  @Override
+  public FlushRequester getFlushRequester() {
+    return null;
+  }
+
+  @Override
+  public CompactionRequester getCompactionRequestor() {
+    return null;
+  }
+
+  @Override
+  public RegionServerAccounting getRegionServerAccounting() {
+    return null;
+  }
+
+  @Override
+  public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
+    return null;
+  }
+
+  @Override
+  public SecureBulkLoadManager getSecureBulkLoadManager() {
+    return null;
+  }
+
+  @Override
+  public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
+    return null;
+  }
+
+  @Override
+  public void postOpenDeployTasks(PostOpenDeployContext context) throws IOException {
+    throw new DoNotRetryIOException(new UnsupportedOperationException("This's ReplicationServer."));
+  }
+
+  @Override
+  public boolean reportRegionStateTransition(RegionStateTransitionContext context) {
+    return false;
+  }
+
+  @Override
+  public RpcServerInterface getRpcServer() {
+    return null;
+  }
+
+  @Override
+  public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
+    return null;
+  }
+
+  @Override
+  public LeaseManager getLeaseManager() {
+    return null;
+  }
+
+  @Override
+  public ExecutorService getExecutorService() {
+    return null;
+  }
+
+  @Override
+  public ServerNonceManager getNonceManager() {
+    return null;
+  }
+
+  @Override
+  public boolean registerService(Service service) {
+    return false;
+  }
+
+  @Override
+  public HeapMemoryManager getHeapMemoryManager() {
+    return null;
+  }
+
+  @Override
+  public double getCompactionPressure() {
+    return 0;
+  }
+
+  @Override
+  public ThroughputController getFlushThroughputController() {
+    return null;
+  }
+
+  @Override
+  public double getFlushPressure() {
+    return 0;
+  }
+
+  @Override
+  public MetricsRegionServer getMetrics() {
+    return null;
+  }
+
+  @Override
+  public EntityLock regionLock(List<RegionInfo> regionInfos, String description, Abortable abort)
+      throws IOException {
+    throw new DoNotRetryIOException(new UnsupportedOperationException("This's ReplicationServer."));
+  }
+
+  @Override
+  public void unassign(byte[] regionName) throws IOException {
+    throw new DoNotRetryIOException(new UnsupportedOperationException("This's ReplicationServer."));
+  }
+
+  @Override
+  public boolean reportRegionSizesForQuotas(RegionSizeStore sizeStore) {
+    return false;
+  }
+
+  @Override
+  public boolean reportFileArchivalForQuotas(TableName tableName,
+      Collection<Entry<String, Long>> archivedFiles) {
+    return false;
+  }
+
+  @Override
+  public boolean isClusterUp() {
+    return false;
+  }
+
+  @Override
+  public ReplicationSourceService getReplicationSourceService() {
+    return null;
+  }
+
+  @Override
+  public TableDescriptors getTableDescriptors() {
+    return null;
+  }
+
+  @Override
+  public Optional<BlockCache> getBlockCache() {
+    return Optional.empty();
+  }
+
+  @Override
+  public Optional<MobFileCache> getMobFileCache() {
+    return Optional.empty();
+  }
+
+  @Override
+  public AccessChecker getAccessChecker() {
+    return null;
+  }
+
+  @Override
+  public ZKPermissionWatcher getZKPermissionWatcher() {
+    return null;
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  @Override
+  public ZKWatcher getZooKeeper() {
+    return zooKeeper;
+  }
+
+  @Override
+  public Connection getConnection() {
+    return getAsyncConnection().toConnection();
+  }
+
+  @Override
+  public Connection createConnection(Configuration conf) throws IOException {
+    throw new DoNotRetryIOException(new UnsupportedOperationException("This's ReplicationServer."));
+  }
+
+  @Override
+  public AsyncClusterConnection getAsyncClusterConnection() {
+    return null;
+  }
+
+  @Override
+  public ServerName getServerName() {
+    return serverName;
+  }
+
+  @Override
+  public CoordinatedStateManager getCoordinatedStateManager() {
+    return null;
+  }
+
+  @Override
+  public ChoreService getChoreService() {
+    return null;
+  }
+
+  @Override
+  public void abort(String why, Throwable cause) {
+    if (!setAbortRequested()) {
+      // Abort already in progress, ignore the new request.
+      LOG.debug(
+          "Abort already in progress. Ignoring the current request with reason: {}", why);
+      return;
+    }
+    String msg = "***** ABORTING replication server " + this + ": " + why + " *****";
+    if (cause != null) {
+      LOG.error(HBaseMarkers.FATAL, msg, cause);
+    } else {
+      LOG.error(HBaseMarkers.FATAL, msg);
+    }
+    stop(why);
+  }
+
+  @Override
+  public boolean isAborted() {
+    return abortRequested.get();
+  }
+
+  @Override
+  public void stop(String why) {
+    stopped = true;
+  }
+
+  @Override
+  public boolean isStopped() {
+    return this.stopped;
+  }
+
+  @Override
+  public void updateRegionFavoredNodesMapping(String encodedRegionName,
+      List<HBaseProtos.ServerName> favoredNodes) {
+  }
+
+  @Override
+  public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
+    return new InetSocketAddress[0];
+  }
+
+  @Override
+  public void addRegion(HRegion r) {
+
+  }
+
+  @Override
+  public boolean removeRegion(HRegion r, ServerName destination) {
+    return false;
+  }
+
+  @Override
+  public Region getRegion(String encodedRegionName) {
+    return null;
+  }
+
+  @Override
+  public List<? extends Region> getRegions(TableName tableName) throws IOException {
+    throw new DoNotRetryIOException(new UnsupportedOperationException("This's ReplicationServer."));
+  }
+
+  @Override
+  public List<? extends Region> getRegions() {
+    return null;
+  }
+
+  /**
+   * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
+   * be hooked up to WAL.
+   */
+  private void setupReplication() throws IOException {
+    // Instantiate replication if replication enabled. Pass it the log directories.
+    createNewReplicationInstance(conf, this);
+  }
+
+  /**
+   * Load the replication executorService objects, if any
+   */
+  private static void createNewReplicationInstance(Configuration conf, HReplicationServer server)
+      throws IOException {
+    // read in the name of the sink replication class from the config file.
+    String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
+        HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
+
+    server.replicationSinkHandler = newReplicationInstance(sinkClassname,
+        ReplicationSinkService.class, conf, server);
+  }

Review comment:
       Why having these two methods? _setupReplication_ apparently doing nothing extra to  _ createNewReplicationInstance_.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
##########
@@ -0,0 +1,683 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.locking.EntityLock;
+import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.mob.MobFileCache;
+import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionSizeStore;
+import org.apache.hadoop.hbase.regionserver.FlushRequester;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
+import org.apache.hadoop.hbase.regionserver.LeaseManager;
+import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.ReplicationService;
+import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
+import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
+import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
+import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.AccessChecker;
+import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
+import org.apache.hadoop.hbase.trace.TraceUtil;
+import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.Service;
+
+/**
+ * HReplicationServer which is responsible to all replication stuff. It checks in with
+ * the HMaster. There are many HReplicationServers in a single HBase deployment.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
+@SuppressWarnings({ "deprecation"})
+public class HReplicationServer extends Thread implements Server, RegionServerServices {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HReplicationServer.class);
+
+  /** Parameter name for what region server implementation to use. */
+  public static final String REPLICATION_SERVER_IMPL = "hbase.replicationserver.impl";
+
+  /** replication server process name */
+  public static final String REPLICATIONSERVER = "replicationserver";
+
+  /**
+   * This servers startcode.
+   */
+  protected final long startcode;
+
+  private volatile boolean stopped = false;
+
+  // A state before we go into stopped state.  At this stage we're closing user
+  // space regions.
+  private boolean stopping = false;
+  private volatile boolean killed = false;
+  private volatile boolean shutDown = false;
+
+  // Go down hard. Used if file system becomes unavailable and also in
+  // debugging and unit tests.
+  private AtomicBoolean abortRequested;
+
+  // flag set after we're done setting up server threads
+  final AtomicBoolean online = new AtomicBoolean(false);
+
+  /**
+   * The server name the Master sees us as.  Its made from the hostname the
+   * master passes us, port, and server startcode. Gets set after registration
+   * against Master.
+   */
+  private ServerName serverName;
+
+  protected final Configuration conf;
+
+  private ReplicationSinkService replicationSinkHandler;
+
+  final int msgInterval;
+  // A sleeper that sleeps for msgInterval.
+  protected final Sleeper sleeper;
+
+  // zookeeper connection and watcher
+  protected final ZKWatcher zooKeeper;
+
+  /**
+   * The asynchronous cluster connection to be shared by services.
+   */
+  protected AsyncClusterConnection asyncClusterConnection;
+
+  private UserProvider userProvider;
+
+  protected final ReplicationServerRpcServices rpcServices;
+
+  public HReplicationServer(final Configuration conf) throws IOException {
+    TraceUtil.initTracer(conf);
+    try {
+      this.startcode = System.currentTimeMillis();
+      this.conf = conf;
+
+      this.abortRequested = new AtomicBoolean(false);
+
+      this.rpcServices = createRpcServices();
+
+      String hostName = this.rpcServices.isa.getHostName();
+      serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode);
+
+      this.userProvider = UserProvider.instantiate(conf);
+
+      this.msgInterval = conf.getInt("hbase.replicationserver.msginterval", 3 * 1000);
+      this.sleeper = new Sleeper(this.msgInterval, this);
+
+      // Some unit tests don't need a cluster, so no zookeeper at all
+      if (!conf.getBoolean("hbase.testing.nocluster", false)) {
+        // Open connection to zookeeper and set primary watcher
+        zooKeeper = new ZKWatcher(conf, getProcessName() + ":" +
+            rpcServices.isa.getPort(), this, false);
+      } else {
+        zooKeeper = null;
+      }
+
+      this.rpcServices.start(zooKeeper);
+    } catch (Throwable t) {
+      // Make sure we log the exception. HReplicationServer is often started via reflection and the
+      // cause of failed startup is lost.
+      LOG.error("Failed construction ReplicationServer", t);
+      throw t;
+    }
+  }
+
+  /**
+   * Utility for constructing an instance of the passed HRegionServer class.
+   */
+  static HReplicationServer constructReplicationServer(
+      final Class<? extends HReplicationServer> replicationServerClass,
+      final Configuration conf) {
+    try {
+      Constructor<? extends HReplicationServer> c =
+          replicationServerClass.getConstructor(Configuration.class);
+      return c.newInstance(conf);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed construction of " + "ReplicationServer: "
+          + replicationServerClass.toString(), e);
+    }
+  }
+
+  public String getProcessName() {
+    return REPLICATIONSERVER;
+  }
+
+  @Override
+  public void run() {
+    if (isStopped()) {
+      LOG.info("Skipping run; stopped");
+      return;
+    }
+    try {
+      // Do pre-registration initializations; zookeeper, lease threads, etc.
+      preRegistrationInitialization();
+    } catch (Throwable e) {
+      abort("Fatal exception during initialization", e);
+    }
+    try {
+      setupReplication();
+      startReplicationService();
+
+      // Wake up anyone waiting for this server to online
+      synchronized (online) {
+        online.set(true);
+        online.notifyAll();
+      }
+
+      long lastMsg = System.currentTimeMillis();
+      // The main run loop.
+      while (!isStopped()) {
+        if (!isClusterUp()) {
+          if (!this.stopping) {
+            this.stopping = true;
+          }
+        }
+        long now = System.currentTimeMillis();
+        if ((now - lastMsg) >= msgInterval) {
+          lastMsg = System.currentTimeMillis();
+        }
+        if (!isStopped() && !isAborted()) {
+          this.sleeper.sleep();
+        }
+      }
+
+      if (!killed) {
+        stopServiceThreads();
+      }
+      if (this.rpcServices != null) {
+        this.rpcServices.stop();
+      }
+    } catch (Throwable t) {
+      abort(t.getMessage(), t);
+    }
+
+    if (this.zooKeeper != null) {
+      this.zooKeeper.close();
+    }
+    this.shutDown = true;
+    LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
+  }
+
+  private Configuration cleanupConfiguration() {
+    Configuration conf = this.conf;
+    // We use ZKConnectionRegistry for all the internal communication, primarily for these reasons:
+    // - Decouples RS and master life cycles. RegionServers can continue be up independent of
+    //   masters' availability.
+    // - Configuration management for region servers (cluster internal) is much simpler when adding
+    //   new masters or removing existing masters, since only clients' config needs to be updated.
+    // - We need to retain ZKConnectionRegistry for replication use anyway, so we just extend it for
+    //   other internal connections too.
+    conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+        HConstants.ZK_CONNECTION_REGISTRY_CLASS);
+    if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
+      // Use server ZK cluster for server-issued connections, so we clone
+      // the conf and unset the client ZK related properties
+      conf = new Configuration(this.conf);
+      conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
+    }
+    return conf;
+  }
+
+  /**
+   * All initialization needed before we go register with Master.<br>
+   * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
+   * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
+   */
+  private void preRegistrationInitialization() {
+    try {
+      setupClusterConnection();
+    } catch (Throwable t) {
+      // Call stop if error or process will stick around for ever since server
+      // puts up non-daemon threads.
+      this.rpcServices.stop();
+      abort("Initialization of RS failed.  Hence aborting RS.", t);
+    }
+  }
+
+  /**
+   * Setup our cluster connection if not already initialized.
+   */
+  protected final synchronized void setupClusterConnection() throws IOException {
+    if (asyncClusterConnection == null) {
+      Configuration conf = cleanupConfiguration();
+      InetSocketAddress localAddress = new InetSocketAddress(this.rpcServices.isa.getAddress(), 0);
+      User user = userProvider.getCurrent();
+      asyncClusterConnection =
+          ClusterConnectionFactory.createAsyncClusterConnection(conf, localAddress, user);
+    }
+  }
+
+  /**
+   * Wait on all threads to finish. Presumption is that all closes and stops
+   * have already been called.
+   */
+  protected void stopServiceThreads() {
+    if (this.replicationSinkHandler != null) {
+      this.replicationSinkHandler.stopReplicationService();
+    }
+  }
+
+  public static void main(String[] args) {
+    LOG.info("STARTING executorService " + HReplicationServer.class.getSimpleName());
+    VersionInfo.logVersion();
+    Configuration conf = HBaseConfiguration.create();
+    @SuppressWarnings("unchecked")
+    Class<? extends HReplicationServer> replicationServerClass =
+        (Class<? extends HReplicationServer>)
+            conf.getClass(REPLICATION_SERVER_IMPL, HReplicationServer.class);
+
+    new HReplicationServerCommandLine(replicationServerClass).doMain(args);
+  }
+
+  @Override
+  public WAL getWAL(RegionInfo regionInfo) throws IOException {
+    throw new DoNotRetryIOException(new UnsupportedOperationException("This's ReplicationServer."));
+  }
+
+  @Override
+  public List<WAL> getWALs() throws IOException {
+    throw new DoNotRetryIOException(new UnsupportedOperationException("This's ReplicationServer."));
+  }
+
+  @Override
+  public FlushRequester getFlushRequester() {

Review comment:
       Should throw UnsupportedOperationException? (And same applies to all non implemented methods currently returning null).

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
##########
@@ -0,0 +1,683 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.locking.EntityLock;
+import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.mob.MobFileCache;
+import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionSizeStore;
+import org.apache.hadoop.hbase.regionserver.FlushRequester;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
+import org.apache.hadoop.hbase.regionserver.LeaseManager;
+import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.ReplicationService;
+import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
+import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
+import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
+import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.AccessChecker;
+import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
+import org.apache.hadoop.hbase.trace.TraceUtil;
+import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.Service;
+
+/**
+ * HReplicationServer which is responsible to all replication stuff. It checks in with
+ * the HMaster. There are many HReplicationServers in a single HBase deployment.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
+@SuppressWarnings({ "deprecation"})
+public class HReplicationServer extends Thread implements Server, RegionServerServices {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HReplicationServer.class);
+
+  /** Parameter name for what region server implementation to use. */
+  public static final String REPLICATION_SERVER_IMPL = "hbase.replicationserver.impl";
+
+  /** replication server process name */
+  public static final String REPLICATIONSERVER = "replicationserver";
+
+  /**
+   * This servers startcode.
+   */
+  protected final long startcode;
+
+  private volatile boolean stopped = false;
+
+  // A state before we go into stopped state.  At this stage we're closing user
+  // space regions.
+  private boolean stopping = false;
+  private volatile boolean killed = false;
+  private volatile boolean shutDown = false;
+
+  // Go down hard. Used if file system becomes unavailable and also in
+  // debugging and unit tests.
+  private AtomicBoolean abortRequested;
+
+  // flag set after we're done setting up server threads
+  final AtomicBoolean online = new AtomicBoolean(false);
+
+  /**
+   * The server name the Master sees us as.  Its made from the hostname the
+   * master passes us, port, and server startcode. Gets set after registration
+   * against Master.
+   */
+  private ServerName serverName;
+
+  protected final Configuration conf;
+
+  private ReplicationSinkService replicationSinkHandler;
+
+  final int msgInterval;
+  // A sleeper that sleeps for msgInterval.
+  protected final Sleeper sleeper;
+
+  // zookeeper connection and watcher
+  protected final ZKWatcher zooKeeper;
+
+  /**
+   * The asynchronous cluster connection to be shared by services.
+   */
+  protected AsyncClusterConnection asyncClusterConnection;
+
+  private UserProvider userProvider;
+
+  protected final ReplicationServerRpcServices rpcServices;
+
+  public HReplicationServer(final Configuration conf) throws IOException {
+    TraceUtil.initTracer(conf);
+    try {
+      this.startcode = System.currentTimeMillis();
+      this.conf = conf;
+
+      this.abortRequested = new AtomicBoolean(false);
+
+      this.rpcServices = createRpcServices();
+
+      String hostName = this.rpcServices.isa.getHostName();
+      serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode);
+
+      this.userProvider = UserProvider.instantiate(conf);
+
+      this.msgInterval = conf.getInt("hbase.replicationserver.msginterval", 3 * 1000);
+      this.sleeper = new Sleeper(this.msgInterval, this);
+
+      // Some unit tests don't need a cluster, so no zookeeper at all
+      if (!conf.getBoolean("hbase.testing.nocluster", false)) {
+        // Open connection to zookeeper and set primary watcher
+        zooKeeper = new ZKWatcher(conf, getProcessName() + ":" +
+            rpcServices.isa.getPort(), this, false);
+      } else {
+        zooKeeper = null;
+      }
+
+      this.rpcServices.start(zooKeeper);
+    } catch (Throwable t) {
+      // Make sure we log the exception. HReplicationServer is often started via reflection and the
+      // cause of failed startup is lost.
+      LOG.error("Failed construction ReplicationServer", t);
+      throw t;
+    }
+  }
+
+  /**
+   * Utility for constructing an instance of the passed HRegionServer class.
+   */
+  static HReplicationServer constructReplicationServer(
+      final Class<? extends HReplicationServer> replicationServerClass,
+      final Configuration conf) {
+    try {
+      Constructor<? extends HReplicationServer> c =
+          replicationServerClass.getConstructor(Configuration.class);
+      return c.newInstance(conf);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed construction of " + "ReplicationServer: "
+          + replicationServerClass.toString(), e);
+    }
+  }
+
+  public String getProcessName() {
+    return REPLICATIONSERVER;
+  }
+
+  @Override
+  public void run() {
+    if (isStopped()) {
+      LOG.info("Skipping run; stopped");
+      return;
+    }
+    try {
+      // Do pre-registration initializations; zookeeper, lease threads, etc.
+      preRegistrationInitialization();
+    } catch (Throwable e) {
+      abort("Fatal exception during initialization", e);
+    }
+    try {
+      setupReplication();
+      startReplicationService();
+
+      // Wake up anyone waiting for this server to online
+      synchronized (online) {
+        online.set(true);
+        online.notifyAll();
+      }
+
+      long lastMsg = System.currentTimeMillis();
+      // The main run loop.
+      while (!isStopped()) {
+        if (!isClusterUp()) {
+          if (!this.stopping) {
+            this.stopping = true;
+          }
+        }
+        long now = System.currentTimeMillis();
+        if ((now - lastMsg) >= msgInterval) {
+          lastMsg = System.currentTimeMillis();
+        }
+        if (!isStopped() && !isAborted()) {
+          this.sleeper.sleep();
+        }
+      }
+
+      if (!killed) {
+        stopServiceThreads();
+      }
+      if (this.rpcServices != null) {
+        this.rpcServices.stop();
+      }
+    } catch (Throwable t) {
+      abort(t.getMessage(), t);
+    }
+
+    if (this.zooKeeper != null) {
+      this.zooKeeper.close();
+    }
+    this.shutDown = true;
+    LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
+  }
+
+  private Configuration cleanupConfiguration() {
+    Configuration conf = this.conf;
+    // We use ZKConnectionRegistry for all the internal communication, primarily for these reasons:
+    // - Decouples RS and master life cycles. RegionServers can continue be up independent of
+    //   masters' availability.
+    // - Configuration management for region servers (cluster internal) is much simpler when adding
+    //   new masters or removing existing masters, since only clients' config needs to be updated.
+    // - We need to retain ZKConnectionRegistry for replication use anyway, so we just extend it for
+    //   other internal connections too.
+    conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+        HConstants.ZK_CONNECTION_REGISTRY_CLASS);
+    if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
+      // Use server ZK cluster for server-issued connections, so we clone
+      // the conf and unset the client ZK related properties
+      conf = new Configuration(this.conf);
+      conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
+    }
+    return conf;
+  }
+
+  /**
+   * All initialization needed before we go register with Master.<br>
+   * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
+   * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
+   */
+  private void preRegistrationInitialization() {
+    try {
+      setupClusterConnection();
+    } catch (Throwable t) {
+      // Call stop if error or process will stick around for ever since server
+      // puts up non-daemon threads.
+      this.rpcServices.stop();
+      abort("Initialization of RS failed.  Hence aborting RS.", t);
+    }
+  }
+
+  /**
+   * Setup our cluster connection if not already initialized.
+   */
+  protected final synchronized void setupClusterConnection() throws IOException {
+    if (asyncClusterConnection == null) {
+      Configuration conf = cleanupConfiguration();
+      InetSocketAddress localAddress = new InetSocketAddress(this.rpcServices.isa.getAddress(), 0);
+      User user = userProvider.getCurrent();
+      asyncClusterConnection =
+          ClusterConnectionFactory.createAsyncClusterConnection(conf, localAddress, user);
+    }
+  }
+
+  /**
+   * Wait on all threads to finish. Presumption is that all closes and stops
+   * have already been called.
+   */
+  protected void stopServiceThreads() {
+    if (this.replicationSinkHandler != null) {
+      this.replicationSinkHandler.stopReplicationService();
+    }
+  }
+
+  public static void main(String[] args) {
+    LOG.info("STARTING executorService " + HReplicationServer.class.getSimpleName());
+    VersionInfo.logVersion();
+    Configuration conf = HBaseConfiguration.create();
+    @SuppressWarnings("unchecked")
+    Class<? extends HReplicationServer> replicationServerClass =
+        (Class<? extends HReplicationServer>)
+            conf.getClass(REPLICATION_SERVER_IMPL, HReplicationServer.class);
+
+    new HReplicationServerCommandLine(replicationServerClass).doMain(args);
+  }
+
+  @Override
+  public WAL getWAL(RegionInfo regionInfo) throws IOException {
+    throw new DoNotRetryIOException(new UnsupportedOperationException("This's ReplicationServer."));
+  }
+
+  @Override
+  public List<WAL> getWALs() throws IOException {
+    throw new DoNotRetryIOException(new UnsupportedOperationException("This's ReplicationServer."));
+  }
+
+  @Override
+  public FlushRequester getFlushRequester() {
+    return null;
+  }
+
+  @Override
+  public CompactionRequester getCompactionRequestor() {
+    return null;
+  }
+
+  @Override
+  public RegionServerAccounting getRegionServerAccounting() {
+    return null;
+  }
+
+  @Override
+  public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
+    return null;
+  }
+
+  @Override
+  public SecureBulkLoadManager getSecureBulkLoadManager() {
+    return null;
+  }
+
+  @Override
+  public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
+    return null;
+  }
+
+  @Override
+  public void postOpenDeployTasks(PostOpenDeployContext context) throws IOException {
+    throw new DoNotRetryIOException(new UnsupportedOperationException("This's ReplicationServer."));
+  }
+
+  @Override
+  public boolean reportRegionStateTransition(RegionStateTransitionContext context) {
+    return false;
+  }
+
+  @Override
+  public RpcServerInterface getRpcServer() {
+    return null;
+  }
+
+  @Override
+  public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
+    return null;
+  }
+
+  @Override
+  public LeaseManager getLeaseManager() {
+    return null;
+  }
+
+  @Override
+  public ExecutorService getExecutorService() {
+    return null;
+  }
+
+  @Override
+  public ServerNonceManager getNonceManager() {
+    return null;
+  }
+
+  @Override
+  public boolean registerService(Service service) {
+    return false;
+  }
+
+  @Override
+  public HeapMemoryManager getHeapMemoryManager() {
+    return null;
+  }
+
+  @Override
+  public double getCompactionPressure() {
+    return 0;
+  }
+
+  @Override
+  public ThroughputController getFlushThroughputController() {
+    return null;
+  }
+
+  @Override
+  public double getFlushPressure() {
+    return 0;
+  }
+
+  @Override
+  public MetricsRegionServer getMetrics() {
+    return null;
+  }
+
+  @Override
+  public EntityLock regionLock(List<RegionInfo> regionInfos, String description, Abortable abort)
+      throws IOException {
+    throw new DoNotRetryIOException(new UnsupportedOperationException("This's ReplicationServer."));
+  }
+
+  @Override
+  public void unassign(byte[] regionName) throws IOException {
+    throw new DoNotRetryIOException(new UnsupportedOperationException("This's ReplicationServer."));
+  }
+
+  @Override
+  public boolean reportRegionSizesForQuotas(RegionSizeStore sizeStore) {
+    return false;
+  }
+
+  @Override
+  public boolean reportFileArchivalForQuotas(TableName tableName,
+      Collection<Entry<String, Long>> archivedFiles) {
+    return false;
+  }
+
+  @Override
+  public boolean isClusterUp() {
+    return false;
+  }
+
+  @Override
+  public ReplicationSourceService getReplicationSourceService() {
+    return null;
+  }
+
+  @Override
+  public TableDescriptors getTableDescriptors() {
+    return null;
+  }
+
+  @Override
+  public Optional<BlockCache> getBlockCache() {
+    return Optional.empty();
+  }
+
+  @Override
+  public Optional<MobFileCache> getMobFileCache() {
+    return Optional.empty();
+  }
+
+  @Override
+  public AccessChecker getAccessChecker() {
+    return null;
+  }
+
+  @Override
+  public ZKPermissionWatcher getZKPermissionWatcher() {
+    return null;
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  @Override
+  public ZKWatcher getZooKeeper() {
+    return zooKeeper;
+  }
+
+  @Override
+  public Connection getConnection() {
+    return getAsyncConnection().toConnection();
+  }
+
+  @Override
+  public Connection createConnection(Configuration conf) throws IOException {
+    throw new DoNotRetryIOException(new UnsupportedOperationException("This's ReplicationServer."));
+  }
+
+  @Override
+  public AsyncClusterConnection getAsyncClusterConnection() {
+    return null;
+  }
+
+  @Override
+  public ServerName getServerName() {
+    return serverName;
+  }
+
+  @Override
+  public CoordinatedStateManager getCoordinatedStateManager() {
+    return null;
+  }
+
+  @Override
+  public ChoreService getChoreService() {
+    return null;
+  }
+
+  @Override
+  public void abort(String why, Throwable cause) {
+    if (!setAbortRequested()) {
+      // Abort already in progress, ignore the new request.
+      LOG.debug(
+          "Abort already in progress. Ignoring the current request with reason: {}", why);
+      return;
+    }
+    String msg = "***** ABORTING replication server " + this + ": " + why + " *****";
+    if (cause != null) {
+      LOG.error(HBaseMarkers.FATAL, msg, cause);
+    } else {
+      LOG.error(HBaseMarkers.FATAL, msg);
+    }
+    stop(why);
+  }
+
+  @Override
+  public boolean isAborted() {
+    return abortRequested.get();
+  }
+
+  @Override
+  public void stop(String why) {
+    stopped = true;
+  }
+
+  @Override
+  public boolean isStopped() {
+    return this.stopped;
+  }
+
+  @Override
+  public void updateRegionFavoredNodesMapping(String encodedRegionName,
+      List<HBaseProtos.ServerName> favoredNodes) {
+  }
+
+  @Override
+  public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
+    return new InetSocketAddress[0];
+  }
+
+  @Override
+  public void addRegion(HRegion r) {
+
+  }
+
+  @Override
+  public boolean removeRegion(HRegion r, ServerName destination) {
+    return false;
+  }
+
+  @Override
+  public Region getRegion(String encodedRegionName) {
+    return null;
+  }
+
+  @Override
+  public List<? extends Region> getRegions(TableName tableName) throws IOException {
+    throw new DoNotRetryIOException(new UnsupportedOperationException("This's ReplicationServer."));
+  }
+
+  @Override
+  public List<? extends Region> getRegions() {
+    return null;
+  }
+
+  /**
+   * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
+   * be hooked up to WAL.
+   */
+  private void setupReplication() throws IOException {
+    // Instantiate replication if replication enabled. Pass it the log directories.
+    createNewReplicationInstance(conf, this);
+  }
+
+  /**
+   * Load the replication executorService objects, if any
+   */
+  private static void createNewReplicationInstance(Configuration conf, HReplicationServer server)
+      throws IOException {
+    // read in the name of the sink replication class from the config file.
+    String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
+        HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
+
+    server.replicationSinkHandler = newReplicationInstance(sinkClassname,
+        ReplicationSinkService.class, conf, server);
+  }
+
+  private static <T extends ReplicationService> T newReplicationInstance(String classname,
+      Class<T> xface, Configuration conf, HReplicationServer server) throws IOException {
+    final Class<? extends T> clazz;
+    try {
+      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+      clazz = Class.forName(classname, true, classLoader).asSubclass(xface);
+    } catch (java.lang.ClassNotFoundException nfe) {
+      throw new IOException("Could not find class for " + classname);
+    }
+    T service = ReflectionUtils.newInstance(clazz, conf);
+    service.initialize(server, null, null, null, null);

Review comment:
       Why all those null params? Can we remove those from the interface?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
##########
@@ -0,0 +1,683 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.locking.EntityLock;
+import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.mob.MobFileCache;
+import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionSizeStore;
+import org.apache.hadoop.hbase.regionserver.FlushRequester;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
+import org.apache.hadoop.hbase.regionserver.LeaseManager;
+import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.ReplicationService;
+import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
+import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
+import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
+import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.AccessChecker;
+import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
+import org.apache.hadoop.hbase.trace.TraceUtil;
+import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.Service;
+
+/**
+ * HReplicationServer which is responsible to all replication stuff. It checks in with
+ * the HMaster. There are many HReplicationServers in a single HBase deployment.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
+@SuppressWarnings({ "deprecation"})
+public class HReplicationServer extends Thread implements Server, RegionServerServices {

Review comment:
       Have similar feeling, given the high number of methods where an implementation is not applicable in this context.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org