You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2020/09/08 18:46:35 UTC

[GitHub] [hbase] saintstack commented on a change in pull request #2364: HBASE-24998 Introduce a ReplicationSourceOverallController interface …

saintstack commented on a change in pull request #2364:
URL: https://github.com/apache/hbase/pull/2364#discussion_r485115878



##########
File path: hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
##########
@@ -979,6 +979,8 @@
   /*
    * cluster replication constants.
    */
+  public static final String REPLICATION_OFFLOAD_ENABLE_KEY = "hbase.replication.offload.enabled";
+  public static final boolean REPLICATION_OFFLOAD_ENABLE_DEFAULT = false;

Review comment:
       Only used in ReplicationSourceManager so define these there?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceOverallController.java
##########
@@ -0,0 +1,31 @@
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Used to control all replication sources inside one RegionServer or ReplicationServer.
+ * Used by {@link ReplicationSource} or {@link RecoveredReplicationSource}.
+ */
+@InterfaceAudience.Private
+public interface ReplicationSourceOverallController {

Review comment:
       Can it just be ReplicationSoruceController? Drop the 'Overall'?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
##########
@@ -244,7 +286,7 @@ private void adoptAbandonedQueues() {
    * </ol>
    * @param peerId the id of replication peer
    */
-  public void addPeer(String peerId) throws IOException {
+  void addPeer(String peerId) throws IOException {

Review comment:
       Funny, I want it to stay public in my current work (meta region replicas). Can move it back in my patch.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
##########
@@ -268,7 +310,7 @@ public void addPeer(String peerId) throws IOException {
    * </ol>
    * @param peerId the id of the replication peer
    */
-  public void removePeer(String peerId) {
+  void removePeer(String peerId) {

Review comment:
       Ditto

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
##########
@@ -186,12 +188,11 @@ public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
     this.latestPaths = new HashMap<>();
     this.replicationForBulkLoadDataEnabled = conf.getBoolean(
       HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
-    this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000);
-    this.maxRetriesMultiplier =
-      this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60);
     this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
         HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
     this.globalMetrics = globalMetrics;
+    this.replicationOffload = conf.getBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY,

Review comment:
       I should go back to the design but we can't just have the offload be a ReplicationSource implementation? Because you need to span all ReplicationSources?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
##########
@@ -92,7 +93,8 @@
  * </ul>
  */
 @InterfaceAudience.Private
-public class ReplicationSourceManager implements ReplicationListener {
+public class ReplicationSourceManager implements ReplicationListener,
+  ReplicationSourceOverallController {

Review comment:
       Yeah, if this implements Controller, you have to pass it in to ReplicationSource #Init as a distinct arg?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceOverallController.java
##########
@@ -0,0 +1,31 @@
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Used to control all replication sources inside one RegionServer or ReplicationServer.
+ * Used by {@link ReplicationSource} or {@link RecoveredReplicationSource}.
+ */
+@InterfaceAudience.Private
+public interface ReplicationSourceOverallController {
+
+  /**
+   * Returns the maximum size in bytes of edits held in memory which are pending replication
+   * across all sources inside this RegionServer or ReplicationServer.
+   */
+  long getTotalBufferLimit();
+
+  AtomicLong getTotalBufferUsed();
+
+  MetricsReplicationGlobalSourceSource getGlobalMetrics();

Review comment:
       What sort of 'global' metrics? This is metrics for all replication sources?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceOverallController.java
##########
@@ -0,0 +1,31 @@
+package org.apache.hadoop.hbase.replication;

Review comment:
       License

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceOverallController.java
##########
@@ -0,0 +1,31 @@
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Used to control all replication sources inside one RegionServer or ReplicationServer.
+ * Used by {@link ReplicationSource} or {@link RecoveredReplicationSource}.
+ */
+@InterfaceAudience.Private
+public interface ReplicationSourceOverallController {
+
+  /**
+   * Returns the maximum size in bytes of edits held in memory which are pending replication
+   * across all sources inside this RegionServer or ReplicationServer.
+   */
+  long getTotalBufferLimit();

Review comment:
       Is this a 'size' rather than a 'limit'? It is count of how many bytes are currently accumulated in replication source memory?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
##########
@@ -45,18 +46,15 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class);
 
-  private Path walDir;
-
   private String actualPeerId;
 
   @Override
-  public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
-    ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
-    String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
-    MetricsSource metrics) throws IOException {
-    super.init(conf, fs, walDir, manager, queueStorage, replicationPeer, server, peerClusterZnode,
-      clusterId, walFileLengthProvider, metrics);
-    this.walDir = walDir;
+  public void init(Configuration conf, FileSystem fs, Path walDir,
+    ReplicationSourceOverallController overallController, ReplicationQueueStorage queueStorage,

Review comment:
       There is only one ReplicationSourceManager instance in a RS? If so, can it carry the Controller rather than as here where it is another param on this init method?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
##########
@@ -267,10 +267,11 @@ public Path getCurrentPath() {
   //returns false if we've already exceeded the global quota
   private boolean checkQuota() {
     // try not to go over total quota
-    if (source.manager.getTotalBufferUsed().get() > source.manager.getTotalBufferLimit()) {
+    if (source.overallController.getTotalBufferUsed().get() > source.overallController

Review comment:
       Yeah, if the Controller is in the ReplicationSourceManager, then the 'overall' will be redundant?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceOverallController.java
##########
@@ -0,0 +1,31 @@
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Used to control all replication sources inside one RegionServer or ReplicationServer.
+ * Used by {@link ReplicationSource} or {@link RecoveredReplicationSource}.
+ */
+@InterfaceAudience.Private
+public interface ReplicationSourceOverallController {
+
+  /**
+   * Returns the maximum size in bytes of edits held in memory which are pending replication
+   * across all sources inside this RegionServer or ReplicationServer.
+   */
+  long getTotalBufferLimit();
+
+  AtomicLong getTotalBufferUsed();
+
+  MetricsReplicationGlobalSourceSource getGlobalMetrics();
+
+  /**
+   * Called this when the recovered replication source replicated all WALs.
+   * @param src

Review comment:
       Remove '@param src'. s/Called/Call/

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
##########
@@ -849,19 +874,6 @@ int getSizeOfLatestPath() {
     }
   }
 
-  @VisibleForTesting
-  public AtomicLong getTotalBufferUsed() {

Review comment:
       I see, you are just moving existing methods.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
##########
@@ -244,7 +286,7 @@ private void adoptAbandonedQueues() {
    * </ol>
    * @param peerId the id of replication peer
    */
-  public void addPeer(String peerId) throws IOException {
+  void addPeer(String peerId) throws IOException {

Review comment:
       I want to call it on Region open. Let me see if I can move where I make the call.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceOverallController.java
##########
@@ -0,0 +1,31 @@
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Used to control all replication sources inside one RegionServer or ReplicationServer.
+ * Used by {@link ReplicationSource} or {@link RecoveredReplicationSource}.
+ */
+@InterfaceAudience.Private
+public interface ReplicationSourceOverallController {
+
+  /**
+   * Returns the maximum size in bytes of edits held in memory which are pending replication
+   * across all sources inside this RegionServer or ReplicationServer.
+   */
+  long getTotalBufferLimit();

Review comment:
       Maybe the comment is just in the wrong place.. should be on the next data member?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org