You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/05/30 22:21:28 UTC

svn commit: r1344445 - in /hbase/branches/0.94/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/ipc/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/compactions/ main/resour...

Author: stack
Date: Wed May 30 20:21:28 2012
New Revision: 1344445

URL: http://svn.apache.org/viewvc?rev=1344445&view=rev
Log:
HBASE-6124 Backport HBASE-6033 to 0.90, 0.92 and 0.94

Added:
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionState.java
Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
    hbase/branches/0.94/src/main/resources/hbase-webapps/master/table.jsp

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1344445&r1=1344444&r2=1344445&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Wed May 30 20:21:28 2012
@@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.client;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.lang.reflect.UndeclaredThrowableException;
 import java.net.SocketTimeoutException;
 import java.util.Arrays;
 import java.util.LinkedList;
@@ -56,6 +55,7 @@ import org.apache.hadoop.hbase.client.Me
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
 import org.apache.hadoop.hbase.ipc.HMasterInterface;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest.CompactionState;
 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
 import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -1667,4 +1667,90 @@ public class HBaseAdmin implements Abort
       return null;
     }
   }
+
+  /**
+   * Get the current compaction state of a table or region.
+   * It could be in a major compaction, a minor compaction, both, or none.
+   *
+   * @param tableNameOrRegionName table or region to major compact
+   * @throws IOException if a remote or network exception occurs
+   * @throws InterruptedException
+   * @return the current compaction state
+   */
+  public CompactionState getCompactionState(final String tableNameOrRegionName)
+      throws IOException, InterruptedException {
+    return getCompactionState(Bytes.toBytes(tableNameOrRegionName));
+  }
+
+  /**
+   * Get the current compaction state of a table or region.
+   * It could be in a major compaction, a minor compaction, both, or none.
+   *
+   * @param tableNameOrRegionName table or region to major compact
+   * @throws IOException if a remote or network exception occurs
+   * @throws InterruptedException
+   * @return the current compaction state
+   */
+  public CompactionState getCompactionState(final byte [] tableNameOrRegionName)
+      throws IOException, InterruptedException {
+    CompactionState state = CompactionState.NONE;
+    CatalogTracker ct = getCatalogTracker();
+    try {
+      if (isRegionName(tableNameOrRegionName, ct)) {
+        Pair<HRegionInfo, ServerName> pair =
+          MetaReader.getRegion(ct, tableNameOrRegionName);
+        if (pair == null || pair.getSecond() == null) {
+          LOG.info("No server in .META. for " +
+            Bytes.toStringBinary(tableNameOrRegionName) + "; pair=" + pair);
+        } else {
+          ServerName sn = pair.getSecond();
+          HRegionInterface rs =
+            this.connection.getHRegionConnection(sn.getHostname(), sn.getPort());
+          return CompactionState.valueOf(
+            rs.getCompactionState(pair.getFirst().getRegionName()));
+        }
+      } else {
+        final String tableName = tableNameString(tableNameOrRegionName, ct);
+        List<Pair<HRegionInfo, ServerName>> pairs =
+          MetaReader.getTableRegionsAndLocations(ct, tableName);
+        for (Pair<HRegionInfo, ServerName> pair: pairs) {
+          if (pair.getFirst().isOffline()) continue;
+          if (pair.getSecond() == null) continue;
+          try {
+            ServerName sn = pair.getSecond();
+            HRegionInterface rs =
+              this.connection.getHRegionConnection(sn.getHostname(), sn.getPort());
+            switch (CompactionState.valueOf(
+              rs.getCompactionState(pair.getFirst().getRegionName()))) {
+            case MAJOR_AND_MINOR:
+              return CompactionState.MAJOR_AND_MINOR;
+            case MAJOR:
+              if (state == CompactionState.MINOR) {
+                return CompactionState.MAJOR_AND_MINOR;
+              }
+              state = CompactionState.MAJOR;
+              break;
+            case MINOR:
+              if (state == CompactionState.MAJOR) {
+                return CompactionState.MAJOR_AND_MINOR;
+              }
+              state = CompactionState.MINOR;
+              break;
+            case NONE:
+              default: // nothing, continue
+            }
+          } catch (NotServingRegionException e) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Trying to get compaction state of " +
+                pair.getFirst() + ": " +
+                StringUtils.stringifyException(e));
+            }
+          }
+        }
+      }
+    } finally {
+      cleanupCatalogTracker(ct);
+    }
+    return state;
+  }
 }

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1344445&r1=1344444&r2=1344445&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Wed May 30 20:21:28 2012
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.filter.Co
 import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
 import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest.CompactionState;
 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.security.TokenInfo;
@@ -621,6 +622,15 @@ public interface HRegionInterface extend
    */
   public byte[][] rollHLogWriter() throws IOException, FailedLogCloseException;
 
+  /**
+   * Get the current compaction state of the region.
+   *
+   * @param regionName the name of the region to check compaction statte.
+   * @return the compaction state name.
+   * @throws IOException exception
+   */
+  public String getCompactionState(final byte[] regionName) throws IOException;
+
   @Override
   public void stop(String why);
 }

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1344445&r1=1344444&r2=1344445&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed May 30 20:21:28 2012
@@ -122,6 +122,7 @@ import org.apache.hadoop.hbase.ipc.RpcSe
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
 import org.apache.hadoop.hbase.regionserver.handler.CloseRootHandler;
@@ -3698,4 +3699,19 @@ public class HRegionServer implements HR
         mxBeanInfo);
     LOG.info("Registered RegionServer MXBean");
   }
+
+  /**
+   * Get the current compaction state of the region.
+   *
+   * @param regionName the name of the region to check compaction statte.
+   * @return the compaction state name.
+   * @throws IOException exception
+   */
+  public String getCompactionState(final byte[] regionName) throws IOException {
+      checkOpen();
+      requestCount.incrementAndGet();
+      HRegion region = getRegion(regionName);
+      HRegionInfo info = region.getRegionInfo();
+      return CompactionRequest.getCompactionState(info.getRegionId()).name();
+  }
 }

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1344445&r1=1344444&r2=1344445&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Wed May 30 20:21:28 2012
@@ -1281,10 +1281,14 @@ public class Store extends SchemaConfigu
     } finally {
       this.lock.readLock().unlock();
     }
+    if (ret != null) {
+      CompactionRequest.preRequest(ret);
+    }
     return ret;
   }
 
   public void finishRequest(CompactionRequest cr) {
+    CompactionRequest.postRequest(cr);
     cr.finishRequest();
     synchronized (filesCompacting) {
       filesCompacting.removeAll(cr.getFiles());

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java?rev=1344445&r1=1344444&r2=1344445&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java Wed May 30 20:21:28 2012
@@ -21,12 +21,15 @@ package org.apache.hadoop.hbase.regionse
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.Store;
@@ -55,6 +58,14 @@ public class CompactionRequest implement
     private final Long timeInNanos;
     private HRegionServer server = null;
 
+    /**
+     * Map to track the number of compaction requested per region (id)
+     */
+    private static final ConcurrentHashMap<Long, AtomicInteger>
+      majorCompactions = new ConcurrentHashMap<Long, AtomicInteger>();
+    private static final ConcurrentHashMap<Long, AtomicInteger>
+      minorCompactions = new ConcurrentHashMap<Long, AtomicInteger>();
+
     public CompactionRequest(HRegion r, Store s,
         CompactSelection files, boolean isMajor, int p) {
       Preconditions.checkNotNull(r);
@@ -73,6 +84,58 @@ public class CompactionRequest implement
       this.timeInNanos = System.nanoTime();
     }
 
+    /**
+     * Find out if a given region in compaction now.
+     *
+     * @param regionId
+     * @return
+     */
+    public static CompactionState getCompactionState(
+        final long regionId) {
+      Long key = Long.valueOf(regionId);
+      AtomicInteger major = majorCompactions.get(key);
+      AtomicInteger minor = minorCompactions.get(key);
+      int state = 0;
+      if (minor != null && minor.get() > 0) {
+        state += 1;  // use 1 to indicate minor here
+      }
+      if (major != null && major.get() > 0) {
+        state += 2;  // use 2 to indicate major here
+      }
+      switch (state) {
+      case 3:  // 3 = 2 + 1, so both major and minor
+        return CompactionState.MAJOR_AND_MINOR;
+      case 2:
+        return CompactionState.MAJOR;
+      case 1:
+        return CompactionState.MINOR;
+      default:
+        return CompactionState.NONE;
+      }
+    }
+
+    public static void preRequest(final CompactionRequest cr){
+      Long key = Long.valueOf(cr.getHRegion().getRegionId());
+      ConcurrentHashMap<Long, AtomicInteger> compactions =
+        cr.isMajor() ? majorCompactions : minorCompactions;
+      AtomicInteger count = compactions.get(key);
+      if (count == null) {
+        compactions.putIfAbsent(key, new AtomicInteger(0));
+        count = compactions.get(key);
+      }
+      count.incrementAndGet();
+    }
+
+    public static void postRequest(final CompactionRequest cr){
+      Long key = Long.valueOf(cr.getHRegion().getRegionId());
+      ConcurrentHashMap<Long, AtomicInteger> compactions =
+        cr.isMajor() ? majorCompactions : minorCompactions;
+      AtomicInteger count = compactions.get(key);
+      if (count != null) {
+        count.decrementAndGet();
+      }
+    }
+
     public void finishRequest() {
       this.compactSelection.finishRequest();
     }
@@ -213,6 +276,16 @@ public class CompactionRequest implement
     }
 
     /**
+     * An enum for the region compaction state
+     */
+    public static enum CompactionState {
+      NONE,
+      MINOR,
+      MAJOR,
+      MAJOR_AND_MINOR;
+    }
+
+    /**
      * Cleanup class to use when rejecting a compaction request from the queue.
      */
     public static class Rejection implements RejectedExecutionHandler {

Modified: hbase/branches/0.94/src/main/resources/hbase-webapps/master/table.jsp
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/resources/hbase-webapps/master/table.jsp?rev=1344445&r1=1344444&r2=1344445&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/resources/hbase-webapps/master/table.jsp (original)
+++ hbase/branches/0.94/src/main/resources/hbase-webapps/master/table.jsp Wed May 30 20:21:28 2012
@@ -154,6 +154,11 @@
       <td><%= hbadmin.isTableEnabled(table.getTableName()) %></td>
       <td>Is the table enabled</td>
   </tr>
+  <tr>
+      <td>Compaction</td>
+      <td><%= hbadmin.getCompactionState(table.getTableName()) %></td>
+      <td>Is the table compacting</td>
+  </tr>
 <%  if (showFragmentation) { %>
   <tr>
       <td>Fragmentation</td>

Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionState.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionState.java?rev=1344445&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionState.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionState.java Wed May 30 20:21:28 2012
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest.CompactionState;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/** Unit tests to test retrieving table/region compaction state*/
+@Category(LargeTests.class)
+public class TestCompactionState {
+  final static Log LOG = LogFactory.getLog(TestCompactionState.class);
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private final static Random random = new Random();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test(timeout=60000)
+  public void testMajorCompaction() throws IOException, InterruptedException {
+    compaction("testMajorCompaction", 8, CompactionState.MAJOR);
+  }
+
+  @Test(timeout=60000)
+  public void testMinorCompaction() throws IOException, InterruptedException {
+    compaction("testMinorCompaction", 15, CompactionState.MINOR);
+  }
+
+  /**
+   * Load data to a table, flush it to disk, trigger compaction,
+   * confirm the compaction state is right and wait till it is done.
+   *
+   * @param tableName
+   * @param flushes
+   * @param expectedState
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private void compaction(final String tableName, final int flushes,
+      final CompactionState expectedState) throws IOException, InterruptedException {
+    // Create a table with regions
+    byte [] table = Bytes.toBytes(tableName);
+    byte [] family = Bytes.toBytes("family");
+    HTable ht = null;
+    try {
+      ht = TEST_UTIL.createTable(table, family);
+      loadData(ht, family, 3000, flushes);
+      HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
+      List<HRegion> regions = rs.getOnlineRegions(table);
+      int countBefore = countStoreFiles(regions, family);
+      assertTrue(countBefore > 0); // there should be some data files
+      HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+      if (expectedState == CompactionState.MINOR) {
+        admin.compact(tableName);
+      } else {
+        admin.majorCompact(table);
+      }
+      long curt = System.currentTimeMillis();
+      long waitTime = 5000;
+      long endt = curt + waitTime;
+      CompactionState state = admin.getCompactionState(table);
+      while (state == CompactionState.NONE && curt < endt) {
+        Thread.sleep(10);
+        state = admin.getCompactionState(table);
+        curt = System.currentTimeMillis();
+      }
+      // Now, should have the right compaction state,
+      // otherwise, the compaction should have already been done
+      if (expectedState != state) {
+        for (HRegion region: regions) {
+          state = CompactionRequest.getCompactionState(region.getRegionId());
+          assertEquals(CompactionState.NONE, state);
+        }
+      } else {
+        curt = System.currentTimeMillis();
+        waitTime = 20000;
+        endt = curt + waitTime;
+        state = admin.getCompactionState(table);
+        while (state != CompactionState.NONE && curt < endt) {
+          Thread.sleep(10);
+          state = admin.getCompactionState(table);
+          curt = System.currentTimeMillis();
+        }
+        // Now, compaction should be done.
+        assertEquals(CompactionState.NONE, state);
+      }
+      int countAfter = countStoreFiles(regions, family);
+      assertTrue(countAfter < countBefore);
+      if (expectedState == CompactionState.MAJOR) assertTrue(1 == countAfter);
+      else assertTrue(1 < countAfter);
+    } finally {
+      if (ht != null) {
+        TEST_UTIL.deleteTable(table);
+      }
+    }
+  }
+
+  private static int countStoreFiles(
+      List<HRegion> regions, final byte[] family) {
+    int count = 0;
+    for (HRegion region: regions) {
+      count += region.getStoreFileList(new byte[][]{family}).size();
+    }
+    return count;
+  }
+
+  private static void loadData(final HTable ht, final byte[] family,
+      final int rows, final int flushes) throws IOException {
+    List<Put> puts = new ArrayList<Put>(rows);
+    byte[] qualifier = Bytes.toBytes("val");
+    for (int i = 0; i < flushes; i++) {
+      for (int k = 0; k < rows; k++) {
+        byte[] row = Bytes.toBytes(random.nextLong());
+        Put p = new Put(row);
+        p.add(family, qualifier, row);
+        puts.add(p);
+      }
+      ht.put(puts);
+      ht.flushCommits();
+      TEST_UTIL.flush();
+      puts.clear();
+    }
+  }
+  
+  @org.junit.Rule
+  public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
+    new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
+}