You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2021/02/23 21:20:32 UTC
[hbase] branch branch-1 updated: HBASE-25596: Fix NPE and avoid
permanent unreplicated data due to EOF (#2975)
This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1 by this push:
new 19fe18e HBASE-25596: Fix NPE and avoid permanent unreplicated data due to EOF (#2975)
19fe18e is described below
commit 19fe18e466a2c4eb72c5b3c9dfbbbbb09e11df46
Author: Sandeep Pal <50...@users.noreply.github.com>
AuthorDate: Tue Feb 23 13:20:00 2021 -0800
HBASE-25596: Fix NPE and avoid permanent unreplicated data due to EOF (#2975)
Signed-off-by: Andrew Purtell <ap...@apache.org>
---
.../regionserver/ReplicationSource.java | 5 +
.../regionserver/ReplicationSourceManager.java | 2 +-
.../ReplicationSourceWALReaderThread.java | 208 +++++++++++-----
.../replication/regionserver/WALEntryStream.java | 22 +-
.../hbase/replication/TestReplicationSource.java | 265 +++++++++++++++++----
.../regionserver/TestReplicationSourceBase.java | 1 -
.../regionserver/TestWALEntryStream.java | 56 ++++-
7 files changed, 443 insertions(+), 116 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index a58289e..969e8ca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -221,6 +221,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
}
}
+ @InterfaceAudience.Private
+ public Map<String, PriorityBlockingQueue<Path>> getQueues() {
+ return logQueue.getQueues();
+ }
+
@Override
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index d9435a3..a8e8e76 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -183,7 +183,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* position. It will also clean old logs from the queue.
* @param log Path to the log currently being replicated from
* replication status in zookeeper. It will also delete older entries.
- * @param id id of the peer cluster
+ * @param id id of the replication queue
* @param position current location in the log
* @param queueRecovered indicates if this queue comes from another region server
* @param holdLogInZK if true then the log is retained in ZK
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index bd155d5..a1d64ca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -50,7 +50,8 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALKey;
/**
- * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue
+ * Reads and filters WAL entries, groups the filtered entries into batches,
+ * and puts the batches onto a queue
*
*/
@InterfaceAudience.Private
@@ -88,7 +89,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
* entries, and puts them on a batch queue.
* @param manager replication manager
- * @param replicationQueueInfo
+ * @param replicationQueueInfo replication queue info
* @param logQueue The WAL queue to read off of
* @param startPosition position in the first WAL to start reading from
* @param fs the files system to use
@@ -135,71 +136,128 @@ public class ReplicationSourceWALReaderThread extends Thread {
@Override
public void run() {
int sleepMultiplier = 1;
- while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
- try (WALEntryStream entryStream =
- new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics, walGroupId)) {
- while (isReaderRunning()) { // loop here to keep reusing stream while we can
- if (!source.isPeerEnabled()) {
- Threads.sleep(sleepForRetries);
- continue;
- }
- if (!checkQuota()) {
- continue;
- }
- WALEntryBatch batch = new WALEntryBatch(replicationBatchCountCapacity);
- boolean hasNext;
- while ((hasNext = entryStream.hasNext()) == true) {
- Entry entry = entryStream.next();
- entry = filterEntry(entry);
- if (entry != null) {
- WALEdit edit = entry.getEdit();
- if (edit != null && !edit.isEmpty()) {
- long entrySize = getEntrySizeIncludeBulkLoad(entry);
- long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
- batch.addEntry(entry, entrySize);
- updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
- boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
- // Stop if too many entries or too big
- if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
+ WALEntryBatch batch = null;
+ WALEntryStream entryStream =
+ new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics, walGroupId);
+ try {
+ while (isReaderRunning()) { // we only loop back here if something fatal happens to stream
+ try {
+ entryStream = new WALEntryStream(logQueue, fs, conf,
+ lastReadPosition, metrics, walGroupId);
+ while (isReaderRunning()) { // loop here to keep reusing stream while we can
+ if (!source.isPeerEnabled()) {
+ Threads.sleep(sleepForRetries);
+ continue;
+ }
+ if (!checkQuota()) {
+ continue;
+ }
+ batch = new WALEntryBatch(replicationBatchCountCapacity);
+ boolean hasNext = entryStream.hasNext();
+ while (hasNext) {
+ Entry entry = entryStream.next();
+ entry = filterEntry(entry);
+ if (entry != null) {
+ WALEdit edit = entry.getEdit();
+ if (edit != null && !edit.isEmpty()) {
+ long entrySize = getEntrySizeIncludeBulkLoad(entry);
+ long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
+ batch.addEntry(entry, entrySize);
+ updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
+ boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
+ // Stop if too many entries or too big
+ if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
|| batch.getNbEntries() >= replicationBatchCountCapacity) {
- break;
+ break;
+ }
}
}
+ hasNext = entryStream.hasNext();
}
- }
- updateBatch(entryStream, batch, hasNext);
- if (isShippable(batch)) {
- sleepMultiplier = 1;
- entryBatchQueue.put(batch);
- if (!batch.hasMoreEntries()) {
- // we're done with queue recovery, shut ourselves down
- setReaderRunning(false);
+ // If the batch has data to max capacity or stream doesn't have anything
+ // try to ship it
+ if (updateBatchAndShippingQueue(entryStream, batch, hasNext, false)) {
+ sleepMultiplier = 1;
}
+ }
+ } catch (IOException | WALEntryStreamRuntimeException e) { // stream related
+ if (handleEofException(e, entryStream, batch)) {
+ sleepMultiplier = 1;
} else {
- Thread.sleep(sleepForRetries);
+ if (sleepMultiplier < maxRetriesMultiplier) {
+ LOG.debug("Failed to read stream of replication entries: " + e);
+ sleepMultiplier++;
+ } else {
+ LOG.error("Failed to read stream of replication entries", e);
+ }
+ Threads.sleep(sleepForRetries * sleepMultiplier);
}
- resetStream(entryStream);
- }
- } catch (IOException | WALEntryStreamRuntimeException e) { // stream related
- if (sleepMultiplier < maxRetriesMultiplier) {
- LOG.debug("Failed to read stream of replication entries: " + e);
- sleepMultiplier++;
- } else {
- LOG.error("Failed to read stream of replication entries", e);
- handleEofException(e);
+ } catch (InterruptedException e) {
+ LOG.trace("Interrupted while sleeping between WAL reads");
+ Thread.currentThread().interrupt();
+ } finally {
+ entryStream.close();
}
- Threads.sleep(sleepForRetries * sleepMultiplier);
- } catch (InterruptedException e) {
- LOG.trace("Interrupted while sleeping between WAL reads");
- Thread.currentThread().interrupt();
}
+ } catch (IOException e) {
+ if (sleepMultiplier < maxRetriesMultiplier) {
+ LOG.debug("Failed to read stream of replication entries: " + e);
+ sleepMultiplier++;
+ } else {
+ LOG.error("Failed to read stream of replication entries", e);
+ }
+ Threads.sleep(sleepForRetries * sleepMultiplier);
+ } catch (InterruptedException e) {
+ LOG.trace("Interrupted while sleeping between WAL reads");
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Update the batch try to ship and return true if shipped
+ * @param entryStream stream of the WALs
+ * @param batch Batch of entries to ship
+ * @param hasMoreData if the stream has more yet more data to read
+ * @param isEOFException if we have hit the EOF exception before this. For EOF exception,
+ * we do not want to reset the stream since entry stream doesn't
+ * have correct information.
+ * @return if batch is shipped successfully
+ * @throws InterruptedException throws interrupted exception
+ * @throws IOException throws io exception from stream
+ */
+ private boolean updateBatchAndShippingQueue(WALEntryStream entryStream, WALEntryBatch batch,
+ boolean hasMoreData, boolean isEOFException) throws InterruptedException, IOException {
+ updateBatch(entryStream, batch, hasMoreData, isEOFException);
+ boolean isDataQueued = false;
+ if (isShippable(batch)) {
+ isDataQueued = true;
+ entryBatchQueue.put(batch);
+ if (!batch.hasMoreEntries()) {
+ // we're done with queue recovery, shut ourselves down
+ LOG.debug("Stopping the reader after recovering the queue");
+ setReaderRunning(false);
+ }
+ } else {
+ Thread.sleep(sleepForRetries);
+ }
+
+ if (!isEOFException) {
+ resetStream(entryStream);
}
+ return isDataQueued;
}
- private void updateBatch(WALEntryStream entryStream, WALEntryBatch batch, boolean moreData) {
+ private void updateBatch(WALEntryStream entryStream, WALEntryBatch batch, boolean moreData,
+ boolean isEOFException) {
logMessage(batch);
- batch.updatePosition(entryStream);
+ // In case of EOF exception we can utilize the last read path and position
+ // since we do not have the current information.
+ if (isEOFException) {
+ batch.updatePosition(lastReadPath, lastReadPosition);
+ } else {
+ batch.updatePosition(entryStream.getCurrentPath(), entryStream.getPosition());
+ }
batch.setMoreEntries(!replicationQueueInfo.isQueueRecovered() || moreData);
}
@@ -229,10 +287,18 @@ public class ReplicationSourceWALReaderThread extends Thread {
stream.reset(); // reuse stream
}
- // if we get an EOF due to a zero-length log, and there are other logs in queue
- // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
- // enabled, then dump the log
- private void handleEofException(Exception e) {
+ /**
+ * This is to handle the EOFException from the WAL entry stream. EOFException should
+ * be handled carefully because there are chances of data loss because of never replicating
+ * the data.
+ * If EOFException happens on the last log in recovered queue, we can safely stop
+ * the reader.
+ * If EOException doesn't happen on the last log in recovered queue, we should
+ * not stop the reader.
+ * @return true only the IOE can be handled
+ */
+ private boolean handleEofException(Exception e, WALEntryStream entryStream,
+ WALEntryBatch batch) throws InterruptedException {
boolean isRecoveredSource = manager.getOldSources().contains(source);
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
// Dump the log even if logQueue size is 1 if the source is from recovered Source since we don't
@@ -245,11 +311,22 @@ public class ReplicationSourceWALReaderThread extends Thread {
lastReadPath = queue.peek();
logQueue.remove(walGroupId);
lastReadPosition = 0;
+
+ // If it was on last log in the recovered queue,
+ // the stream doesn't have more data, we should stop the reader
+ boolean hasMoreData = !queue.isEmpty();
+ // After we removed the WAL from the queue, we should
+ // try shipping the existing batch of entries, we do not want to reset
+ // stream since entry stream doesn't have the correct data at this point
+ updateBatchAndShippingQueue(entryStream, batch, hasMoreData, true);
+ return true;
}
} catch (IOException ioe) {
LOG.warn("Couldn't get file length information about log " + queue.peek());
}
}
+
+ return false;
}
public Path getCurrentPath() {
@@ -299,7 +376,8 @@ public class ReplicationSourceWALReaderThread extends Thread {
return edit.heapSize() + key.estimatedSerializedSizeOf();
}
- private void updateBatchStats(WALEntryBatch batch, Entry entry, long entryPosition, long entrySize) {
+ private void updateBatchStats(WALEntryBatch batch, Entry entry,
+ long entryPosition, long entrySize) {
WALEdit edit = entry.getEdit();
if (edit != null && !edit.isEmpty()) {
batch.incrementHeapSize(entrySize);
@@ -409,7 +487,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
* Holds a batch of WAL entries to replicate, along with some statistics
*
*/
- static class WALEntryBatch {
+ final static class WALEntryBatch {
private List<Pair<Entry, Long>> walEntriesWithSize;
// last WAL that was read
private Path lastWalPath;
@@ -515,9 +593,15 @@ public class ReplicationSourceWALReaderThread extends Thread {
return walEntriesWithSize.isEmpty();
}
- public void updatePosition(WALEntryStream entryStream) {
- lastWalPath = entryStream.getCurrentPath();
- lastWalPosition = entryStream.getPosition();
+ /**
+ * Update the wal entry batch with latest wal and position which will be used by
+ * shipper to update the log position in ZK node
+ * @param currentPath the path of WAL
+ * @param currentPosition the position of the WAL
+ */
+ public void updatePosition(Path currentPath, long currentPosition) {
+ lastWalPath = currentPath;
+ lastWalPosition = currentPosition;
}
public boolean hasMoreEntries() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index a0b09dd..c667881 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -72,25 +72,23 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
* @param conf {@link Configuration} to use to create {@link Reader} for this stream
* @param metrics replication metrics
* @param walGroupId wal prefix
- * @throws IOException
*/
public WALEntryStream(ReplicationSourceLogQueue logQueue, FileSystem fs, Configuration conf,
- MetricsSource metrics, String walGroupId)
- throws IOException {
+ MetricsSource metrics, String walGroupId) {
this(logQueue, fs, conf, 0, metrics, walGroupId);
}
/**
* Create an entry stream over the given queue at the given start position
* @param logQueue the queue of WAL paths
+ * @param fs {@link FileSystem} to use to create {@link Reader} for this stream
* @param conf the {@link Configuration} to use to create {@link Reader} for this stream
* @param startPosition the position in the first WAL to start reading at
* @param metrics the replication metrics
* @param walGroupId wal prefix
- * @throws IOException
*/
public WALEntryStream(ReplicationSourceLogQueue logQueue, FileSystem fs, Configuration conf,
- long startPosition, MetricsSource metrics, String walGroupId) throws IOException {
+ long startPosition, MetricsSource metrics, String walGroupId) {
this.logQueue = logQueue;
this.fs = fs;
this.conf = conf;
@@ -122,7 +120,9 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
*/
@Override
public Entry next() {
- if (!hasNext()) throw new NoSuchElementException();
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
Entry save = currentEntry;
currentEntry = null; // gets reloaded by hasNext()
return save;
@@ -180,7 +180,7 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
/**
* Should be called if the stream is to be reused (i.e. used again after hasNext() has returned
* false)
- * @throws IOException
+ * @throws IOException io exception while resetting the reader
*/
public void reset() throws IOException {
if (reader != null && currentPath != null) {
@@ -306,7 +306,9 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
Path nextPath = queue.peek();
if (nextPath != null) {
openReader(nextPath);
- if (reader != null) return true;
+ if (reader != null) {
+ return true;
+ }
}
return false;
}
@@ -349,7 +351,9 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
handleFileNotFound(path, fnfe);
} catch (RemoteException re) {
IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
- if (!(ioe instanceof FileNotFoundException)) throw ioe;
+ if (!(ioe instanceof FileNotFoundException)) {
+ throw ioe;
+ }
handleFileNotFound(path, (FileNotFoundException)ioe);
} catch (LeaseNotRecoveredException lnre) {
// HBASE-15019 the WAL was not closed due to some hiccup.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
index ce185f4..b0a2a8c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
@@ -34,6 +34,8 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.internal.verification.VerificationModeFactory.times;
+import com.google.common.collect.Lists;
+
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
@@ -46,10 +48,10 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
@@ -75,6 +77,7 @@ import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSource
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.ByteStringer;
@@ -86,6 +89,7 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALProvider;
+
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -125,6 +129,7 @@ public class TestReplicationSource {
if (FS.exists(logDir)) {
FS.delete(logDir, true);
}
+ conf.setBoolean("replication.source.eof.autorecovery", true);
}
@Before
@@ -244,7 +249,6 @@ public class TestReplicationSource {
}
});
-
}
private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException {
@@ -295,12 +299,16 @@ public class TestReplicationSource {
Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyBoolean());
}
- ReplicationSource createReplicationSourceWithMocks(ReplicationEndpoint endpoint)
- throws IOException {
+ ReplicationSource createReplicationSourceWithMocks(ReplicationEndpoint endpoint,
+ boolean isRecovered) throws IOException {
final ReplicationSource source = new ReplicationSource();
endpoint.init(context);
source.init(conf, FS, manager, queues, peers, mock(Stoppable.class),
- "testPeerClusterZnode", UUID.randomUUID(), endpoint, metrics);
+ "testPeerClusterZnode", UUID.randomUUID(), endpoint, metrics);
+ if (isRecovered) {
+ when(manager.getOldSources())
+ .thenReturn(Lists.<ReplicationSourceInterface>newArrayList(source));
+ }
return source;
}
@@ -321,48 +329,54 @@ public class TestReplicationSource {
@Test
public void testSetLogPositionForWALCurrentlyReadingWhenLogsRolled() throws Exception {
final int numWALEntries = 5;
- conf.setInt("replication.source.nb.capacity", numWALEntries);
+ int nbCapacity = conf.getInt("replication.source.nb.capacity", 25000);
+ try {
+ conf.setInt("replication.source.nb.capacity", numWALEntries);
- Mocks mocks = new Mocks();
- final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() {
- @Override
- public WALEntryFilter getWALEntryfilter() {
- return null;
- }
- };
- WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "test");
- final Path log1 = new Path(logDir, "log.1");
- final Path log2 = new Path(logDir, "log.2");
+ Mocks mocks = new Mocks();
+ final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() {
+ @Override public WALEntryFilter getWALEntryfilter() {
+ return null;
+ }
+ };
+ WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "test");
+ final Path log1 = new Path(logDir, "log.1");
+ final Path log2 = new Path(logDir, "log.2");
- WALProvider.Writer writer1 = WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration());
- WALProvider.Writer writer2 = WALFactory.createWALWriter(FS, log2, TEST_UTIL.getConfiguration());
+ WALProvider.Writer writer1
+ = WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration());
+ WALProvider.Writer writer2
+ = WALFactory.createWALWriter(FS, log2, TEST_UTIL.getConfiguration());
- appendEntries(writer1, 3);
- appendEntries(writer2, 2);
+ appendEntries(writer1, 3);
+ appendEntries(writer2, 2);
- long pos = getPosition(wals, log2, 2);
+ long pos = getPosition(wals, log2, 2);
- final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint);
- source.run();
+ final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, false);
+ source.run();
- source.enqueueLog(log1);
- // log rolled
- source.enqueueLog(log2);
+ source.enqueueLog(log1);
+ // log rolled
+ source.enqueueLog(log2);
- Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
- @Override public boolean evaluate() throws Exception {
- return endpoint.replicateCount.get() > 0;
- }
- });
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() {
+ return endpoint.replicateCount.get() > 0;
+ }
+ });
- ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
- ArgumentCaptor<Long> positionCaptor = ArgumentCaptor.forClass(Long.class);
- verify(mocks.manager, times(1))
+ ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+ ArgumentCaptor<Long> positionCaptor = ArgumentCaptor.forClass(Long.class);
+ verify(mocks.manager, times(1))
.logPositionAndCleanOldLogs(pathCaptor.capture(), anyString(), positionCaptor.capture(),
- anyBoolean(), anyBoolean());
- assertTrue(endpoint.lastEntries.size() == 5);
- assertThat(pathCaptor.getValue(), is(log2));
- assertThat(positionCaptor.getValue(), is(pos));
+ anyBoolean(), anyBoolean());
+ assertTrue(endpoint.lastEntries.size() == 5);
+ assertThat(pathCaptor.getValue(), is(log2));
+ assertThat(positionCaptor.getValue(), is(pos));
+ } finally {
+ conf.setInt("replication.source.nb.capacity", nbCapacity);
+ }
}
@Test
@@ -405,7 +419,7 @@ public class TestReplicationSource {
writer.close();
Mocks mocks = new Mocks();
- final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint);
+ final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, false);
source.run();
source.enqueueLog(log);
@@ -423,7 +437,7 @@ public class TestReplicationSource {
Mocks mocks = new Mocks();
final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest();
- final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint);
+ final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, false);
WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "test");
final Path log1 = new Path(logDir, "log.1");
@@ -475,7 +489,7 @@ public class TestReplicationSource {
final long pos = getPosition(wals, log2, 2);
final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest();
- final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint);
+ final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, false);
source.enqueueLog(log1);
source.enqueueLog(log2);
source.run();
@@ -529,7 +543,7 @@ public class TestReplicationSource {
}
};
- final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint);
+ final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, false);
source.run();
source.enqueueLog(log1);
@@ -556,6 +570,173 @@ public class TestReplicationSource {
});
}
+ @Test
+ public void testReplicationOnEmptyLogAtTheEndOfQueueWithMultipleLogs() throws Exception {
+ final String logPrefix = "logPrefix";
+ Mocks mocks = new Mocks();
+ // set table cfs to filter all cells out
+ final TableName replicatedTable = TableName.valueOf("replicated_table");
+ final Map<TableName, List<String>> cfs =
+ Collections.singletonMap(replicatedTable, Collections.<String>emptyList());
+ when(mocks.peer.getTableCFs()).thenReturn(cfs);
+
+ // Append 3 entries in a log
+ final Path log1 = new Path(logDir, logPrefix + ".1");
+ WALProvider.Writer writer1 = WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration());
+ appendEntries(writer1, 3);
+
+ // Create a 0 length log.
+ Path emptyLog = new Path(logDir, logPrefix + ".2");
+ FSDataOutputStream fsdos = FS.create(emptyLog);
+ fsdos.close();
+ assertEquals(0, FS.getFileStatus(emptyLog).getLen());
+
+ // Replication end point with no filter
+ final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() {
+ @Override
+ public WALEntryFilter getWALEntryfilter() {
+ return null;
+ }
+ };
+
+ final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, true);
+ source.run();
+ source.enqueueLog(log1);
+ source.enqueueLog(emptyLog);
+
+ // Wait for source to replicate
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() {
+ return endpoint.replicateCount.get() == 1;
+ }
+ });
+
+ // Wait and verify if all the entries get replicated for non empty logs
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() {
+ return endpoint.lastEntries.size() == 3;
+ }
+ });
+
+ // Wait and verify if log queue has been drained fully
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() {
+ return source.getQueues().get(logPrefix).isEmpty();
+ }
+ });
+ }
+
+ @Test
+ public void testReplicationOnEmptyLogAtTheEndOfQueueWithSingleLog() throws Exception {
+ final String logPrefix = "logPrefix";
+ Mocks mocks = new Mocks();
+ // set table cfs to filter all cells out
+ final TableName replicatedTable = TableName.valueOf("replicated_table");
+ final Map<TableName, List<String>> cfs =
+ Collections.singletonMap(replicatedTable, Collections.<String>emptyList());
+ when(mocks.peer.getTableCFs()).thenReturn(cfs);
+
+ // Create a 0 length log.
+ Path emptyLog = new Path(logDir, logPrefix + ".1");
+ FSDataOutputStream fsdos = FS.create(emptyLog);
+ fsdos.close();
+ assertEquals(0, FS.getFileStatus(emptyLog).getLen());
+
+ // Replication end point with no filter
+ final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() {
+ @Override
+ public WALEntryFilter getWALEntryfilter() {
+ return null;
+ }
+ };
+
+ final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, true);
+ source.run();
+ source.enqueueLog(emptyLog);
+
+ // Wait and verify if no entry got replicated
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() {
+ return endpoint.lastEntries == null;
+ }
+ });
+
+ // Wait and verify get is queue is empty
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() {
+ return source.getQueues().get(logPrefix).isEmpty();
+ }
+ });
+ }
+
+ @Test
+ public void testReplicationOnEmptyLogBetweenTheNonEmptyLogsInLogQueue() throws Exception {
+ final String logPrefix = "logPrefix";
+ Mocks mocks = new Mocks();
+ // set table cfs to filter all cells out
+ final TableName replicatedTable = TableName.valueOf("replicated_table");
+ final Map<TableName, List<String>> cfs =
+ Collections.singletonMap(replicatedTable, Collections.<String>emptyList());
+ when(mocks.peer.getTableCFs()).thenReturn(cfs);
+
+ // Append 3 entries in a log
+ final Path log1 = new Path(logDir, logPrefix + ".11");
+ WALProvider.Writer writer1 = WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration());
+ appendEntries(writer1, 3);
+
+ // Create a 0 length log.
+ Path emptyLog = new Path(logDir, logPrefix + ".12");
+ FSDataOutputStream fsdos = FS.create(emptyLog);
+ fsdos.close();
+ assertEquals(0, FS.getFileStatus(emptyLog).getLen());
+
+ // Append 5 entries in a log
+ final Path log3 = new Path(logDir, logPrefix + ".13");
+ WALProvider.Writer writer3 = WALFactory.createWALWriter(FS, log3, TEST_UTIL.getConfiguration());
+ appendEntries(writer3, 5);
+
+ // Append 10 entries in a log
+ final Path log4 = new Path(logDir, logPrefix + ".14");
+ WALProvider.Writer writer4 = WALFactory.createWALWriter(FS, log4, TEST_UTIL.getConfiguration());
+ appendEntries(writer4, 10);
+
+ // Replication end point with no filter
+ final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() {
+ @Override
+ public WALEntryFilter getWALEntryfilter() {
+ return null;
+ }
+ };
+
+ final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, true);
+ source.run();
+ source.enqueueLog(log1);
+ source.enqueueLog(emptyLog);
+ source.enqueueLog(log3);
+ source.enqueueLog(log4);
+
+ // Wait for source to replicate
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() {
+ return endpoint.replicateCount.get() == 2;
+ }
+ });
+
+ // Wait and verify the last replicated entries
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() {
+ return endpoint.lastEntries.size() == 15;
+ }
+ });
+
+ // Wait and verify only one log is there in queue
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() {
+ return source.getQueues().get(logPrefix).size() == 1;
+ }
+ });
+ }
+
/**
* Tests that recovered queues are preserved on a regionserver shutdown.
* See HBASE-18192
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceBase.java
index e94985e..ab4d19d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceBase.java
@@ -76,7 +76,6 @@ public abstract class TestReplicationSourceBase {
protected static DummyServer server;
@BeforeClass public static void setUpBeforeClass() throws Exception {
-
conf = HBaseConfiguration.create();
conf.set("replication.replicationsource.implementation",
ReplicationSourceDummyWithNoTermination.class.getCanonicalName());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index adf427b..0b01c5f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -81,6 +81,7 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.AfterClass;
@@ -741,9 +742,45 @@ public class TestWALEntryStream {
new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(),
localLogQueue, 0, fs, conf, getDummyFilter(), getMockMetrics(), source, fakeWalGroupId);
reader.run();
+ assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
+ }
+
+ @Test
+ public void testEOFExceptionForRecoveredQueueWithMultipleLogs() throws Exception {
+ ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, getMockMetrics());
+ // Create a 0 length log.
+ Path emptyLog = new Path("log.2");
+ FSDataOutputStream fsdos = fs.create(emptyLog);
+ fsdos.close();
+ assertEquals(0, fs.getFileStatus(emptyLog).getLen());
+ localLogQueue.enqueueLog(emptyLog, fakeWalGroupId);
+
+ final Path log1 = new Path("log.1");
+ WALProvider.Writer writer1 = WALFactory.createWALWriter(fs, log1, TEST_UTIL.getConfiguration());
+ appendEntries(writer1, 3);
+ localLogQueue.enqueueLog(log1, fakeWalGroupId);
+
+ ReplicationSource source = Mockito.mock(ReplicationSource.class);
+ ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
+ // Make it look like the source is from recovered source.
+ when(mockSourceManager.getOldSources())
+ .thenReturn(new ArrayList<>(Arrays.asList((ReplicationSourceInterface)source)));
+ when(source.isPeerEnabled()).thenReturn(true);
+ when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+ // Override the max retries multiplier to fail fast.
+ conf.setInt("replication.source.maxretriesmultiplier", 1);
+ conf.setBoolean("replication.source.eof.autorecovery", true);
+ // Create a reader thread.
+ ReplicationSourceWALReaderThread reader =
+ new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(),
+ localLogQueue, 0, fs, conf, getDummyFilter(), getMockMetrics(), source, fakeWalGroupId);
+ assertEquals("Initial log queue size is not correct",
+ 2, localLogQueue.getQueueSize(fakeWalGroupId));
+ reader.run();
+
// ReplicationSourceWALReaderThread#handleEofException method will
// remove empty log from logQueue.
- assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
+ assertEquals("Log queue should be empty", 0, localLogQueue.getQueueSize(fakeWalGroupId));
}
private PriorityBlockingQueue<Path> getQueue() {
@@ -757,4 +794,21 @@ public class TestWALEntryStream {
doNothing().when(source).setOldestWalAge(Mockito.anyInt());
return source;
}
+
+ private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException {
+ for (int i = 0; i < numEntries; i++) {
+ byte[] b = Bytes.toBytes(Integer.toString(i));
+ KeyValue kv = new KeyValue(b,b,b);
+ WALEdit edit = new WALEdit();
+ edit.add(kv);
+ WALKey key = new WALKey(b, TableName.valueOf(b), 0, 0,
+ HConstants.DEFAULT_CLUSTER_ID);
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+ scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
+ key.setScopes(scopes);
+ writer.append(new WAL.Entry(key, edit));
+ writer.sync(false);
+ }
+ writer.close();
+ }
}