You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ni...@apache.org on 2021/07/14 15:21:18 UTC

[hbase] branch HBASE-25714 updated: HBASE-25830 HBaseCluster support CompactionServer for UTs (addendum) (#3464)

This is an automated email from the ASF dual-hosted git repository.

niuyulin pushed a commit to branch HBASE-25714
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-25714 by this push:
     new 27ec80d  HBASE-25830 HBaseCluster support CompactionServer for UTs (addendum) (#3464)
27ec80d is described below

commit 27ec80d6e321ff04dcc8424b8e6e5ca0ba7374d0
Author: niuyulin <yu...@gmail.com>
AuthorDate: Wed Jul 14 23:20:42 2021 +0800

    HBASE-25830 HBaseCluster support CompactionServer for UTs (addendum) (#3464)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../org/apache/hadoop/hbase/LocalHBaseCluster.java |  57 ++++++++++-
 .../apache/hadoop/hbase/util/JVMClusterUtil.java   | 113 ++++++++++++---------
 .../org/apache/hadoop/hbase/MiniHBaseCluster.java  |  41 ++++++++
 .../compactionserver/TestCompactionServer.java     |  33 ++++--
 4 files changed, 186 insertions(+), 58 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
index ec55182..b856c7a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
@@ -168,6 +168,19 @@ public class LocalHBaseCluster {
         LOG.debug("Setting RS InfoServer Port to random.");
         conf.set(HConstants.REGIONSERVER_INFO_PORT, "0");
       }
+      if (conf.getInt(HConstants.COMPACTION_SERVER_PORT, HConstants.DEFAULT_COMPACTION_SERVER_PORT)
+        == HConstants.DEFAULT_COMPACTION_SERVER_PORT) {
+        LOG.debug("Setting CompactionServer Port to random.");
+        conf.set(HConstants.COMPACTION_SERVER_PORT, "0");
+      }
+      // treat info ports special; expressly don't change '-1' (keep off)
+      // in case we make that the default behavior.
+      if (conf.getInt(HConstants.COMPACTION_SERVER_INFO_PORT, 0) != -1 && conf.getInt(
+        HConstants.COMPACTION_SERVER_INFO_PORT, HConstants.DEFAULT_COMPACTION_SERVER_INFOPORT)
+        == HConstants.DEFAULT_COMPACTION_SERVER_INFOPORT) {
+        LOG.debug("Setting CS InfoServer Port to random.");
+        conf.set(HConstants.COMPACTION_SERVER_INFO_PORT, "0");
+      }
       if (conf.getInt(HConstants.MASTER_INFO_PORT, 0) != -1 &&
           conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT)
           == HConstants.DEFAULT_MASTER_INFOPORT) {
@@ -309,6 +322,17 @@ public class LocalHBaseCluster {
   }
 
   /**
+   * Wait for the specified compaction server to stop. Removes this thread from list of running
+   * threads.
+   * @return Name of compaction server that just went down.
+   */
+  public String waitOnCompactionServer(int serverNumber) {
+    JVMClusterUtil.CompactionServerThread regionServerThread =
+        this.compactionServerThreads.get(serverNumber);
+    return waitOnCompactionServer(regionServerThread);
+  }
+
+  /**
    * Wait for the specified region server to stop. Removes this thread from list of running threads.
    * @return Name of region server that just went down.
    */
@@ -327,6 +351,25 @@ public class LocalHBaseCluster {
   }
 
   /**
+   * Wait for the specified compaction server to stop. Removes this thread from list of running
+   * threads.
+   * @return Name of compaction server that just went down.
+   */
+  public String waitOnCompactionServer(JVMClusterUtil.CompactionServerThread cst) {
+    while (cst.isAlive()) {
+      try {
+        LOG.info("Waiting on " + cst.getCompactionServer().toString());
+        cst.join();
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted while waiting for {} to finish. Retrying join", cst.getName(), e);
+        Thread.currentThread().interrupt();
+      }
+    }
+    compactionServerThreads.remove(cst);
+    return cst.getName();
+  }
+
+  /**
    * @return the HMaster thread
    */
   public HMaster getMaster(int serverNumber) {
@@ -427,6 +470,18 @@ public class LocalHBaseCluster {
         }
       }
     }
+    if (this.compactionServerThreads != null) {
+      for(Thread t: this.compactionServerThreads) {
+        if (t.isAlive()) {
+          try {
+            Threads.threadDumpingIsAlive(t);
+          } catch (InterruptedException e) {
+            LOG.debug("Interrupted", e);
+          }
+        }
+      }
+    }
+
   }
 
   @SuppressWarnings("unchecked")
@@ -491,7 +546,7 @@ public class LocalHBaseCluster {
    * Shut down the mini HBase cluster
    */
   public void shutdown() {
-    JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads);
+    JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads, this.compactionServerThreads);
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
index 57a74a0..6791900 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
@@ -294,12 +294,64 @@ public class JVMClusterUtil {
 
   }
 
+  private static <T extends Thread> boolean shutdown(final List<T> servers,
+      boolean wasInterrupted) {
+    final long maxTime = System.currentTimeMillis() + 30 * 1000;
+    // first try nicely.
+    for (Thread t : servers) {
+      long now = System.currentTimeMillis();
+      if (t.isAlive() && !wasInterrupted && now < maxTime) {
+        try {
+          t.join(maxTime - now);
+        } catch (InterruptedException e) {
+          LOG.info(
+            "Got InterruptedException on shutdown - " + "not waiting anymore on region server ends",
+            e);
+          wasInterrupted = true; // someone wants us to speed up.
+        }
+      }
+    }
+
+    // Let's try to interrupt the remaining threads if any.
+    for (int i = 0; i < 100; ++i) {
+      boolean atLeastOneLiveServer = false;
+      for (Thread t : servers) {
+        if (t.isAlive()) {
+          atLeastOneLiveServer = true;
+          try {
+            LOG.warn("{} remaining, give one more chance before interrupting",
+              t.getClass().getSimpleName());
+            t.join(1000);
+          } catch (InterruptedException e) {
+            wasInterrupted = true;
+          }
+        }
+      }
+      if (!atLeastOneLiveServer) {
+        break;
+      }
+      for (Thread t : servers) {
+        if (t.isAlive()) {
+          LOG.warn(
+            "{} taking too long to stop, interrupting; thread dump " + "if > 3 attempts: i=" + i,
+            t.getClass().getSimpleName());
+          if (i > 3) {
+            Threads.printThreadInfo(System.out, "Thread dump " + t.getName());
+          }
+          t.interrupt();
+        }
+      }
+    }
+    return wasInterrupted;
+  }
+
   /**
    * @param masters
    * @param regionservers
    */
   public static void shutdown(final List<MasterThread> masters,
-      final List<RegionServerThread> regionservers) {
+      final List<RegionServerThread> regionservers,
+      final List<CompactionServerThread> compactionServers) {
     LOG.debug("Shutting down HBase Cluster");
     if (masters != null) {
       // Do backups first.
@@ -335,51 +387,18 @@ public class JVMClusterUtil {
       }
     }
     boolean wasInterrupted = false;
-    final long maxTime = System.currentTimeMillis() + 30 * 1000;
     if (regionservers != null) {
-      // first try nicely.
       for (RegionServerThread t : regionservers) {
         t.getRegionServer().stop("Shutdown requested");
       }
-      for (RegionServerThread t : regionservers) {
-        long now = System.currentTimeMillis();
-        if (t.isAlive() && !wasInterrupted && now < maxTime) {
-          try {
-            t.join(maxTime - now);
-          } catch (InterruptedException e) {
-            LOG.info("Got InterruptedException on shutdown - " +
-                "not waiting anymore on region server ends", e);
-            wasInterrupted = true; // someone wants us to speed up.
-          }
-        }
-      }
-
-      // Let's try to interrupt the remaining threads if any.
-      for (int i = 0; i < 100; ++i) {
-        boolean atLeastOneLiveServer = false;
-        for (RegionServerThread t : regionservers) {
-          if (t.isAlive()) {
-            atLeastOneLiveServer = true;
-            try {
-              LOG.warn("RegionServerThreads remaining, give one more chance before interrupting");
-              t.join(1000);
-            } catch (InterruptedException e) {
-              wasInterrupted = true;
-            }
-          }
-        }
-        if (!atLeastOneLiveServer) break;
-        for (RegionServerThread t : regionservers) {
-          if (t.isAlive()) {
-            LOG.warn("RegionServerThreads taking too long to stop, interrupting; thread dump "  +
-              "if > 3 attempts: i=" + i);
-            if (i > 3) {
-              Threads.printThreadInfo(System.out, "Thread dump " + t.getName());
-            }
-            t.interrupt();
-          }
-        }
+      wasInterrupted = shutdown(regionservers, wasInterrupted);
+    }
+    if (compactionServers != null) {
+      // first try nicely.
+      for (CompactionServerThread t : compactionServers) {
+        t.getCompactionServer().stop("Shutdown requested");
       }
+      wasInterrupted = shutdown(compactionServers, wasInterrupted);
     }
 
     if (masters != null) {
@@ -398,12 +417,14 @@ public class JVMClusterUtil {
         }
       }
     }
-    LOG.info("Shutdown of " +
-      ((masters != null) ? masters.size() : "0") + " master(s) and " +
-      ((regionservers != null) ? regionservers.size() : "0") +
-      " regionserver(s) " + (wasInterrupted ? "interrupted" : "complete"));
+    LOG.info("Shutdown of " + ((masters != null) ? masters.size() : "0") + " master(s) and "
+        + ((regionservers != null) ? regionservers.size() : "0") + " regionserver(s) "
+        + ((compactionServers != null) ? compactionServers.size() : "0") + " compactionServer(s) "
+        + (wasInterrupted ? "interrupted" : "complete")
+
+    );
 
-    if (wasInterrupted){
+    if (wasInterrupted) {
       Thread.currentThread().interrupt();
     }
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
index fc36ac8..05fad48 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
@@ -436,6 +436,25 @@ public class MiniHBaseCluster extends HBaseCluster {
     return startRegionServer(newConf);
   }
 
+  /**
+   * Starts a compaction server thread running
+   * @return New CompactionServerThread
+   */
+  public JVMClusterUtil.CompactionServerThread startCompactionServer() throws IOException {
+    final Configuration configuration = HBaseConfiguration.create(conf);
+    User rsUser = HBaseTestingUtility.getDifferentUser(configuration, ".hfs." + index++);
+    JVMClusterUtil.CompactionServerThread t = null;
+    try {
+      t = hbaseCluster.addCompactionServer(configuration,
+        hbaseCluster.getCompactionServers().size(), rsUser);
+      t.start();
+      t.waitForServerOnline();
+    } catch (InterruptedException ie) {
+      throw new IOException("Interrupted adding compactionserver to cluster", ie);
+    }
+    return t;
+  }
+
   private JVMClusterUtil.RegionServerThread startRegionServer(Configuration configuration)
       throws IOException {
     User rsUser =
@@ -503,6 +522,20 @@ public class MiniHBaseCluster extends HBaseCluster {
   }
 
   /**
+   * Shut down the specified compaction server cleanly
+   *
+   * @param serverNumber  Used as index into a list.
+   * @return the compaction server that was stopped
+   */
+  public JVMClusterUtil.CompactionServerThread stopCompactionServer(int serverNumber) {
+    JVMClusterUtil.CompactionServerThread server =
+      hbaseCluster.getCompactionServers().get(serverNumber);
+    LOG.info("Stopping " + server.toString());
+    server.getCompactionServer().stop("Stopping rs " + serverNumber);
+    return server;
+  }
+
+  /**
    * Shut down the specified region server cleanly
    *
    * @param serverNumber  Used as index into a list.
@@ -557,6 +590,14 @@ public class MiniHBaseCluster extends HBaseCluster {
     return this.hbaseCluster.waitOnRegionServer(serverNumber);
   }
 
+  /**
+   * Wait for the specified compaction server to stop. Removes this thread from list
+   * of running threads.
+   * @return Name of compaction server that just went down.
+   */
+  public String waitOnCompactionServer(final int serverNumber) {
+    return this.hbaseCluster.waitOnCompactionServer(serverNumber);
+  }
 
   /**
    * Starts a master thread running
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
index dfc17e4..6d0cccc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
@@ -95,8 +95,12 @@ public class TestCompactionServer {
 
   @Before
   public void before() throws Exception {
-    TEST_UTIL.createTable(TABLENAME, FAMILY);
+    TableDescriptor tableDescriptor =
+        TableDescriptorBuilder.newBuilder(TABLENAME).setCompactionOffloadEnabled(true).build();
+    TEST_UTIL.createTable(tableDescriptor, Bytes.toByteArrays(FAMILY),
+      TEST_UTIL.getConfiguration());
     TEST_UTIL.waitTableAvailable(TABLENAME);
+    COMPACTION_SERVER.requestCount.reset();
   }
 
   @After
@@ -169,9 +173,9 @@ public class TestCompactionServer {
     TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>());
     ColumnFamilyDescriptor cfd =
         ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(FAMILY)).setMaxVersions(3).build();
-    TableDescriptor modifiedtableDescriptor =
-        TableDescriptorBuilder.newBuilder(TABLENAME).setColumnFamily(cfd).build();
-    TEST_UTIL.getAdmin().modifyTable(modifiedtableDescriptor);
+    TableDescriptor modifiedTableDescriptor = TableDescriptorBuilder.newBuilder(TABLENAME)
+        .setColumnFamily(cfd).setCompactionOffloadEnabled(true).build();
+    TEST_UTIL.getAdmin().modifyTable(modifiedTableDescriptor);
     TEST_UTIL.waitTableAvailable(TABLENAME);
     doFillRecord(1, 500, RandomUtils.nextBytes(20));
     doFillRecord(1, 500, RandomUtils.nextBytes(20));
@@ -198,6 +202,8 @@ public class TestCompactionServer {
       return hFileCount == 1;
     });
 
+    // To ensure do compaction on compaction server
+    TEST_UTIL.waitFor(60000, () -> COMPACTION_SERVER.requestCount.sum() > 0);
     kVCount = 0;
     for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) {
       for (HStoreFile hStoreFile : region.getStore(Bytes.toBytes(FAMILY)).getStorefiles()) {
@@ -208,11 +214,11 @@ public class TestCompactionServer {
     verifyRecord(1, 500, true);
   }
 
-
   @Test
   public void testCompactionServerDown() throws Exception {
     TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>());
-    COMPACTION_SERVER.stop("test");
+    TEST_UTIL.getHBaseCluster().stopCompactionServer(0);
+    TEST_UTIL.getHBaseCluster().waitOnCompactionServer(0);
     TEST_UTIL.waitFor(60000,
       () -> MASTER.getCompactionOffloadManager().getOnlineServersList().size() == 0);
     doPutRecord(1, 1000, true);
@@ -232,6 +238,12 @@ public class TestCompactionServer {
       return hFile == 1;
     });
     verifyRecord(1, 1000, true);
+    TEST_UTIL.getHBaseCluster().startCompactionServer();
+    COMPACTION_SERVER = TEST_UTIL.getMiniHBaseCluster().getCompactionServerThreads().get(0)
+      .getCompactionServer();
+    COMPACTION_SERVER_NAME = COMPACTION_SERVER.getServerName();
+    TEST_UTIL.waitFor(60000,
+      () -> MASTER.getCompactionOffloadManager().getOnlineServersList().size() == 1);
   }
 
   @Test
@@ -277,20 +289,19 @@ public class TestCompactionServer {
 
     TableDescriptor htd =
         TableDescriptorBuilder.newBuilder(TEST_UTIL.getAdmin().getDescriptor(TABLENAME))
-            .setCompactionOffloadEnabled(true).build();
+            .setCompactionOffloadEnabled(false).build();
     TEST_UTIL.getAdmin().modifyTable(htd);
     TEST_UTIL.waitUntilAllRegionsAssigned(TABLENAME);
     // invoke compact
     TEST_UTIL.compact(TABLENAME, false);
-    TEST_UTIL.waitFor(6000, () -> COMPACTION_SERVER.requestCount.sum() > 0);
-    long requestCount = COMPACTION_SERVER.requestCount.sum();
+    TEST_UTIL.waitFor(6000, () -> COMPACTION_SERVER.requestCount.sum() == 0);
 
     htd = TableDescriptorBuilder.newBuilder(TEST_UTIL.getAdmin().getDescriptor(TABLENAME))
-        .setCompactionOffloadEnabled(false).build();
+        .setCompactionOffloadEnabled(true).build();
     TEST_UTIL.getAdmin().modifyTable(htd);
     TEST_UTIL.waitUntilAllRegionsAssigned(TABLENAME);
     // invoke compact
     TEST_UTIL.compact(TABLENAME, false);
-    TEST_UTIL.waitFor(6000, () -> COMPACTION_SERVER.requestCount.sum() == requestCount);
+    TEST_UTIL.waitFor(6000, () -> COMPACTION_SERVER.requestCount.sum() > 0);
   }
 }