You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2021/04/28 23:55:10 UTC
[hbase] branch branch-2 updated: HBASE-25779
HRegionServer#compactSplitThread should be private
This is an automated email from the ASF dual-hosted git repository.
ndimiduk pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 3a01941 HBASE-25779 HRegionServer#compactSplitThread should be private
3a01941 is described below
commit 3a01941963c9a2541534fd9a39b8893b07f11618
Author: Nick Dimiduk <nd...@apache.org>
AuthorDate: Thu Apr 15 16:41:38 2021 -0700
HBASE-25779 HRegionServer#compactSplitThread should be private
Minor refactor. Make the `compactSplitThread` member field of `HRegionServer` private, and gate
all access through the getter method.
Signed-off-by: Yulin Niu <ni...@apache.org>
Signed-off-by: Pankaj Kumar <pa...@apache.org>
---
.../hadoop/hbase/regionserver/HRegionServer.java | 2 +-
.../hadoop/hbase/regionserver/MemStoreFlusher.java | 10 ++++----
.../MetricsRegionServerWrapperImpl.java | 27 +++++++---------------
.../hadoop/hbase/regionserver/RSDumpServlet.java | 19 +++++++--------
.../hadoop/hbase/regionserver/RSRpcServices.java | 12 ++++++----
.../hbase/io/encoding/TestChangingEncoding.java | 4 ++--
.../io/encoding/TestLoadAndSwitchEncodeOnDisk.java | 2 +-
.../hbase/regionserver/TestCompactSplitThread.java | 22 +++++++++---------
.../TestRegionServerOnlineConfigChange.java | 19 ++++++++-------
.../TestCompactionWithThroughputController.java | 6 ++---
10 files changed, 58 insertions(+), 65 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index d6981e2..8c23150 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -304,7 +304,7 @@ public class HRegionServer extends Thread implements
private ReplicationSinkService replicationSinkHandler;
// Compactions
- public CompactSplit compactSplitThread;
+ private CompactSplit compactSplitThread;
/**
* Map of regions currently being served by this region server. Key is the
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index a35a0f1..3f1ea99 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -575,9 +575,10 @@ class MemStoreFlusher implements FlushRequester {
LOG.warn("{} has too many store files({}); delaying flush up to {} ms",
region.getRegionInfo().getEncodedName(), getStoreFileCount(region),
this.blockingWaitTime);
- if (!this.server.compactSplitThread.requestSplit(region)) {
+ final CompactSplit compactSplitThread = server.getCompactSplitThread();
+ if (!compactSplitThread.requestSplit(region)) {
try {
- this.server.compactSplitThread.requestSystemCompaction(region,
+ compactSplitThread.requestSystemCompaction(region,
Thread.currentThread().getName());
} catch (IOException e) {
e = e instanceof RemoteException ?
@@ -624,6 +625,7 @@ class MemStoreFlusher implements FlushRequester {
tracker.beforeExecution();
lock.readLock().lock();
+ final CompactSplit compactSplitThread = server.getCompactSplitThread();
try {
notifyFlushRequest(region, emergencyFlush);
FlushResult flushResult = region.flushcache(families, false, tracker);
@@ -631,9 +633,9 @@ class MemStoreFlusher implements FlushRequester {
// We just want to check the size
boolean shouldSplit = region.checkSplit().isPresent();
if (shouldSplit) {
- this.server.compactSplitThread.requestSplit(region);
+ compactSplitThread.requestSplit(region);
} else if (shouldCompact) {
- server.compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName());
+ compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName());
}
} catch (DroppedSnapshotException ex) {
// Cache flush can fail in a few places. If it fails in a critical
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
index ae4a0a5..b5466e7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
@@ -234,37 +234,26 @@ class MetricsRegionServerWrapperImpl
@Override
public int getSplitQueueSize() {
- if (this.regionServer.compactSplitThread == null) {
- return 0;
- }
- return this.regionServer.compactSplitThread.getSplitQueueSize();
+ final CompactSplit compactSplit = regionServer.getCompactSplitThread();
+ return compactSplit == null ? 0 : compactSplit.getSplitQueueSize();
}
@Override
public int getCompactionQueueSize() {
- //The thread could be zero. if so assume there is no queue.
- if (this.regionServer.compactSplitThread == null) {
- return 0;
- }
- return this.regionServer.compactSplitThread.getCompactionQueueSize();
+ final CompactSplit compactSplit = regionServer.getCompactSplitThread();
+ return compactSplit == null ? 0 : compactSplit.getCompactionQueueSize();
}
@Override
public int getSmallCompactionQueueSize() {
- //The thread could be zero. if so assume there is no queue.
- if (this.regionServer.compactSplitThread == null) {
- return 0;
- }
- return this.regionServer.compactSplitThread.getSmallCompactionQueueSize();
+ final CompactSplit compactSplit = regionServer.getCompactSplitThread();
+ return compactSplit == null ? 0 : compactSplit.getSmallCompactionQueueSize();
}
@Override
public int getLargeCompactionQueueSize() {
- //The thread could be zero. if so assume there is no queue.
- if (this.regionServer.compactSplitThread == null) {
- return 0;
- }
- return this.regionServer.compactSplitThread.getLargeCompactionQueueSize();
+ final CompactSplit compactSplit = regionServer.getCompactSplitThread();
+ return compactSplit == null ? 0 : compactSplit.getLargeCompactionQueueSize();
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java
index 1153467..56b72e5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java
@@ -124,19 +124,20 @@ public class RSDumpServlet extends StateDumpServlet {
}
}
- public static void dumpQueue(HRegionServer hrs, PrintWriter out)
- throws IOException {
- if (hrs.compactSplitThread != null) {
+ public static void dumpQueue(HRegionServer hrs, PrintWriter out) {
+ final CompactSplit compactSplit = hrs.getCompactSplitThread();
+ if (compactSplit != null) {
// 1. Print out Compaction/Split Queue
- out.println("Compaction/Split Queue summary: "
- + hrs.compactSplitThread.toString() );
- out.println(hrs.compactSplitThread.dumpQueue());
+ out.println("Compaction/Split Queue summary: " + compactSplit);
+ out.println(compactSplit.dumpQueue());
}
- if (hrs.getMemStoreFlusher() != null) {
+ final MemStoreFlusher memStoreFlusher = hrs.getMemStoreFlusher();
+ if (memStoreFlusher != null) {
// 2. Print out flush Queue
- out.println("\nFlush Queue summary: " + hrs.getMemStoreFlusher().toString());
- out.println(hrs.getMemStoreFlusher().dumpQueue());
+ out.println();
+ out.println("Flush Queue summary: " + memStoreFlusher);
+ out.println(memStoreFlusher.dumpQueue());
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index a5cdccb..0ef8317 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -1707,17 +1707,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
@Override
public CompactionSwitchResponse compactionSwitch(RpcController controller,
CompactionSwitchRequest request) throws ServiceException {
+ final CompactSplit compactSplitThread = regionServer.getCompactSplitThread();
try {
checkOpen();
requestCount.increment();
- boolean prevState = regionServer.compactSplitThread.isCompactionsEnabled();
+ boolean prevState = compactSplitThread.isCompactionsEnabled();
CompactionSwitchResponse response =
CompactionSwitchResponse.newBuilder().setPrevState(prevState).build();
if (prevState == request.getEnabled()) {
// passed in requested state is same as current state. No action required
return response;
}
- regionServer.compactSplitThread.switchCompaction(request.getEnabled());
+ compactSplitThread.switchCompaction(request.getEnabled());
return response;
} catch (IOException ie) {
throw new ServiceException(ie);
@@ -1760,7 +1761,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
boolean compactionNeeded = flushResult.isCompactionNeeded();
if (compactionNeeded) {
- regionServer.compactSplitThread.requestSystemCompaction(region,
+ regionServer.getCompactSplitThread().requestSystemCompaction(region,
"Compaction through user triggered flush");
}
builder.setFlushed(flushResult.isFlushSucceeded());
@@ -1876,6 +1877,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder();
requestCount.increment();
if (clearCompactionQueues.compareAndSet(false,true)) {
+ final CompactSplit compactSplitThread = regionServer.getCompactSplitThread();
try {
checkOpen();
regionServer.getRegionServerCoprocessorHost().preClearCompactionQueues();
@@ -1883,10 +1885,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
LOG.debug("clear " + queueName + " compaction queue");
switch (queueName) {
case "long":
- regionServer.compactSplitThread.clearLongCompactionsQueue();
+ compactSplitThread.clearLongCompactionsQueue();
break;
case "short":
- regionServer.compactSplitThread.clearShortCompactionsQueue();
+ compactSplitThread.clearShortCompactionsQueue();
break;
default:
LOG.warn("Unknown queue name " + queueName);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestChangingEncoding.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestChangingEncoding.java
index 38313c4..514f966 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestChangingEncoding.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestChangingEncoding.java
@@ -238,11 +238,11 @@ public class TestChangingEncoding {
final long maxWaitime = System.currentTimeMillis() + 500;
boolean cont;
do {
- cont = rs.compactSplitThread.getCompactionQueueSize() == 0;
+ cont = rs.getCompactSplitThread().getCompactionQueueSize() == 0;
Threads.sleep(1);
} while (cont && System.currentTimeMillis() < maxWaitime);
- while (rs.compactSplitThread.getCompactionQueueSize() > 0) {
+ while (rs.getCompactSplitThread().getCompactionQueueSize() > 0) {
Threads.sleep(1);
}
LOG.debug("Compaction queue size reached 0, continuing");
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java
index 43fa0e3..db5c254 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java
@@ -102,7 +102,7 @@ public class TestLoadAndSwitchEncodeOnDisk extends TestMiniClusterLoadSequential
// Wait until compaction completes
Threads.sleepWithoutInterrupt(5000);
HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
- while (rs.compactSplitThread.getCompactionQueueSize() > 0) {
+ while (rs.getCompactSplitThread().getCompactionQueueSize() > 0) {
Threads.sleep(50);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
index bb52e19..3467da1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
@@ -119,39 +119,39 @@ public class TestCompactSplitThread {
HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
// check initial configuration of thread pool sizes
- assertEquals(3, regionServer.compactSplitThread.getLargeCompactionThreadNum());
- assertEquals(4, regionServer.compactSplitThread.getSmallCompactionThreadNum());
- assertEquals(5, regionServer.compactSplitThread.getSplitThreadNum());
+ assertEquals(3, regionServer.getCompactSplitThread().getLargeCompactionThreadNum());
+ assertEquals(4, regionServer.getCompactSplitThread().getSmallCompactionThreadNum());
+ assertEquals(5, regionServer.getCompactSplitThread().getSplitThreadNum());
// change bigger configurations and do online update
conf.setInt(CompactSplit.LARGE_COMPACTION_THREADS, 4);
conf.setInt(CompactSplit.SMALL_COMPACTION_THREADS, 5);
conf.setInt(CompactSplit.SPLIT_THREADS, 6);
try {
- regionServer.compactSplitThread.onConfigurationChange(conf);
+ regionServer.getCompactSplitThread().onConfigurationChange(conf);
} catch (IllegalArgumentException iae) {
Assert.fail("Update bigger configuration failed!");
}
// check again after online update
- assertEquals(4, regionServer.compactSplitThread.getLargeCompactionThreadNum());
- assertEquals(5, regionServer.compactSplitThread.getSmallCompactionThreadNum());
- assertEquals(6, regionServer.compactSplitThread.getSplitThreadNum());
+ assertEquals(4, regionServer.getCompactSplitThread().getLargeCompactionThreadNum());
+ assertEquals(5, regionServer.getCompactSplitThread().getSmallCompactionThreadNum());
+ assertEquals(6, regionServer.getCompactSplitThread().getSplitThreadNum());
// change smaller configurations and do online update
conf.setInt(CompactSplit.LARGE_COMPACTION_THREADS, 2);
conf.setInt(CompactSplit.SMALL_COMPACTION_THREADS, 3);
conf.setInt(CompactSplit.SPLIT_THREADS, 4);
try {
- regionServer.compactSplitThread.onConfigurationChange(conf);
+ regionServer.getCompactSplitThread().onConfigurationChange(conf);
} catch (IllegalArgumentException iae) {
Assert.fail("Update smaller configuration failed!");
}
// check again after online update
- assertEquals(2, regionServer.compactSplitThread.getLargeCompactionThreadNum());
- assertEquals(3, regionServer.compactSplitThread.getSmallCompactionThreadNum());
- assertEquals(4, regionServer.compactSplitThread.getSplitThreadNum());
+ assertEquals(2, regionServer.getCompactSplitThread().getLargeCompactionThreadNum());
+ assertEquals(3, regionServer.getCompactSplitThread().getSmallCompactionThreadNum());
+ assertEquals(4, regionServer.getCompactSplitThread().getSplitThreadNum());
} finally {
conn.close();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java
index 7eedafc..88e1bc7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
@@ -91,26 +91,25 @@ public class TestRegionServerOnlineConfigChange {
/**
* Check if the number of compaction threads changes online
- * @throws IOException
*/
@Test
- public void testNumCompactionThreadsOnlineChange() throws IOException {
- assertTrue(rs1.compactSplitThread != null);
+ public void testNumCompactionThreadsOnlineChange() {
+ assertNotNull(rs1.getCompactSplitThread());
int newNumSmallThreads =
- rs1.compactSplitThread.getSmallCompactionThreadNum() + 1;
+ rs1.getCompactSplitThread().getSmallCompactionThreadNum() + 1;
int newNumLargeThreads =
- rs1.compactSplitThread.getLargeCompactionThreadNum() + 1;
+ rs1.getCompactSplitThread().getLargeCompactionThreadNum() + 1;
conf.setInt("hbase.regionserver.thread.compaction.small",
- newNumSmallThreads);
+ newNumSmallThreads);
conf.setInt("hbase.regionserver.thread.compaction.large",
- newNumLargeThreads);
+ newNumLargeThreads);
rs1.getConfigurationManager().notifyAllObservers(conf);
assertEquals(newNumSmallThreads,
- rs1.compactSplitThread.getSmallCompactionThreadNum());
+ rs1.getCompactSplitThread().getSmallCompactionThreadNum());
assertEquals(newNumLargeThreads,
- rs1.compactSplitThread.getLargeCompactionThreadNum());
+ rs1.getCompactSplitThread().getLargeCompactionThreadNum());
}
/**
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java
index 2345dc9..9198bd5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java
@@ -208,7 +208,7 @@ public class TestCompactionWithThroughputController {
TEST_UTIL.waitTableAvailable(tableName);
HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
PressureAwareCompactionThroughputController throughputController =
- (PressureAwareCompactionThroughputController) regionServer.compactSplitThread
+ (PressureAwareCompactionThroughputController) regionServer.getCompactSplitThread()
.getCompactionThroughputController();
assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
Table table = conn.getTable(tableName);
@@ -234,9 +234,9 @@ public class TestCompactionWithThroughputController {
conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
NoLimitThroughputController.class.getName());
- regionServer.compactSplitThread.onConfigurationChange(conf);
+ regionServer.getCompactSplitThread().onConfigurationChange(conf);
assertTrue(throughputController.isStopped());
- assertTrue(regionServer.compactSplitThread.getCompactionThroughputController()
+ assertTrue(regionServer.getCompactSplitThread().getCompactionThroughputController()
instanceof NoLimitThroughputController);
} finally {
conn.close();