You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by je...@apache.org on 2013/06/20 02:04:08 UTC

svn commit: r1494814 - in /hbase/branches/0.95: hbase-common/src/main/java/org/apache/hadoop/hbase/ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ hbase-server/src/test/java/org/apache/hadoop/hbase/master/

Author: jeffreyz
Date: Thu Jun 20 00:04:08 2013
New Revision: 1494814

URL: http://svn.apache.org/r1494814
Log:
HBASE-8617: Introducing a new config to disable writes during recovering

Modified:
    hbase/branches/0.95/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java

Modified: hbase/branches/0.95/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1494814&r1=1494813&r2=1494814&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.95/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java Thu Jun 20 00:04:08 2013
@@ -724,6 +724,9 @@ public final class HConstants {
   /** Conf key that enables unflushed WAL edits directly being replayed to region servers */
   public static final String DISTRIBUTED_LOG_REPLAY_KEY = "hbase.master.distributed.log.replay";
   public static final boolean DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG = false;
+  public static final String DISALLOW_WRITES_IN_RECOVERING =
+      "hbase.regionserver.disallow.writes.when.recovering";
+  public static final boolean DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG = false;
 
   /** Conf key that specifies timeout value to wait for a region ready */
   public static final String LOG_REPLAY_WAIT_REGION_TIMEOUT = 

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1494814&r1=1494813&r2=1494814&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Jun 20 00:04:08 2013
@@ -209,7 +209,8 @@ public class HRegion implements HeapSize
    * startRegionOperation
    */
   protected enum Operation {
-    ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION
+    ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE,
+    REPLAY_BATCH_MUTATE
   }
 
   //////////////////////////////////////////////////////////////////////////////
@@ -301,6 +302,11 @@ public class HRegion implements HeapSize
   Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
 
   /**
+   * Config setting for whether to allow writes when a region is in recovering or not.
+   */
+  private boolean disallowWritesInRecovering = false;
+
+  /**
    * @return The smallest mvcc readPoint across all the scanners in this
    * region. Writes older than this readPoint, are included  in every
    * read operation.
@@ -517,6 +523,11 @@ public class HRegion implements HeapSize
       // Write out region name as string and its encoded name.
       LOG.debug("Instantiated " + this);
     }
+
+    // by default, we allow writes against a region when it's in recovering
+    this.disallowWritesInRecovering =
+        conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
+          HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
   }
 
   void setHTableSpecificConf() {
@@ -1965,7 +1976,11 @@ public class HRegion implements HeapSize
       checkResources();
 
       long newSize;
-      startRegionOperation();
+      if (isReplay) {
+        startRegionOperation(Operation.REPLAY_BATCH_MUTATE);
+      } else {
+        startRegionOperation(Operation.BATCH_MUTATE);
+      }
 
       try {
         if (!initialized) {
@@ -5076,7 +5091,7 @@ public class HRegion implements HeapSize
       ClassSize.ARRAY +
       39 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
       (11 * Bytes.SIZEOF_LONG) +
-      Bytes.SIZEOF_BOOLEAN);
+      2 * Bytes.SIZEOF_BOOLEAN);
 
   public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
       ClassSize.OBJECT + // closeLock
@@ -5374,14 +5389,17 @@ public class HRegion implements HeapSize
     case SCAN:
     case SPLIT_REGION:
     case MERGE_REGION:
+    case PUT:
+    case DELETE:
+    case BATCH_MUTATE:
       // when a region is in recovering state, no read, split or merge is allowed
-      if (this.isRecovering()) {
-        throw new RegionInRecoveryException(this.getRegionNameAsString()
-            + " is recovering");
+      if (this.isRecovering() && (this.disallowWritesInRecovering || 
+              (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) {
+        throw new RegionInRecoveryException(this.getRegionNameAsString() + " is recovering");
       }
       break;
-      default:
-        break;
+    default:
+      break;
     }
     if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION) {
       // split or merge region doesn't need to check the closing/closed state or lock the region

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java?rev=1494814&r1=1494813&r2=1494814&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java Thu Jun 20 00:04:08 2013
@@ -67,6 +67,8 @@ import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
 import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -89,6 +91,7 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -677,14 +680,6 @@ public class TestDistributedLogSplitting
       break;
     }
 
-    LOG.info("#regions = " + regions.size());
-    Iterator<HRegionInfo> it = regions.iterator();
-    while (it.hasNext()) {
-      HRegionInfo region = it.next();
-      if (region.isMetaTable()) {
-        it.remove();
-      }
-    }
     this.prepareData(ht, Bytes.toBytes("family"), Bytes.toBytes("c1"));
     String originalCheckSum = TEST_UTIL.checksumRows(ht);
     
@@ -812,6 +807,91 @@ public class TestDistributedLogSplitting
     ht.close();
   }
 
+  @Test(timeout = 300000)
+  public void testDisallowWritesInRecovering() throws Exception {
+    LOG.info("testDisallowWritesInRecovering");
+    Configuration curConf = HBaseConfiguration.create();
+    curConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
+    curConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+    curConf.setBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, true);
+    startCluster(NUM_RS, curConf);
+    final int NUM_REGIONS_TO_CREATE = 40;
+    final int NUM_LOG_LINES = 20000;
+    // turn off load balancing to prevent regions from moving around otherwise
+    // they will consume recovered.edits
+    master.balanceSwitch(false);
+
+    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+    final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
+    HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
+
+    List<HRegionInfo> regions = null;
+    HRegionServer hrs = null;
+    for (int i = 0; i < NUM_RS; i++) {
+      boolean isCarryingMeta = false;
+      hrs = rsts.get(i).getRegionServer();
+      regions = ProtobufUtil.getOnlineRegions(hrs);
+      for (HRegionInfo region : regions) {
+        if (region.isMetaRegion()) {
+          isCarryingMeta = true;
+          break;
+        }
+      }
+      if (isCarryingMeta) {
+        continue;
+      }
+      break;
+    }
+
+    LOG.info("#regions = " + regions.size());
+    Iterator<HRegionInfo> it = regions.iterator();
+    while (it.hasNext()) {
+      HRegionInfo region = it.next();
+      if (region.isMetaTable()) {
+        it.remove();
+      }
+    }
+    makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
+    
+    // abort RS
+    LOG.info("Aborting region server: " + hrs.getServerName());
+    hrs.abort("testing");
+    
+    // wait for abort completes
+    TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
+      }
+    });
+    
+    // wait for regions come online
+    TEST_UTIL.waitFor(180000, 100, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        return (getAllOnlineRegions(cluster).size() >= (NUM_REGIONS_TO_CREATE + 1));
+      }
+    });
+
+    try {
+      HRegionInfo region = regions.get(0);
+      byte[] key = region.getStartKey();
+      if (key == null || key.length == 0) {
+        key = new byte[] { 0, 0, 0, 0, 1 };
+      }
+      ht.setAutoFlush(true);
+      Put put = new Put(key);
+      put.add(Bytes.toBytes("family"), Bytes.toBytes("c1"), new byte[]{'b'});
+      ht.put(put);
+    } catch (IOException ioe) {
+      Assert.assertTrue(ioe instanceof RetriesExhaustedWithDetailsException);
+      RetriesExhaustedWithDetailsException re = (RetriesExhaustedWithDetailsException) ioe;
+      Assert.assertTrue(re.getCause(0) instanceof RegionInRecoveryException);
+    }
+
+    ht.close();
+  }
+
   /**
    * The original intention of this test was to force an abort of a region
    * server and to make sure that the failure path in the region servers is