You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2020/01/21 09:26:01 UTC
[hbase] branch branch-1 updated: HBASE-23254 Release replication
buffer quota correctly, when batch includes bulk loaded hfiles (#792)
This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1 by this push:
new 8bf4985 HBASE-23254 Release replication buffer quota correctly, when batch includes bulk loaded hfiles (#792)
8bf4985 is described below
commit 8bf4985d699a5339391f42a6d481b9e0d0d86278
Author: Jeongdae Kim <kj...@gmail.com>
AuthorDate: Tue Jan 21 18:23:55 2020 +0900
HBASE-23254 Release replication buffer quota correctly, when batch includes bulk loaded hfiles (#792)
Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
.../regionserver/ReplicationSource.java | 2 +-
.../hbase/replication/TestReplicationSource.java | 66 +++++++++++++++++++++-
2 files changed, 66 insertions(+), 2 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 5acb709..c586142 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -616,7 +616,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
try {
WALEntryBatch entryBatch = entryReader.take();
shipEdits(entryBatch);
- releaseBufferQuota((int) entryBatch.getHeapSize());
if (!entryBatch.hasMoreEntries()) {
LOG.debug("Finished recovering queue for group "
+ walGroupId + " of peer " + peerClusterZnode);
@@ -777,6 +776,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
if (throttler.isEnabled()) {
throttler.addPushSize(sizeExcludeBulkLoad);
}
+ releaseBufferQuota(sizeExcludeBulkLoad);
totalReplicatedEdits.addAndGet(entries.size());
totalReplicatedOperations.addAndGet(entryBatch.getNbOperations());
// FIXME check relationship between wal group and overall
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
index b4ac71b..84d2d8b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
@@ -35,6 +35,7 @@ import static org.mockito.internal.verification.VerificationModeFactory.times;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@@ -54,6 +55,7 @@ import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.Stoppable;
@@ -61,6 +63,8 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
@@ -69,7 +73,9 @@ import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
@@ -263,11 +269,12 @@ public class TestReplicationSource {
private final MetricsSource metrics = mock(MetricsSource.class);
private final ReplicationPeer peer = mock(ReplicationPeer.class);
private final ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
+ private final AtomicLong totalBufferUsed = new AtomicLong();
private Mocks() {
when(peers.getStatusOfPeer(anyString())).thenReturn(true);
when(context.getReplicationPeer()).thenReturn(peer);
- when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+ when(manager.getTotalBufferUsed()).thenReturn(totalBufferUsed);
}
ReplicationSource createReplicationSourceWithMocks(ReplicationEndpoint endpoint)
@@ -278,6 +285,10 @@ public class TestReplicationSource {
"testPeerClusterZnode", UUID.randomUUID(), endpoint, metrics);
return source;
}
+
+ public AtomicLong getTotalBufferUsed() {
+ return totalBufferUsed;
+ }
}
@Test
@@ -328,6 +339,59 @@ public class TestReplicationSource {
}
@Test
+ public void testUpdateQuotaWhenBulkLoad() throws Exception {
+ byte[] cfBytes = Bytes.toBytes("cf");
+ TableName tableName = TableName.valueOf("test_table");
+ Path dir = TEST_UTIL.getDataTestDirOnTestFS(tableName.getNameAsString());
+ Map<String, Long> storeFilesSize = new HashMap<>(1);
+ Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
+ int numRows = 10;
+
+ Path familyDir = new Path(dir, Bytes.toString(cfBytes));
+ Path hfilePath = new Path(familyDir, "test_hfile");
+ HFileTestUtil.createHFile(conf, FS, hfilePath, cfBytes, cfBytes,
+ Bytes.toBytes("a"), Bytes.toBytes("z"), numRows);
+ storeFilesSize.put(hfilePath.getName(), FS.getFileStatus(hfilePath).getLen());
+ storeFiles.put(cfBytes, Collections.singletonList(hfilePath));
+
+ HRegionInfo regionInfo = new HRegionInfo(tableName);
+ WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil
+ .toBulkLoadDescriptor(tableName, ByteStringer.wrap(Bytes.toBytes("test_region")),
+ storeFiles, storeFilesSize, 1, null);
+ WALEdit edit = WALEdit.createBulkLoadEvent(regionInfo, loadDescriptor);
+
+ final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() {
+ @Override
+ public WALEntryFilter getWALEntryfilter() {
+ return null;
+ }
+ };
+ final Path log = new Path(logDir, "log.1");
+
+ WALProvider.Writer writer = WALFactory.createWALWriter(FS, log, TEST_UTIL.getConfiguration());
+ WALKey key = new WALKey(regionInfo.getEncodedNameAsBytes(), tableName, 0, 0,
+ HConstants.DEFAULT_CLUSTER_ID);
+ WAL.Entry bulkLoadEventEntry = new WAL.Entry(key, edit);
+ WAL.Entry entryWithoutCells = new WAL.Entry(key, new WALEdit());
+ writer.append(bulkLoadEventEntry);
+ writer.append(entryWithoutCells);
+ writer.close();
+
+ Mocks mocks = new Mocks();
+ final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint);
+ source.run();
+
+ source.enqueueLog(log);
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() throws Exception {
+ return endpoint.replicateCount.get() > 0;
+ }
+ });
+
+ assertEquals(0L, mocks.getTotalBufferUsed().get());
+ }
+
+ @Test
public void testSetLogPositionAndRemoveOldWALsEvenIfEmptyWALsRolled() throws Exception {
Mocks mocks = new Mocks();