You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2019/08/14 00:39:19 UTC
[hbase] branch branch-1.4 updated: HBASE-22784 OldWALs not cleared
in a replication slave cluster (cyclic replication bw 2 clusters)
This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1.4 by this push:
new ce0a91c HBASE-22784 OldWALs not cleared in a replication slave cluster (cyclic replication bw 2 clusters)
ce0a91c is described below
commit ce0a91cb0a00dad9e317fa9ba9926733be26d8d8
Author: Wellington Chevreuil <wc...@apache.org>
AuthorDate: Thu Aug 8 12:19:09 2019 +0100
HBASE-22784 OldWALs not cleared in a replication slave cluster (cyclic replication bw 2 clusters)
Signed-off-by: Andrew Purtell <ap...@apache.org>
Conflicts:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
---
.../regionserver/ReplicationSource.java | 1 +
.../regionserver/ReplicationSourceManager.java | 28 +++++++++++-----
.../ReplicationSourceWALReaderThread.java | 8 +++++
.../regionserver/TestWALEntryStream.java | 39 ++++++++++++++++++++++
4 files changed, 67 insertions(+), 9 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index d3f2620..6ae1d1b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -723,6 +723,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
}
private void updateLogPosition(long lastReadPosition) {
+ manager.setPendingShipment(false);
manager.logPositionAndCleanOldLogs(currentPath, peerClusterZnode, lastReadPosition,
this.replicationQueueInfo.isQueueRecovered(), false);
lastLoggedPosition = lastReadPosition;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index d5b5c63..071d2c5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -120,6 +120,7 @@ public class ReplicationSourceManager implements ReplicationListener {
private final Random rand;
private final boolean replicationForBulkLoadDataEnabled;
+ private boolean pendingShipment;
/**
* Creates a replication manager and sets the watch on all the other registered region servers
@@ -186,14 +187,20 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param queueRecovered indicates if this queue comes from another region server
* @param holdLogInZK if true then the log is retained in ZK
*/
- public void logPositionAndCleanOldLogs(Path log, String id, long position,
+ public synchronized void logPositionAndCleanOldLogs(Path log, String id, long position,
boolean queueRecovered, boolean holdLogInZK) {
- String fileName = log.getName();
- this.replicationQueues.setLogPosition(id, fileName, position);
- if (holdLogInZK) {
- return;
+ if (!this.pendingShipment) {
+ String fileName = log.getName();
+ this.replicationQueues.setLogPosition(id, fileName, position);
+ if (holdLogInZK) {
+ return;
+ }
+ cleanOldLogs(fileName, id, queueRecovered);
}
- cleanOldLogs(fileName, id, queueRecovered);
+ }
+
+ public synchronized void setPendingShipment(boolean pendingShipment) {
+ this.pendingShipment = pendingShipment;
}
/**
@@ -206,9 +213,12 @@ public class ReplicationSourceManager implements ReplicationListener {
public void cleanOldLogs(String key, String id, boolean queueRecovered) {
String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key);
if (queueRecovered) {
- SortedSet<String> wals = walsByIdRecoveredQueues.get(id).get(logPrefix);
- if (wals != null && !wals.first().equals(key)) {
- cleanOldLogs(wals, key, id);
+ Map<String, SortedSet<String>> walsForPeer = walsByIdRecoveredQueues.get(id);
+ if(walsForPeer != null) {
+ SortedSet<String> wals = walsForPeer.get(logPrefix);
+ if (wals != null && !wals.first().equals(key)) {
+ cleanOldLogs(wals, key, id);
+ }
}
} else {
synchronized (this.walsById) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index 872f91d..ddbf1c9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -73,6 +73,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
private ReplicationQueueInfo replicationQueueInfo;
private int maxRetriesMultiplier;
private MetricsSource metrics;
+ private ReplicationSourceManager replicationSourceManager;
/**
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
@@ -102,6 +103,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
// memory used will be batchSizeCapacity * (nb.batches + 1)
// the +1 is for the current thread reading before placing onto the queue
int batchCount = conf.getInt("replication.source.nb.batches", 1);
+ this.replicationSourceManager = manager;
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
this.maxRetriesMultiplier =
@@ -134,6 +136,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
if (edit != null && !edit.isEmpty()) {
long entrySize = getEntrySize(entry);
batch.addEntry(entry);
+ replicationSourceManager.setPendingShipment(true);
updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
// Stop if too many entries or too big
if (batch.getHeapSize() >= replicationBatchSizeCapacity
@@ -141,6 +144,11 @@ public class ReplicationSourceWALReaderThread extends Thread {
break;
}
}
+ } else {
+ replicationSourceManager.logPositionAndCleanOldLogs(entryStream.getCurrentPath(),
+ this.replicationQueueInfo.getPeerClusterZnode(),
+ entryStream.getPosition(),
+ this.replicationQueueInfo.isQueueRecovered(), false);
}
}
if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 04c3b81..794fed2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -24,6 +24,11 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.ArrayList;
@@ -66,6 +71,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
@@ -364,6 +370,39 @@ public class TestWALEntryStream {
assertEquals(getRow(entryBatch.getWalEntries().get(0)), "foo");
}
+ @Test
+ public void testReplicationSourceUpdatesLogPositionOnFilteredEntries() throws Exception {
+ appendEntriesToLog(3);
+ // get ending position
+ long position;
+ try (WALEntryStream entryStream =
+ new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+ entryStream.next();
+ entryStream.next();
+ entryStream.next();
+ position = entryStream.getPosition();
+ }
+ // start up a readerThread with a WALEntryFilter that always filter the entries
+ ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
+ ReplicationSourceWALReaderThread readerThread = new ReplicationSourceWALReaderThread(
+ mockSourceManager, getQueueInfo(), walQueue, 0, fs, conf, new WALEntryFilter() {
+ @Override
+ public Entry filter(Entry entry) {
+ return null;
+ }
+ }, new MetricsSource("1"));
+ readerThread.start();
+ Thread.sleep(100);
+ ArgumentCaptor<Long> positionCaptor = ArgumentCaptor.forClass(Long.class);
+ verify(mockSourceManager, times(3))
+ .logPositionAndCleanOldLogs(any(Path.class),
+ anyString(),
+ positionCaptor.capture(),
+ anyBoolean(),
+ anyBoolean());
+ assertEquals(position, positionCaptor.getValue().longValue());
+ }
+
private String getRow(WAL.Entry entry) {
Cell cell = entry.getEdit().getCells().get(0);
return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());