You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by vj...@apache.org on 2020/07/24 15:02:57 UTC
[hbase] branch branch-2 updated: HBASE-24757 : ReplicationSink
should limit row count in batch mutation based on
hbase.rpc.rows.warning.threshold (#2136)
This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 6cb51cc HBASE-24757 : ReplicationSink should limit row count in batch mutation based on hbase.rpc.rows.warning.threshold (#2136)
6cb51cc is described below
commit 6cb51cc0f01f6282360b1e6bc614a0283e42447c
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Fri Jul 24 20:32:40 2020 +0530
HBASE-24757 : ReplicationSink should limit row count in batch mutation based on hbase.rpc.rows.warning.threshold (#2136)
Closes #2127
Signed-off-by: stack <st...@apache.org>
---
.../java/org/apache/hadoop/hbase/HConstants.java | 10 ++++
.../hadoop/hbase/regionserver/RSRpcServices.java | 12 +----
.../replication/regionserver/Replication.java | 2 +-
.../replication/regionserver/ReplicationSink.java | 60 ++++++++++++----------
.../hbase/regionserver/TestMultiLogThreshold.java | 5 +-
.../regionserver/TestReplicationSink.java | 49 +++++++++++++++---
.../regionserver/TestWALEntrySinkFilter.java | 2 +-
7 files changed, 91 insertions(+), 49 deletions(-)
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 475989b..1eea08b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1610,6 +1610,16 @@ public final class HConstants {
"hbase.regionserver.slowlog.systable.enabled";
public static final boolean DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY = false;
+ /**
+ * Number of rows in a batch operation above which a warning will be logged.
+ */
+ public static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold";
+
+ /**
+ * Default value of {@link #BATCH_ROWS_THRESHOLD_NAME}
+ */
+ public static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
+
private HConstants() {
// Can't be instantiated with this ctor.
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index cbca1a5..d1ef55c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -290,15 +290,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/
private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
- /**
- * Number of rows in a batch operation above which a warning will be logged.
- */
- static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold";
- /**
- * Default value of {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
- */
- static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
-
/*
* Whether to reject rows with size > threshold defined by
* {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
@@ -1261,7 +1252,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
final Configuration conf = rs.getConfiguration();
this.ld = ld;
regionServer = rs;
- rowSizeWarnThreshold = conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT);
+ rowSizeWarnThreshold = conf.getInt(
+ HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
rejectRowsWithSizeOverThreshold =
conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 186e67d..cb8e299 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -187,7 +187,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
@Override
public void startReplicationService() throws IOException {
this.replicationManager.init();
- this.replicationSink = new ReplicationSink(this.conf, this.server);
+ this.replicationSink = new ReplicationSink(this.conf);
this.scheduleThreadPool.scheduleAtFixedRate(
new ReplicationStatisticsTask(this.replicationSink, this.replicationManager),
statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index e910e53..b947c80 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -37,7 +38,6 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.yetus.audience.InterfaceAudience;
@@ -51,13 +51,14 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
/**
* <p>
@@ -91,15 +92,20 @@ public class ReplicationSink {
private WALEntrySinkFilter walEntrySinkFilter;
/**
+ * Row size threshold for multi requests above which a warning is logged
+ */
+ private final int rowSizeWarnThreshold;
+
+ /**
* Create a sink for replication
- *
- * @param conf conf object
- * @param stopper boolean to tell this thread to stop
+ * @param conf conf object
* @throws IOException thrown when HDFS goes bad or bad file name
*/
- public ReplicationSink(Configuration conf, Stoppable stopper)
+ public ReplicationSink(Configuration conf)
throws IOException {
this.conf = HBaseConfiguration.create(conf);
+ rowSizeWarnThreshold = conf.getInt(
+ HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
decorateConf();
this.metrics = new MetricsSink();
this.walEntrySinkFilter = setupWALEntrySinkFilter();
@@ -209,11 +215,7 @@ public class ReplicationSink {
// Map of table name Vs list of pair of family and list of
// hfile paths from its namespace
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
- bulkLoadsPerClusters.get(bld.getClusterIdsList());
- if (bulkLoadHFileMap == null) {
- bulkLoadHFileMap = new HashMap<>();
- bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
- }
+ bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>());
buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
}
} else {
@@ -246,7 +248,7 @@ public class ReplicationSink {
if (!rowMap.isEmpty()) {
LOG.debug("Started replicating mutations.");
for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
- batch(entry.getKey(), entry.getValue().values());
+ batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold);
}
LOG.debug("Finished replicating mutations.");
}
@@ -366,17 +368,10 @@ public class ReplicationSink {
* @param value
* @return the list of values corresponding to key1 and key2
*/
- private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1, K2 key2, V value) {
- Map<K2,List<V>> innerMap = map.get(key1);
- if (innerMap == null) {
- innerMap = new HashMap<>();
- map.put(key1, innerMap);
- }
- List<V> values = innerMap.get(key2);
- if (values == null) {
- values = new ArrayList<>();
- innerMap.put(key2, values);
- }
+ private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2, List<V>>> map, K1 key1, K2 key2,
+ V value) {
+ Map<K2, List<V>> innerMap = map.computeIfAbsent(key1, k -> new HashMap<>());
+ List<V> values = innerMap.computeIfAbsent(key2, k -> new ArrayList<>());
values.add(value);
return values;
}
@@ -404,9 +399,10 @@ public class ReplicationSink {
* Do the changes and handle the pool
* @param tableName table to insert into
* @param allRows list of actions
- * @throws IOException
+ * @param batchRowSizeThreshold rowSize threshold for batch mutation
*/
- protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException {
+ private void batch(TableName tableName, Collection<List<Row>> allRows, int batchRowSizeThreshold)
+ throws IOException {
if (allRows.isEmpty()) {
return;
}
@@ -415,7 +411,15 @@ public class ReplicationSink {
Connection connection = getConnection();
table = connection.getTable(tableName);
for (List<Row> rows : allRows) {
- table.batch(rows, null);
+ List<List<Row>> batchRows;
+ if (rows.size() > batchRowSizeThreshold) {
+ batchRows = Lists.partition(rows, batchRowSizeThreshold);
+ } else {
+ batchRows = Collections.singletonList(rows);
+ }
+ for(List<Row> rowList:batchRows){
+ table.batch(rowList, null);
+ }
}
} catch (RetriesExhaustedWithDetailsException rewde) {
for (Throwable ex : rewde.getCauses()) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java
index abc9564..c58ee17 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -80,8 +81,8 @@ public class TestMultiLogThreshold {
final TableName tableName = TableName.valueOf("tableName");
TEST_UTIL = HBaseTestingUtility.createLocalHTU();
CONF = TEST_UTIL.getConfiguration();
- THRESHOLD = CONF.getInt(RSRpcServices.BATCH_ROWS_THRESHOLD_NAME,
- RSRpcServices.BATCH_ROWS_THRESHOLD_DEFAULT);
+ THRESHOLD = CONF.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME,
+ HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
CONF.setBoolean("hbase.rpc.rows.size.threshold.reject", rejectLargeBatchOp);
TEST_UTIL.startMiniCluster();
TEST_UTIL.createTable(tableName, TEST_FAM);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
index 9145841..57900cd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import java.security.SecureRandom;
import java.util.ArrayList;
@@ -55,7 +54,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
@@ -78,7 +77,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey;
-@Category({ReplicationTests.class, MediumTests.class})
+@Category({ReplicationTests.class, LargeTests.class})
public class TestReplicationSink {
@ClassRule
@@ -127,10 +126,8 @@ public class TestReplicationSink {
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider",
TestSourceFSConfigurationProvider.class.getCanonicalName());
-
TEST_UTIL.startMiniCluster(3);
- SINK =
- new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
+ SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()));
table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());
@@ -203,6 +200,40 @@ public class TestReplicationSink {
assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length);
}
+ @Test
+ public void testLargeEditsPutDelete() throws Exception {
+ List<WALEntry> entries = new ArrayList<>();
+ List<Cell> cells = new ArrayList<>();
+ for (int i = 0; i < 5510; i++) {
+ entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
+ }
+ SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
+ baseNamespaceDir, hfileArchiveDir);
+
+ ResultScanner resultScanner = table1.getScanner(new Scan());
+ int totalRows = 0;
+ while (resultScanner.next() != null) {
+ totalRows++;
+ }
+ assertEquals(5510, totalRows);
+
+ entries = new ArrayList<>();
+ cells = new ArrayList<>();
+ for (int i = 0; i < 11000; i++) {
+ entries.add(
+ createEntry(TABLE_NAME1, i, i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn,
+ cells));
+ }
+ SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
+ baseNamespaceDir, hfileArchiveDir);
+ resultScanner = table1.getScanner(new Scan());
+ totalRows = 0;
+ while (resultScanner.next() != null) {
+ totalRows++;
+ }
+ assertEquals(5500, totalRows);
+ }
+
/**
* Insert to 2 different tables
* @throws Exception
@@ -221,7 +252,11 @@ public class TestReplicationSink {
Scan scan = new Scan();
ResultScanner scanRes = table2.getScanner(scan);
for(Result res : scanRes) {
- assertTrue(Bytes.toInt(res.getRow()) % 2 == 0);
+ assertEquals(0, Bytes.toInt(res.getRow()) % 2);
+ }
+ scanRes = table1.getScanner(scan);
+ for(Result res : scanRes) {
+ assertEquals(1, Bytes.toInt(res.getRow()) % 2);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java
index 39a2789..a3a84fe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java
@@ -125,7 +125,7 @@ public class TestWALEntrySinkFilter {
IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class);
conf.setClass("hbase.client.connection.impl", DevNullConnection.class,
Connection.class);
- ReplicationSink sink = new ReplicationSink(conf, STOPPABLE);
+ ReplicationSink sink = new ReplicationSink(conf);
// Create some dumb walentries.
List< org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry > entries =
new ArrayList<>();