You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2021/03/02 20:29:43 UTC
[hbase] branch branch-2 updated: Revert "HBASE-25596: Fix NPE and
avoid permanent unreplicated data due to EOF (#2990)"
This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 0849fae Revert "HBASE-25596: Fix NPE and avoid permanent unreplicated data due to EOF (#2990)"
0849fae is described below
commit 0849fae7304696f8e00e26e2b6057e3816c52f87
Author: Andrew Purtell <ap...@apache.org>
AuthorDate: Tue Mar 2 12:26:23 2021 -0800
Revert "HBASE-25596: Fix NPE and avoid permanent unreplicated data due to EOF (#2990)"
This reverts commit d724d0576fcafb507b32d3f6636228d559a924fa.
---
.../regionserver/ReplicationSource.java | 8 +-
.../regionserver/ReplicationSourceWALReader.java | 152 ++++-------
.../SerialReplicationSourceWALReader.java | 4 +-
.../replication/regionserver/WALEntryBatch.java | 4 -
.../replication/regionserver/WALEntryStream.java | 6 +-
.../hbase/replication/TestReplicationBase.java | 66 ++---
.../TestReplicationEmptyWALRecovery.java | 298 ++-------------------
.../regionserver/TestWALEntryStream.java | 62 +----
8 files changed, 99 insertions(+), 501 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 8c7f0a6..e654a5b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -29,12 +29,12 @@ import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
/**
@@ -263,11 +264,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
}
}
- @InterfaceAudience.Private
- public Map<String, PriorityBlockingQueue<Path>> getQueues() {
- return logQueue.getQueues();
- }
-
@Override
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 57c0a16..f52a83a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -41,6 +41,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
@@ -122,64 +123,44 @@ class ReplicationSourceWALReader extends Thread {
@Override
public void run() {
int sleepMultiplier = 1;
- WALEntryBatch batch = null;
- WALEntryStream entryStream = null;
- try {
- // we only loop back here if something fatal happened to our stream
- while (isReaderRunning()) {
- try {
- entryStream =
- new WALEntryStream(logQueue, conf, currentPosition, source.getWALFileLengthProvider(),
- source.getServerWALsBelongTo(), source.getSourceMetrics(), walGroupId);
- while (isReaderRunning()) { // loop here to keep reusing stream while we can
- if (!source.isPeerEnabled()) {
- Threads.sleep(sleepForRetries);
- continue;
- }
- if (!checkQuota()) {
- continue;
- }
-
- batch = createBatch(entryStream);
- batch = readWALEntries(entryStream, batch);
- currentPosition = entryStream.getPosition();
- if (batch == null) {
- // either the queue have no WAL to read
- // or got no new entries (didn't advance position in WAL)
- handleEmptyWALEntryBatch();
- entryStream.reset(); // reuse stream
- } else {
- addBatchToShippingQueue(batch);
- }
+ while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
+ try (WALEntryStream entryStream =
+ new WALEntryStream(logQueue, conf, currentPosition,
+ source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
+ source.getSourceMetrics(), walGroupId)) {
+ while (isReaderRunning()) { // loop here to keep reusing stream while we can
+ if (!source.isPeerEnabled()) {
+ Threads.sleep(sleepForRetries);
+ continue;
+ }
+ if (!checkQuota()) {
+ continue;
}
- } catch (IOException e) { // stream related
- if (handleEofException(e, batch)) {
+ WALEntryBatch batch = readWALEntries(entryStream);
+ currentPosition = entryStream.getPosition();
+ if (batch != null) {
+ // need to propagate the batch even it has no entries since it may carry the last
+ // sequence id information for serial replication.
+ LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
+ entryBatchQueue.put(batch);
sleepMultiplier = 1;
- } else {
- LOG.warn("Failed to read stream of replication entries", e);
- if (sleepMultiplier < maxRetriesMultiplier) {
- sleepMultiplier++;
- }
- Threads.sleep(sleepForRetries * sleepMultiplier);
+ } else { // got no entries and didn't advance position in WAL
+ handleEmptyWALEntryBatch(entryStream.getCurrentPath());
+ entryStream.reset(); // reuse stream
}
- } catch (InterruptedException e) {
- LOG.trace("Interrupted while sleeping between WAL reads");
- Thread.currentThread().interrupt();
- } finally {
- entryStream.close();
}
+ } catch (IOException e) { // stream related
+ if (!handleEofException(e)) {
+ LOG.warn("Failed to read stream of replication entries", e);
+ if (sleepMultiplier < maxRetriesMultiplier) {
+ sleepMultiplier ++;
+ }
+ Threads.sleep(sleepForRetries * sleepMultiplier);
+ }
+ } catch (InterruptedException e) {
+ LOG.trace("Interrupted while sleeping between WAL reads");
+ Thread.currentThread().interrupt();
}
- } catch (IOException e) {
- if (sleepMultiplier < maxRetriesMultiplier) {
- LOG.debug("Failed to read stream of replication entries: " + e);
- sleepMultiplier++;
- } else {
- LOG.error("Failed to read stream of replication entries", e);
- }
- Threads.sleep(sleepForRetries * sleepMultiplier);
- } catch (InterruptedException e) {
- LOG.trace("Interrupted while sleeping between WAL reads");
- Thread.currentThread().interrupt();
}
}
@@ -208,19 +189,14 @@ class ReplicationSourceWALReader extends Thread {
return newPath == null || !path.getName().equals(newPath.getName());
}
- // We need to get the WALEntryBatch from the caller so we can add entries in there
- // This is required in case there is any exception in while reading entries
- // we do want to loss the existing entries in the batch
- protected WALEntryBatch readWALEntries(WALEntryStream entryStream,
- WALEntryBatch batch) throws IOException, InterruptedException {
+ protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
+ throws IOException, InterruptedException {
Path currentPath = entryStream.getCurrentPath();
if (!entryStream.hasNext()) {
// check whether we have switched a file
if (currentPath != null && switched(entryStream, currentPath)) {
return WALEntryBatch.endOfFile(currentPath);
} else {
- // This would mean either no more files in the queue
- // or there is no new data yet on the current wal
return null;
}
}
@@ -232,7 +208,7 @@ class ReplicationSourceWALReader extends Thread {
// when reading from the entry stream first time we will enter here
currentPath = entryStream.getCurrentPath();
}
- batch.setLastWalPath(currentPath);
+ WALEntryBatch batch = createBatch(entryStream);
for (;;) {
Entry entry = entryStream.next();
batch.setLastWalPosition(entryStream.getPosition());
@@ -255,12 +231,10 @@ class ReplicationSourceWALReader extends Thread {
return batch;
}
- private void handleEmptyWALEntryBatch() throws InterruptedException {
+ private void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
LOG.trace("Didn't read any new entries from WAL");
- if (logQueue.getQueue(walGroupId).isEmpty()) {
- // we're done with current queue, either this is a recovered queue, or it is the special group
- // for a sync replication peer and the peer has been transited to DA or S state.
- LOG.debug("Stopping the replication source wal reader");
+ if (source.isRecovered()) {
+ // we're done with queue recovery, shut ourself down
setReaderRunning(false);
// shuts down shipper thread immediately
entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
@@ -270,38 +244,22 @@ class ReplicationSourceWALReader extends Thread {
}
/**
- * This is to handle the EOFException from the WAL entry stream. EOFException should
- * be handled carefully because there are chances of data loss because of never replicating
- * the data. Thus we should always try to ship existing batch of entries here.
- * If there was only one log in the queue before EOF, we ship the empty batch here
- * and since reader is still active, in the next iteration of reader we will
- * stop the reader.
- * If there was more than one log in the queue before EOF, we ship the existing batch
- * and reset the wal patch and position to the log with EOF, so shipper can remove
- * logs from replication queue
+ * if we get an EOF due to a zero-length log, and there are other logs in queue
+ * (highly likely we've closed the current log), and autorecovery is
+ * enabled, then dump the log
* @return true only the IOE can be handled
*/
- private boolean handleEofException(IOException e, WALEntryBatch batch)
- throws InterruptedException {
+ private boolean handleEofException(IOException e) {
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
// Dump the log even if logQueue size is 1 if the source is from recovered Source
// since we don't add current log to recovered source queue so it is safe to remove.
- if ((e instanceof EOFException || e.getCause() instanceof EOFException)
- && (source.isRecovered() || queue.size() > 1)
- && this.eofAutoRecovery) {
- Path head = queue.peek();
+ if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
+ (source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
try {
- if (fs.getFileStatus(head).getLen() == 0) {
- // head of the queue is an empty log file
- LOG.warn("Forcing removal of 0 length log in queue: {}", head);
+ if (fs.getFileStatus(queue.peek()).getLen() == 0) {
+ LOG.warn("Forcing removal of 0 length log in queue: {}", queue.peek());
logQueue.remove(walGroupId);
currentPosition = 0;
- // After we removed the WAL from the queue, we should
- // try shipping the existing batch of entries and set the wal position
- // and path to the wal just dequeued to correctly remove logs from the zk
- batch.setLastWalPath(head);
- batch.setLastWalPosition(currentPosition);
- addBatchToShippingQueue(batch);
return true;
}
} catch (IOException ioe) {
@@ -311,20 +269,6 @@ class ReplicationSourceWALReader extends Thread {
return false;
}
- /**
- * Update the batch try to ship and return true if shipped
- * @param batch Batch of entries to ship
- * @throws InterruptedException throws interrupted exception
- * @throws IOException throws io exception from stream
- */
- private void addBatchToShippingQueue(WALEntryBatch batch)
- throws InterruptedException, IOException {
- // need to propagate the batch even it has no entries since it may carry the last
- // sequence id information for serial replication.
- LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
- entryBatchQueue.put(batch);
- }
-
public Path getCurrentPath() {
// if we've read some WAL entries, get the Path we read from
WALEntryBatch batchQueueHead = entryBatchQueue.peek();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
index 254dc4a..d0e76fb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
@@ -50,7 +50,7 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
}
@Override
- protected WALEntryBatch readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
+ protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
throws IOException, InterruptedException {
Path currentPath = entryStream.getCurrentPath();
if (!entryStream.hasNext()) {
@@ -70,7 +70,7 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
currentPath = entryStream.getCurrentPath();
}
long positionBefore = entryStream.getPosition();
- batch = createBatch(entryStream);
+ WALEntryBatch batch = createBatch(entryStream);
for (;;) {
Entry entry = entryStream.peek();
boolean doFiltering = true;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
index 8301dff..4f96c96 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -94,10 +94,6 @@ class WALEntryBatch {
return lastWalPath;
}
- public void setLastWalPath(Path lastWalPath) {
- this.lastWalPath = lastWalPath;
- }
-
/**
* @return the position in the last WAL that was read.
*/
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index 721a122..5b8f057 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -80,7 +80,7 @@ class WALEntryStream implements Closeable {
* @param walFileLengthProvider provides the length of the WAL file
* @param serverName the server name which all WALs belong to
* @param metrics the replication metrics
- * @throws IOException throw IO exception from stream
+ * @throws IOException
*/
public WALEntryStream(ReplicationSourceLogQueue logQueue, Configuration conf,
long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
@@ -368,9 +368,7 @@ class WALEntryStream implements Closeable {
handleFileNotFound(path, fnfe);
} catch (RemoteException re) {
IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
- if (!(ioe instanceof FileNotFoundException)) {
- throw ioe;
- }
+ if (!(ioe instanceof FileNotFoundException)) throw ioe;
handleFileNotFound(path, (FileNotFoundException)ioe);
} catch (LeaseNotRecoveredException lnre) {
// HBASE-15019 the WAL was not closed due to some hiccup.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index 5fb51e5..96e86ff 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -20,10 +20,10 @@ package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -43,10 +43,8 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
-import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
-import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.junit.After;
import org.junit.AfterClass;
@@ -54,7 +52,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
/**
@@ -65,8 +63,7 @@ import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
*/
public class TestReplicationBase {
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class);
- private static Connection connection1;
- private static Connection connection2;
+
protected static Configuration CONF_WITH_LOCALFS;
protected static ReplicationAdmin admin;
@@ -87,8 +84,6 @@ public class TestReplicationBase {
NB_ROWS_IN_BATCH * 10;
protected static final long SLEEP_TIME = 500;
protected static final int NB_RETRIES = 50;
- protected static AtomicInteger replicateCount = new AtomicInteger();
- protected static volatile List<WAL.Entry> replicatedEntries = Lists.newArrayList();
protected static final TableName tableName = TableName.valueOf("test");
protected static final byte[] famName = Bytes.toBytes("f");
@@ -243,26 +238,26 @@ public class TestReplicationBase {
// as a component in deciding maximum number of parallel batches to send to the peer cluster.
UTIL2.startMiniCluster(NUM_SLAVES2);
- connection1 = ConnectionFactory.createConnection(CONF1);
- connection2 = ConnectionFactory.createConnection(CONF2);
- hbaseAdmin = connection1.getAdmin();
+ admin = new ReplicationAdmin(CONF1);
+ hbaseAdmin = ConnectionFactory.createConnection(CONF1).getAdmin();
TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
- try (
- Admin admin1 = connection1.getAdmin();
- Admin admin2 = connection2.getAdmin()) {
+ Connection connection1 = ConnectionFactory.createConnection(CONF1);
+ Connection connection2 = ConnectionFactory.createConnection(CONF2);
+ try (Admin admin1 = connection1.getAdmin()) {
admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
+ }
+ try (Admin admin2 = connection2.getAdmin()) {
admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
- UTIL1.waitUntilAllRegionsAssigned(tableName);
- htable1 = connection1.getTable(tableName);
- UTIL2.waitUntilAllRegionsAssigned(tableName);
- htable2 = connection2.getTable(tableName);
}
-
+ UTIL1.waitUntilAllRegionsAssigned(tableName);
+ UTIL2.waitUntilAllRegionsAssigned(tableName);
+ htable1 = connection1.getTable(tableName);
+ htable2 = connection2.getTable(tableName);
}
@BeforeClass
@@ -278,10 +273,9 @@ public class TestReplicationBase {
@Before
public void setUpBase() throws Exception {
if (!peerExist(PEER_ID2)) {
- ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
- .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()).setReplicationEndpointImpl(
- ReplicationEndpointTest.class.getName());
- hbaseAdmin.addReplicationPeer(PEER_ID2, builder.build());
+ ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
+ .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()).build();
+ hbaseAdmin.addReplicationPeer(PEER_ID2, rpc);
}
}
@@ -357,33 +351,7 @@ public class TestReplicationBase {
if (admin != null) {
admin.close();
}
- if (hbaseAdmin != null) {
- hbaseAdmin.close();
- }
-
- if (connection2 != null) {
- connection2.close();
- }
- if (connection1 != null) {
- connection1.close();
- }
UTIL2.shutdownMiniCluster();
UTIL1.shutdownMiniCluster();
}
-
- /**
- * Custom replication endpoint to keep track of replication status for tests.
- */
- public static class ReplicationEndpointTest extends HBaseInterClusterReplicationEndpoint {
- public ReplicationEndpointTest() {
- replicateCount.set(0);
- }
-
- @Override public boolean replicate(ReplicateContext replicateContext) {
- replicateCount.incrementAndGet();
- replicatedEntries.addAll(replicateContext.getEntries());
-
- return super.replicate(replicateContext);
- }
- }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
index 2d72618..c0f22a9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
@@ -6,7 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,99 +20,56 @@ package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.NavigableMap;
-import java.util.TreeMap;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.WALKeyImpl;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-@Category
- ({ ReplicationTests.class, LargeTests.class }) public class TestReplicationEmptyWALRecovery
- extends TestReplicationBase {
- MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
- static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
- NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
- @ClassRule public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class);
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class);
@Before
public void setUp() throws IOException, InterruptedException {
cleanUp();
- scopes.put(famName, HConstants.REPLICATION_SCOPE_GLOBAL);
- replicateCount.set(0);
- replicatedEntries.clear();
}
/**
* Waits until there is only one log(the current writing one) in the replication queue
- *
- * @param numRs number of region servers
+ * @param numRs number of regionservers
*/
- private void waitForLogAdvance(int numRs) {
- Waiter.waitFor(CONF1, 100000, new Waiter.Predicate<Exception>() {
+ private void waitForLogAdvance(int numRs) throws Exception {
+ Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
for (int i = 0; i < numRs; i++) {
HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
RegionInfo regionInfo =
- UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+ UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
WAL wal = hrs.getWAL(regionInfo);
Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName();
- Replication replicationService =
- (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
- for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
- .getSources()) {
- ReplicationSource source = (ReplicationSource) rsi;
- // We are making sure that there is only one log queue and that is for the
- // current WAL of region server
- String logPrefix = source.getQueues().keySet().stream().findFirst().get();
- if (!currentFile.equals(source.getCurrentPath())
- || source.getQueues().keySet().size() != 1
- || source.getQueues().get(logPrefix).size() != 1) {
- return false;
- }
- }
- }
- return true;
- }
- });
- }
-
- private void verifyNumberOfLogsInQueue(int numQueues, int numRs) {
- Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() {
- @Override
- public boolean evaluate() {
- for (int i = 0; i < numRs; i++) {
- Replication replicationService =
- (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
+ Replication replicationService = (Replication) UTIL1.getHBaseCluster()
+ .getRegionServer(i).getReplicationSourceService();
for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
- .getSources()) {
+ .getSources()) {
ReplicationSource source = (ReplicationSource) rsi;
- String logPrefix = source.getQueues().keySet().stream().findFirst().get();
- if (source.getQueues().get(logPrefix).size() != numQueues) {
+ if (!currentFile.equals(source.getCurrentPath())) {
return false;
}
}
@@ -123,211 +82,25 @@ import org.junit.experimental.categories.Category;
@Test
public void testEmptyWALRecovery() throws Exception {
final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
- // for each RS, create an empty wal with same walGroupId
- final List<Path> emptyWalPaths = new ArrayList<>();
- long ts = System.currentTimeMillis();
- for (int i = 0; i < numRs; i++) {
- RegionInfo regionInfo =
- UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
- WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
- Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
- String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
- Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
- UTIL1.getTestFileSystem().create(emptyWalPath).close();
- emptyWalPaths.add(emptyWalPath);
- }
- injectEmptyWAL(numRs, emptyWalPaths);
-
- // ReplicationSource should advance past the empty wal, or else the test will fail
- waitForLogAdvance(numRs);
- verifyNumberOfLogsInQueue(1, numRs);
- // we're now writing to the new wal
- // if everything works, the source should've stopped reading from the empty wal, and start
- // replicating from the new wal
- runSimplePutDeleteTest();
- rollWalsAndWaitForDeque(numRs);
- }
-
- /**
- * Test empty WAL along with non empty WALs in the same batch. This test is to make sure
- * when we see the empty and handle the EOF exception, we are able to existing the previous
- * batch of entries without loosing it. This test also tests the number of batches shipped
- *
- * @throws Exception throws any exception
- */
- @Test
- public void testReplicationOfEmptyWALFollowingNonEmptyWAL() throws Exception {
- // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
- hbaseAdmin.disableReplicationPeer(PEER_ID2);
- int numOfEntriesToReplicate = 20;
-
- final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
// for each RS, create an empty wal with same walGroupId
final List<Path> emptyWalPaths = new ArrayList<>();
long ts = System.currentTimeMillis();
for (int i = 0; i < numRs; i++) {
RegionInfo regionInfo =
- UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
+ UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
-
- appendEntriesToWal(numOfEntriesToReplicate, wal);
- wal.rollWriter();
- String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
- Path emptyWalPath = new Path(UTIL1.getDefaultRootDirPath(), walGroupId + "." + ts);
- UTIL1.getTestFileSystem().create(emptyWalPath).close();
- emptyWalPaths.add(emptyWalPath);
- }
-
- injectEmptyWAL(numRs, emptyWalPaths);
- // There should be three WALs in queue
- // 1. empty WAL
- // 2. non empty WAL
- // 3. live WAL
- //verifyNumberOfLogsInQueue(3, numRs);
- hbaseAdmin.enableReplicationPeer(PEER_ID2);
- // ReplicationSource should advance past the empty wal, or else the test will fail
- waitForLogAdvance(numRs);
-
- // Now we should expect numOfEntriesToReplicate entries
- // replicated from each region server. This makes sure we didn't loose data
- // from any previous batch when we encounter EOF exception for empty file.
- Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs,
- replicatedEntries.size());
-
- // We expect just one batch of replication which will
- // be from when we handle the EOF exception.
- Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.intValue());
- verifyNumberOfLogsInQueue(1, numRs);
- // we're now writing to the new wal
- // if everything works, the source should've stopped reading from the empty wal, and start
- // replicating from the new wal
- runSimplePutDeleteTest();
- rollWalsAndWaitForDeque(numRs);
- }
-
- /**
- * Test empty WAL along with non empty WALs in the same batch. This test is to make sure
- * when we see the empty WAL and handle the EOF exception, we are able to proceed
- * with next batch and replicate it properly without missing data.
- *
- * @throws Exception throws any exception
- */
- @Test
- public void testReplicationOfEmptyWALFollowedByNonEmptyWAL() throws Exception {
- // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
- hbaseAdmin.disableReplicationPeer(PEER_ID2);
- int numOfEntriesToReplicate = 20;
-
- final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
- // for each RS, create an empty wal with same walGroupId
- final List<Path> emptyWalPaths = new ArrayList<>();
-
- long ts = System.currentTimeMillis();
- WAL wal = null;
- for (int i = 0; i < numRs; i++) {
- RegionInfo regionInfo =
- UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
- wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
- Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
- appendEntriesToWal(numOfEntriesToReplicate, wal);
String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
UTIL1.getTestFileSystem().create(emptyWalPath).close();
emptyWalPaths.add(emptyWalPath);
-
}
- injectEmptyWAL(numRs, emptyWalPaths);
- // roll the WAL now
- for (int i = 0; i < numRs; i++) {
- wal.rollWriter();
- }
- hbaseAdmin.enableReplicationPeer(PEER_ID2);
- // ReplicationSource should advance past the empty wal, or else the test will fail
- waitForLogAdvance(numRs);
-
- // Now we should expect numOfEntriesToReplicate entries
- // replicated from each region server. This makes sure we didn't loose data
- // from any previous batch when we encounter EOF exception for empty file.
- Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs,
- replicatedEntries.size());
-
- // We expect just one batch of replication to be shipped which will
- // for non empty WAL
- Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.get());
- verifyNumberOfLogsInQueue(1, numRs);
- // we're now writing to the new wal
- // if everything works, the source should've stopped reading from the empty wal, and start
- // replicating from the new wal
- runSimplePutDeleteTest();
- rollWalsAndWaitForDeque(numRs);
- }
-
- /**
- * This test make sure we replicate all the enties from the non empty WALs which
- * are surrounding the empty WALs
- *
- * @throws Exception throws exception
- */
- @Test
- public void testReplicationOfEmptyWALSurroundedNonEmptyWAL() throws Exception {
- // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
- hbaseAdmin.disableReplicationPeer(PEER_ID2);
- int numOfEntriesToReplicate = 20;
-
- final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
- // for each RS, create an empty wal with same walGroupId
- final List<Path> emptyWalPaths = new ArrayList<>();
- long ts = System.currentTimeMillis();
- WAL wal = null;
- for (int i = 0; i < numRs; i++) {
- RegionInfo regionInfo =
- UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
- wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
- Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
- appendEntriesToWal(numOfEntriesToReplicate, wal);
- wal.rollWriter();
- String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
- Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
- UTIL1.getTestFileSystem().create(emptyWalPath).close();
- emptyWalPaths.add(emptyWalPath);
- }
- injectEmptyWAL(numRs, emptyWalPaths);
-
- // roll the WAL again with some entries
- for (int i = 0; i < numRs; i++) {
- appendEntriesToWal(numOfEntriesToReplicate, wal);
- wal.rollWriter();
- }
-
- hbaseAdmin.enableReplicationPeer(PEER_ID2);
- // ReplicationSource should advance past the empty wal, or else the test will fail
- waitForLogAdvance(numRs);
-
- // Now we should expect numOfEntriesToReplicate entries
- // replicated from each region server. This makes sure we didn't loose data
- // from any previous batch when we encounter EOF exception for empty file.
- Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs * 2,
- replicatedEntries.size());
-
- // We expect two batch of replication to be shipped which will
- // for non empty WAL
- Assert.assertEquals("Replicated batches are not correct", 2, replicateCount.get());
- verifyNumberOfLogsInQueue(1, numRs);
- // we're now writing to the new wal
- // if everything works, the source should've stopped reading from the empty wal, and start
- // replicating from the new wal
- runSimplePutDeleteTest();
- rollWalsAndWaitForDeque(numRs);
- }
-
- // inject our empty wal into the replication queue, and then roll the original wal, which
- // enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to
- // determine if the file being replicated currently is still opened for write, so just inject a
- // new wal to the replication queue does not mean the previous file is closed.
- private void injectEmptyWAL(int numRs, List<Path> emptyWalPaths) throws IOException {
+ // inject our empty wal into the replication queue, and then roll the original wal, which
+ // enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to
+ // determine if the file being replicated currently is still opened for write, so just inject a
+ // new wal to the replication queue does not mean the previous file is closed.
for (int i = 0; i < numRs; i++) {
HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
Replication replicationService = (Replication) hrs.getReplicationSourceService();
@@ -338,32 +111,13 @@ import org.junit.experimental.categories.Category;
WAL wal = hrs.getWAL(regionInfo);
wal.rollWriter(true);
}
- }
-
- protected WALKeyImpl getWalKeyImpl() {
- return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 0, mvcc, scopes);
- }
- // Roll the WAL and wait for it to get deque from the log queue
- private void rollWalsAndWaitForDeque(int numRs) throws IOException {
- RegionInfo regionInfo =
- UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
- for (int i = 0; i < numRs; i++) {
- WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
- wal.rollWriter();
- }
+ // ReplicationSource should advance past the empty wal, or else the test will fail
waitForLogAdvance(numRs);
- }
- private void appendEntriesToWal(int numEntries, WAL wal) throws IOException {
- long txId = -1;
- for (int i = 0; i < numEntries; i++) {
- byte[] b = Bytes.toBytes(Integer.toString(i));
- KeyValue kv = new KeyValue(b, famName, b);
- WALEdit edit = new WALEdit();
- edit.add(kv);
- txId = wal.appendData(info, getWalKeyImpl(), edit);
- }
- wal.sync(txId);
+ // we're now writing to the new wal
+ // if everything works, the source should've stopped reading from the empty wal, and start
+ // replicating from the new wal
+ runSimplePutDeleteTest();
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index d31b864..9c6fafc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -27,9 +27,9 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
@@ -69,7 +69,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
-import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.AfterClass;
@@ -84,6 +83,7 @@ import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+
@Category({ ReplicationTests.class, LargeTests.class })
public class TestWALEntryStream {
@@ -687,7 +687,6 @@ public class TestWALEntryStream {
// Override the max retries multiplier to fail fast.
conf.setInt("replication.source.maxretriesmultiplier", 1);
conf.setBoolean("replication.source.eof.autorecovery", true);
- conf.setInt("replication.source.nb.batches", 10);
// Create a reader thread with source as recovered source.
ReplicationSource source = mockReplicationSource(true, conf);
when(source.isPeerEnabled()).thenReturn(true);
@@ -706,64 +705,7 @@ public class TestWALEntryStream {
assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
}
- @Test
- public void testEOFExceptionForRecoveredQueueWithMultipleLogs() throws Exception {
- Configuration conf = new Configuration(CONF);
- MetricsSource metrics = mock(MetricsSource.class);
- ReplicationSource source = mockReplicationSource(true, conf);
- ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source);
- // Create a 0 length log.
- Path emptyLog = new Path(fs.getHomeDirectory(),"log.2");
- FSDataOutputStream fsdos = fs.create(emptyLog);
- fsdos.close();
- assertEquals(0, fs.getFileStatus(emptyLog).getLen());
- localLogQueue.enqueueLog(emptyLog, fakeWalGroupId);
-
- final Path log1 = new Path(fs.getHomeDirectory(), "log.1");
- WALProvider.Writer writer1 = WALFactory.createWALWriter(fs, log1, TEST_UTIL.getConfiguration());
- appendEntries(writer1, 3);
- localLogQueue.enqueueLog(log1, fakeWalGroupId);
-
- ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
- // Make it look like the source is from recovered source.
- when(mockSourceManager.getOldSources())
- .thenReturn(new ArrayList<>(Arrays.asList((ReplicationSourceInterface)source)));
- when(source.isPeerEnabled()).thenReturn(true);
- when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
- // Override the max retries multiplier to fail fast.
- conf.setInt("replication.source.maxretriesmultiplier", 1);
- conf.setBoolean("replication.source.eof.autorecovery", true);
- conf.setInt("replication.source.nb.batches", 10);
- // Create a reader thread.
- ReplicationSourceWALReader reader =
- new ReplicationSourceWALReader(fs, conf, localLogQueue, 0,
- getDummyFilter(), source, fakeWalGroupId);
- assertEquals("Initial log queue size is not correct",
- 2, localLogQueue.getQueueSize(fakeWalGroupId));
- reader.run();
-
- // remove empty log from logQueue.
- assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
- assertEquals("Log queue should be empty", 0, localLogQueue.getQueueSize(fakeWalGroupId));
- }
-
private PriorityBlockingQueue<Path> getQueue() {
return logQueue.getQueue(fakeWalGroupId);
}
-
- private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException {
- for (int i = 0; i < numEntries; i++) {
- byte[] b = Bytes.toBytes(Integer.toString(i));
- KeyValue kv = new KeyValue(b,b,b);
- WALEdit edit = new WALEdit();
- edit.add(kv);
- WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0,
- HConstants.DEFAULT_CLUSTER_ID);
- NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
- scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
- writer.append(new WAL.Entry(key, edit));
- writer.sync(false);
- }
- writer.close();
- }
}