You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2020/01/21 09:26:01 UTC

[hbase] branch branch-1 updated: HBASE-23254 Release replication buffer quota correctly, when batch includes bulk loaded hfiles (#792)

This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new 8bf4985  HBASE-23254 Release replication buffer quota correctly, when batch includes bulk loaded hfiles (#792)
8bf4985 is described below

commit 8bf4985d699a5339391f42a6d481b9e0d0d86278
Author: Jeongdae Kim <kj...@gmail.com>
AuthorDate: Tue Jan 21 18:23:55 2020 +0900

    HBASE-23254 Release replication buffer quota correctly, when batch includes bulk loaded hfiles (#792)
    
    Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
 .../regionserver/ReplicationSource.java            |  2 +-
 .../hbase/replication/TestReplicationSource.java   | 66 +++++++++++++++++++++-
 2 files changed, 66 insertions(+), 2 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 5acb709..c586142 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -616,7 +616,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
         try {
           WALEntryBatch entryBatch = entryReader.take();
           shipEdits(entryBatch);
-          releaseBufferQuota((int) entryBatch.getHeapSize());
           if (!entryBatch.hasMoreEntries()) {
             LOG.debug("Finished recovering queue for group "
                     + walGroupId + " of peer " + peerClusterZnode);
@@ -777,6 +776,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
           if (throttler.isEnabled()) {
             throttler.addPushSize(sizeExcludeBulkLoad);
           }
+          releaseBufferQuota(sizeExcludeBulkLoad);
           totalReplicatedEdits.addAndGet(entries.size());
           totalReplicatedOperations.addAndGet(entryBatch.getNbOperations());
           // FIXME check relationship between wal group and overall
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
index b4ac71b..84d2d8b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
@@ -35,6 +35,7 @@ import static org.mockito.internal.verification.VerificationModeFactory.times;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -54,6 +55,7 @@ import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.Stoppable;
@@ -61,6 +63,8 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.Waiter.Predicate;
 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
@@ -69,7 +73,9 @@ import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
@@ -263,11 +269,12 @@ public class TestReplicationSource {
     private final MetricsSource metrics = mock(MetricsSource.class);
     private final ReplicationPeer peer = mock(ReplicationPeer.class);
     private final ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
+    private final AtomicLong totalBufferUsed = new AtomicLong();
 
     private Mocks() {
       when(peers.getStatusOfPeer(anyString())).thenReturn(true);
       when(context.getReplicationPeer()).thenReturn(peer);
-      when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+      when(manager.getTotalBufferUsed()).thenReturn(totalBufferUsed);
     }
 
     ReplicationSource createReplicationSourceWithMocks(ReplicationEndpoint endpoint)
@@ -278,6 +285,10 @@ public class TestReplicationSource {
               "testPeerClusterZnode", UUID.randomUUID(), endpoint, metrics);
       return source;
     }
+
+    public AtomicLong getTotalBufferUsed() {
+      return totalBufferUsed;
+    }
   }
 
   @Test
@@ -328,6 +339,59 @@ public class TestReplicationSource {
   }
 
   @Test
+  public void testUpdateQuotaWhenBulkLoad() throws Exception {
+    byte[] cfBytes = Bytes.toBytes("cf");
+    TableName tableName = TableName.valueOf("test_table");
+    Path dir = TEST_UTIL.getDataTestDirOnTestFS(tableName.getNameAsString());
+    Map<String, Long> storeFilesSize = new HashMap<>(1);
+    Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
+    int numRows = 10;
+
+    Path familyDir = new Path(dir, Bytes.toString(cfBytes));
+    Path hfilePath = new Path(familyDir, "test_hfile");
+    HFileTestUtil.createHFile(conf, FS, hfilePath, cfBytes, cfBytes,
+      Bytes.toBytes("a"), Bytes.toBytes("z"), numRows);
+    storeFilesSize.put(hfilePath.getName(), FS.getFileStatus(hfilePath).getLen());
+    storeFiles.put(cfBytes, Collections.singletonList(hfilePath));
+
+    HRegionInfo regionInfo = new HRegionInfo(tableName);
+    WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil
+      .toBulkLoadDescriptor(tableName, ByteStringer.wrap(Bytes.toBytes("test_region")),
+        storeFiles, storeFilesSize, 1, null);
+    WALEdit edit = WALEdit.createBulkLoadEvent(regionInfo, loadDescriptor);
+
+    final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() {
+      @Override
+      public WALEntryFilter getWALEntryfilter() {
+        return null;
+      }
+    };
+    final Path log = new Path(logDir, "log.1");
+
+    WALProvider.Writer writer = WALFactory.createWALWriter(FS, log, TEST_UTIL.getConfiguration());
+    WALKey key = new WALKey(regionInfo.getEncodedNameAsBytes(), tableName, 0, 0,
+      HConstants.DEFAULT_CLUSTER_ID);
+    WAL.Entry bulkLoadEventEntry = new WAL.Entry(key, edit);
+    WAL.Entry entryWithoutCells = new WAL.Entry(key, new WALEdit());
+    writer.append(bulkLoadEventEntry);
+    writer.append(entryWithoutCells);
+    writer.close();
+
+    Mocks mocks = new Mocks();
+    final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint);
+    source.run();
+
+    source.enqueueLog(log);
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() throws Exception {
+        return endpoint.replicateCount.get() > 0;
+      }
+    });
+
+    assertEquals(0L, mocks.getTotalBufferUsed().get());
+  }
+
+  @Test
   public void testSetLogPositionAndRemoveOldWALsEvenIfEmptyWALsRolled() throws Exception {
     Mocks mocks = new Mocks();