You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ya...@apache.org on 2016/12/23 04:05:19 UTC
hbase git commit: HBASE-17314 Limit total buffered size for all
replication sources
Repository: hbase
Updated Branches:
refs/heads/master b3f2bec09 -> 8fb9a91d4
HBASE-17314 Limit total buffered size for all replication sources
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8fb9a91d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8fb9a91d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8fb9a91d
Branch: refs/heads/master
Commit: 8fb9a91d441fc7ea8d316ca3fb670ddc6dd6561a
Parents: b3f2bec
Author: Phil Yang <ya...@apache.org>
Authored: Tue Dec 20 16:05:18 2016 +0800
Committer: Phil Yang <ya...@apache.org>
Committed: Fri Dec 23 11:48:06 2016 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/HConstants.java | 10 +
.../hbase/regionserver/HRegionServer.java | 3 +-
.../regionserver/ReplicationSource.java | 37 +++-
.../regionserver/ReplicationSourceManager.java | 8 +
.../replication/TestReplicationEndpoint.java | 3 +-
.../replication/TestReplicationSource.java | 13 +-
.../regionserver/TestGlobalThrottler.java | 184 +++++++++++++++++++
7 files changed, 245 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/8fb9a91d/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 48d9778..1eec691 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -933,6 +933,16 @@ public final class HConstants {
REPLICATION_SERIALLY_WAITING_DEFAULT = 10000;
/**
+ * Max total size of buffered entries in all replication peers. It will prevent server getting
+ * OOM if there are many peers. Default value is 256MB which is four times to default
+ * replication.source.size.capacity.
+ */
+ public static final String REPLICATION_SOURCE_TOTAL_BUFFER_KEY = "replication.total.buffer.quota";
+
+ public static final int REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT = 256 * 1024 * 1024;
+
+
+ /**
* Directory where the source cluster file system client configuration are placed which is used by
* sink cluster to copy HFiles from source cluster file system
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/8fb9a91d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 5bc0a66..853d699 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -2340,7 +2340,8 @@ public class HRegionServer extends HasThread implements
* @return Return the object that implements the replication
* source service.
*/
- ReplicationSourceService getReplicationSourceService() {
+ @VisibleForTesting
+ public ReplicationSourceService getReplicationSourceService() {
return replicationSourceHandler;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8fb9a91d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index f777282..3eeb4b8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -150,6 +150,9 @@ public class ReplicationSource extends Thread
private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
+ private AtomicLong totalBufferUsed;
+ private long totalBufferQuota;
+
/**
* Instantiation method used by region servers
*
@@ -201,7 +204,9 @@ public class ReplicationSource extends Thread
defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
currentBandwidth = getCurrentBandwidth();
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
-
+ this.totalBufferUsed = manager.getTotalBufferUsed();
+ this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
+ HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId
+ " inited, replicationQueueSizeCapacity=" + replicationQueueSizeCapacity
+ ", replicationQueueNbCapacity=" + replicationQueueNbCapacity + ", curerntBandwidth="
@@ -536,7 +541,7 @@ public class ReplicationSource extends Thread
private boolean workerRunning = true;
// Current number of hfiles that we need to replicate
private long currentNbHFiles = 0;
-
+ List<WAL.Entry> entries;
// Use guava cache to set ttl for each key
private LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
.expireAfterAccess(1, TimeUnit.DAYS).build(
@@ -556,6 +561,7 @@ public class ReplicationSource extends Thread
this.replicationQueueInfo = replicationQueueInfo;
this.repLogReader = new ReplicationWALReaderManager(fs, conf);
this.source = source;
+ this.entries = new ArrayList<>();
}
@Override
@@ -628,8 +634,7 @@ public class ReplicationSource extends Thread
boolean gotIOE = false;
currentNbOperations = 0;
currentNbHFiles = 0;
- List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
-
+ entries.clear();
Map<String, Long> lastPositionsForSerialScope = new HashMap<>();
currentSize = 0;
try {
@@ -721,6 +726,7 @@ public class ReplicationSource extends Thread
continue;
}
shipEdits(currentWALisBeingWrittenTo, entries, lastPositionsForSerialScope);
+ releaseBufferQuota();
}
if (replicationQueueInfo.isQueueRecovered()) {
// use synchronize to make sure one last thread will clean the queue
@@ -810,7 +816,7 @@ public class ReplicationSource extends Thread
}
}
}
-
+ boolean totalBufferTooLarge = false;
// don't replicate if the log entries have already been consumed by the cluster
if (replicationEndpoint.canReplicateToSameCluster()
|| !entry.getKey().getClusterIds().contains(peerClusterId)) {
@@ -828,15 +834,16 @@ public class ReplicationSource extends Thread
logKey.addClusterId(clusterId);
currentNbOperations += countDistinctRowKeys(edit);
entries.add(entry);
- currentSize += entry.getEdit().heapSize();
- currentSize += calculateTotalSizeOfStoreFiles(edit);
+ int delta = (int)entry.getEdit().heapSize() + calculateTotalSizeOfStoreFiles(edit);
+ currentSize += delta;
+ totalBufferTooLarge = acquireBufferQuota(delta);
} else {
metrics.incrLogEditsFiltered();
}
}
// Stop if too many entries or too big
// FIXME check the relationship between single wal group and overall
- if (currentSize >= replicationQueueSizeCapacity
+ if (totalBufferTooLarge || currentSize >= replicationQueueSizeCapacity
|| entries.size() >= replicationQueueNbCapacity) {
break;
}
@@ -1317,5 +1324,19 @@ public class ReplicationSource extends Thread
public void setWorkerRunning(boolean workerRunning) {
this.workerRunning = workerRunning;
}
+
+ /**
+ * @param size delta size for grown buffer
+ * @return true if we should clear buffer and push all
+ */
+ private boolean acquireBufferQuota(long size) {
+ return totalBufferUsed.addAndGet(size) >= totalBufferQuota;
+ }
+
+ private void releaseBufferQuota() {
+ totalBufferUsed.addAndGet(-currentSize);
+ currentSize = 0;
+ entries.clear();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8fb9a91d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 2c9fdcc..ef4093e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -42,6 +42,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -126,6 +127,8 @@ public class ReplicationSourceManager implements ReplicationListener {
private Connection connection;
private long replicationWaitTime;
+ private AtomicLong totalBufferUsed = new AtomicLong();
+
/**
* Creates a replication manager and sets the watch on all the other registered region servers
* @param replicationQueues the interface for manipulating replication queues
@@ -435,6 +438,11 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
+ @VisibleForTesting
+ public AtomicLong getTotalBufferUsed() {
+ return totalBufferUsed;
+ }
+
/**
* Factory method to create a replication source
* @param conf the configuration to use
http://git-wip-us.apache.org/repos/asf/hbase/blob/8fb9a91d/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 002b8c9..f9c467e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -361,7 +362,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
@Override
public boolean replicate(ReplicateContext replicateContext) {
replicateCount.incrementAndGet();
- lastEntries = replicateContext.entries;
+ lastEntries = new ArrayList<>(replicateContext.entries);
return true;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8fb9a91d/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
index abdd68a..7461edb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertNull;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,6 +38,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
@@ -50,6 +52,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
import static org.mockito.Mockito.mock;
@@ -140,11 +143,15 @@ public class TestReplicationSource {
}
};
replicationEndpoint.start();
- ReplicationPeers mockPeers = mock(ReplicationPeers.class);
+ ReplicationPeers mockPeers = Mockito.mock(ReplicationPeers.class);
+ ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
+ Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
Configuration testConf = HBaseConfiguration.create();
testConf.setInt("replication.source.maxretriesmultiplier", 1);
- source.init(testConf, null, null, null, mockPeers, null, "testPeer", null,
- replicationEndpoint, null);
+ ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
+ Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
+ source.init(testConf, null, manager, null, mockPeers, null, "testPeer",
+ null, replicationEndpoint, null);
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(new Runnable() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8fb9a91d/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java
new file mode 100644
index 0000000..7e4ae45
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java
@@ -0,0 +1,184 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HTestConst;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestGlobalThrottler {
+ private static final Log LOG = LogFactory.getLog(TestGlobalThrottler.class);
+ private static Configuration conf1;
+ private static Configuration conf2;
+
+ private static HBaseTestingUtility utility1;
+ private static HBaseTestingUtility utility2;
+
+ private static final byte[] famName = Bytes.toBytes("f");
+ private static final byte[] VALUE = Bytes.toBytes("v");
+ private static final byte[] ROW = Bytes.toBytes("r");
+ private static final byte[][] ROWS = HTestConst.makeNAscii(ROW, 100);
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ conf1 = HBaseConfiguration.create();
+ conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
+ conf1.setLong("replication.source.sleepforretries", 100);
+ // Each WAL is about 120 bytes
+ conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 200);
+ conf1.setLong("replication.source.per.peer.node.bandwidth", 100L);
+
+ utility1 = new HBaseTestingUtility(conf1);
+ utility1.startMiniZKCluster();
+ MiniZooKeeperCluster miniZK = utility1.getZkCluster();
+ new ZooKeeperWatcher(conf1, "cluster1", null, true);
+
+ conf2 = new Configuration(conf1);
+ conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
+
+ utility2 = new HBaseTestingUtility(conf2);
+ utility2.setZkCluster(miniZK);
+ new ZooKeeperWatcher(conf2, "cluster2", null, true);
+
+ ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
+ ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+ rpc.setClusterKey(utility2.getClusterKey());
+
+ utility1.startMiniCluster(1, 1);
+ utility2.startMiniCluster(1, 1);
+
+ admin1.addPeer("peer1", rpc, null);
+ admin1.addPeer("peer2", rpc, null);
+ admin1.addPeer("peer3", rpc, null);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ utility2.shutdownMiniCluster();
+ utility1.shutdownMiniCluster();
+ }
+
+
+ volatile private boolean testQuotaPass = false;
+ volatile private boolean testQuotaNonZero = false;
+ @Test
+ public void testQuota() throws IOException {
+ TableName tableName = TableName.valueOf("testQuota");
+ HTableDescriptor table = new HTableDescriptor(tableName);
+ HColumnDescriptor fam = new HColumnDescriptor(famName);
+ fam.setScope(HConstants.REPLICATION_SCOPE_SERIAL);
+ table.addFamily(fam);
+ utility1.getHBaseAdmin().createTable(table);
+ utility2.getHBaseAdmin().createTable(table);
+
+ Thread watcher = new Thread(()->{
+ Replication replication = (Replication)utility1.getMiniHBaseCluster()
+ .getRegionServer(0).getReplicationSourceService();
+ AtomicLong bufferUsed = replication.getReplicationManager().getTotalBufferUsed();
+ testQuotaPass = true;
+ while (!Thread.interrupted()) {
+ long size = bufferUsed.get();
+ if (size > 0) {
+ testQuotaNonZero = true;
+ }
+ if (size > 600) {
+ // We read logs first then check throttler, so if the buffer quota limiter doesn't
+ // take effect, it will push many logs and exceed the quota.
+ testQuotaPass = false;
+ }
+ Threads.sleep(50);
+ }
+ });
+ watcher.start();
+
+ try(Table t1 = utility1.getConnection().getTable(tableName);
+ Table t2 = utility2.getConnection().getTable(tableName)) {
+ for (int i = 0; i < 50; i++) {
+ Put put = new Put(ROWS[i]);
+ put.addColumn(famName, VALUE, VALUE);
+ t1.put(put);
+ }
+ long start = EnvironmentEdgeManager.currentTime();
+ while (EnvironmentEdgeManager.currentTime() - start < 180000) {
+ Scan scan = new Scan();
+ scan.setCaching(50);
+ int count = 0;
+ try (ResultScanner results = t2.getScanner(scan)) {
+ for (Result result : results) {
+ count++;
+ }
+ }
+ if (count < 50) {
+ LOG.info("Waiting all logs pushed to slave. Expected 50 , actual " + count);
+ Threads.sleep(200);
+ continue;
+ }
+ break;
+ }
+ }
+
+ watcher.interrupt();
+ Assert.assertTrue(testQuotaPass);
+ Assert.assertTrue(testQuotaNonZero);
+ }
+
+ private List<Integer> getRowNumbers(List<Cell> cells) {
+ List<Integer> listOfRowNumbers = new ArrayList<>();
+ for (Cell c : cells) {
+ listOfRowNumbers.add(Integer.parseInt(Bytes
+ .toString(c.getRowArray(), c.getRowOffset() + ROW.length,
+ c.getRowLength() - ROW.length)));
+ }
+ return listOfRowNumbers;
+ }
+}