You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2021/04/28 23:47:03 UTC

[hbase] branch master updated: HBASE-25779 HRegionServer#compactSplitThread should be private

This is an automated email from the ASF dual-hosted git repository.

ndimiduk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new b061b0c  HBASE-25779 HRegionServer#compactSplitThread should be private
b061b0c is described below

commit b061b0c4ed43cef331d9e0a1d1c93779f2ae3ad7
Author: Nick Dimiduk <nd...@apache.org>
AuthorDate: Thu Apr 15 16:41:38 2021 -0700

    HBASE-25779 HRegionServer#compactSplitThread should be private
    
    Minor refactor. Make the `compactSplitThread` member field of `HRegionServer` private, and gate
    all access through the getter method.
    
    Signed-off-by: Yulin Niu <ni...@apache.org>
    Signed-off-by: Pankaj Kumar <pa...@apache.org>
---
 .../hadoop/hbase/regionserver/HRegionServer.java   |  2 +-
 .../hadoop/hbase/regionserver/MemStoreFlusher.java | 10 ++++----
 .../MetricsRegionServerWrapperImpl.java            | 27 +++++++---------------
 .../hadoop/hbase/regionserver/RSDumpServlet.java   | 19 +++++++--------
 .../hadoop/hbase/regionserver/RSRpcServices.java   | 12 ++++++----
 .../hbase/io/encoding/TestChangingEncoding.java    |  4 ++--
 .../io/encoding/TestLoadAndSwitchEncodeOnDisk.java |  2 +-
 .../hbase/regionserver/TestCompactSplitThread.java | 22 +++++++++---------
 .../TestRegionServerOnlineConfigChange.java        | 18 +++++++--------
 .../TestCompactionWithThroughputController.java    |  6 ++---
 10 files changed, 58 insertions(+), 64 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 3386e07..36fdc77 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -300,7 +300,7 @@ public class HRegionServer extends Thread implements
   private boolean sameReplicationSourceAndSink;
 
   // Compactions
-  public CompactSplit compactSplitThread;
+  private CompactSplit compactSplitThread;
 
   /**
    * Map of regions currently being served by this region server. Key is the
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index 5e62f7a..17927ad 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -581,9 +581,10 @@ class MemStoreFlusher implements FlushRequester {
           LOG.warn("{} has too many store files({}); delaying flush up to {} ms",
               region.getRegionInfo().getEncodedName(), getStoreFileCount(region),
               this.blockingWaitTime);
-          if (!this.server.compactSplitThread.requestSplit(region)) {
+          final CompactSplit compactSplitThread = server.getCompactSplitThread();
+          if (!compactSplitThread.requestSplit(region)) {
             try {
-              this.server.compactSplitThread.requestSystemCompaction(region,
+              compactSplitThread.requestSystemCompaction(region,
                 Thread.currentThread().getName());
             } catch (IOException e) {
               e = e instanceof RemoteException ?
@@ -630,6 +631,7 @@ class MemStoreFlusher implements FlushRequester {
 
     tracker.beforeExecution();
     lock.readLock().lock();
+    final CompactSplit compactSplitThread = server.getCompactSplitThread();
     try {
       notifyFlushRequest(region, emergencyFlush);
       FlushResult flushResult = region.flushcache(families, false, tracker);
@@ -637,9 +639,9 @@ class MemStoreFlusher implements FlushRequester {
       // We just want to check the size
       boolean shouldSplit = region.checkSplit().isPresent();
       if (shouldSplit) {
-        this.server.compactSplitThread.requestSplit(region);
+        compactSplitThread.requestSplit(region);
       } else if (shouldCompact) {
-        server.compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName());
+        compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName());
       }
     } catch (DroppedSnapshotException ex) {
       // Cache flush can fail in a few places. If it fails in a critical
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
index 5a358bc..3da069d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
@@ -235,37 +235,26 @@ class MetricsRegionServerWrapperImpl
 
   @Override
   public int getSplitQueueSize() {
-    if (this.regionServer.compactSplitThread == null) {
-      return 0;
-    }
-    return this.regionServer.compactSplitThread.getSplitQueueSize();
+    final CompactSplit compactSplit = regionServer.getCompactSplitThread();
+    return compactSplit == null ? 0 : compactSplit.getSplitQueueSize();
   }
 
   @Override
   public int getCompactionQueueSize() {
-    //The thread could be zero.  if so assume there is no queue.
-    if (this.regionServer.compactSplitThread == null) {
-      return 0;
-    }
-    return this.regionServer.compactSplitThread.getCompactionQueueSize();
+    final CompactSplit compactSplit = regionServer.getCompactSplitThread();
+    return compactSplit == null ? 0 : compactSplit.getCompactionQueueSize();
   }
 
   @Override
   public int getSmallCompactionQueueSize() {
-    //The thread could be zero.  if so assume there is no queue.
-    if (this.regionServer.compactSplitThread == null) {
-      return 0;
-    }
-    return this.regionServer.compactSplitThread.getSmallCompactionQueueSize();
+    final CompactSplit compactSplit = regionServer.getCompactSplitThread();
+    return compactSplit == null ? 0 : compactSplit.getSmallCompactionQueueSize();
   }
 
   @Override
   public int getLargeCompactionQueueSize() {
-    //The thread could be zero.  if so assume there is no queue.
-    if (this.regionServer.compactSplitThread == null) {
-      return 0;
-    }
-    return this.regionServer.compactSplitThread.getLargeCompactionQueueSize();
+    final CompactSplit compactSplit = regionServer.getCompactSplitThread();
+    return compactSplit == null ? 0 : compactSplit.getLargeCompactionQueueSize();
   }
 
   @Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java
index 1153467..56b72e5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java
@@ -124,19 +124,20 @@ public class RSDumpServlet extends StateDumpServlet {
     }
   }
 
-  public static void dumpQueue(HRegionServer hrs, PrintWriter out)
-      throws IOException {
-    if (hrs.compactSplitThread != null) {
+  public static void dumpQueue(HRegionServer hrs, PrintWriter out) {
+    final CompactSplit compactSplit = hrs.getCompactSplitThread();
+    if (compactSplit != null) {
       // 1. Print out Compaction/Split Queue
-      out.println("Compaction/Split Queue summary: "
-          + hrs.compactSplitThread.toString() );
-      out.println(hrs.compactSplitThread.dumpQueue());
+      out.println("Compaction/Split Queue summary: " + compactSplit);
+      out.println(compactSplit.dumpQueue());
     }
 
-    if (hrs.getMemStoreFlusher() != null) {
+    final MemStoreFlusher memStoreFlusher = hrs.getMemStoreFlusher();
+    if (memStoreFlusher != null) {
       // 2. Print out flush Queue
-      out.println("\nFlush Queue summary: " + hrs.getMemStoreFlusher().toString());
-      out.println(hrs.getMemStoreFlusher().dumpQueue());
+      out.println();
+      out.println("Flush Queue summary: " + memStoreFlusher);
+      out.println(memStoreFlusher.dumpQueue());
     }
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 9d9d5c7..79d77c7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -1711,17 +1711,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   @Override
   public CompactionSwitchResponse compactionSwitch(RpcController controller,
       CompactionSwitchRequest request) throws ServiceException {
+    final CompactSplit compactSplitThread = regionServer.getCompactSplitThread();
     try {
       checkOpen();
       requestCount.increment();
-      boolean prevState = regionServer.compactSplitThread.isCompactionsEnabled();
+      boolean prevState = compactSplitThread.isCompactionsEnabled();
       CompactionSwitchResponse response =
           CompactionSwitchResponse.newBuilder().setPrevState(prevState).build();
       if (prevState == request.getEnabled()) {
         // passed in requested state is same as current state. No action required
         return response;
       }
-      regionServer.compactSplitThread.switchCompaction(request.getEnabled());
+      compactSplitThread.switchCompaction(request.getEnabled());
       return response;
     } catch (IOException ie) {
       throw new ServiceException(ie);
@@ -1764,7 +1765,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         }
         boolean compactionNeeded = flushResult.isCompactionNeeded();
         if (compactionNeeded) {
-          regionServer.compactSplitThread.requestSystemCompaction(region,
+          regionServer.getCompactSplitThread().requestSystemCompaction(region,
             "Compaction through user triggered flush");
         }
         builder.setFlushed(flushResult.isFlushSucceeded());
@@ -1880,6 +1881,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder();
     requestCount.increment();
     if (clearCompactionQueues.compareAndSet(false,true)) {
+      final CompactSplit compactSplitThread = regionServer.getCompactSplitThread();
       try {
         checkOpen();
         regionServer.getRegionServerCoprocessorHost().preClearCompactionQueues();
@@ -1887,10 +1889,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           LOG.debug("clear " + queueName + " compaction queue");
           switch (queueName) {
             case "long":
-              regionServer.compactSplitThread.clearLongCompactionsQueue();
+              compactSplitThread.clearLongCompactionsQueue();
               break;
             case "short":
-              regionServer.compactSplitThread.clearShortCompactionsQueue();
+              compactSplitThread.clearShortCompactionsQueue();
               break;
             default:
               LOG.warn("Unknown queue name " + queueName);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestChangingEncoding.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestChangingEncoding.java
index c3f4e1c..c6b8ddc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestChangingEncoding.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestChangingEncoding.java
@@ -240,11 +240,11 @@ public class TestChangingEncoding {
     final long maxWaitime = System.currentTimeMillis() + 500;
     boolean cont;
     do {
-      cont = rs.compactSplitThread.getCompactionQueueSize() == 0;
+      cont = rs.getCompactSplitThread().getCompactionQueueSize() == 0;
       Threads.sleep(1);
     } while (cont && System.currentTimeMillis() < maxWaitime);
 
-    while (rs.compactSplitThread.getCompactionQueueSize() > 0) {
+    while (rs.getCompactSplitThread().getCompactionQueueSize() > 0) {
       Threads.sleep(1);
     }
     LOG.debug("Compaction queue size reached 0, continuing");
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java
index 2d04fb3..7ef8052 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java
@@ -102,7 +102,7 @@ public class TestLoadAndSwitchEncodeOnDisk extends TestMiniClusterLoadSequential
     // Wait until compaction completes
     Threads.sleepWithoutInterrupt(5000);
     HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
-    while (rs.compactSplitThread.getCompactionQueueSize() > 0) {
+    while (rs.getCompactSplitThread().getCompactionQueueSize() > 0) {
       Threads.sleep(50);
     }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
index f191e48..06fbf89 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
@@ -120,39 +120,39 @@ public class TestCompactSplitThread {
       HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
 
       // check initial configuration of thread pool sizes
-      assertEquals(3, regionServer.compactSplitThread.getLargeCompactionThreadNum());
-      assertEquals(4, regionServer.compactSplitThread.getSmallCompactionThreadNum());
-      assertEquals(5, regionServer.compactSplitThread.getSplitThreadNum());
+      assertEquals(3, regionServer.getCompactSplitThread().getLargeCompactionThreadNum());
+      assertEquals(4, regionServer.getCompactSplitThread().getSmallCompactionThreadNum());
+      assertEquals(5, regionServer.getCompactSplitThread().getSplitThreadNum());
 
       // change bigger configurations and do online update
       conf.setInt(CompactSplit.LARGE_COMPACTION_THREADS, 4);
       conf.setInt(CompactSplit.SMALL_COMPACTION_THREADS, 5);
       conf.setInt(CompactSplit.SPLIT_THREADS, 6);
       try {
-        regionServer.compactSplitThread.onConfigurationChange(conf);
+        regionServer.getCompactSplitThread().onConfigurationChange(conf);
       } catch (IllegalArgumentException iae) {
         Assert.fail("Update bigger configuration failed!");
       }
 
       // check again after online update
-      assertEquals(4, regionServer.compactSplitThread.getLargeCompactionThreadNum());
-      assertEquals(5, regionServer.compactSplitThread.getSmallCompactionThreadNum());
-      assertEquals(6, regionServer.compactSplitThread.getSplitThreadNum());
+      assertEquals(4, regionServer.getCompactSplitThread().getLargeCompactionThreadNum());
+      assertEquals(5, regionServer.getCompactSplitThread().getSmallCompactionThreadNum());
+      assertEquals(6, regionServer.getCompactSplitThread().getSplitThreadNum());
 
       // change smaller configurations and do online update
       conf.setInt(CompactSplit.LARGE_COMPACTION_THREADS, 2);
       conf.setInt(CompactSplit.SMALL_COMPACTION_THREADS, 3);
       conf.setInt(CompactSplit.SPLIT_THREADS, 4);
       try {
-        regionServer.compactSplitThread.onConfigurationChange(conf);
+        regionServer.getCompactSplitThread().onConfigurationChange(conf);
       } catch (IllegalArgumentException iae) {
         Assert.fail("Update smaller configuration failed!");
       }
 
       // check again after online update
-      assertEquals(2, regionServer.compactSplitThread.getLargeCompactionThreadNum());
-      assertEquals(3, regionServer.compactSplitThread.getSmallCompactionThreadNum());
-      assertEquals(4, regionServer.compactSplitThread.getSplitThreadNum());
+      assertEquals(2, regionServer.getCompactSplitThread().getLargeCompactionThreadNum());
+      assertEquals(3, regionServer.getCompactSplitThread().getSmallCompactionThreadNum());
+      assertEquals(4, regionServer.getCompactSplitThread().getSplitThreadNum());
     } finally {
       conn.close();
     }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java
index 79c043e..b5c55be 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
@@ -102,26 +103,25 @@ public class TestRegionServerOnlineConfigChange {
 
   /**
    * Check if the number of compaction threads changes online
-   * @throws IOException
    */
   @Test
-  public void testNumCompactionThreadsOnlineChange() throws IOException {
-    assertTrue(rs1.compactSplitThread != null);
+  public void testNumCompactionThreadsOnlineChange() {
+    assertNotNull(rs1.getCompactSplitThread());
     int newNumSmallThreads =
-            rs1.compactSplitThread.getSmallCompactionThreadNum() + 1;
+      rs1.getCompactSplitThread().getSmallCompactionThreadNum() + 1;
     int newNumLargeThreads =
-            rs1.compactSplitThread.getLargeCompactionThreadNum() + 1;
+      rs1.getCompactSplitThread().getLargeCompactionThreadNum() + 1;
 
     conf.setInt("hbase.regionserver.thread.compaction.small",
-            newNumSmallThreads);
+      newNumSmallThreads);
     conf.setInt("hbase.regionserver.thread.compaction.large",
-            newNumLargeThreads);
+      newNumLargeThreads);
     rs1.getConfigurationManager().notifyAllObservers(conf);
 
     assertEquals(newNumSmallThreads,
-                  rs1.compactSplitThread.getSmallCompactionThreadNum());
+      rs1.getCompactSplitThread().getSmallCompactionThreadNum());
     assertEquals(newNumLargeThreads,
-                  rs1.compactSplitThread.getLargeCompactionThreadNum());
+      rs1.getCompactSplitThread().getLargeCompactionThreadNum());
   }
 
   /**
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java
index 2345dc9..9198bd5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java
@@ -208,7 +208,7 @@ public class TestCompactionWithThroughputController {
       TEST_UTIL.waitTableAvailable(tableName);
       HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
       PressureAwareCompactionThroughputController throughputController =
-          (PressureAwareCompactionThroughputController) regionServer.compactSplitThread
+          (PressureAwareCompactionThroughputController) regionServer.getCompactSplitThread()
               .getCompactionThroughputController();
       assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
       Table table = conn.getTable(tableName);
@@ -234,9 +234,9 @@ public class TestCompactionWithThroughputController {
 
       conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
         NoLimitThroughputController.class.getName());
-      regionServer.compactSplitThread.onConfigurationChange(conf);
+      regionServer.getCompactSplitThread().onConfigurationChange(conf);
       assertTrue(throughputController.isStopped());
-      assertTrue(regionServer.compactSplitThread.getCompactionThroughputController()
+      assertTrue(regionServer.getCompactSplitThread().getCompactionThroughputController()
         instanceof NoLimitThroughputController);
     } finally {
       conn.close();