You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by vj...@apache.org on 2020/07/27 14:46:08 UTC
[hbase] branch branch-2.2 updated: HBASE-24757 : ReplicationSink
should limit row count in batch mutation based on
hbase.rpc.rows.warning.threshold (#2139)
This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.2 by this push:
new 603d2b6 HBASE-24757 : ReplicationSink should limit row count in batch mutation based on hbase.rpc.rows.warning.threshold (#2139)
603d2b6 is described below
commit 603d2b629aeb749b2ca009dbf54149d7240bb269
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Mon Jul 27 20:15:51 2020 +0530
HBASE-24757 : ReplicationSink should limit row count in batch mutation based on hbase.rpc.rows.warning.threshold (#2139)
Closes #2127
Signed-off-by: stack <st...@apache.org>
---
.../java/org/apache/hadoop/hbase/HConstants.java | 10 ++++
.../hadoop/hbase/regionserver/RSRpcServices.java | 12 +----
.../replication/regionserver/Replication.java | 2 +-
.../replication/regionserver/ReplicationSink.java | 60 ++++++++++++----------
.../hbase/regionserver/TestMultiLogThreshold.java | 5 +-
.../regionserver/TestReplicationSink.java | 49 +++++++++++++++---
.../regionserver/TestWALEntrySinkFilter.java | 2 +-
7 files changed, 91 insertions(+), 49 deletions(-)
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 05366a3..d1537fa 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1493,6 +1493,16 @@ public final class HConstants {
"hbase.master.executor.logreplayops.threads";
public static final int MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT = 10;
+ /**
+ * Number of rows in a batch operation above which a warning will be logged.
+ */
+ public static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold";
+
+ /**
+ * Default value of {@link #BATCH_ROWS_THRESHOLD_NAME}
+ */
+ public static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
+
private HConstants() {
// Can't be instantiated with this ctor.
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 9a02831..0819423 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -276,15 +276,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/
private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
- /**
- * Number of rows in a batch operation above which a warning will be logged.
- */
- static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold";
- /**
- * Default value of {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
- */
- static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
-
protected static final String RESERVOIR_ENABLED_KEY = "hbase.ipc.server.reservoir.enabled";
// Request counter. (Includes requests that are not serviced by regions.)
@@ -1229,7 +1220,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
RSRpcServices(HRegionServer rs, LogDelegate ld) throws IOException {
this.ld = ld;
regionServer = rs;
- rowSizeWarnThreshold = rs.conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT);
+ rowSizeWarnThreshold = rs.conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME,
+ HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
RpcSchedulerFactory rpcSchedulerFactory;
try {
rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 6c46a85..752cfb8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -187,7 +187,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
@Override
public void startReplicationService() throws IOException {
this.replicationManager.init();
- this.replicationSink = new ReplicationSink(this.conf, this.server);
+ this.replicationSink = new ReplicationSink(this.conf);
this.scheduleThreadPool.scheduleAtFixedRate(
new ReplicationStatisticsTask(this.replicationSink, this.replicationManager),
statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index ae0a732..76e22f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -38,7 +39,6 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.yetus.audience.InterfaceAudience;
@@ -52,13 +52,14 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
/**
* <p>
@@ -92,15 +93,20 @@ public class ReplicationSink {
private WALEntrySinkFilter walEntrySinkFilter;
/**
+ * Row size threshold for multi requests above which a warning is logged
+ */
+ private final int rowSizeWarnThreshold;
+
+ /**
* Create a sink for replication
- *
- * @param conf conf object
- * @param stopper boolean to tell this thread to stop
+ * @param conf conf object
* @throws IOException thrown when HDFS goes bad or bad file name
*/
- public ReplicationSink(Configuration conf, Stoppable stopper)
+ public ReplicationSink(Configuration conf)
throws IOException {
this.conf = HBaseConfiguration.create(conf);
+ rowSizeWarnThreshold = conf.getInt(
+ HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
decorateConf();
this.metrics = new MetricsSink();
this.walEntrySinkFilter = setupWALEntrySinkFilter();
@@ -210,11 +216,7 @@ public class ReplicationSink {
// Map of table name Vs list of pair of family and list of
// hfile paths from its namespace
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
- bulkLoadsPerClusters.get(bld.getClusterIdsList());
- if (bulkLoadHFileMap == null) {
- bulkLoadHFileMap = new HashMap<>();
- bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
- }
+ bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>());
buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
}
} else {
@@ -247,7 +249,7 @@ public class ReplicationSink {
if (!rowMap.isEmpty()) {
LOG.debug("Started replicating mutations.");
for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
- batch(entry.getKey(), entry.getValue().values());
+ batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold);
}
LOG.debug("Finished replicating mutations.");
}
@@ -372,17 +374,10 @@ public class ReplicationSink {
* @param value
* @return the list of values corresponding to key1 and key2
*/
- private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1, K2 key2, V value) {
- Map<K2,List<V>> innerMap = map.get(key1);
- if (innerMap == null) {
- innerMap = new HashMap<>();
- map.put(key1, innerMap);
- }
- List<V> values = innerMap.get(key2);
- if (values == null) {
- values = new ArrayList<>();
- innerMap.put(key2, values);
- }
+ private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2, List<V>>> map, K1 key1, K2 key2,
+ V value) {
+ Map<K2, List<V>> innerMap = map.computeIfAbsent(key1, k -> new HashMap<>());
+ List<V> values = innerMap.computeIfAbsent(key2, k -> new ArrayList<>());
values.add(value);
return values;
}
@@ -410,9 +405,10 @@ public class ReplicationSink {
* Do the changes and handle the pool
* @param tableName table to insert into
* @param allRows list of actions
- * @throws IOException
+ * @param batchRowSizeThreshold rowSize threshold for batch mutation
*/
- protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException {
+ private void batch(TableName tableName, Collection<List<Row>> allRows, int batchRowSizeThreshold)
+ throws IOException {
if (allRows.isEmpty()) {
return;
}
@@ -421,7 +417,15 @@ public class ReplicationSink {
Connection connection = getConnection();
table = connection.getTable(tableName);
for (List<Row> rows : allRows) {
- table.batch(rows, null);
+ List<List<Row>> batchRows;
+ if (rows.size() > batchRowSizeThreshold) {
+ batchRows = Lists.partition(rows, batchRowSizeThreshold);
+ } else {
+ batchRows = Collections.singletonList(rows);
+ }
+ for(List<Row> rowList:batchRows){
+ table.batch(rowList, null);
+ }
}
} catch (RetriesExhaustedWithDetailsException rewde) {
for (Throwable ex : rewde.getCauses()) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java
index 8e11ed5..f41335f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -67,8 +68,8 @@ public class TestMultiLogThreshold {
final TableName tableName = TableName.valueOf("tableName");
TEST_UTIL = HBaseTestingUtility.createLocalHTU();
CONF = TEST_UTIL.getConfiguration();
- THRESHOLD = CONF.getInt(RSRpcServices.BATCH_ROWS_THRESHOLD_NAME,
- RSRpcServices.BATCH_ROWS_THRESHOLD_DEFAULT);
+ THRESHOLD = CONF.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME,
+ HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
TEST_UTIL.startMiniCluster();
TEST_UTIL.createTable(tableName, TEST_FAM);
RS = TEST_UTIL.getRSForFirstRegionInTable(tableName);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
index aa6c39c..adb077d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import java.security.SecureRandom;
import java.util.ArrayList;
@@ -55,7 +54,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -78,7 +77,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey;
-@Category({ReplicationTests.class, MediumTests.class})
+@Category({ReplicationTests.class, LargeTests.class})
public class TestReplicationSink {
@ClassRule
@@ -127,10 +126,8 @@ public class TestReplicationSink {
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider",
TestSourceFSConfigurationProvider.class.getCanonicalName());
-
TEST_UTIL.startMiniCluster(3);
- SINK =
- new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
+ SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()));
table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration());
@@ -203,6 +200,40 @@ public class TestReplicationSink {
assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length);
}
+ @Test
+ public void testLargeEditsPutDelete() throws Exception {
+ List<WALEntry> entries = new ArrayList<>();
+ List<Cell> cells = new ArrayList<>();
+ for (int i = 0; i < 5510; i++) {
+ entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
+ }
+ SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
+ baseNamespaceDir, hfileArchiveDir);
+
+ ResultScanner resultScanner = table1.getScanner(new Scan());
+ int totalRows = 0;
+ while (resultScanner.next() != null) {
+ totalRows++;
+ }
+ assertEquals(5510, totalRows);
+
+ entries = new ArrayList<>();
+ cells = new ArrayList<>();
+ for (int i = 0; i < 11000; i++) {
+ entries.add(
+ createEntry(TABLE_NAME1, i, i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn,
+ cells));
+ }
+ SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
+ baseNamespaceDir, hfileArchiveDir);
+ resultScanner = table1.getScanner(new Scan());
+ totalRows = 0;
+ while (resultScanner.next() != null) {
+ totalRows++;
+ }
+ assertEquals(5500, totalRows);
+ }
+
/**
* Insert to 2 different tables
* @throws Exception
@@ -221,7 +252,11 @@ public class TestReplicationSink {
Scan scan = new Scan();
ResultScanner scanRes = table2.getScanner(scan);
for(Result res : scanRes) {
- assertTrue(Bytes.toInt(res.getRow()) % 2 == 0);
+ assertEquals(0, Bytes.toInt(res.getRow()) % 2);
+ }
+ scanRes = table1.getScanner(scan);
+ for(Result res : scanRes) {
+ assertEquals(1, Bytes.toInt(res.getRow()) % 2);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java
index 31e94d6..15ff54c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java
@@ -128,7 +128,7 @@ public class TestWALEntrySinkFilter {
IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class);
conf.setClass("hbase.client.connection.impl", DevNullConnection.class,
Connection.class);
- ReplicationSink sink = new ReplicationSink(conf, STOPPABLE);
+ ReplicationSink sink = new ReplicationSink(conf);
// Create some dumb walentries.
List< org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry > entries =
new ArrayList<>();