You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/11/30 02:04:58 UTC
[3/3] hbase git commit: HBASE-19381 TestGlobalThrottler doesn't make
progress
HBASE-19381 TestGlobalThrottler doesn't make progress
Revert "HBASE-17314 Limit total buffered size for all replication sources"
This reverts commit 29e390c80895af54206d6a14eac50ca2859cf2b7.
Conflicts:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ea8123e8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ea8123e8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ea8123e8
Branch: refs/heads/branch-1.4
Commit: ea8123e81cb4b0e2d89fb672b5bfe67557852ec0
Parents: 39da0d4
Author: Andrew Purtell <ap...@apache.org>
Authored: Wed Nov 29 17:21:42 2017 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed Nov 29 17:26:39 2017 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/HConstants.java | 9 -
.../hbase/regionserver/HRegionServer.java | 1 -
.../regionserver/ReplicationSource.java | 13 +-
.../regionserver/ReplicationSourceManager.java | 8 -
.../ReplicationSourceWALReaderThread.java | 34 +---
.../replication/TestReplicationSource.java | 12 +-
.../regionserver/TestGlobalThrottler.java | 187 -------------------
.../regionserver/TestWALEntryStream.java | 3 -
8 files changed, 5 insertions(+), 262 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/ea8123e8/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 8242a17..2de16f7 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -895,15 +895,6 @@ public final class HConstants {
/** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */
public static final String REPLICATION_CLUSTER_ID = "hbase.replication.cluster.id";
/**
- * Max total size of buffered entries in all replication peers. It will prevent server getting
- * OOM if there are many peers. Default value is 256MB which is four times to default
- * replication.source.size.capacity.
- */
- public static final String REPLICATION_SOURCE_TOTAL_BUFFER_KEY = "replication.total.buffer.quota";
- public static final int REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT = 256 * 1024 * 1024;
-
-
- /**
* Directory where the source cluster file system client configuration are placed which is used by
* sink cluster to copy HFiles from source cluster file system
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/ea8123e8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 4853b2b..b156256 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -2291,7 +2291,6 @@ public class HRegionServer extends HasThread implements
* @return Return the object that implements the replication
* source service.
*/
- @VisibleForTesting
public ReplicationSourceService getReplicationSourceService() {
return replicationSourceHandler;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ea8123e8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index add1043..776814f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -72,10 +72,6 @@ import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Service;
-
/**
* Class that handles the source of a replication stream.
* Currently does not handle more than 1 slave
@@ -148,8 +144,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
FINISHED // The worker is done processing a recovered queue
}
- private AtomicLong totalBufferUsed;
-
/**
* Instantiation method used by region servers
*
@@ -195,7 +189,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
currentBandwidth = getCurrentBandwidth();
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
- this.totalBufferUsed = manager.getTotalBufferUsed();
+
LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId
+ ", currentBandwidth=" + this.currentBandwidth);
}
@@ -555,7 +549,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
try {
WALEntryBatch entryBatch = entryReader.take();
shipEdits(entryBatch);
- releaseBufferQuota((int) entryBatch.getHeapSize());
if (replicationQueueInfo.isQueueRecovered() && entryBatch.getWalEntries().isEmpty()
&& entryBatch.getLastSeqIds().isEmpty()) {
LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
@@ -916,9 +909,5 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
public WorkerState getWorkerState() {
return state;
}
-
- private void releaseBufferQuota(int size) {
- totalBufferUsed.addAndGet(-size);
- }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ea8123e8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 469e634..74dded7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -41,7 +41,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -120,8 +119,6 @@ public class ReplicationSourceManager implements ReplicationListener {
private final boolean replicationForBulkLoadDataEnabled;
- private AtomicLong totalBufferUsed = new AtomicLong();
-
/**
* Creates a replication manager and sets the watch on all the other registered region servers
* @param replicationQueues the interface for manipulating replication queues
@@ -453,11 +450,6 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
- @VisibleForTesting
- public AtomicLong getTotalBufferUsed() {
- return totalBufferUsed;
- }
-
/**
* Factory method to create a replication source
* @param conf the configuration to use
http://git-wip-us.apache.org/repos/asf/hbase/blob/ea8123e8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index 306ba8f..872f91d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -27,7 +27,6 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,17 +35,14 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.WALEntryStreamRuntimeException;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -78,9 +74,6 @@ public class ReplicationSourceWALReaderThread extends Thread {
private int maxRetriesMultiplier;
private MetricsSource metrics;
- private AtomicLong totalBufferUsed;
- private long totalBufferQuota;
-
/**
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
* entries, and puts them on a batch queue.
@@ -109,9 +102,6 @@ public class ReplicationSourceWALReaderThread extends Thread {
// memory used will be batchSizeCapacity * (nb.batches + 1)
// the +1 is for the current thread reading before placing onto the queue
int batchCount = conf.getInt("replication.source.nb.batches", 1);
- this.totalBufferUsed = manager.getTotalBufferUsed();
- this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
- HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
this.maxRetriesMultiplier =
@@ -132,9 +122,6 @@ public class ReplicationSourceWALReaderThread extends Thread {
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
- if (!checkQuota()) {
- continue;
- }
WALEntryBatch batch = null;
while (entryStream.hasNext()) {
if (batch == null) {
@@ -148,9 +135,8 @@ public class ReplicationSourceWALReaderThread extends Thread {
long entrySize = getEntrySize(entry);
batch.addEntry(entry);
updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
- boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
// Stop if too many entries or too big
- if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
+ if (batch.getHeapSize() >= replicationBatchSizeCapacity
|| batch.getNbEntries() >= replicationBatchCountCapacity) {
break;
}
@@ -223,16 +209,6 @@ public class ReplicationSourceWALReaderThread extends Thread {
return logQueue.peek();
}
- //returns false if we've already exceeded the global quota
- private boolean checkQuota() {
- // try not to go over total quota
- if (totalBufferUsed.get() > totalBufferQuota) {
- Threads.sleep(sleepForRetries);
- return false;
- }
- return true;
- }
-
private Entry filterEntry(Entry entry) {
Entry filtered = filter.filter(entry);
if (entry != null && filtered == null) {
@@ -337,14 +313,6 @@ public class ReplicationSourceWALReaderThread extends Thread {
}
/**
- * @param size delta size for grown buffer
- * @return true if we should clear buffer and push all
- */
- private boolean acquireBufferQuota(long size) {
- return totalBufferUsed.addAndGet(size) >= totalBufferQuota;
- }
-
- /**
* @return whether the reader thread is running
*/
public boolean isReaderRunning() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/ea8123e8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
index 6e6fe9a..990c5fd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
@@ -26,7 +26,6 @@ import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -60,7 +59,6 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
import static org.mockito.Mockito.mock;
@@ -160,15 +158,11 @@ public class TestReplicationSource {
}
};
replicationEndpoint.start();
- ReplicationPeers mockPeers = Mockito.mock(ReplicationPeers.class);
- ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
- Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
+ ReplicationPeers mockPeers = mock(ReplicationPeers.class);
Configuration testConf = HBaseConfiguration.create();
testConf.setInt("replication.source.maxretriesmultiplier", 1);
- ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
- Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
- source.init(testConf, null, manager, null, mockPeers, null, "testPeer",
- null, replicationEndpoint, null);
+ source.init(testConf, null, null, null, mockPeers, null, "testPeer", null, replicationEndpoint,
+ null);
ExecutorService executor = Executors.newSingleThreadExecutor();
final Future<?> future = executor.submit(new Runnable() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/ea8123e8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java
deleted file mode 100644
index 6e19fc2..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.HTestConst;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ ReplicationTests.class, LargeTests.class })
-public class TestGlobalThrottler {
- private static final Log LOG = LogFactory.getLog(TestGlobalThrottler.class);
- private static Configuration conf1;
- private static Configuration conf2;
-
- private static HBaseTestingUtility utility1;
- private static HBaseTestingUtility utility2;
-
- private static final byte[] famName = Bytes.toBytes("f");
- private static final byte[] VALUE = Bytes.toBytes("v");
- private static final byte[] ROW = Bytes.toBytes("r");
- private static final byte[][] ROWS = HTestConst.makeNAscii(ROW, 100);
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- conf1 = HBaseConfiguration.create();
- conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
- conf1.setLong("replication.source.sleepforretries", 100);
- // Each WAL is about 120 bytes
- conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 200);
- conf1.setLong("replication.source.per.peer.node.bandwidth", 100L);
-
- utility1 = new HBaseTestingUtility(conf1);
- utility1.startMiniZKCluster();
- MiniZooKeeperCluster miniZK = utility1.getZkCluster();
- new ZooKeeperWatcher(conf1, "cluster1", null, true);
-
- conf2 = new Configuration(conf1);
- conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
-
- utility2 = new HBaseTestingUtility(conf2);
- utility2.setZkCluster(miniZK);
- new ZooKeeperWatcher(conf2, "cluster2", null, true);
-
- ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
- ReplicationPeerConfig rpc = new ReplicationPeerConfig();
- rpc.setClusterKey(utility2.getClusterKey());
- admin1.addPeer("peer1", rpc, null);
- admin1.addPeer("peer2", rpc, null);
- admin1.addPeer("peer3", rpc, null);
-
- utility1.startMiniCluster(1, 1);
- utility2.startMiniCluster(1, 1);
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- utility2.shutdownMiniCluster();
- utility1.shutdownMiniCluster();
- }
-
-
- volatile private boolean testQuotaPass = false;
- volatile private boolean testQuotaNonZero = false;
- @Test
- public void testQuota() throws IOException {
- TableName tableName = TableName.valueOf("testQuota");
- HTableDescriptor table = new HTableDescriptor(tableName);
- HColumnDescriptor fam = new HColumnDescriptor(famName);
- fam.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
- table.addFamily(fam);
- utility1.getHBaseAdmin().createTable(table);
- utility2.getHBaseAdmin().createTable(table);
-
- Thread watcher = new Thread(new Runnable() {
- @Override
- public void run() {
- Replication replication = (Replication) utility1.getMiniHBaseCluster()
- .getRegionServer(0).getReplicationSourceService();
- AtomicLong bufferUsed = replication.getReplicationManager().getTotalBufferUsed();
- testQuotaPass = true;
- while (!Thread.interrupted()) {
- long size = bufferUsed.get();
- if (size > 0) {
- testQuotaNonZero = true;
- }
- if (size > 600) {
- // We read logs first then check throttler, so if the buffer quota limiter doesn't
- // take effect, it will push many logs and exceed the quota.
- testQuotaPass = false;
- }
- Threads.sleep(50);
- }
- }
- });
-
- watcher.start();
-
- try(Table t1 = utility1.getConnection().getTable(tableName);
- Table t2 = utility2.getConnection().getTable(tableName)) {
- for (int i = 0; i < 50; i++) {
- Put put = new Put(ROWS[i]);
- put.addColumn(famName, VALUE, VALUE);
- t1.put(put);
- }
- long start = EnvironmentEdgeManager.currentTime();
- while (EnvironmentEdgeManager.currentTime() - start < 180000) {
- Scan scan = new Scan();
- scan.setCaching(50);
- int count = 0;
- try (ResultScanner results = t2.getScanner(scan)) {
- for (Result result : results) {
- count++;
- }
- }
- if (count < 50) {
- LOG.info("Waiting all logs pushed to slave. Expected 50 , actual " + count);
- Threads.sleep(200);
- continue;
- }
- break;
- }
- }
-
- watcher.interrupt();
- Assert.assertTrue(testQuotaPass);
- Assert.assertTrue(testQuotaNonZero);
- }
-
- private List<Integer> getRowNumbers(List<Cell> cells) {
- List<Integer> listOfRowNumbers = new ArrayList<>();
- for (Cell c : cells) {
- listOfRowNumbers.add(Integer.parseInt(Bytes
- .toString(c.getRowArray(), c.getRowOffset() + ROW.length,
- c.getRowLength() - ROW.length)));
- }
- return listOfRowNumbers;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ea8123e8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 005e2a1..04c3b81 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
@@ -33,7 +32,6 @@ import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.TreeMap;
import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -347,7 +345,6 @@ public class TestWALEntryStream {
// start up a batcher
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
- when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
ReplicationSourceWALReaderThread batcher = new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0,
fs, conf, getDummyFilter(), new MetricsSource("1"));
Path walPath = walQueue.peek();