You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/05/15 06:24:03 UTC

svn commit: r1482675 [3/5] - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ hbase-client/src/main/java/org/a...

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1482675&r1=1482674&r2=1482675&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed May 15 04:24:02 2013
@@ -95,6 +95,7 @@ import org.apache.hadoop.hbase.exception
 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
 import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException;
 import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
+import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
 import org.apache.hadoop.hbase.exceptions.RegionTooBusyException;
 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
 import org.apache.hadoop.hbase.exceptions.UnknownScannerException;
@@ -200,6 +201,16 @@ public class HRegion implements HeapSize
 
   protected long completeSequenceId = -1L;
 
+  /**
+   * Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for
+   * startRegionOperation to possibly invoke different checks before any region operations. Not all
+   * operations have to be defined here. It's only needed when a special check is need in
+   * startRegionOperation
+   */
+  protected enum Operation {
+    ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION
+  }
+
   //////////////////////////////////////////////////////////////////////////////
   // Members
   //////////////////////////////////////////////////////////////////////////////
@@ -281,6 +292,11 @@ public class HRegion implements HeapSize
   private final AtomicInteger minorInProgress = new AtomicInteger(0);
 
   /**
+   * Min sequence id stored in store files of a region when opening the region
+   */
+  private long minSeqIdForLogReplay = -1;
+
+  /**
    * @return The smallest mvcc readPoint across all the scanners in this
    * region. Writes older than this readPoint, are included  in every
    * read operation.
@@ -641,6 +657,9 @@ public class HRegion implements HeapSize
           long storeSeqIdForReplay = store.getMaxSequenceId(false);
           maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
               storeSeqIdForReplay);
+          if (this.minSeqIdForLogReplay == -1 || storeSeqIdForReplay < this.minSeqIdForLogReplay) {
+            this.minSeqIdForLogReplay = storeSeqIdForReplay;
+          }
           // Include bulk loaded files when determining seqIdForAssignment
           long storeSeqIdForAssignment = store.getMaxSequenceId(true);
           if (maxSeqId == -1 || storeSeqIdForAssignment > maxSeqId) {
@@ -778,6 +797,21 @@ public class HRegion implements HeapSize
     return this.closing.get();
   }
 
+  /**
+   * Reset recovering state of current region
+   * @param newState
+   */
+  public void setRecovering(boolean newState) {
+    this.getRegionInfo().setRecovering(newState);
+  }
+  
+  /**
+   * @return True if current region is in recovering
+   */
+  public boolean isRecovering() {
+    return this.getRegionInfo().isRecovering();
+  }
+
   /** @return true if region is available (not closed and not closing) */
   public boolean isAvailable() {
     return !isClosed() && !isClosing();
@@ -1608,7 +1642,7 @@ public class HRegion implements HeapSize
     // look across all the HStores for this region and determine what the
     // closest key is across all column families, since the data may be sparse
     checkRow(row, "getClosestRowBefore");
-    startRegionOperation();
+    startRegionOperation(Operation.GET);
     this.readRequestsCount.increment();
     try {
       Store store = getStore(family);
@@ -1654,7 +1688,7 @@ public class HRegion implements HeapSize
 
   protected RegionScanner getScanner(Scan scan,
       List<KeyValueScanner> additionalScanners) throws IOException {
-    startRegionOperation();
+    startRegionOperation(Operation.SCAN);
     try {
       // Verify families are all valid
       prepareScanner(scan);
@@ -1705,7 +1739,7 @@ public class HRegion implements HeapSize
   throws IOException {
     checkReadOnly();
     checkResources();
-    startRegionOperation();
+    startRegionOperation(Operation.DELETE);
     this.writeRequestsCount.increment();
     try {
       delete.getRow();
@@ -1804,7 +1838,7 @@ public class HRegion implements HeapSize
     // read lock, resources may run out.  For now, the thought is that this
     // will be extremely rare; we'll deal with it when it happens.
     checkResources();
-    startRegionOperation();
+    startRegionOperation(Operation.PUT);
     this.writeRequestsCount.increment();
     try {
       // All edits for the given row (across all column families) must happen atomically.
@@ -1862,13 +1896,29 @@ public class HRegion implements HeapSize
    */
   public OperationStatus[] batchMutate(
       Pair<Mutation, Integer>[] mutationsAndLocks) throws IOException {
+    return batchMutate(mutationsAndLocks, false);
+  }
+ 
+  /**
+   * Perform a batch of mutations.
+   * It supports only Put and Delete mutations and will ignore other types passed.
+   * @param mutationsAndLocks
+   *          the list of mutations paired with their requested lock IDs.
+   * @return an array of OperationStatus which internally contains the
+   *         OperationStatusCode and the exceptionMessage if any.
+   * @throws IOException
+   */
+  OperationStatus[] batchMutate(Pair<Mutation, Integer>[] mutationsAndLocks, boolean isReplay)
+      throws IOException {
     BatchOperationInProgress<Pair<Mutation, Integer>> batchOp =
       new BatchOperationInProgress<Pair<Mutation,Integer>>(mutationsAndLocks);
 
     boolean initialized = false;
 
     while (!batchOp.isDone()) {
-      checkReadOnly();
+      if (!isReplay) {
+        checkReadOnly();
+      }
       checkResources();
 
       long newSize;
@@ -1876,11 +1926,13 @@ public class HRegion implements HeapSize
 
       try {
         if (!initialized) {
-          this.writeRequestsCount.increment();
-          doPreMutationHook(batchOp);
+          if (!isReplay) {
+            this.writeRequestsCount.increment();
+            doPreMutationHook(batchOp);
+          }
           initialized = true;
         }
-        long addedSize = doMiniBatchMutation(batchOp);
+        long addedSize = doMiniBatchMutation(batchOp, isReplay);
         newSize = this.addAndGetGlobalMemstoreSize(addedSize);
       } finally {
         closeRegionOperation();
@@ -1891,6 +1943,7 @@ public class HRegion implements HeapSize
     }
     return batchOp.retCodeDetails;
   }
+  
 
   private void doPreMutationHook(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp)
       throws IOException {
@@ -1927,10 +1980,9 @@ public class HRegion implements HeapSize
     }
   }
 
-
   @SuppressWarnings("unchecked")
-  private long doMiniBatchMutation(
-    BatchOperationInProgress<Pair<Mutation, Integer>> batchOp) throws IOException {
+  private long doMiniBatchMutation(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp,
+      boolean isInReplay) throws IOException {
 
     // variable to note if all Put items are for the same CF -- metrics related
     boolean putsCfSetConsistent = true;
@@ -1941,7 +1993,7 @@ public class HRegion implements HeapSize
     //The set of columnFamilies first seen for Delete.
     Set<byte[]> deletesCfSet = null;
 
-    WALEdit walEdit = new WALEdit();
+    WALEdit walEdit = new WALEdit(isInReplay);
     MultiVersionConsistencyControl.WriteEntry w = null;
     long txid = 0;
     boolean walSyncSuccessful = false;
@@ -1983,7 +2035,11 @@ public class HRegion implements HeapSize
         try {
           if (isPutMutation) {
             // Check the families in the put. If bad, skip this one.
-            checkFamilies(familyMap.keySet());
+            if (isInReplay) {
+              removeNonExistentColumnFamilyForReplay(familyMap);
+            } else {
+              checkFamilies(familyMap.keySet());
+            }
             checkTimestamps(mutation.getFamilyMap(), now);
           } else {
             prepareDelete((Delete) mutation);
@@ -2042,7 +2098,7 @@ public class HRegion implements HeapSize
           }
         }
       }
-
+      
       // we should record the timestamp only after we have acquired the rowLock,
       // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
       now = EnvironmentEdgeManager.currentTimeMillis();
@@ -2081,9 +2137,9 @@ public class HRegion implements HeapSize
       w = mvcc.beginMemstoreInsert();
 
       // calling the pre CP hook for batch mutation
-      if (coprocessorHost != null) {
-        MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp =
-          new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations,
+      if (!isInReplay && coprocessorHost != null) {
+        MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp = 
+          new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations, 
           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
         if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
       }
@@ -2168,9 +2224,9 @@ public class HRegion implements HeapSize
       }
       walSyncSuccessful = true;
       // calling the post CP hook for batch mutation
-      if (coprocessorHost != null) {
-        MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp =
-          new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations,
+      if (!isInReplay && coprocessorHost != null) {
+        MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp = 
+          new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations, 
           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
         coprocessorHost.postBatchMutate(miniBatchOp);
       }
@@ -2187,7 +2243,7 @@ public class HRegion implements HeapSize
       // STEP 9. Run coprocessor post hooks. This should be done after the wal is
       // synced so that the coprocessor contract is adhered to.
       // ------------------------------------
-      if (coprocessorHost != null) {
+      if (!isInReplay && coprocessorHost != null) {
         for (int i = firstIndex; i < lastIndexExclusive; i++) {
           // only for successful puts
           if (batchOp.retCodeDetails[i].getOperationStatusCode()
@@ -2626,6 +2682,30 @@ public class HRegion implements HeapSize
     }
   }
 
+  /**
+   * During replay, there could exist column families which are removed between region server
+   * failure and replay
+   */
+  private void removeNonExistentColumnFamilyForReplay(
+      final Map<byte[], List<? extends Cell>> familyMap) {
+    List<byte[]> nonExistentList = null;
+    for (byte[] family : familyMap.keySet()) {
+      if (!this.htableDescriptor.hasFamily(family)) {
+        if (nonExistentList == null) {
+          nonExistentList = new ArrayList<byte[]>();
+        }
+        nonExistentList.add(family);
+      }
+    }
+    if (nonExistentList != null) {
+      for (byte[] family : nonExistentList) {
+        // Perhaps schema was changed between crash and replay
+        LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
+        familyMap.remove(family);
+      }
+    }
+  }
+
   void checkTimestamps(final Map<byte[], List<? extends Cell>> familyMap,
       long now) throws FailedSanityCheckException {
     if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
@@ -3480,7 +3560,7 @@ public class HRegion implements HeapSize
             "after we renewed it. Could be caused by a very slow scanner " +
             "or a lengthy garbage collection");
       }
-      startRegionOperation();
+      startRegionOperation(Operation.SCAN);
       readRequestsCount.increment();
       try {
 
@@ -4651,7 +4731,7 @@ public class HRegion implements HeapSize
 
     checkReadOnly();
     // Lock row
-    startRegionOperation();
+    startRegionOperation(Operation.APPEND);
     this.writeRequestsCount.increment();
     WriteEntry w = null;
     try {
@@ -4819,7 +4899,7 @@ public class HRegion implements HeapSize
 
     checkReadOnly();
     // Lock row
-    startRegionOperation();
+    startRegionOperation(Operation.INCREMENT);
     this.writeRequestsCount.increment();
     WriteEntry w = null;
     try {
@@ -4956,7 +5036,7 @@ public class HRegion implements HeapSize
       ClassSize.OBJECT +
       ClassSize.ARRAY +
       38 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
-      (11 * Bytes.SIZEOF_LONG) +
+      (12 * Bytes.SIZEOF_LONG) +
       Bytes.SIZEOF_BOOLEAN);
 
   public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
@@ -5236,6 +5316,37 @@ public class HRegion implements HeapSize
    */
   public void startRegionOperation()
       throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
+    startRegionOperation(Operation.ANY);
+  }
+
+  /**
+   * @param op The operation is about to be taken on the region
+   * @throws NotServingRegionException
+   * @throws RegionTooBusyException
+   * @throws InterruptedIOException
+   */
+  protected void startRegionOperation(Operation op) throws NotServingRegionException,
+      RegionTooBusyException, InterruptedIOException {
+    switch (op) {
+    case INCREMENT:
+    case APPEND:
+    case GET:
+    case SCAN:
+    case SPLIT_REGION:
+    case MERGE_REGION:
+      // when a region is in recovering state, no read, split or merge is allowed
+      if (this.isRecovering()) {
+        throw new RegionInRecoveryException(this.getRegionNameAsString()
+            + " is recovering");
+      }
+      break;
+      default:
+        break;
+    }
+    if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION) {
+      // split or merge region doesn't need to check the closing/closed state or lock the region
+      return;
+    }
     if (this.closing.get()) {
       throw new NotServingRegionException(getRegionNameAsString() + " is closing");
     }
@@ -5450,6 +5561,14 @@ public class HRegion implements HeapSize
   }
 
   /**
+   * Gets the min sequence number that was read from storage when this region was opened. WAL Edits
+   * with smaller sequence number will be skipped from replay.
+   */
+  public long getMinSeqIdForLogReplay() {
+    return this.minSeqIdForLogReplay;
+  }
+
+  /**
    * @return if a given region is in compaction now.
    */
   public CompactionState getCompactionState() {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1482675&r1=1482674&r2=1482675&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed May 15 04:24:02 2013
@@ -92,6 +92,7 @@ import org.apache.hadoop.hbase.exception
 import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
 import org.apache.hadoop.hbase.exceptions.RegionAlreadyInTransitionException;
+import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
 import org.apache.hadoop.hbase.exceptions.RegionServerRunningException;
@@ -112,6 +113,7 @@ import org.apache.hadoop.hbase.ipc.Paylo
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.master.SplitLogManager;
 import org.apache.hadoop.hbase.master.TableLockManager;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
@@ -178,6 +180,7 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
+import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
@@ -204,6 +207,7 @@ import org.apache.hadoop.hbase.util.Vers
 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
 import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
+import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
@@ -214,6 +218,7 @@ import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
 import org.cliffc.high_scale_lib.Counter;
 
 import com.google.protobuf.BlockingRpcChannel;
@@ -258,6 +263,10 @@ public class HRegionServer implements Cl
   // catalog tracker
   protected CatalogTracker catalogTracker;
 
+  // Watch if a region is out of recovering state from ZooKeeper
+  @SuppressWarnings("unused")
+  private RecoveringRegionWatcher recoveringRegionWatcher;
+
   /**
    * Go here to get table descriptors.
    */
@@ -291,6 +300,13 @@ public class HRegionServer implements Cl
    */
   protected final Map<String, InetSocketAddress[]> regionFavoredNodesMap =
       new ConcurrentHashMap<String, InetSocketAddress[]>();
+   
+  /**
+   * Set of regions currently being in recovering state which means it can accept writes(edits from
+   * previous failed region server) but not reads. A recovering region is also an online region.
+   */
+  protected final Map<String, HRegion> recoveringRegions = Collections
+      .synchronizedMap(new HashMap<String, HRegion>());
 
   // Leases
   protected Leases leases;
@@ -456,6 +472,9 @@ public class HRegionServer implements Cl
 
   /** Handle all the snapshot requests to this server */
   RegionServerSnapshotManager snapshotManager;
+  
+  // configuration setting on if replay WAL edits directly to another RS
+  private final boolean distributedLogReplay;
 
   // Table level lock manager for locking for region operations
   private TableLockManager tableLockManager;
@@ -547,6 +566,9 @@ public class HRegionServer implements Cl
       }
     };
     this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
+
+    this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, 
+      HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
   }
 
   /**
@@ -671,6 +693,11 @@ public class HRegionServer implements Cl
     }
     this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper,
         new ServerName(isa.getHostName(), isa.getPort(), startcode));
+
+    // register watcher for recovering regions
+    if(this.distributedLogReplay) {
+      this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
+    }
   }
 
   /**
@@ -1515,8 +1542,7 @@ public class HRegionServer implements Cl
     this.rpcServer.start();
 
     // Create the log splitting worker and start it
-    this.splitLogWorker = new SplitLogWorker(this.zooKeeper,
-        this.getConfiguration(), this.getServerName(), this);
+    this.splitLogWorker = new SplitLogWorker(this.zooKeeper, this.getConfiguration(), this, this);
     splitLogWorker.start();
   }
 
@@ -1641,6 +1667,10 @@ public class HRegionServer implements Cl
       LOG.error("No sequence number found when opening " + r.getRegionNameAsString());
       openSeqNum = 0;
     }
+
+    // Update flushed sequence id of a recovering region in ZK
+    updateRecoveringRegionLastFlushedSequenceId(r);
+
     // Update ZK, or META
     if (r.getRegionInfo().isMetaRegion()) {
       MetaRegionTracker.setMetaLocation(getZooKeeper(),
@@ -1884,14 +1914,13 @@ public class HRegionServer implements Cl
   public long getLastSequenceId(byte[] region) {
     Long lastFlushedSequenceId = -1l;
     try {
-      GetLastFlushedSequenceIdRequest req =
-        RequestConverter.buildGetLastFlushedSequenceIdRequest(region);
+      GetLastFlushedSequenceIdRequest req = RequestConverter
+          .buildGetLastFlushedSequenceIdRequest(region);
       lastFlushedSequenceId = rssStub.getLastFlushedSequenceId(null, req)
-      .getLastFlushedSequenceId();
+          .getLastFlushedSequenceId();
     } catch (ServiceException e) {
       lastFlushedSequenceId = -1l;
-      LOG.warn("Unable to connect to the master to check " +
-          "the last flushed sequence id", e);
+      LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id", e);
     }
     return lastFlushedSequenceId;
   }
@@ -1965,6 +1994,10 @@ public class HRegionServer implements Cl
     return this.stopping;
   }
 
+  public Map<String, HRegion> getRecoveringRegions() {
+    return this.recoveringRegions;
+  }
+
   /**
    *
    * @return the configuration
@@ -2651,10 +2684,12 @@ public class HRegionServer implements Cl
     try {
       requestCount.increment();
       HRegion region = getRegion(request.getRegion());
+
       GetResponse.Builder builder = GetResponse.newBuilder();
       ClientProtos.Get get = request.getGet();
       Boolean existence = null;
       Result r = null;
+
       if (request.getClosestRowBefore()) {
         if (get.getColumnCount() != 1) {
           throw new DoNotRetryIOException(
@@ -3006,7 +3041,7 @@ public class HRegionServer implements Cl
               }
               List<KeyValue> values = new ArrayList<KeyValue>();
               MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
-              region.startRegionOperation();
+              region.startRegionOperation(Operation.SCAN);
               try {
                 int i = 0;
                 synchronized(scanner) {
@@ -3450,6 +3485,10 @@ public class HRegionServer implements Cl
         removeFromMovedRegions(region.getEncodedName());
 
         if (previous == null) {
+          // check if the region to be opened is marked in recovering state in ZK
+          if (isRegionMarkedRecoveringInZK(region.getEncodedName())) {
+            this.recoveringRegions.put(region.getEncodedName(), null);
+          }
           // If there is no action in progress, we can submit a specific handler.
           // Need to pass the expected version in the constructor.
           if (region.isMetaRegion()) {
@@ -3465,6 +3504,9 @@ public class HRegionServer implements Cl
 
         builder.addOpeningState(RegionOpeningState.OPENED);
 
+      } catch (KeeperException zooKeeperEx) {
+        LOG.error("Can't retrieve recovering state from zookeeper", zooKeeperEx);
+        throw new ServiceException(zooKeeperEx);
       } catch (IOException ie) {
         LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
         if (isBulkAssign) {
@@ -3589,6 +3631,7 @@ public class HRegionServer implements Cl
       checkOpen();
       requestCount.increment();
       HRegion region = getRegion(request.getRegion());
+      region.startRegionOperation(Operation.SPLIT_REGION);
       LOG.info("Splitting " + region.getRegionNameAsString());
       region.flushcache();
       byte[] splitPoint = null;
@@ -3621,6 +3664,8 @@ public class HRegionServer implements Cl
       HRegion regionA = getRegion(request.getRegionA());
       HRegion regionB = getRegion(request.getRegionB());
       boolean forcible = request.getForcible();
+      regionA.startRegionOperation(Operation.MERGE_REGION);
+      regionB.startRegionOperation(Operation.MERGE_REGION);
       LOG.info("Receiving merging request for  " + regionA + ", " + regionB
           + ",forcible=" + forcible);
       regionA.flushcache();
@@ -3714,8 +3759,57 @@ public class HRegionServer implements Cl
   }
 
   /**
+   * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
+   * that the given mutations will be durable on the receiving RS if this method returns without any
+   * exception.
+   * @param rpcc the RPC controller
+   * @param request the request
+   * @throws ServiceException
+   */
+  @Override
+  @QosPriority(priority = HConstants.REPLAY_QOS)
+  public MultiResponse replay(final RpcController rpcc, final MultiRequest request)
+      throws ServiceException {
+    long before = EnvironmentEdgeManager.currentTimeMillis();  
+    PayloadCarryingRpcController controller = (PayloadCarryingRpcController) rpcc;
+    CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
+    // Clear scanner so we are not holding on to reference across call.
+    controller.setCellScanner(null);
+    try {
+      checkOpen();
+      HRegion region = getRegion(request.getRegion());
+      MultiResponse.Builder builder = MultiResponse.newBuilder();
+      List<MutationProto> mutates = new ArrayList<MutationProto>();
+      for (ClientProtos.MultiAction actionUnion : request.getActionList()) {
+        if (actionUnion.hasMutation()) {
+          MutationProto mutate = actionUnion.getMutation();
+          MutationType type = mutate.getMutateType();
+          switch (type) {
+          case PUT:
+          case DELETE:
+            mutates.add(mutate);
+            break;
+          default:
+            throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
+          }
+        } else {
+          LOG.warn("Error: invalid action: " + actionUnion + ". " + "it must be a Mutation.");
+          throw new DoNotRetryIOException("Invalid action, " + "it must be a Mutation.");
+        }
+      }
+      if (!mutates.isEmpty()) {
+        doBatchOp(builder, region, mutates, cellScanner, true);
+      }
+      return builder.build();
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    } finally {
+      metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTimeMillis() - before); 
+    }
+  }
+
+  /**
    * Roll the WAL writer of the region server.
-   *
    * @param controller the RPC controller
    * @param request the request
    * @throws ServiceException
@@ -3843,13 +3937,21 @@ public class HRegionServer implements Cl
 
   /**
    * Execute a list of Put/Delete mutations.
+   */
+  protected void doBatchOp(final MultiResponse.Builder builder,
+      final HRegion region, final List<MutationProto> mutates, final CellScanner cells) {
+    doBatchOp(builder, region, mutates, cells, false);
+  }
+  
+  /**
+   * Execute a list of Put/Delete mutations.
    *
    * @param builder
    * @param region
    * @param mutations
    */
   protected void doBatchOp(final MultiResponse.Builder builder, final HRegion region,
-      final List<MutationProto> mutations, final CellScanner cells) {
+      final List<MutationProto> mutations, final CellScanner cells, boolean isReplay) {
     @SuppressWarnings("unchecked")
     Pair<Mutation, Integer>[] mutationsWithLocks = new Pair[mutations.size()];
     long before = EnvironmentEdgeManager.currentTimeMillis();
@@ -3877,7 +3979,7 @@ public class HRegionServer implements Cl
         cacheFlusher.reclaimMemStoreMemory();
       }
 
-      OperationStatus codes[] = region.batchMutate(mutationsWithLocks);
+      OperationStatus codes[] = region.batchMutate(mutationsWithLocks, isReplay);
       for (i = 0; i < codes.length; i++) {
         switch (codes[i].getOperationStatusCode()) {
           case BAD_FAMILY:
@@ -4097,4 +4199,91 @@ public class HRegionServer implements Cl
   public CompactSplitThread getCompactSplitThread() {
     return this.compactSplitThread;
   }
+
+  /**
+   * check if /hbase/recovering-regions/<current region encoded name> exists. Returns true if exists
+   * and set watcher as well.
+   * @param regionEncodedName region encode name
+   * @return true when /hbase/recovering-regions/<current region encoded name> exists
+   * @throws KeeperException
+   */
+  private boolean isRegionMarkedRecoveringInZK(String regionEncodedName) throws KeeperException {
+    boolean result = false;
+    String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode, regionEncodedName);
+
+    byte[] node = ZKUtil.getDataAndWatch(this.zooKeeper, nodePath);
+    if (node != null) {
+      result = true;
+    }
+
+    return result;
+  }
+
+  /**
+   * A helper function to store the last flushed sequence Id with the previous failed RS for a
+   * recovering region. The Id is used to skip wal edits which are flushed. Since the flushed
+   * sequence id is only valid for each RS, we associate the Id with corresponding failed RS.
+   * @throws KeeperException
+   * @throws IOException
+   */
+  private void updateRecoveringRegionLastFlushedSequenceId(HRegion r) throws KeeperException,
+      IOException {
+    if (!r.isRecovering()) {
+      // return immdiately for non-recovering regions
+      return;
+    }
+
+    HRegionInfo region = r.getRegionInfo();
+    ZooKeeperWatcher zkw = getZooKeeper();
+    String previousRSName = this.getLastFailedRSFromZK(region.getEncodedName());
+    long minSeqIdForLogReplay = r.getMinSeqIdForLogReplay();
+    long lastRecordedFlushedSequenceId = -1;
+    String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode,
+      region.getEncodedName());
+    // recovering-region level
+    byte[] data = ZKUtil.getData(zkw, nodePath);
+    if (data != null) {
+      lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data);
+    }
+    if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) {
+      ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
+    }
+    if (previousRSName != null) {
+      // one level deeper for failed RS
+      nodePath = ZKUtil.joinZNode(nodePath, previousRSName);
+      ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
+      LOG.debug("Update last flushed sequence id of region " + region.getEncodedName() + " for " 
+          + previousRSName);
+    } else {
+      LOG.warn("Can't find failed region server for recovering region " + region.getEncodedName());
+    }
+  }
+
+  /**
+   * Return the last failed RS name under /hbase/recovering-regions/encodedRegionName
+   * @param encodedRegionName
+   * @return
+   * @throws IOException
+   * @throws KeeperException
+   */
+  private String getLastFailedRSFromZK(String encodedRegionName) throws KeeperException {
+    String result = null;
+    long maxZxid = 0;
+    ZooKeeperWatcher zkw = this.getZooKeeper();
+    String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
+    List<String> failedServers = ZKUtil.listChildrenNoWatch(zkw, nodePath);
+    if (failedServers == null || failedServers.isEmpty()) {
+      return result;
+    }
+    for (String failedServer : failedServers) {
+      String rsPath = ZKUtil.joinZNode(nodePath, failedServer);
+      Stat stat = new Stat();
+      ZKUtil.getDataNoWatch(zkw, rsPath, stat);
+      if (maxZxid < stat.getCzxid()) {
+        maxZxid = stat.getCzxid();
+        result = failedServer;
+      }
+    }
+    return result;
+  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LastSequenceId.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LastSequenceId.java?rev=1482675&r1=1482674&r2=1482675&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LastSequenceId.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LastSequenceId.java Wed May 15 04:24:02 2013
@@ -26,8 +26,8 @@ import org.apache.hadoop.classification.
 @InterfaceAudience.Private
 public interface LastSequenceId {
   /**
-   * @param regionname
-   * @return Last flushed sequence Id for regionname
+   * @param regionName Encoded region name
+   * @return Last flushed sequence Id for regionName or -1 if it can't be determined
    */
-  public long getLastSequenceId(byte[] regionname);
+  public long getLastSequenceId(byte[] regionName);
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java?rev=1482675&r1=1482674&r2=1482675&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java Wed May 15 04:24:02 2013
@@ -87,4 +87,8 @@ public class MetricsRegionServer {
     }
     serverSource.updateAppend(t);
   }
+
+  public void updateReplay(long t){
+    serverSource.updateReplay(t);
+  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java?rev=1482675&r1=1482674&r2=1482675&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java Wed May 15 04:24:02 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -108,4 +109,9 @@ public interface RegionServerServices ex
    * @return The RegionServer's CatalogTracker
    */
   public CatalogTracker getCatalogTracker();
+
+  /**
+   * @return set of recovering regions on the hosting region server
+   */
+  public Map<String, HRegion> getRecoveringRegions();
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1482675&r1=1482674&r2=1482675&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java Wed May 15 04:24:02 2013
@@ -20,7 +20,11 @@ package org.apache.hadoop.hbase.regionse
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.net.ConnectException;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
@@ -29,10 +33,12 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SplitLogCounters;
 import org.apache.hadoop.hbase.SplitLogTask;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
 import org.apache.hadoop.hbase.master.SplitLogManager;
 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
@@ -70,6 +76,7 @@ import org.apache.zookeeper.data.Stat;
 @InterfaceAudience.Private
 public class SplitLogWorker extends ZooKeeperListener implements Runnable {
   private static final Log LOG = LogFactory.getLog(SplitLogWorker.class);
+  private static final int checkInterval = 5000; // 5 seconds
 
   Thread worker;
   private final ServerName serverName;
@@ -83,20 +90,30 @@ public class SplitLogWorker extends ZooK
   private final Object grabTaskLock = new Object();
   private boolean workerInGrabTask = false;
   private final int report_period;
+  private RegionServerServices server = null;
 
   public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf,
-      ServerName serverName, TaskExecutor splitTaskExecutor) {
+      RegionServerServices server, TaskExecutor splitTaskExecutor) {
+    super(watcher);
+    this.server = server;
+    this.serverName = server.getServerName();
+    this.splitTaskExecutor = splitTaskExecutor;
+    report_period = conf.getInt("hbase.splitlog.report.period",
+      conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
+  }
+
+  public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, ServerName serverName,
+      TaskExecutor splitTaskExecutor) {
     super(watcher);
     this.serverName = serverName;
     this.splitTaskExecutor = splitTaskExecutor;
     report_period = conf.getInt("hbase.splitlog.report.period",
-      conf.getInt("hbase.splitlog.manager.timeout",
-        SplitLogManager.DEFAULT_TIMEOUT) / 2);
+      conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
   }
 
-  public SplitLogWorker(ZooKeeperWatcher watcher, final Configuration conf,
-      final ServerName serverName, final LastSequenceId sequenceIdChecker) {
-    this(watcher, conf, serverName, new TaskExecutor () {
+  public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf,
+      RegionServerServices server, final LastSequenceId sequenceIdChecker) {
+    this(watcher, conf, server, new TaskExecutor() {
       @Override
       public Status exec(String filename, CancelableProgressable p) {
         Path rootdir;
@@ -113,7 +130,7 @@ public class SplitLogWorker extends ZooK
         // encountered a bad non-retry-able persistent error.
         try {
           if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
-            fs, conf, p, sequenceIdChecker)) {
+            fs, conf, p, sequenceIdChecker, watcher)) {
             return Status.PREEMPTED;
           }
         } catch (InterruptedIOException iioe) {
@@ -121,9 +138,18 @@ public class SplitLogWorker extends ZooK
           return Status.RESIGNED;
         } catch (IOException e) {
           Throwable cause = e.getCause();
-          if (cause instanceof InterruptedException) {
+          if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException 
+                  || cause instanceof ConnectException 
+                  || cause instanceof SocketTimeoutException)) {
+            LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, "
+            		+ "resigning", e);
+            return Status.RESIGNED;
+          } else if (cause instanceof InterruptedException) {
             LOG.warn("log splitting of " + filename + " interrupted, resigning", e);
             return Status.RESIGNED;
+          } else if(cause instanceof KeeperException) {
+            LOG.warn("log splitting of " + filename + " hit ZooKeeper issue, resigning", e);
+            return Status.RESIGNED;
           }
           LOG.warn("log splitting of " + filename + " failed, returning error", e);
           return Status.ERR;
@@ -204,7 +230,39 @@ public class SplitLogWorker extends ZooK
       synchronized (taskReadyLock) {
         while (seq_start == taskReadySeq) {
           try {
-            taskReadyLock.wait();
+            taskReadyLock.wait(checkInterval);
+            if (this.server != null) {
+              // check to see if we have stale recovering regions in our internal memory state
+              Map<String, HRegion> recoveringRegions = this.server.getRecoveringRegions();
+              if (!recoveringRegions.isEmpty()) {
+                // Make a local copy to prevent ConcurrentModificationException when other threads
+                // modify recoveringRegions
+                List<String> tmpCopy = new ArrayList<String>(recoveringRegions.keySet());
+                for (String region : tmpCopy) {
+                  String nodePath = ZKUtil.joinZNode(this.watcher.recoveringRegionsZNode, region);
+                  try {
+                    if (ZKUtil.checkExists(this.watcher, nodePath) == -1) {
+                      HRegion r = recoveringRegions.remove(region);
+                      if (r != null) {
+                        r.setRecovering(false);
+                      }
+                      LOG.debug("Mark recovering region:" + region + " up.");
+                    } else {
+                      // current check is a defensive(or redundant) mechanism to prevent us from
+                      // having stale recovering regions in our internal RS memory state while
+                      // zookeeper(source of truth) says differently. We stop at the first good one
+                      // because we should not have a single instance such as this in normal case so
+                      // check the first one is good enough.
+                      break;
+                    }
+                  } catch (KeeperException e) {
+                    // ignore zookeeper error
+                    LOG.debug("Got a zookeeper when trying to open a recovering region", e);
+                    break;
+                  }
+                }
+              }
+            }
           } catch (InterruptedException e) {
             LOG.info("SplitLogWorker interrupted while waiting for task," +
                 " exiting: " + e.toString() + (exitWorker ? "" :
@@ -214,6 +272,7 @@ public class SplitLogWorker extends ZooK
           }
         }
       }
+
     }
   }
 
@@ -463,9 +522,6 @@ public class SplitLogWorker extends ZooK
     }
   }
 
-
-
-
   @Override
   public void nodeDataChanged(String path) {
     // there will be a self generated dataChanged event every time attemptToOwnTask()
@@ -510,7 +566,6 @@ public class SplitLogWorker extends ZooK
     return childrenPaths;
   }
 
-
   @Override
   public void nodeChildrenChanged(String path) {
     if(path.equals(watcher.splitLogZNode)) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java?rev=1482675&r1=1482674&r2=1482675&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java Wed May 15 04:24:02 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.regionserver.handler;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
@@ -137,6 +138,16 @@ public class OpenRegionHandler extends E
       if (region == null) {
         return;
       }
+
+      // check if we need set current region in recovering state
+      region.setRecovering(false);
+      Map<String, HRegion> recoveringRegions = this.rsServices.getRecoveringRegions();
+      if (recoveringRegions != null && !recoveringRegions.isEmpty()
+          && recoveringRegions.containsKey(region.getRegionInfo().getEncodedName())) {
+        region.setRecovering(true);
+        recoveringRegions.put(region.getRegionInfo().getEncodedName(), region);
+      }
+
       boolean failed = true;
       if (tickleOpening("post_region_open")) {
         if (updateMeta(region)) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java?rev=1482675&r1=1482674&r2=1482675&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java Wed May 15 04:24:02 2013
@@ -1200,6 +1200,10 @@ class FSHLog implements HLog, Syncable {
       long now = EnvironmentEdgeManager.currentTimeMillis();
       // coprocessor hook:
       if (!coprocessorHost.preWALWrite(info, logKey, logEdit)) {
+        if (logEdit.isReplay()) {
+          // set replication scope null so that this won't be replicated
+          logKey.setScopes(null);
+        }
         // write to our buffer for the Hlog file.
         logSyncer.append(new FSHLog.Entry(logKey, logEdit));
       }