You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/05/15 06:24:03 UTC

svn commit: r1482675 [2/5] - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ hbase-client/src/main/java/org/a...

Modified: hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HBaseProtos.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HBaseProtos.java?rev=1482675&r1=1482674&r2=1482675&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HBaseProtos.java (original)
+++ hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HBaseProtos.java Wed May 15 04:24:02 2013
@@ -3132,6 +3132,10 @@ public final class HBaseProtos {
     // optional bool split = 6;
     boolean hasSplit();
     boolean getSplit();
+    
+    // optional bool recovering = 7;
+    boolean hasRecovering();
+    boolean getRecovering();
   }
   public static final class RegionInfo extends
       com.google.protobuf.GeneratedMessage
@@ -3222,6 +3226,16 @@ public final class HBaseProtos {
       return split_;
     }
     
+    // optional bool recovering = 7;
+    public static final int RECOVERING_FIELD_NUMBER = 7;
+    private boolean recovering_;
+    public boolean hasRecovering() {
+      return ((bitField0_ & 0x00000040) == 0x00000040);
+    }
+    public boolean getRecovering() {
+      return recovering_;
+    }
+    
     private void initFields() {
       regionId_ = 0L;
       tableName_ = com.google.protobuf.ByteString.EMPTY;
@@ -3229,6 +3243,7 @@ public final class HBaseProtos {
       endKey_ = com.google.protobuf.ByteString.EMPTY;
       offline_ = false;
       split_ = false;
+      recovering_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -3268,6 +3283,9 @@ public final class HBaseProtos {
       if (((bitField0_ & 0x00000020) == 0x00000020)) {
         output.writeBool(6, split_);
       }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        output.writeBool(7, recovering_);
+      }
       getUnknownFields().writeTo(output);
     }
     
@@ -3301,6 +3319,10 @@ public final class HBaseProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(6, split_);
       }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(7, recovering_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -3354,6 +3376,11 @@ public final class HBaseProtos {
         result = result && (getSplit()
             == other.getSplit());
       }
+      result = result && (hasRecovering() == other.hasRecovering());
+      if (hasRecovering()) {
+        result = result && (getRecovering()
+            == other.getRecovering());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -3387,6 +3414,10 @@ public final class HBaseProtos {
         hash = (37 * hash) + SPLIT_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getSplit());
       }
+      if (hasRecovering()) {
+        hash = (37 * hash) + RECOVERING_FIELD_NUMBER;
+        hash = (53 * hash) + hashBoolean(getRecovering());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       return hash;
     }
@@ -3515,6 +3546,8 @@ public final class HBaseProtos {
         bitField0_ = (bitField0_ & ~0x00000010);
         split_ = false;
         bitField0_ = (bitField0_ & ~0x00000020);
+        recovering_ = false;
+        bitField0_ = (bitField0_ & ~0x00000040);
         return this;
       }
       
@@ -3577,6 +3610,10 @@ public final class HBaseProtos {
           to_bitField0_ |= 0x00000020;
         }
         result.split_ = split_;
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000040;
+        }
+        result.recovering_ = recovering_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -3611,6 +3648,9 @@ public final class HBaseProtos {
         if (other.hasSplit()) {
           setSplit(other.getSplit());
         }
+        if (other.hasRecovering()) {
+          setRecovering(other.getRecovering());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -3680,6 +3720,11 @@ public final class HBaseProtos {
               split_ = input.readBool();
               break;
             }
+            case 56: {
+              bitField0_ |= 0x00000040;
+              recovering_ = input.readBool();
+              break;
+            }
           }
         }
       }
@@ -3821,6 +3866,27 @@ public final class HBaseProtos {
         return this;
       }
       
+      // optional bool recovering = 7;
+      private boolean recovering_ ;
+      public boolean hasRecovering() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      public boolean getRecovering() {
+        return recovering_;
+      }
+      public Builder setRecovering(boolean value) {
+        bitField0_ |= 0x00000040;
+        recovering_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearRecovering() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        recovering_ = false;
+        onChanged();
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:RegionInfo)
     }
     
@@ -14719,60 +14785,60 @@ public final class HBaseProtos {
       "hema\022&\n\rconfiguration\030\004 \003(\0132\017.NameString" +
       "Pair\"o\n\022ColumnFamilySchema\022\014\n\004name\030\001 \002(\014" +
       "\022#\n\nattributes\030\002 \003(\0132\017.BytesBytesPair\022&\n" +
-      "\rconfiguration\030\003 \003(\0132\017.NameStringPair\"s\n",
-      "\nRegionInfo\022\020\n\010regionId\030\001 \002(\004\022\021\n\ttableNa" +
-      "me\030\002 \002(\014\022\020\n\010startKey\030\003 \001(\014\022\016\n\006endKey\030\004 \001" +
-      "(\014\022\017\n\007offline\030\005 \001(\010\022\r\n\005split\030\006 \001(\010\"0\n\014Fa" +
-      "voredNodes\022 \n\013favoredNode\030\001 \003(\0132\013.Server" +
-      "Name\"\225\001\n\017RegionSpecifier\0222\n\004type\030\001 \002(\0162$" +
-      ".RegionSpecifier.RegionSpecifierType\022\r\n\005" +
-      "value\030\002 \002(\014\"?\n\023RegionSpecifierType\022\017\n\013RE" +
-      "GION_NAME\020\001\022\027\n\023ENCODED_REGION_NAME\020\002\"\260\003\n" +
-      "\nRegionLoad\022)\n\017regionSpecifier\030\001 \002(\0132\020.R" +
-      "egionSpecifier\022\016\n\006stores\030\002 \001(\r\022\022\n\nstoref",
-      "iles\030\003 \001(\r\022\037\n\027storeUncompressedSizeMB\030\004 " +
-      "\001(\r\022\027\n\017storefileSizeMB\030\005 \001(\r\022\026\n\016memstore" +
-      "SizeMB\030\006 \001(\r\022\034\n\024storefileIndexSizeMB\030\007 \001" +
-      "(\r\022\031\n\021readRequestsCount\030\010 \001(\004\022\032\n\022writeRe" +
-      "questsCount\030\t \001(\004\022\032\n\022totalCompactingKVs\030" +
-      "\n \001(\004\022\033\n\023currentCompactedKVs\030\013 \001(\004\022\027\n\017ro" +
-      "otIndexSizeKB\030\014 \001(\r\022\036\n\026totalStaticIndexS" +
-      "izeKB\030\r \001(\r\022\036\n\026totalStaticBloomSizeKB\030\016 " +
-      "\001(\r\022\032\n\022completeSequenceId\030\017 \001(\004\"\372\001\n\nServ" +
-      "erLoad\022\030\n\020numberOfRequests\030\001 \001(\r\022\035\n\025tota",
-      "lNumberOfRequests\030\002 \001(\r\022\022\n\nusedHeapMB\030\003 " +
-      "\001(\r\022\021\n\tmaxHeapMB\030\004 \001(\r\022 \n\013regionLoads\030\005 " +
-      "\003(\0132\013.RegionLoad\022\"\n\014coprocessors\030\006 \003(\0132\014" +
-      ".Coprocessor\022\027\n\017reportStartTime\030\007 \001(\004\022\025\n" +
-      "\rreportEndTime\030\010 \001(\004\022\026\n\016infoServerPort\030\t" +
-      " \001(\r\"%\n\tTimeRange\022\014\n\004from\030\001 \001(\004\022\n\n\002to\030\002 " +
-      "\001(\004\"0\n\006Filter\022\014\n\004name\030\001 \002(\t\022\030\n\020serialize" +
-      "dFilter\030\002 \001(\014\"x\n\010KeyValue\022\013\n\003row\030\001 \002(\014\022\016" +
-      "\n\006family\030\002 \002(\014\022\021\n\tqualifier\030\003 \002(\014\022\021\n\ttim" +
-      "estamp\030\004 \001(\004\022\032\n\007keyType\030\005 \001(\0162\t.CellType",
-      "\022\r\n\005value\030\006 \001(\014\"?\n\nServerName\022\020\n\010hostNam" +
-      "e\030\001 \002(\t\022\014\n\004port\030\002 \001(\r\022\021\n\tstartCode\030\003 \001(\004" +
-      "\"\033\n\013Coprocessor\022\014\n\004name\030\001 \002(\t\"-\n\016NameStr" +
-      "ingPair\022\014\n\004name\030\001 \002(\t\022\r\n\005value\030\002 \002(\t\",\n\r" +
-      "NameBytesPair\022\014\n\004name\030\001 \002(\t\022\r\n\005value\030\002 \001" +
-      "(\014\"/\n\016BytesBytesPair\022\r\n\005first\030\001 \002(\014\022\016\n\006s" +
-      "econd\030\002 \002(\014\",\n\rNameInt64Pair\022\014\n\004name\030\001 \001" +
-      "(\t\022\r\n\005value\030\002 \001(\003\"\255\001\n\023SnapshotDescriptio" +
-      "n\022\014\n\004name\030\001 \002(\t\022\r\n\005table\030\002 \001(\t\022\027\n\014creati" +
-      "onTime\030\003 \001(\003:\0010\022.\n\004type\030\004 \001(\0162\031.Snapshot",
-      "Description.Type:\005FLUSH\022\017\n\007version\030\005 \001(\005" +
-      "\"\037\n\004Type\022\014\n\010DISABLED\020\000\022\t\n\005FLUSH\020\001\"\n\n\010Emp" +
-      "tyMsg\"\032\n\007LongMsg\022\017\n\007longMsg\030\001 \002(\003\"&\n\rBig" +
-      "DecimalMsg\022\025\n\rbigdecimalMsg\030\001 \002(\014\"1\n\004UUI" +
-      "D\022\024\n\014leastSigBits\030\001 \002(\004\022\023\n\013mostSigBits\030\002" +
-      " \002(\004*`\n\010CellType\022\013\n\007MINIMUM\020\000\022\007\n\003PUT\020\004\022\n" +
-      "\n\006DELETE\020\010\022\021\n\rDELETE_COLUMN\020\014\022\021\n\rDELETE_" +
-      "FAMILY\020\016\022\014\n\007MAXIMUM\020\377\001*r\n\013CompareType\022\010\n" +
-      "\004LESS\020\000\022\021\n\rLESS_OR_EQUAL\020\001\022\t\n\005EQUAL\020\002\022\r\n" +
-      "\tNOT_EQUAL\020\003\022\024\n\020GREATER_OR_EQUAL\020\004\022\013\n\007GR",
-      "EATER\020\005\022\t\n\005NO_OP\020\006B>\n*org.apache.hadoop." +
-      "hbase.protobuf.generatedB\013HBaseProtosH\001\240" +
-      "\001\001"
+      "\rconfiguration\030\003 \003(\0132\017.NameStringPair\"\207\001",
+      "\n\nRegionInfo\022\020\n\010regionId\030\001 \002(\004\022\021\n\ttableN" +
+      "ame\030\002 \002(\014\022\020\n\010startKey\030\003 \001(\014\022\016\n\006endKey\030\004 " +
+      "\001(\014\022\017\n\007offline\030\005 \001(\010\022\r\n\005split\030\006 \001(\010\022\022\n\nr" +
+      "ecovering\030\007 \001(\010\"0\n\014FavoredNodes\022 \n\013favor" +
+      "edNode\030\001 \003(\0132\013.ServerName\"\225\001\n\017RegionSpec" +
+      "ifier\0222\n\004type\030\001 \002(\0162$.RegionSpecifier.Re" +
+      "gionSpecifierType\022\r\n\005value\030\002 \002(\014\"?\n\023Regi" +
+      "onSpecifierType\022\017\n\013REGION_NAME\020\001\022\027\n\023ENCO" +
+      "DED_REGION_NAME\020\002\"\260\003\n\nRegionLoad\022)\n\017regi" +
+      "onSpecifier\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006s",
+      "tores\030\002 \001(\r\022\022\n\nstorefiles\030\003 \001(\r\022\037\n\027store" +
+      "UncompressedSizeMB\030\004 \001(\r\022\027\n\017storefileSiz" +
+      "eMB\030\005 \001(\r\022\026\n\016memstoreSizeMB\030\006 \001(\r\022\034\n\024sto" +
+      "refileIndexSizeMB\030\007 \001(\r\022\031\n\021readRequestsC" +
+      "ount\030\010 \001(\004\022\032\n\022writeRequestsCount\030\t \001(\004\022\032" +
+      "\n\022totalCompactingKVs\030\n \001(\004\022\033\n\023currentCom" +
+      "pactedKVs\030\013 \001(\004\022\027\n\017rootIndexSizeKB\030\014 \001(\r" +
+      "\022\036\n\026totalStaticIndexSizeKB\030\r \001(\r\022\036\n\026tota" +
+      "lStaticBloomSizeKB\030\016 \001(\r\022\032\n\022completeSequ" +
+      "enceId\030\017 \001(\004\"\372\001\n\nServerLoad\022\030\n\020numberOfR",
+      "equests\030\001 \001(\r\022\035\n\025totalNumberOfRequests\030\002" +
+      " \001(\r\022\022\n\nusedHeapMB\030\003 \001(\r\022\021\n\tmaxHeapMB\030\004 " +
+      "\001(\r\022 \n\013regionLoads\030\005 \003(\0132\013.RegionLoad\022\"\n" +
+      "\014coprocessors\030\006 \003(\0132\014.Coprocessor\022\027\n\017rep" +
+      "ortStartTime\030\007 \001(\004\022\025\n\rreportEndTime\030\010 \001(" +
+      "\004\022\026\n\016infoServerPort\030\t \001(\r\"%\n\tTimeRange\022\014" +
+      "\n\004from\030\001 \001(\004\022\n\n\002to\030\002 \001(\004\"0\n\006Filter\022\014\n\004na" +
+      "me\030\001 \002(\t\022\030\n\020serializedFilter\030\002 \001(\014\"x\n\010Ke" +
+      "yValue\022\013\n\003row\030\001 \002(\014\022\016\n\006family\030\002 \002(\014\022\021\n\tq" +
+      "ualifier\030\003 \002(\014\022\021\n\ttimestamp\030\004 \001(\004\022\032\n\007key",
+      "Type\030\005 \001(\0162\t.CellType\022\r\n\005value\030\006 \001(\014\"?\n\n" +
+      "ServerName\022\020\n\010hostName\030\001 \002(\t\022\014\n\004port\030\002 \001" +
+      "(\r\022\021\n\tstartCode\030\003 \001(\004\"\033\n\013Coprocessor\022\014\n\004" +
+      "name\030\001 \002(\t\"-\n\016NameStringPair\022\014\n\004name\030\001 \002" +
+      "(\t\022\r\n\005value\030\002 \002(\t\",\n\rNameBytesPair\022\014\n\004na" +
+      "me\030\001 \002(\t\022\r\n\005value\030\002 \001(\014\"/\n\016BytesBytesPai" +
+      "r\022\r\n\005first\030\001 \002(\014\022\016\n\006second\030\002 \002(\014\",\n\rName" +
+      "Int64Pair\022\014\n\004name\030\001 \001(\t\022\r\n\005value\030\002 \001(\003\"\255" +
+      "\001\n\023SnapshotDescription\022\014\n\004name\030\001 \002(\t\022\r\n\005" +
+      "table\030\002 \001(\t\022\027\n\014creationTime\030\003 \001(\003:\0010\022.\n\004",
+      "type\030\004 \001(\0162\031.SnapshotDescription.Type:\005F" +
+      "LUSH\022\017\n\007version\030\005 \001(\005\"\037\n\004Type\022\014\n\010DISABLE" +
+      "D\020\000\022\t\n\005FLUSH\020\001\"\n\n\010EmptyMsg\"\032\n\007LongMsg\022\017\n" +
+      "\007longMsg\030\001 \002(\003\"&\n\rBigDecimalMsg\022\025\n\rbigde" +
+      "cimalMsg\030\001 \002(\014\"1\n\004UUID\022\024\n\014leastSigBits\030\001" +
+      " \002(\004\022\023\n\013mostSigBits\030\002 \002(\004*`\n\010CellType\022\013\n" +
+      "\007MINIMUM\020\000\022\007\n\003PUT\020\004\022\n\n\006DELETE\020\010\022\021\n\rDELET" +
+      "E_COLUMN\020\014\022\021\n\rDELETE_FAMILY\020\016\022\014\n\007MAXIMUM" +
+      "\020\377\001*r\n\013CompareType\022\010\n\004LESS\020\000\022\021\n\rLESS_OR_" +
+      "EQUAL\020\001\022\t\n\005EQUAL\020\002\022\r\n\tNOT_EQUAL\020\003\022\024\n\020GRE",
+      "ATER_OR_EQUAL\020\004\022\013\n\007GREATER\020\005\022\t\n\005NO_OP\020\006B" +
+      ">\n*org.apache.hadoop.hbase.protobuf.gene" +
+      "ratedB\013HBaseProtosH\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -14808,7 +14874,7 @@ public final class HBaseProtos {
           internal_static_RegionInfo_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_RegionInfo_descriptor,
-              new java.lang.String[] { "RegionId", "TableName", "StartKey", "EndKey", "Offline", "Split", },
+              new java.lang.String[] { "RegionId", "TableName", "StartKey", "EndKey", "Offline", "Split", "Recovering", },
               org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo.class,
               org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo.Builder.class);
           internal_static_FavoredNodes_descriptor =

Modified: hbase/trunk/hbase-protocol/src/main/protobuf/Admin.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/Admin.proto?rev=1482675&r1=1482674&r2=1482675&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/Admin.proto (original)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/Admin.proto Wed May 15 04:24:02 2013
@@ -24,6 +24,7 @@ option java_generic_services = true;
 option java_generate_equals_and_hash = true;
 option optimize_for = SPEED;
 
+import "Client.proto";
 import "hbase.proto";
 import "WAL.proto";
 
@@ -233,6 +234,9 @@ service AdminService {
 
   rpc replicateWALEntry(ReplicateWALEntryRequest)
     returns(ReplicateWALEntryResponse);
+    
+  rpc replay(MultiRequest)
+    returns(MultiResponse);    
 
   rpc rollWALWriter(RollWALWriterRequest)
     returns(RollWALWriterResponse);

Modified: hbase/trunk/hbase-protocol/src/main/protobuf/hbase.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/hbase.proto?rev=1482675&r1=1482674&r2=1482675&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/hbase.proto (original)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/hbase.proto Wed May 15 04:24:02 2013
@@ -81,6 +81,7 @@ message RegionInfo {
   optional bytes endKey = 4;
   optional bool offline = 5;
   optional bool split = 6;
+  optional bool recovering = 7;
 }
 
 /**

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1482675&r1=1482674&r2=1482675&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Wed May 15 04:24:02 2013
@@ -422,7 +422,8 @@ public class AssignmentManager extends Z
       return;
     }
 
-    boolean failover = !serverManager.getDeadServers().isEmpty();
+    boolean failover = (!serverManager.getDeadServers().isEmpty() || !serverManager
+        .getRequeuedDeadServers().isEmpty());
 
     if (!failover) {
       // Run through all regions.  If they are not assigned and not in RIT, then
@@ -2728,18 +2729,38 @@ public class AssignmentManager extends Z
    */
   public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
       throws IOException, InterruptedException {
-    if (!regionStates.isRegionInTransition(hri)) return;
+    waitOnRegionToClearRegionsInTransition(hri, -1L);
+  }
+
+  /**
+   * Wait on region to clear regions-in-transition or time out
+   * @param hri
+   * @param timeOut Milliseconds to wait for current region to be out of transition state.
+   * @return True when a region clears regions-in-transition before timeout otherwise false
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
+      throws IOException, InterruptedException {
+    if (!regionStates.isRegionInTransition(hri)) return true;
     RegionState rs = null;
+    long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTimeMillis()
+        + timeOut;
     // There is already a timeout monitor on regions in transition so I
     // should not have to have one here too?
-    while(!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
-      LOG.info("Waiting on " + rs + " to clear regions-in-transition");
+    LOG.info("Waiting on " + rs + " to clear regions-in-transition");
+    while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
       regionStates.waitForUpdate(100);
+      if (EnvironmentEdgeManager.currentTimeMillis() > end) {
+        LOG.info("Timed out on waiting for region:" + hri.getEncodedName() + " to be assigned.");
+        return false;
+      }
     }
     if (this.server.isStopped()) {
-      LOG.info("Giving up wait on regions in " +
-        "transition because stoppable.isStopped is set");
+      LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
+      return false;
     }
+    return true;
   }
 
   /**

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1482675&r1=1482674&r2=1482675&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Wed May 15 04:24:02 2013
@@ -27,6 +27,7 @@ import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -347,7 +348,15 @@ MasterServices, Server {
 
   /** The health check chore. */
   private HealthCheckChore healthCheckChore;
+  
+  /**
+   * is in distributedLogReplay mode. When true, SplitLogWorker directly replays WAL edits to newly
+   * assigned region servers instead of creating recovered.edits files.
+   */
+  private final boolean distributedLogReplay;
 
+  /** flag used in test cases in order to simulate RS failures during master initialization */
+  private volatile boolean initializationBeforeMetaAssignment = false;
 
   /**
    * Initializes the HMaster. The steps are as follows:
@@ -451,6 +460,9 @@ MasterServices, Server {
       clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
       Threads.setDaemonThreadRunning(clusterStatusPublisherChore.getThread());
     }
+
+    distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, 
+      HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
   }
 
   /**
@@ -769,18 +781,47 @@ MasterServices, Server {
       this.assignmentManager.startTimeOutMonitor();
     }
 
-    // TODO: Should do this in background rather than block master startup
-    status.setStatus("Splitting logs after master startup");
-    splitLogAfterStartup(this.fileSystemManager);
+    // get a list for previously failed RS which need log splitting work
+    // we recover .META. region servers inside master initialization and
+    // handle other failed servers in SSH in order to start up master node ASAP
+    Set<ServerName> previouslyFailedServers = this.fileSystemManager
+        .getFailedServersFromLogFolders();
+
+    // remove stale recovering regions from previous run
+    this.fileSystemManager.removeStaleRecoveringRegionsFromZK(previouslyFailedServers);
+
+    // log splitting for .META. server
+    ServerName oldMetaServerLocation = this.catalogTracker.getMetaLocation();
+    if (oldMetaServerLocation != null && previouslyFailedServers.contains(oldMetaServerLocation)) {
+      splitMetaLogBeforeAssignment(oldMetaServerLocation);
+      // Note: we can't remove oldMetaServerLocation from previousFailedServers list because it
+      // may also host user regions
+    }
 
+    this.initializationBeforeMetaAssignment = true;
     // Make sure meta assigned before proceeding.
-    if (!assignMeta(status)) return;
+    status.setStatus("Assigning Meta Region");
+    assignMeta(status);
+
+    if (this.distributedLogReplay && oldMetaServerLocation != null
+        && previouslyFailedServers.contains(oldMetaServerLocation)) {
+      // replay WAL edits mode need new .META. RS is assigned firstly
+      status.setStatus("replaying log for Meta Region");
+      this.fileSystemManager.splitMetaLog(oldMetaServerLocation);
+    }
+
     enableServerShutdownHandler();
 
+    status.setStatus("Submitting log splitting work for previously failed region servers");
+    // Master has recovered META region server and we put
+    // other failed region servers in a queue to be handled later by SSH
+    for (ServerName tmpServer : previouslyFailedServers) {
+      this.serverManager.processDeadServer(tmpServer, true);
+    }
+
     // Update meta with new PB serialization if required. i.e migrate all HRI to PB serialization
     // in meta. This must happen before we assign all user regions or else the assignment will
     // fail.
-    // TODO: Remove this after 0.96, when we do 0.98.
     org.apache.hadoop.hbase.catalog.MetaMigrationConvertingToPB
       .updateMetaIfNecessary(this);
 
@@ -830,14 +871,6 @@ MasterServices, Server {
   }
 
   /**
-   * Override to change master's splitLogAfterStartup. Used testing
-   * @param mfs
-   */
-  protected void splitLogAfterStartup(final MasterFileSystem mfs) {
-    mfs.splitLogAfterStartup();
-  }
-
-  /**
    * Create a {@link ServerManager} instance.
    * @param master
    * @param services
@@ -865,52 +898,66 @@ MasterServices, Server {
   }
 
   /**
-   * Check <code>.META.</code> are assigned.  If not,
-   * assign them.
+   * Check <code>.META.</code> is assigned. If not, assign it.
+   * @param status MonitoredTask
    * @throws InterruptedException
    * @throws IOException
    * @throws KeeperException
-   * @return True if meta is healthy, assigned
    */
-  boolean assignMeta(MonitoredTask status)
-  throws InterruptedException, IOException, KeeperException {
+  void assignMeta(MonitoredTask status)
+      throws InterruptedException, IOException, KeeperException {
+    // Work on meta region
     int assigned = 0;
     long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000);
+    boolean beingExpired = false;
 
-    // Work on .META. region.  Is it in zk in transition?
     status.setStatus("Assigning META region");
-    assignmentManager.getRegionStates().createRegionState(
-        HRegionInfo.FIRST_META_REGIONINFO);
-    boolean rit = this.assignmentManager.
-      processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO);
-    ServerName currentMetaServer = null;
-    boolean metaRegionLocation = catalogTracker.verifyMetaRegionLocation(timeout);
+    
+    assignmentManager.getRegionStates().createRegionState(HRegionInfo.FIRST_META_REGIONINFO);
+    boolean rit = this.assignmentManager
+        .processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO);
+    boolean metaRegionLocation = this.catalogTracker.verifyMetaRegionLocation(timeout);
     if (!rit && !metaRegionLocation) {
-      currentMetaServer = this.catalogTracker.getMetaLocation();
-      splitLogAndExpireIfOnline(currentMetaServer);
-      this.assignmentManager.assignMeta();
-      enableSSHandWaitForMeta();
+      ServerName currentMetaServer = this.catalogTracker.getMetaLocation();
+      if (currentMetaServer != null) {
+        beingExpired = expireIfOnline(currentMetaServer);
+      }
+      if (beingExpired) {
+        splitMetaLogBeforeAssignment(currentMetaServer);
+      }
+      assignmentManager.assignMeta();
       // Make sure a .META. location is set.
-      if (!isMetaLocation()) return false;
-      // This guarantees that the transition assigning .META. has completed
-      this.assignmentManager.waitForAssignment(HRegionInfo.FIRST_META_REGIONINFO);
+      enableSSHandWaitForMeta();
       assigned++;
+      if (beingExpired && this.distributedLogReplay) {
+        // In Replay WAL Mode, we need the new .META. server online
+        this.fileSystemManager.splitMetaLog(currentMetaServer);
+      }
     } else if (rit && !metaRegionLocation) {
       // Make sure a .META. location is set.
-      if (!isMetaLocation()) return false;
-      // This guarantees that the transition assigning .META. has completed
-      this.assignmentManager.waitForAssignment(HRegionInfo.FIRST_META_REGIONINFO);
+      enableSSHandWaitForMeta();
       assigned++;
-    } else if (metaRegionLocation) {
-      // Region already assigned.  We didn't assign it.  Add to in-memory state.
+    } else {
+      // Region already assigned. We didn't assign it. Add to in-memory state.
       this.assignmentManager.regionOnline(HRegionInfo.FIRST_META_REGIONINFO,
         this.catalogTracker.getMetaLocation());
     }
     enableCatalogTables(Bytes.toString(HConstants.META_TABLE_NAME));
-    LOG.info(".META. assigned=" + assigned + ", rit=" + rit +
-      ", location=" + catalogTracker.getMetaLocation());
+    LOG.info(".META. assigned=" + assigned + ", rit=" + rit + ", location="
+        + catalogTracker.getMetaLocation());
     status.setStatus("META assigned.");
-    return true;
+  }
+
+  private void splitMetaLogBeforeAssignment(ServerName currentMetaServer) throws IOException {
+    if (this.distributedLogReplay) {
+      // In log replay mode, we mark META region as recovering in ZK
+      Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
+      regions.add(HRegionInfo.FIRST_META_REGIONINFO);
+      this.fileSystemManager.prepareMetaLogReplay(currentMetaServer, regions);
+    } else {
+      // In recovered.edits mode: create recovered edits file for .META. server
+      this.fileSystemManager.splitMetaLog(currentMetaServer);
+    }
   }
 
   private void enableSSHandWaitForMeta() throws IOException, InterruptedException {
@@ -921,24 +968,6 @@ MasterServices, Server {
     this.assignmentManager.waitForAssignment(HRegionInfo.FIRST_META_REGIONINFO);
   }
 
-  /**
-   * @return True if there a meta available
-   * @throws InterruptedException
-   */
-  private boolean isMetaLocation() throws InterruptedException {
-    // Cycle up here in master rather than down in catalogtracker so we can
-    // check the master stopped flag every so often.
-    while (!this.stopped) {
-      try {
-        if (this.catalogTracker.waitForMeta(100) != null) break;
-      } catch (NotAllMetaRegionsOnlineException e) {
-        // Ignore.  I know .META. is not online yet.
-      }
-    }
-    // We got here because we came of above loop.
-    return !this.stopped;
-  }
-
   private void enableCatalogTables(String catalogTableName) {
     if (!this.assignmentManager.getZKTable().isEnabledTable(catalogTableName)) {
       this.assignmentManager.setEnabledTable(catalogTableName);
@@ -946,20 +975,19 @@ MasterServices, Server {
   }
 
   /**
-   * Split a server's log and expire it if we find it is one of the online
-   * servers.
+   * Expire a server if we find it is one of the online servers.
    * @param sn ServerName to check.
+   * @return true when server <code>sn<code> is being expired by the function.
    * @throws IOException
    */
-  private void splitLogAndExpireIfOnline(final ServerName sn)
+  private boolean expireIfOnline(final ServerName sn)
       throws IOException {
     if (sn == null || !serverManager.isServerOnline(sn)) {
-      return;
+      return false;
     }
-    LOG.info("Forcing splitLog and expire of " + sn);
-    fileSystemManager.splitMetaLog(sn);
-    fileSystemManager.splitLog(sn);
+    LOG.info("Forcing expire of " + sn);
     serverManager.expireServer(sn);
+    return true;
   }
 
   @Override
@@ -2235,6 +2263,14 @@ MasterServices, Server {
     return this.serverShutdownHandlerEnabled;
   }
 
+  /**
+   * Report whether this master has started initialization and is about to do meta region assignment
+   * @return true if master is in initialization & about to assign META regions
+   */
+  public boolean isInitializationStartsMetaRegionAssignment() {
+    return this.initializationBeforeMetaAssignment;
+  }
+
   @Override
   public AssignRegionResponse assignRegion(RpcController controller, AssignRegionRequest req)
   throws ServiceException {
@@ -2678,4 +2714,5 @@ MasterServices, Server {
     String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
     return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
   }
+
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java?rev=1482675&r1=1482674&r2=1482675&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java Wed May 15 04:24:02 2013
@@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.master;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.NavigableMap;
 import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -34,26 +36,29 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hbase.ClusterId;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.exceptions.InvalidFamilyOperationException;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.backup.HFileArchiver;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.exceptions.InvalidFamilyOperationException;
+import org.apache.hadoop.hbase.exceptions.OrphanHLogAfterSplitException;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
-import org.apache.hadoop.hbase.exceptions.OrphanHLogAfterSplitException;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * This class abstracts a bunch of operations the HMaster needs to interact with
@@ -83,6 +88,7 @@ public class MasterFileSystem {
   private final Path tempdir;
   // create the split log lock
   final Lock splitLogLock = new ReentrantLock();
+  final boolean distributedLogReplay;
   final boolean distributedLogSplitting;
   final SplitLogManager splitLogManager;
   private final MasterServices services;
@@ -118,15 +124,14 @@ public class MasterFileSystem {
     FSUtils.setFsDefault(conf, new Path(this.fs.getUri()));
     // make sure the fs has the same conf
     fs.setConf(conf);
-    this.distributedLogSplitting =
-      conf.getBoolean(HConstants.DISTRIBUTED_LOG_SPLITTING_KEY, true);
+    this.splitLogManager = new SplitLogManager(master.getZooKeeper(), master.getConfiguration(),
+        master, services, master.getServerName());
+    this.distributedLogSplitting = conf.getBoolean(HConstants.DISTRIBUTED_LOG_SPLITTING_KEY, true);
     if (this.distributedLogSplitting) {
-      this.splitLogManager = new SplitLogManager(master.getZooKeeper(),
-          master.getConfiguration(), master, services, master.getServerName());
       this.splitLogManager.finishInitialization(masterRecovery);
-    } else {
-      this.splitLogManager = null;
     }
+    this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, 
+      HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
     // setup the filesystem variable
     // set up the archived logs path
     this.oldLogDir = createInitialFileSystemLayout();
@@ -212,21 +217,23 @@ public class MasterFileSystem {
   }
 
   /**
-   * Inspect the log directory to recover any log file without
-   * an active region server.
+   * Inspect the log directory to find dead servers which need recovery work
+   * @return A set of ServerNames which aren't running but still have WAL files left in file system
    */
-  void splitLogAfterStartup() {
+  Set<ServerName> getFailedServersFromLogFolders() {
     boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors",
-        HLog.SPLIT_SKIP_ERRORS_DEFAULT);
+      HLog.SPLIT_SKIP_ERRORS_DEFAULT);
+
+    Set<ServerName> serverNames = new HashSet<ServerName>();
     Path logsDirPath = new Path(this.rootdir, HConstants.HREGION_LOGDIR_NAME);
+
     do {
       if (master.isStopped()) {
-        LOG.warn("Master stopped while splitting logs");
+        LOG.warn("Master stopped while trying to get failed servers.");
         break;
       }
-      List<ServerName> serverNames = new ArrayList<ServerName>();
       try {
-        if (!this.fs.exists(logsDirPath)) return;
+        if (!this.fs.exists(logsDirPath)) return serverNames;
         FileStatus[] logFolders = FSUtils.listStatus(this.fs, logsDirPath, null);
         // Get online servers after getting log folders to avoid log folder deletion of newly
         // checked in region servers . see HBASE-5916
@@ -235,7 +242,7 @@ public class MasterFileSystem {
 
         if (logFolders == null || logFolders.length == 0) {
           LOG.debug("No log files to split, proceeding...");
-          return;
+          return serverNames;
         }
         for (FileStatus status : logFolders) {
           String sn = status.getPath().getName();
@@ -249,23 +256,19 @@ public class MasterFileSystem {
                 + "to a known region server, splitting");
             serverNames.add(serverName);
           } else {
-            LOG.info("Log folder " + status.getPath()
-                + " belongs to an existing region server");
+            LOG.info("Log folder " + status.getPath() + " belongs to an existing region server");
           }
         }
-        splitLog(serverNames, META_FILTER);
-        splitLog(serverNames, NON_META_FILTER);
         retrySplitting = false;
       } catch (IOException ioe) {
-        LOG.warn("Failed splitting of " + serverNames, ioe);
+        LOG.warn("Failed getting failed servers to be recovered.", ioe);
         if (!checkFileSystem()) {
           LOG.warn("Bad Filesystem, exiting");
           Runtime.getRuntime().halt(1);
         }
         try {
           if (retrySplitting) {
-            Thread.sleep(conf.getInt(
-              "hbase.hlog.split.failure.retry.interval", 30 * 1000));
+            Thread.sleep(conf.getInt("hbase.hlog.split.failure.retry.interval", 30 * 1000));
           }
         } catch (InterruptedException e) {
           LOG.warn("Interrupted, aborting since cannot return w/o splitting");
@@ -275,10 +278,12 @@ public class MasterFileSystem {
         }
       }
     } while (retrySplitting);
+
+    return serverNames;
   }
 
   public void splitLog(final ServerName serverName) throws IOException {
-    List<ServerName> serverNames = new ArrayList<ServerName>();
+    Set<ServerName> serverNames = new HashSet<ServerName>();
     serverNames.add(serverName);
     splitLog(serverNames);
   }
@@ -290,23 +295,20 @@ public class MasterFileSystem {
    */
   public void splitMetaLog(final ServerName serverName) throws IOException {
     long splitTime = 0, splitLogSize = 0;
-    List<ServerName> serverNames = new ArrayList<ServerName>();
+    Set<ServerName> serverNames = new HashSet<ServerName>();
     serverNames.add(serverName);
     List<Path> logDirs = getLogDirs(serverNames);
-    if (logDirs.isEmpty()) {
-      LOG.info("No meta logs to split");
-      return;
-    }
+
     splitLogManager.handleDeadWorkers(serverNames);
     splitTime = EnvironmentEdgeManager.currentTimeMillis();
-    splitLogSize = splitLogManager.splitLogDistributed(logDirs, META_FILTER);
+    splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, META_FILTER);
     splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime;
     if (this.metricsMaster != null) {
-      this.metricsMaster.addSplit(splitTime, splitLogSize);
+      this.metricsMaster.addMetaWALSplit(splitTime, splitLogSize);
     }
   }
 
-  private List<Path> getLogDirs(final List<ServerName> serverNames) throws IOException {
+  private List<Path> getLogDirs(final Set<ServerName> serverNames) throws IOException {
     List<Path> logDirs = new ArrayList<Path>();
     for (ServerName serverName: serverNames) {
       Path logDir = new Path(this.rootdir, HLogUtil.getHLogDirectoryName(serverName.toString()));
@@ -327,30 +329,79 @@ public class MasterFileSystem {
     return logDirs;
   }
 
-  public void splitLog(final List<ServerName> serverNames) throws IOException {
+  /**
+   * Mark regions in recovering state when distributedLogReplay are set true
+   * @param serverNames Set of ServerNames to be replayed wals in order to recover changes contained
+   *          in them
+   * @throws IOException
+   */
+  public void prepareLogReplay(Set<ServerName> serverNames) throws IOException {
+    if (!this.distributedLogReplay) {
+      return;
+    }
+    // mark regions in recovering state
+    for (ServerName serverName : serverNames) {
+      NavigableMap<HRegionInfo, Result> regions = this.getServerUserRegions(serverName);
+      if (regions == null) {
+        continue;
+      }
+      try {
+        this.splitLogManager.markRegionsRecoveringInZK(serverName, regions.keySet());
+      } catch (KeeperException e) {
+        throw new IOException(e);
+      }
+    }
+  }
+
+  /**
+   * Mark meta regions in recovering state when distributedLogReplay are set true. The function is used
+   * when {@link #getServerUserRegions(ServerName)} can't be used in case meta RS is down.
+   * @param serverName
+   * @param regions
+   * @throws IOException
+   */
+  public void prepareMetaLogReplay(ServerName serverName, Set<HRegionInfo> regions)
+      throws IOException {
+    if (!this.distributedLogReplay || (regions == null)) {
+      return;
+    }
+    // mark regions in recovering state
+    try {
+      this.splitLogManager.markRegionsRecoveringInZK(serverName, regions);
+    } catch (KeeperException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public void splitLog(final Set<ServerName> serverNames) throws IOException {
     splitLog(serverNames, NON_META_FILTER);
   }
 
   /**
-   * This method is the base split method that splits HLog files matching a filter.
-   * Callers should pass the appropriate filter for meta and non-meta HLogs.
+   * Wrapper function on {@link SplitLogManager#removeStaleRecoveringRegionsFromZK(Set)}
+   * @param failedServers
+   * @throws KeeperException
+   */
+  void removeStaleRecoveringRegionsFromZK(final Set<ServerName> failedServers)
+      throws KeeperException {
+    this.splitLogManager.removeStaleRecoveringRegionsFromZK(failedServers);
+  }
+
+  /**
+   * This method is the base split method that splits HLog files matching a filter. Callers should
+   * pass the appropriate filter for meta and non-meta HLogs.
    * @param serverNames
    * @param filter
    * @throws IOException
    */
-  public void splitLog(final List<ServerName> serverNames, PathFilter filter) throws IOException {
+  public void splitLog(final Set<ServerName> serverNames, PathFilter filter) throws IOException {
     long splitTime = 0, splitLogSize = 0;
     List<Path> logDirs = getLogDirs(serverNames);
 
-    if (logDirs.isEmpty()) {
-      LOG.info("No logs to split");
-      return;
-    }
-
     if (distributedLogSplitting) {
       splitLogManager.handleDeadWorkers(serverNames);
       splitTime = EnvironmentEdgeManager.currentTimeMillis();
-      splitLogSize = splitLogManager.splitLogDistributed(logDirs,filter);
+      splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, filter);
       splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime;
     } else {
       for(Path logDir: logDirs){
@@ -358,8 +409,8 @@ public class MasterFileSystem {
         // one at a time
         this.splitLogLock.lock();
         try {
-          HLogSplitter splitter = HLogSplitter.createLogSplitter(
-            conf, rootdir, logDir, oldLogDir, this.fs);
+          HLogSplitter splitter = HLogSplitter.createLogSplitter(conf, rootdir, logDir, oldLogDir,
+            this.fs);
           try {
             // If FS is in safe mode, just wait till out of it.
             FSUtils.waitOnSafeMode(conf, conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 1000));
@@ -380,7 +431,11 @@ public class MasterFileSystem {
     }
 
     if (this.metricsMaster != null) {
-      this.metricsMaster.addSplit(splitTime, splitLogSize);
+      if (filter == this.META_FILTER) {
+        this.metricsMaster.addMetaWALSplit(splitTime, splitLogSize);
+      } else {
+        this.metricsMaster.addSplit(splitTime, splitLogSize);
+      }
     }
   }
 
@@ -648,4 +703,18 @@ public class MasterFileSystem {
     this.services.getTableDescriptors().add(htd);
     return htd;
   }
+
+  private NavigableMap<HRegionInfo, Result> getServerUserRegions(ServerName serverName)
+      throws IOException {
+    if (!this.master.isStopped()) {
+      try {
+        this.master.getCatalogTracker().waitForMeta();
+        return MetaReader.getServerUserRegions(this.master.getCatalogTracker(), serverName);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOException("Interrupted", e);
+      }
+    }
+    return null;
+  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java?rev=1482675&r1=1482674&r2=1482675&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java Wed May 15 04:24:02 2013
@@ -181,4 +181,9 @@ public interface MasterServices extends 
   public void dispatchMergingRegions(final HRegionInfo region_a,
       final HRegionInfo region_b, final boolean forcible) throws IOException;
 
+  /**
+   * @return true if master is initialized
+   */
+  public boolean isInitialized();
+
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMaster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMaster.java?rev=1482675&r1=1482674&r2=1482675&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMaster.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMaster.java Wed May 15 04:24:02 2013
@@ -59,6 +59,16 @@ public class MetricsMaster {
   }
 
   /**
+   * Record a single instance of a split
+   * @param time time that the split took
+   * @param size length of original HLogs that were split
+   */
+  public synchronized void addMetaWALSplit(long time, long size) {
+    masterSource.updateMetaWALSplitTime(time);
+    masterSource.updateMetaWALSplitSize(size);
+  }
+
+  /**
    * @param inc How much to add to requests.
    */
   public void incrementRequests(final int inc) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1482675&r1=1482674&r2=1482675&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Wed May 15 04:24:02 2013
@@ -154,21 +154,21 @@ public class ServerManager {
   private Set<ServerName> queuedDeadServers = new HashSet<ServerName>();
 
   /**
-   * Set of region servers which are dead and submitted to ServerShutdownHandler to
-   * process but not fully processed immediately.
+   * Set of region servers which are dead and submitted to ServerShutdownHandler to process but not
+   * fully processed immediately.
    * <p>
-   * If one server died before assignment manager finished the failover cleanup, the server
-   * will be added to this set and will be processed through calling
+   * If one server died before assignment manager finished the failover cleanup, the server will be
+   * added to this set and will be processed through calling
    * {@link ServerManager#processQueuedDeadServers()} by assignment manager.
    * <p>
-   * For all the region servers in this set, HLog split is already completed.
+   * The Boolean value indicates whether log split is needed inside ServerShutdownHandler
    * <p>
-   * ServerShutdownHandler processes a dead server submitted to the handler after
-   * the handler is enabled. It may not be able to complete the processing because meta
-   * is not yet online or master is currently in startup mode.  In this case, the dead
-   * server will be parked in this set temporarily.
+   * ServerShutdownHandler processes a dead server submitted to the handler after the handler is
+   * enabled. It may not be able to complete the processing because meta is not yet online or master
+   * is currently in startup mode. In this case, the dead server will be parked in this set
+   * temporarily.
    */
-  private Set<ServerName> requeuedDeadServers = new HashSet<ServerName>();
+  private Map<ServerName, Boolean> requeuedDeadServers = new HashMap<ServerName, Boolean>();
 
   /**
    * Constructor.
@@ -513,6 +513,10 @@ public class ServerManager {
   }
 
   public synchronized void processDeadServer(final ServerName serverName) {
+    this.processDeadServer(serverName, false);
+  }
+
+  public synchronized void processDeadServer(final ServerName serverName, boolean shouldSplitHlog) {
     // When assignment manager is cleaning up the zookeeper nodes and rebuilding the
     // in-memory region states, region servers could be down. Meta table can and
     // should be re-assigned, log splitting can be done too. However, it is better to
@@ -522,13 +526,14 @@ public class ServerManager {
     // the handler threads and meta table could not be re-assigned in case
     // the corresponding server is down. So we queue them up here instead.
     if (!services.getAssignmentManager().isFailoverCleanupDone()) {
-      requeuedDeadServers.add(serverName);
+      requeuedDeadServers.put(serverName, shouldSplitHlog);
       return;
     }
 
     this.deadservers.add(serverName);
-    this.services.getExecutorService().submit(new ServerShutdownHandler(
-      this.master, this.services, this.deadservers, serverName, false));
+    this.services.getExecutorService().submit(
+      new ServerShutdownHandler(this.master, this.services, this.deadservers, serverName,
+          shouldSplitHlog));
   }
 
   /**
@@ -541,18 +546,20 @@ public class ServerManager {
     }
     Iterator<ServerName> serverIterator = queuedDeadServers.iterator();
     while (serverIterator.hasNext()) {
-      expireServer(serverIterator.next());
+      ServerName tmpServerName = serverIterator.next();
+      expireServer(tmpServerName);
       serverIterator.remove();
+      requeuedDeadServers.remove(tmpServerName);
     }
 
     if (!services.getAssignmentManager().isFailoverCleanupDone()) {
       LOG.info("AssignmentManager hasn't finished failover cleanup");
     }
-    serverIterator = requeuedDeadServers.iterator();
-    while (serverIterator.hasNext()) {
-      processDeadServer(serverIterator.next());
-      serverIterator.remove();
+
+    for(ServerName tmpServerName : requeuedDeadServers.keySet()){
+      processDeadServer(tmpServerName, requeuedDeadServers.get(tmpServerName));
     }
+    requeuedDeadServers.clear();
   }
 
   /*
@@ -838,6 +845,14 @@ public class ServerManager {
     return new HashSet<ServerName>(this.queuedDeadServers);
   }
 
+  /**
+   * @return A copy of the internal map of requeuedDeadServers servers and their corresponding
+   *         splitlog need flag.
+   */
+  Map<ServerName, Boolean> getRequeuedDeadServers() {
+    return Collections.unmodifiableMap(this.requeuedDeadServers);
+  }
+  
   public boolean isServerOnline(ServerName serverName) {
     return serverName != null && onlineServers.containsKey(serverName);
   }
@@ -851,7 +866,7 @@ public class ServerManager {
   public synchronized boolean isServerDead(ServerName serverName) {
     return serverName == null || deadservers.isDeadServer(serverName)
       || queuedDeadServers.contains(serverName)
-      || requeuedDeadServers.contains(serverName);
+      || requeuedDeadServers.containsKey(serverName);
   }
 
   public void shutdownCluster() {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java?rev=1482675&r1=1482674&r2=1482675&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java Wed May 15 04:24:02 2013
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -43,16 +44,19 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hbase.Chore;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SplitLogCounters;
 import org.apache.hadoop.hbase.SplitLogTask;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
+import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Threads;
@@ -118,6 +122,20 @@ public class SplitLogManager extends Zoo
   private long unassignedTimeout;
   private long lastNodeCreateTime = Long.MAX_VALUE;
   public boolean ignoreZKDeleteForTesting = false;
+  private volatile long lastRecoveringNodeCreationTime = Long.MAX_VALUE;
+  // When lastRecoveringNodeCreationTime is older than the following threshold, we'll check
+  // whether to GC stale recovering znodes
+  private long checkRecoveringTimeThreshold = 15000; // 15 seconds
+  private final Set<ServerName> failedRecoveringRegionDeletions = Collections
+      .synchronizedSet(new HashSet<ServerName>());
+
+  /**
+   * In distributedLogReplay mode, we need touch both splitlog and recovering-regions znodes in one
+   * operation. So the lock is used to guard such cases.
+   */
+  protected final ReentrantLock recoveringRegionLock = new ReentrantLock();
+
+  final boolean distributedLogReplay;
 
   private final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>();
   private TimeoutMonitor timeoutMonitor;
@@ -181,10 +199,13 @@ public class SplitLogManager extends Zoo
     LOG.info("timeout=" + timeout + ", unassigned timeout=" + unassignedTimeout);
 
     this.serverName = serverName;
-    this.timeoutMonitor =
-      new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), stopper);
+    this.timeoutMonitor = new TimeoutMonitor(
+      conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), stopper);
 
     this.failedDeletions = Collections.synchronizedSet(new HashSet<String>());
+    this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, 
+      HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
+    LOG.info("distributedLogReplay = " + this.distributedLogReplay);
   }
 
   public void finishInitialization(boolean masterRecovery) {
@@ -244,7 +265,22 @@ public class SplitLogManager extends Zoo
    * @return cumulative size of the logfiles split
    */
   public long splitLogDistributed(final List<Path> logDirs) throws IOException {
-    return splitLogDistributed(logDirs, null);
+    if (logDirs.isEmpty()) {
+      return 0;
+    }
+    Set<ServerName> serverNames = new HashSet<ServerName>();
+    for (Path logDir : logDirs) {
+      try {
+        ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logDir);
+        if (serverName != null) {
+          serverNames.add(serverName);
+        }
+      } catch (IllegalArgumentException e) {
+        // ignore invalid format error.
+        LOG.warn("Cannot parse server name from " + logDir);
+      }
+    }
+    return splitLogDistributed(serverNames, logDirs, null);
   }
 
   /**
@@ -258,15 +294,15 @@ public class SplitLogManager extends Zoo
    * @throws IOException If there was an error while splitting any log file
    * @return cumulative size of the logfiles split
    */
-  public long splitLogDistributed(final List<Path> logDirs, PathFilter filter) 
-      throws IOException {
+  public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
+      PathFilter filter) throws IOException {
     MonitoredTask status = TaskMonitor.get().createStatus(
           "Doing distributed log split in " + logDirs);
     FileStatus[] logfiles = getFileList(logDirs, filter);
     status.setStatus("Checking directory contents...");
     LOG.debug("Scheduling batch of logs to split");
     SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet();
-    LOG.info("started splitting logs in " + logDirs);
+    LOG.info("started splitting " + logfiles.length + " logs in " + logDirs);
     long t = EnvironmentEdgeManager.currentTimeMillis();
     long totalSize = 0;
     TaskBatch batch = new TaskBatch();
@@ -283,6 +319,9 @@ public class SplitLogManager extends Zoo
       }
     }
     waitForSplittingCompletion(batch, status);
+    // remove recovering regions from ZK
+    this.removeRecoveringRegionsFromZK(serverNames);
+
     if (batch.done != batch.installed) {
       batch.isDead = true;
       SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet();
@@ -409,6 +448,171 @@ public class SplitLogManager extends Zoo
     return count;
   }
 
+  /**
+   * It removes recovering regions under /hbase/recovering-regions/[encoded region name] so that the
+   * region server hosting the region can allow reads to the recovered region
+   * @param serverNames servers which are just recovered
+   */
+  private void removeRecoveringRegionsFromZK(final Set<ServerName> serverNames) {
+
+    if (!this.distributedLogReplay) {
+      // the function is only used in WALEdit direct replay mode
+      return;
+    }
+
+    int count = 0;
+    Set<String> recoveredServerNameSet = new HashSet<String>();
+    if (serverNames != null) {
+      for (ServerName tmpServerName : serverNames) {
+        recoveredServerNameSet.add(tmpServerName.getServerName());
+      }
+    }
+
+    try {
+      this.recoveringRegionLock.lock();
+
+      List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
+      if (tasks != null) {
+        for (String t : tasks) {
+          if (!ZKSplitLog.isRescanNode(watcher, t)) {
+            count++;
+          }
+        }
+      }
+      if (count == 0 && this.master.isInitialized()
+          && !this.master.getServerManager().areDeadServersInProgress()) {
+        // no splitting work items left
+        deleteRecoveringRegionZNodes(null);
+        // reset lastRecoveringNodeCreationTime because we cleared all recovering znodes at
+        // this point.
+        lastRecoveringNodeCreationTime = Long.MAX_VALUE;
+      } else if (!recoveredServerNameSet.isEmpty()) {
+        // remove recovering regions which doesn't have any RS associated with it
+        List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
+        if (regions != null) {
+          for (String region : regions) {
+            String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
+            List<String> failedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
+            if (failedServers == null || failedServers.isEmpty()) {
+              ZKUtil.deleteNode(watcher, nodePath);
+              continue;
+            } 
+            if (recoveredServerNameSet.containsAll(failedServers)) {
+              ZKUtil.deleteNodeRecursively(watcher, nodePath);
+            } else {
+              for (String failedServer : failedServers) {
+                if (recoveredServerNameSet.contains(failedServer)) {
+                  String tmpPath = ZKUtil.joinZNode(nodePath, failedServer);
+                  ZKUtil.deleteNode(watcher, tmpPath);
+                }
+              }
+            }
+          }
+        }
+      }
+    } catch (KeeperException ke) {
+      LOG.warn("removeRecoveringRegionsFromZK got zookeeper exception. Will retry", ke);
+      if (serverNames != null && !serverNames.isEmpty()) {
+        this.failedRecoveringRegionDeletions.addAll(serverNames);
+      }
+    } finally {
+      this.recoveringRegionLock.unlock();
+    }
+  }
+
+  /**
+   * It removes stale recovering regions under /hbase/recovering-regions/[encoded region name]
+   * during master initialization phase.
+   * @param failedServers A set of known failed servers
+   * @throws KeeperException
+   */
+  void removeStaleRecoveringRegionsFromZK(final Set<ServerName> failedServers)
+      throws KeeperException {
+
+    if (!this.distributedLogReplay) {
+      // the function is only used in distributedLogReplay mode when master is in initialization
+      return;
+    }
+
+    Set<String> knownFailedServers = new HashSet<String>();
+    if (failedServers != null) {
+      for (ServerName tmpServerName : failedServers) {
+        knownFailedServers.add(tmpServerName.getServerName());
+      }
+    }
+
+    this.recoveringRegionLock.lock();
+    try {
+      List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
+      if (tasks != null) {
+        for (String t : tasks) {
+          byte[] data = ZKUtil.getData(this.watcher, ZKUtil.joinZNode(watcher.splitLogZNode, t));
+          if (data != null) {
+            SplitLogTask slt = null;
+            try {
+              slt = SplitLogTask.parseFrom(data);
+            } catch (DeserializationException e) {
+              LOG.warn("Failed parse data for znode " + t, e);
+            }
+            if (slt != null && slt.isDone()) {
+              continue;
+            }
+          }
+          // decode the file name
+          t = ZKSplitLog.getFileName(t);
+          ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(new Path(t));
+          if (serverName != null) {
+            knownFailedServers.add(serverName.getServerName());
+          } else {
+            LOG.warn("Found invalid WAL log file name:" + t);
+          }
+        }
+      }
+
+      // remove recovering regions which doesn't have any RS associated with it
+      List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
+      if (regions != null) {
+        for (String region : regions) {
+          String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
+          List<String> regionFailedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
+          if (regionFailedServers == null || regionFailedServers.isEmpty()) {
+            ZKUtil.deleteNode(watcher, nodePath);
+            continue;
+          }
+          boolean needMoreRecovery = false;
+          for (String tmpFailedServer : regionFailedServers) {
+            if (knownFailedServers.contains(tmpFailedServer)) {
+              needMoreRecovery = true;
+              break;
+            }
+          }
+          if (!needMoreRecovery) {
+            ZKUtil.deleteNode(watcher, nodePath);
+          }
+        }
+      }
+    } finally {
+      this.recoveringRegionLock.unlock();
+    }
+  }
+
+  private void deleteRecoveringRegionZNodes(List<String> regions) {
+    try {
+      if (regions == null) {
+        // remove all children under /home/recovering-regions
+        LOG.info("Garbage collecting all recovering regions.");
+        ZKUtil.deleteChildrenRecursively(watcher, watcher.recoveringRegionsZNode);
+      } else {
+        for (String curRegion : regions) {
+          String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, curRegion);
+          ZKUtil.deleteNodeRecursively(watcher, nodePath);
+        }
+      }
+    } catch (KeeperException e) {
+      LOG.warn("Cannot remove recovering regions from ZooKeeper", e);
+    }
+  }
+
   private void setDone(String path, TerminationStatus status) {
     Task task = tasks.get(path);
     if (task == null) {
@@ -859,9 +1063,131 @@ public class SplitLogManager extends Zoo
   }
 
   /**
-   * Keeps track of the batch of tasks submitted together by a caller in
-   * splitLogDistributed(). Clients threads use this object to wait for all
-   * their tasks to be done.
+   * Create znodes /hbase/recovering-regions/[region_ids...]/[failed region server names ...] for
+   * all regions of the passed in region servers
+   * @param serverName the name of a region server
+   * @param userRegions user regiones assigned on the region server
+   */
+  void markRegionsRecoveringInZK(final ServerName serverName, Set<HRegionInfo> userRegions)
+      throws KeeperException {
+    if (userRegions == null || !this.distributedLogReplay) {
+      return;
+    }
+
+    try {
+      this.recoveringRegionLock.lock();
+      // mark that we're creating recovering znodes
+      this.lastRecoveringNodeCreationTime = EnvironmentEdgeManager.currentTimeMillis();
+
+      for (HRegionInfo region : userRegions) {
+        String regionEncodeName = region.getEncodedName();
+        long retries = this.zkretries;
+
+        do {
+          String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, regionEncodeName);
+          long lastRecordedFlushedSequenceId = -1;
+          try {
+            long lastSequenceId = this.master.getServerManager().getLastFlushedSequenceId(
+              regionEncodeName.getBytes());
+
+            /*
+             * znode layout: .../region_id[last known flushed sequence id]/failed server[last known
+             * flushed sequence id for the server]
+             */
+            byte[] data = ZKUtil.getData(this.watcher, nodePath);
+            if (data == null) {
+              ZKUtil.createSetData(this.watcher, nodePath,
+                ZKUtil.positionToByteArray(lastSequenceId));
+            } else {
+              lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data);
+              if (lastRecordedFlushedSequenceId < lastSequenceId) {
+                // update last flushed sequence id in the region level
+                ZKUtil.setData(this.watcher, nodePath,
+                  ZKUtil.positionToByteArray(lastSequenceId));
+              }
+            }
+            // go one level deeper with server name
+            nodePath = ZKUtil.joinZNode(nodePath, serverName.getServerName());
+            if (lastSequenceId <= lastRecordedFlushedSequenceId) {
+              // the newly assigned RS failed even before any flush to the region
+              lastSequenceId = lastRecordedFlushedSequenceId;
+            }
+            ZKUtil.createSetData(this.watcher, nodePath,
+              ZKUtil.positionToByteArray(lastSequenceId));
+            LOG.debug("Mark region " + regionEncodeName + " recovering from failed region server "
+                + serverName);
+
+            // break retry loop
+            break;
+          } catch (KeeperException e) {
+            // ignore ZooKeeper exceptions inside retry loop
+            if (retries <= 1) {
+              throw e;
+            }
+            // wait a little bit for retry
+            try {
+              Thread.sleep(20);
+            } catch (Exception ignoreE) {
+              // ignore
+            }
+          }
+        } while ((--retries) > 0 && (!this.stopper.isStopped()));
+      }
+    } finally {
+      this.recoveringRegionLock.unlock();
+    }
+  }
+
+  /**
+   * @param bytes - Content of a failed region server or recovering region znode.
+   * @return long - The last flushed sequence Id for the region server
+   */
+  public static long parseLastFlushedSequenceIdFrom(final byte[] bytes) {
+    long lastRecordedFlushedSequenceId = -1l;
+    try {
+      lastRecordedFlushedSequenceId = ZKUtil.parseHLogPositionFrom(bytes);
+    } catch (DeserializationException e) {
+      lastRecordedFlushedSequenceId = -1l;
+      LOG.warn("Can't parse last flushed sequence Id", e);
+    }
+    return lastRecordedFlushedSequenceId;
+  }
+
+  /**
+   * This function is used in distributedLogReplay to fetch last flushed sequence id from ZK
+   * @param zkw
+   * @param serverName
+   * @param encodedRegionName
+   * @return the last flushed sequence id recorded in ZK of the region for <code>serverName<code>
+   * @throws IOException
+   */
+  public static long getLastFlushedSequenceId(ZooKeeperWatcher zkw, String serverName,
+      String encodedRegionName) throws IOException {
+    // when SplitLogWorker recovers a region by directly replaying unflushed WAL edits,
+    // last flushed sequence Id changes when newly assigned RS flushes writes to the region.
+    // If the newly assigned RS fails again(a chained RS failures scenario), the last flushed
+    // sequence Id name space (sequence Id only valid for a particular RS instance), changes 
+    // when different newly assigned RS flushes the region.
+    // Therefore, in this mode we need to fetch last sequence Ids from ZK where we keep history of
+    // last flushed sequence Id for each failed RS instance.
+    long lastFlushedSequenceId = -1;
+    String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
+    nodePath = ZKUtil.joinZNode(nodePath, serverName);
+    try {
+      byte[] data = ZKUtil.getData(zkw, nodePath);
+      if (data != null) {
+        lastFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data);
+      }
+    } catch (KeeperException e) {
+      throw new IOException("Cannot get lastFlushedSequenceId from ZooKeeper for server="
+          + serverName + "; region=" + encodedRegionName, e);
+    }
+    return lastFlushedSequenceId;
+  }
+
+  /**
+   * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed().
+   * Clients threads use this object to wait for all their tasks to be done.
    * <p>
    * All access is synchronized.
    */
@@ -944,18 +1270,14 @@ public class SplitLogManager extends Zoo
     LOG.info("dead splitlog worker " + workerName);
   }
 
-  void handleDeadWorkers(List<ServerName> serverNames) {
-    List<ServerName> workerNames = new ArrayList<ServerName>(serverNames.size());
-    for (ServerName serverName : serverNames) {
-      workerNames.add(serverName);
-    }
+  void handleDeadWorkers(Set<ServerName> serverNames) {
     synchronized (deadWorkersLock) {
       if (deadWorkers == null) {
         deadWorkers = new HashSet<ServerName>(100);
       }
-      deadWorkers.addAll(workerNames);
+      deadWorkers.addAll(serverNames);
     }
-    LOG.info("dead splitlog workers " + workerNames);
+    LOG.info("dead splitlog workers " + serverNames);
   }
 
   /**
@@ -1052,6 +1374,20 @@ public class SplitLogManager extends Zoo
         }
         failedDeletions.removeAll(tmpPaths);
       }
+
+      // Garbage collect left-over /hbase/recovering-regions/... znode
+      long timeInterval = EnvironmentEdgeManager.currentTimeMillis()
+          - lastRecoveringNodeCreationTime;
+      if (!failedRecoveringRegionDeletions.isEmpty()
+          || (tot == 0 && tasks.size() == 0 && (timeInterval > checkRecoveringTimeThreshold))) {
+        // inside the function there have more checks before GC anything
+        Set<ServerName> previouslyFailedDeletoins = null;
+        if (!failedRecoveringRegionDeletions.isEmpty()) {
+          previouslyFailedDeletoins = new HashSet<ServerName>(failedRecoveringRegionDeletions);
+          failedRecoveringRegionDeletions.removeAll(previouslyFailedDeletoins);
+        }
+        removeRecoveringRegionsFromZK(previouslyFailedDeletoins);
+      }
     }
   }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java?rev=1482675&r1=1482674&r2=1482675&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java Wed May 15 04:24:02 2013
@@ -19,6 +19,8 @@
 package org.apache.hadoop.hbase.master.handler;
 
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -27,6 +29,7 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.master.DeadServer;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.zookeeper.KeeperException;
@@ -47,30 +50,55 @@ public class MetaServerShutdownHandler e
   @Override
   public void process() throws IOException {
     boolean gotException = true; 
-    try{
+    try {
+      AssignmentManager am = this.services.getAssignmentManager();
       try {
-        LOG.info("Splitting META logs for " + serverName);
         if (this.shouldSplitHlog) {
-          this.services.getMasterFileSystem().splitMetaLog(serverName);
-        }
+          LOG.info("Splitting META logs for " + serverName);
+          if(this.distributedLogReplay) {
+            Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
+            regions.add(HRegionInfo.FIRST_META_REGIONINFO);
+            this.services.getMasterFileSystem().prepareMetaLogReplay(serverName, regions);
+          } else {
+            this.services.getMasterFileSystem().splitMetaLog(serverName);
+          }
+        } 
       } catch (IOException ioe) {
         this.services.getExecutorService().submit(this);
         this.deadServers.add(serverName);
-        throw new IOException("failed log splitting for " +
-            serverName + ", will retry", ioe);
+        throw new IOException("failed log splitting for " + serverName + ", will retry", ioe);
       }
   
       // Assign meta if we were carrying it.
       // Check again: region may be assigned to other where because of RIT
       // timeout
-      if (this.services.getAssignmentManager().isCarryingMeta(serverName)) {
+      if (am.isCarryingMeta(serverName)) {
         LOG.info("Server " + serverName + " was carrying META. Trying to assign.");
-        this.services.getAssignmentManager().regionOffline(HRegionInfo.FIRST_META_REGIONINFO);
+        am.regionOffline(HRegionInfo.FIRST_META_REGIONINFO);
         verifyAndAssignMetaWithRetries();
       } else {
         LOG.info("META has been assigned to otherwhere, skip assigning.");
       }
-      
+
+      try {
+        if (this.shouldSplitHlog && this.distributedLogReplay) {
+          if (!am.waitOnRegionToClearRegionsInTransition(HRegionInfo.FIRST_META_REGIONINFO,
+            regionAssignmentWaitTimeout)) {
+            throw new IOException("Region " + HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()
+                + " didn't complete assignment in time");
+          }
+          this.services.getMasterFileSystem().splitMetaLog(serverName);
+        }
+      } catch (Exception ex) {
+        if (ex instanceof IOException) {
+          this.services.getExecutorService().submit(this);
+          this.deadServers.add(serverName);
+          throw new IOException("failed log splitting for " + serverName + ", will retry", ex);
+        } else {
+          throw new IOException(ex);
+        }
+      }
+
       gotException = false;
     } finally {
       if (gotException){
@@ -78,9 +106,15 @@ public class MetaServerShutdownHandler e
         this.deadServers.finish(serverName);
       }     
     }
+    
     super.process();
   }
 
+  @Override
+  boolean isCarryingMeta() {
+    return true;
+  }
+
   /**
    * Before assign the META region, ensure it haven't
    *  been assigned by other place

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java?rev=1482675&r1=1482674&r2=1482675&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java Wed May 15 04:24:02 2013
@@ -20,13 +20,16 @@ package org.apache.hadoop.hbase.master.h
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
@@ -56,6 +59,8 @@ public class ServerShutdownHandler exten
   protected final MasterServices services;
   protected final DeadServer deadServers;
   protected final boolean shouldSplitHlog; // whether to split HLog or not
+  protected final boolean distributedLogReplay;
+  protected final int regionAssignmentWaitTimeout;
 
   public ServerShutdownHandler(final Server server, final MasterServices services,
       final DeadServer deadServers, final ServerName serverName,
@@ -76,6 +81,11 @@ public class ServerShutdownHandler exten
       LOG.warn(this.serverName + " is NOT in deadservers; it should be!");
     }
     this.shouldSplitHlog = shouldSplitHlog;
+    this.distributedLogReplay = server.getConfiguration().getBoolean(
+          HConstants.DISTRIBUTED_LOG_REPLAY_KEY, 
+          HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
+    this.regionAssignmentWaitTimeout = server.getConfiguration().getInt(
+      HConstants.LOG_REPLAY_WAIT_REGION_TIMEOUT, 15000);
   }
 
   @Override
@@ -107,21 +117,7 @@ public class ServerShutdownHandler exten
   public void process() throws IOException {
     final ServerName serverName = this.serverName;
     try {
-      try {
-        if (this.shouldSplitHlog) {
-          LOG.info("Splitting logs for " + serverName);
-          this.services.getMasterFileSystem().splitLog(serverName);
-        } else {
-          LOG.info("Skipping log splitting for " + serverName);
-        }
-      } catch (IOException ioe) {
-        //typecast to SSH so that we make sure that it is the SSH instance that
-        //gets submitted as opposed to MSSH or some other derived instance of SSH
-        this.services.getExecutorService().submit((ServerShutdownHandler)this);
-        this.deadServers.add(serverName);
-        throw new IOException("failed log splitting for " +
-          serverName + ", will retry", ioe);
-      }
+
       // We don't want worker thread in the MetaServerShutdownHandler
       // executor pool to block by waiting availability of .META.
       // Otherwise, it could run into the following issue:
@@ -145,7 +141,7 @@ public class ServerShutdownHandler exten
       // the dead server for further processing too.
       if (isCarryingMeta() // .META.
           || !services.getAssignmentManager().isFailoverCleanupDone()) {
-        this.services.getServerManager().processDeadServer(serverName);
+        this.services.getServerManager().processDeadServer(serverName, this.shouldSplitHlog);
         return;
       }
 
@@ -183,6 +179,23 @@ public class ServerShutdownHandler exten
         throw new IOException("Server is stopped");
       }
 
+      try {
+        if (this.shouldSplitHlog) {
+          LOG.info("Splitting logs for " + serverName + " before assignment.");
+          if(this.distributedLogReplay){
+            Set<ServerName> serverNames = new HashSet<ServerName>();
+            serverNames.add(serverName);
+            this.services.getMasterFileSystem().prepareLogReplay(serverNames);
+          } else {
+            this.services.getMasterFileSystem().splitLog(serverName);
+          }
+        } else {
+          LOG.info("Skipping log splitting for " + serverName);
+        }
+      } catch (IOException ioe) {
+        resubmit(serverName, ioe);
+      }
+
       // Clean out anything in regions in transition.  Being conservative and
       // doing after log splitting.  Could do some states before -- OPENING?
       // OFFLINE? -- and then others after like CLOSING that depend on log
@@ -258,18 +271,47 @@ public class ServerShutdownHandler exten
           }
         }
       }
+
       try {
         am.assign(toAssignRegions);
       } catch (InterruptedException ie) {
         LOG.error("Caught " + ie + " during round-robin assignment");
         throw new IOException(ie);
       }
+
+      try {
+        if (this.shouldSplitHlog && this.distributedLogReplay) {
+          // wait for region assignment completes
+          for (HRegionInfo hri : toAssignRegions) {
+            if (!am.waitOnRegionToClearRegionsInTransition(hri, regionAssignmentWaitTimeout)) {
+              throw new IOException("Region " + hri.getEncodedName()
+                  + " didn't complete assignment in time");
+            }
+          }
+          this.services.getMasterFileSystem().splitLog(serverName);
+        }
+      } catch (Exception ex) {
+        if (ex instanceof IOException) {
+          resubmit(serverName, (IOException)ex);
+        } else {
+          throw new IOException(ex);
+        }
+      }
     } finally {
       this.deadServers.finish(serverName);
     }
+
     LOG.info("Finished processing of shutdown of " + serverName);
   }
 
+  private void resubmit(final ServerName serverName, IOException ex) throws IOException {
+    // typecast to SSH so that we make sure that it is the SSH instance that
+    // gets submitted as opposed to MSSH or some other derived instance of SSH
+    this.services.getExecutorService().submit((ServerShutdownHandler) this);
+    this.deadServers.add(serverName);
+    throw new IOException("failed log splitting for " + serverName + ", will retry", ex);
+  }
+
   /**
    * Process a dead region from a dead RS. Checks if the region is disabled or
    * disabling or if the region has a partially completed split.