You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2020/07/15 23:23:53 UTC

[GitHub] [hbase] bharathv commented on a change in pull request #2064: HBASE-24735: Refactor ReplicationSourceManager: move logPositionAndCl…

bharathv commented on a change in pull request #2064:
URL: https://github.com/apache/hbase/pull/2064#discussion_r455417117



##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
##########
@@ -192,4 +188,8 @@ default boolean isSyncReplication() {
   default boolean isRecovered() {
     return false;
   }
+
+  void logPosition(WALEntryBatch entryBatch);

Review comment:
       nit: method javadoc

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
##########
@@ -192,4 +188,8 @@ default boolean isSyncReplication() {
   default boolean isRecovered() {
     return false;
   }
+
+  void logPosition(WALEntryBatch entryBatch);
+
+  void cleanOldLogs(String walName, boolean inclusive);

Review comment:
       nit: method javadoc

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -729,4 +728,143 @@ void removeWorker(ReplicationSourceShipper worker) {
   private String logPeerId(){
     return "[Source for peer " + this.getPeer().getId() + "]:";
   }
+
+  @VisibleForTesting
+  public void logPosition(WALEntryBatch entryBatch) {
+    String fileName = entryBatch.getLastWalPath().getName();
+    interruptOrAbortWhenFail(() -> this.queueStorage
+      .setWALPosition(server.getServerName(), getQueueId(), fileName,
+        entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds()));
+  }
+
+  /**

Review comment:
       move this to the interface?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -139,17 +144,16 @@
    * Instantiation method used by region servers
    * @param conf configuration to use
    * @param fs file system to use
-   * @param manager replication manager to ping to
    * @param server the server for this region server
    * @param queueId the id of our replication queue
    * @param clusterId unique UUID for the cluster
    * @param metrics metrics for replication source
    */
   @Override
-  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
+  public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,

Review comment:
       You don't need the manager in the c'tor anymore?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
##########
@@ -926,7 +799,8 @@ public void join() {
    * @return list of all normal sources
    */
   public List<ReplicationSourceInterface> getSources() {
-    return new ArrayList<>(this.sources.values());
+    return this.sources.values().stream().filter(source -> source.isSourceActive())

Review comment:
       curious why this change?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
##########
@@ -30,8 +30,7 @@
 /**
  * Holds a batch of WAL entries to replicate, along with some statistics
  */
-@InterfaceAudience.Private
-class WALEntryBatch {
+@InterfaceAudience.Private public class WALEntryBatch {

Review comment:
       I think this is not needed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org