You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/01/04 07:39:08 UTC
[06/50] [abbrv] hbase git commit: Revert "HBASE-17314 Limit total
buffered size for all replication sources"
Revert "HBASE-17314 Limit total buffered size for all replication sources"
This reverts commit 3826e639672eea11d73da333e6c15f6b7c23a46c.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a1d2ff46
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a1d2ff46
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a1d2ff46
Branch: refs/heads/hbase-12439
Commit: a1d2ff4646743a9136bb1182c0512bce28e358b7
Parents: acd0218
Author: Michael Stack <st...@apache.org>
Authored: Wed Dec 21 11:17:28 2016 -0800
Committer: Michael Stack <st...@apache.org>
Committed: Wed Dec 21 11:17:28 2016 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/HConstants.java | 4 -
.../hbase/regionserver/HRegionServer.java | 3 +-
.../regionserver/ReplicationSource.java | 38 +---
.../regionserver/ReplicationSourceManager.java | 8 -
.../replication/TestReplicationEndpoint.java | 3 +-
.../regionserver/TestGlobalThrottler.java | 184 -------------------
6 files changed, 10 insertions(+), 230 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/a1d2ff46/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index dc96c2a..48d9778 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -932,10 +932,6 @@ public final class HConstants {
public static final long
REPLICATION_SERIALLY_WAITING_DEFAULT = 10000;
- public static final String REPLICATION_SOURCE_TOTAL_BUFFER_KEY = "replication.total.buffer.quota";
- public static final int REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT = 256 * 1024 * 1024;
-
-
/**
* Directory where the source cluster file system client configuration are placed which is used by
* sink cluster to copy HFiles from source cluster file system
http://git-wip-us.apache.org/repos/asf/hbase/blob/a1d2ff46/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 853d699..5bc0a66 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -2340,8 +2340,7 @@ public class HRegionServer extends HasThread implements
* @return Return the object that implements the replication
* source service.
*/
- @VisibleForTesting
- public ReplicationSourceService getReplicationSourceService() {
+ ReplicationSourceService getReplicationSourceService() {
return replicationSourceHandler;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a1d2ff46/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 3fb5f94..f777282 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -38,7 +38,6 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
@@ -151,9 +150,6 @@ public class ReplicationSource extends Thread
private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
- private AtomicInteger totalBufferUsed;
- private int totalBufferQuota;
-
/**
* Instantiation method used by region servers
*
@@ -205,9 +201,7 @@ public class ReplicationSource extends Thread
defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
currentBandwidth = getCurrentBandwidth();
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
- this.totalBufferUsed = manager.getTotalBufferUsed();
- this.totalBufferQuota = conf.getInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
- HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
+
LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId
+ " inited, replicationQueueSizeCapacity=" + replicationQueueSizeCapacity
+ ", replicationQueueNbCapacity=" + replicationQueueNbCapacity + ", curerntBandwidth="
@@ -542,7 +536,7 @@ public class ReplicationSource extends Thread
private boolean workerRunning = true;
// Current number of hfiles that we need to replicate
private long currentNbHFiles = 0;
- List<WAL.Entry> entries;
+
// Use guava cache to set ttl for each key
private LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
.expireAfterAccess(1, TimeUnit.DAYS).build(
@@ -562,7 +556,6 @@ public class ReplicationSource extends Thread
this.replicationQueueInfo = replicationQueueInfo;
this.repLogReader = new ReplicationWALReaderManager(fs, conf);
this.source = source;
- this.entries = new ArrayList<>();
}
@Override
@@ -635,7 +628,8 @@ public class ReplicationSource extends Thread
boolean gotIOE = false;
currentNbOperations = 0;
currentNbHFiles = 0;
- entries.clear();
+ List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
+
Map<String, Long> lastPositionsForSerialScope = new HashMap<>();
currentSize = 0;
try {
@@ -727,7 +721,6 @@ public class ReplicationSource extends Thread
continue;
}
shipEdits(currentWALisBeingWrittenTo, entries, lastPositionsForSerialScope);
- releaseBufferQuota();
}
if (replicationQueueInfo.isQueueRecovered()) {
// use synchronize to make sure one last thread will clean the queue
@@ -817,7 +810,7 @@ public class ReplicationSource extends Thread
}
}
}
- boolean totalBufferTooLarge = false;
+
// don't replicate if the log entries have already been consumed by the cluster
if (replicationEndpoint.canReplicateToSameCluster()
|| !entry.getKey().getClusterIds().contains(peerClusterId)) {
@@ -835,16 +828,15 @@ public class ReplicationSource extends Thread
logKey.addClusterId(clusterId);
currentNbOperations += countDistinctRowKeys(edit);
entries.add(entry);
- int delta = (int)entry.getEdit().heapSize() + calculateTotalSizeOfStoreFiles(edit);
- currentSize += delta;
- totalBufferTooLarge = acquireBufferQuota(delta);
+ currentSize += entry.getEdit().heapSize();
+ currentSize += calculateTotalSizeOfStoreFiles(edit);
} else {
metrics.incrLogEditsFiltered();
}
}
// Stop if too many entries or too big
// FIXME check the relationship between single wal group and overall
- if (totalBufferTooLarge || currentSize >= replicationQueueSizeCapacity
+ if (currentSize >= replicationQueueSizeCapacity
|| entries.size() >= replicationQueueNbCapacity) {
break;
}
@@ -1325,19 +1317,5 @@ public class ReplicationSource extends Thread
public void setWorkerRunning(boolean workerRunning) {
this.workerRunning = workerRunning;
}
-
- /**
- * @param size delta size for grown buffer
- * @return true if we should clear buffer and push all
- */
- private boolean acquireBufferQuota(int size) {
- return totalBufferUsed.addAndGet(size) >= totalBufferQuota;
- }
-
- private void releaseBufferQuota() {
- totalBufferUsed.addAndGet(-currentSize);
- currentSize = 0;
- entries.clear();
- }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a1d2ff46/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 2634a52..2c9fdcc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -42,7 +42,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -127,8 +126,6 @@ public class ReplicationSourceManager implements ReplicationListener {
private Connection connection;
private long replicationWaitTime;
- private AtomicInteger totalBufferUsed = new AtomicInteger();
-
/**
* Creates a replication manager and sets the watch on all the other registered region servers
* @param replicationQueues the interface for manipulating replication queues
@@ -438,11 +435,6 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
- @VisibleForTesting
- AtomicInteger getTotalBufferUsed() {
- return totalBufferUsed;
- }
-
/**
* Factory method to create a replication source
* @param conf the configuration to use
http://git-wip-us.apache.org/repos/asf/hbase/blob/a1d2ff46/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index f9c467e..002b8c9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -362,7 +361,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
@Override
public boolean replicate(ReplicateContext replicateContext) {
replicateCount.incrementAndGet();
- lastEntries = new ArrayList<>(replicateContext.entries);
+ lastEntries = replicateContext.entries;
return true;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a1d2ff46/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java
deleted file mode 100644
index a40d7ed..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.HTestConst;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ ReplicationTests.class, LargeTests.class })
-public class TestGlobalThrottler {
- private static final Log LOG = LogFactory.getLog(TestGlobalThrottler.class);
- private static Configuration conf1;
- private static Configuration conf2;
-
- private static HBaseTestingUtility utility1;
- private static HBaseTestingUtility utility2;
-
- private static final byte[] famName = Bytes.toBytes("f");
- private static final byte[] VALUE = Bytes.toBytes("v");
- private static final byte[] ROW = Bytes.toBytes("r");
- private static final byte[][] ROWS = HTestConst.makeNAscii(ROW, 100);
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- conf1 = HBaseConfiguration.create();
- conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
- conf1.setLong("replication.source.sleepforretries", 100);
- // Each WAL is about 120 bytes
- conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 200);
- conf1.setLong("replication.source.per.peer.node.bandwidth", 100L);
-
- utility1 = new HBaseTestingUtility(conf1);
- utility1.startMiniZKCluster();
- MiniZooKeeperCluster miniZK = utility1.getZkCluster();
- new ZooKeeperWatcher(conf1, "cluster1", null, true);
-
- conf2 = new Configuration(conf1);
- conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
-
- utility2 = new HBaseTestingUtility(conf2);
- utility2.setZkCluster(miniZK);
- new ZooKeeperWatcher(conf2, "cluster2", null, true);
-
- ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
- ReplicationPeerConfig rpc = new ReplicationPeerConfig();
- rpc.setClusterKey(utility2.getClusterKey());
- admin1.addPeer("peer1", rpc, null);
- admin1.addPeer("peer2", rpc, null);
- admin1.addPeer("peer3", rpc, null);
-
- utility1.startMiniCluster(1, 1);
- utility2.startMiniCluster(1, 1);
- }
-
- @AfterClass
- public static void setDownAfterClass() throws Exception {
- utility2.shutdownMiniCluster();
- utility1.shutdownMiniCluster();
- }
-
-
- volatile private boolean testQuotaPass = false;
- volatile private boolean testQuotaNonZero = false;
- @Test
- public void testQuota() throws IOException {
- TableName tableName = TableName.valueOf("testQuota");
- HTableDescriptor table = new HTableDescriptor(tableName);
- HColumnDescriptor fam = new HColumnDescriptor(famName);
- fam.setScope(HConstants.REPLICATION_SCOPE_SERIAL);
- table.addFamily(fam);
- utility1.getHBaseAdmin().createTable(table);
- utility2.getHBaseAdmin().createTable(table);
-
- Thread watcher = new Thread(()->{
- Replication replication = (Replication)utility1.getMiniHBaseCluster()
- .getRegionServer(0).getReplicationSourceService();
- AtomicInteger bufferUsed = replication.getReplicationManager().getTotalBufferUsed();
- testQuotaPass = true;
- while (!Thread.interrupted()) {
- int size = bufferUsed.get();
- if (size > 0) {
- testQuotaNonZero = true;
- }
- if (size > 600) {
- // We read logs first then check throttler, so if the buffer quota limiter doesn't
- // take effect, it will push many logs and exceed the quota.
- testQuotaPass = false;
- }
- Threads.sleep(50);
- }
- });
- watcher.start();
-
- try(Table t1 = utility1.getConnection().getTable(tableName);
- Table t2 = utility2.getConnection().getTable(tableName)) {
- for (int i = 0; i < 50; i++) {
- Put put = new Put(ROWS[i]);
- put.addColumn(famName, VALUE, VALUE);
- t1.put(put);
- }
- long start = EnvironmentEdgeManager.currentTime();
- while (EnvironmentEdgeManager.currentTime() - start < 180000) {
- Scan scan = new Scan();
- scan.setCaching(50);
- int count = 0;
- try (ResultScanner results = t2.getScanner(scan)) {
- for (Result result : results) {
- count++;
- }
- }
- if (count < 50) {
- LOG.info("Waiting for all logs pushed to slave. Expected 50 , actual " + count);
- Threads.sleep(200);
- continue;
- }
- break;
- }
- }
-
- watcher.interrupt();
- Assert.assertTrue(testQuotaPass);
- Assert.assertTrue(testQuotaNonZero);
- }
-
- private List<Integer> getRowNumbers(List<Cell> cells) {
- List<Integer> listOfRowNumbers = new ArrayList<>();
- for (Cell c : cells) {
- listOfRowNumbers.add(Integer.parseInt(Bytes
- .toString(c.getRowArray(), c.getRowOffset() + ROW.length,
- c.getRowLength() - ROW.length)));
- }
- return listOfRowNumbers;
- }
-}