You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2019/08/14 00:39:19 UTC

[hbase] branch branch-1.4 updated: HBASE-22784 OldWALs not cleared in a replication slave cluster (cyclic replication bw 2 clusters)

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1.4 by this push:
     new ce0a91c  HBASE-22784 OldWALs not cleared in a replication slave cluster (cyclic replication bw 2 clusters)
ce0a91c is described below

commit ce0a91cb0a00dad9e317fa9ba9926733be26d8d8
Author: Wellington Chevreuil <wc...@apache.org>
AuthorDate: Thu Aug 8 12:19:09 2019 +0100

    HBASE-22784 OldWALs not cleared in a replication slave cluster (cyclic replication bw 2 clusters)
    
    Signed-off-by: Andrew Purtell <ap...@apache.org>
    
    Conflicts:
    	hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
    	hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
    	hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
---
 .../regionserver/ReplicationSource.java            |  1 +
 .../regionserver/ReplicationSourceManager.java     | 28 +++++++++++-----
 .../ReplicationSourceWALReaderThread.java          |  8 +++++
 .../regionserver/TestWALEntryStream.java           | 39 ++++++++++++++++++++++
 4 files changed, 67 insertions(+), 9 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index d3f2620..6ae1d1b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -723,6 +723,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
     }
 
     private void updateLogPosition(long lastReadPosition) {
+      manager.setPendingShipment(false);
       manager.logPositionAndCleanOldLogs(currentPath, peerClusterZnode, lastReadPosition,
         this.replicationQueueInfo.isQueueRecovered(), false);
       lastLoggedPosition = lastReadPosition;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index d5b5c63..071d2c5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -120,6 +120,7 @@ public class ReplicationSourceManager implements ReplicationListener {
   private final Random rand;
   private final boolean replicationForBulkLoadDataEnabled;
 
+  private boolean pendingShipment;
 
   /**
    * Creates a replication manager and sets the watch on all the other registered region servers
@@ -186,14 +187,20 @@ public class ReplicationSourceManager implements ReplicationListener {
    * @param queueRecovered indicates if this queue comes from another region server
    * @param holdLogInZK if true then the log is retained in ZK
    */
-  public void logPositionAndCleanOldLogs(Path log, String id, long position,
+  public synchronized void logPositionAndCleanOldLogs(Path log, String id, long position,
       boolean queueRecovered, boolean holdLogInZK) {
-    String fileName = log.getName();
-    this.replicationQueues.setLogPosition(id, fileName, position);
-    if (holdLogInZK) {
-     return;
+    if (!this.pendingShipment) {
+      String fileName = log.getName();
+      this.replicationQueues.setLogPosition(id, fileName, position);
+      if (holdLogInZK) {
+        return;
+      }
+      cleanOldLogs(fileName, id, queueRecovered);
     }
-    cleanOldLogs(fileName, id, queueRecovered);
+  }
+
+  public synchronized void setPendingShipment(boolean pendingShipment) {
+    this.pendingShipment = pendingShipment;
   }
 
   /**
@@ -206,9 +213,12 @@ public class ReplicationSourceManager implements ReplicationListener {
   public void cleanOldLogs(String key, String id, boolean queueRecovered) {
     String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key);
     if (queueRecovered) {
-      SortedSet<String> wals = walsByIdRecoveredQueues.get(id).get(logPrefix);
-      if (wals != null && !wals.first().equals(key)) {
-        cleanOldLogs(wals, key, id);
+      Map<String, SortedSet<String>> walsForPeer = walsByIdRecoveredQueues.get(id);
+      if(walsForPeer != null) {
+        SortedSet<String> wals = walsForPeer.get(logPrefix);
+        if (wals != null && !wals.first().equals(key)) {
+          cleanOldLogs(wals, key, id);
+        }
       }
     } else {
       synchronized (this.walsById) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index 872f91d..ddbf1c9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -73,6 +73,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
   private ReplicationQueueInfo replicationQueueInfo;
   private int maxRetriesMultiplier;
   private MetricsSource metrics;
+  private ReplicationSourceManager replicationSourceManager;
 
   /**
    * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
@@ -102,6 +103,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
     // memory used will be batchSizeCapacity * (nb.batches + 1)
     // the +1 is for the current thread reading before placing onto the queue
     int batchCount = conf.getInt("replication.source.nb.batches", 1);
+    this.replicationSourceManager = manager;
     this.sleepForRetries =
         this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
     this.maxRetriesMultiplier =
@@ -134,6 +136,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
               if (edit != null && !edit.isEmpty()) {
                 long entrySize = getEntrySize(entry);
                 batch.addEntry(entry);
+                replicationSourceManager.setPendingShipment(true);
                 updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
                 // Stop if too many entries or too big
                 if (batch.getHeapSize() >= replicationBatchSizeCapacity
@@ -141,6 +144,11 @@ public class ReplicationSourceWALReaderThread extends Thread {
                   break;
                 }
               }
+            } else {
+              replicationSourceManager.logPositionAndCleanOldLogs(entryStream.getCurrentPath(),
+                this.replicationQueueInfo.getPeerClusterZnode(),
+                entryStream.getPosition(),
+                this.replicationQueueInfo.isQueueRecovered(), false);
             }
           }
           if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 04c3b81..794fed2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -24,6 +24,11 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -66,6 +71,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
 
@@ -364,6 +370,39 @@ public class TestWALEntryStream {
     assertEquals(getRow(entryBatch.getWalEntries().get(0)), "foo");
   }
 
+  @Test
+  public void testReplicationSourceUpdatesLogPositionOnFilteredEntries() throws Exception {
+    appendEntriesToLog(3);
+    // get ending position
+    long position;
+    try (WALEntryStream entryStream =
+      new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+      entryStream.next();
+      entryStream.next();
+      entryStream.next();
+      position = entryStream.getPosition();
+    }
+    // start up a readerThread with a WALEntryFilter that always filter the entries
+    ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
+    ReplicationSourceWALReaderThread readerThread = new ReplicationSourceWALReaderThread(
+      mockSourceManager, getQueueInfo(), walQueue, 0, fs, conf, new WALEntryFilter() {
+        @Override
+        public Entry filter(Entry entry) {
+          return null;
+        }
+      }, new MetricsSource("1"));
+    readerThread.start();
+    Thread.sleep(100);
+    ArgumentCaptor<Long> positionCaptor = ArgumentCaptor.forClass(Long.class);
+    verify(mockSourceManager, times(3))
+      .logPositionAndCleanOldLogs(any(Path.class),
+        anyString(),
+        positionCaptor.capture(),
+        anyBoolean(),
+        anyBoolean());
+    assertEquals(position, positionCaptor.getValue().longValue());
+  }
+
   private String getRow(WAL.Entry entry) {
     Cell cell = entry.getEdit().getCells().get(0);
     return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());