You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2015/02/13 20:08:36 UTC

[2/2] hbase git commit: HBASE-11569 Flush / Compaction handling from secondary region replicas

HBASE-11569 Flush / Compaction handling from secondary region replicas


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3e10e6e1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3e10e6e1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3e10e6e1

Branch: refs/heads/master
Commit: 3e10e6e1a61aa2a481a1450fecb94bce23809dfe
Parents: cfc131e
Author: Enis Soztutar <en...@apache.org>
Authored: Fri Feb 13 11:08:24 2015 -0800
Committer: Enis Soztutar <en...@apache.org>
Committed: Fri Feb 13 11:08:24 2015 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/protobuf/ProtobufUtil.java     |    2 +
 .../java/org/apache/hadoop/hbase/KeyValue.java  |   39 +-
 .../hbase/protobuf/generated/WALProtos.java     |  299 ++++-
 hbase-protocol/src/main/protobuf/WAL.proto      |    2 +
 .../hbase/regionserver/DefaultMemStore.java     |   10 +-
 .../hadoop/hbase/regionserver/HRegion.java      |  728 ++++++++++-
 .../hbase/regionserver/HRegionFileSystem.java   |   10 +-
 .../hadoop/hbase/regionserver/HStore.java       |  107 +-
 .../hadoop/hbase/regionserver/MemStore.java     |    6 +
 .../hbase/regionserver/RSRpcServices.java       |   19 +-
 .../apache/hadoop/hbase/regionserver/Store.java |   28 +-
 .../hbase/regionserver/StoreFlushContext.java   |   16 +
 .../hbase/regionserver/wal/ReplayHLogKey.java   |   53 +
 .../RegionReplicaReplicationEndpoint.java       |    2 +-
 .../hbase/util/ServerRegionReplicaUtil.java     |   21 +-
 .../apache/hadoop/hbase/wal/WALSplitter.java    |   12 +-
 .../hadoop/hbase/regionserver/TestHRegion.java  |   28 +-
 .../regionserver/TestHRegionReplayEvents.java   | 1162 ++++++++++++++++++
 .../regionserver/TestPerColumnFamilyFlush.java  |   12 +-
 19 files changed, 2439 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3e10e6e1/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 43e91d2..038b148 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -2584,6 +2584,7 @@ public final class ProtobufUtil {
     FlushDescriptor.Builder desc = FlushDescriptor.newBuilder()
         .setAction(action)
         .setEncodedRegionName(ByteStringer.wrap(hri.getEncodedNameAsBytes()))
+        .setRegionName(ByteStringer.wrap(hri.getRegionName()))
         .setFlushSequenceNumber(flushSeqId)
         .setTableName(ByteStringer.wrap(hri.getTable().getName()));
 
@@ -2609,6 +2610,7 @@ public final class ProtobufUtil {
         .setEventType(eventType)
         .setTableName(ByteStringer.wrap(hri.getTable().getName()))
         .setEncodedRegionName(ByteStringer.wrap(hri.getEncodedNameAsBytes()))
+        .setRegionName(ByteStringer.wrap(hri.getRegionName()))
         .setLogSequenceNumber(seqId)
         .setServer(toServerName(server));
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/3e10e6e1/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
index 04551de..3ae324a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
@@ -47,7 +47,7 @@ import org.apache.hadoop.io.RawComparator;
 import com.google.common.annotations.VisibleForTesting;
 
 /**
- * An HBase Key/Value. This is the fundamental HBase Type.  
+ * An HBase Key/Value. This is the fundamental HBase Type.
  * <p>
  * HBase applications and users should use the Cell interface and avoid directly using KeyValue
  * and member functions not defined in Cell.
@@ -297,6 +297,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
     return seqId;
   }
 
+  @Override
   public void setSequenceId(long seqId) {
     this.seqId = seqId;
   }
@@ -577,7 +578,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
     this(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
       qlength, timestamp, type, value, voffset, vlength, null);
   }
-  
+
   /**
    * Constructs KeyValue structure filled with specified values. Uses the provided buffer as the
    * data buffer.
@@ -742,9 +743,9 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
 
   public KeyValue(Cell c) {
     this(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(),
-        c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(), 
-        c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(), 
-        c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(), 
+        c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(),
+        c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(),
+        c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(),
         c.getValueLength(), c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
     this.seqId = c.getSequenceId();
   }
@@ -955,7 +956,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
       final int rlength, final byte [] family, final int foffset, int flength,
       final byte [] qualifier, final int qoffset, int qlength,
       final long timestamp, final Type type,
-      final byte [] value, final int voffset, 
+      final byte [] value, final int voffset,
       int vlength, byte[] tags, int tagsOffset, int tagsLength) {
 
     checkParameters(row, rlength, family, flength, qlength, vlength);
@@ -1115,6 +1116,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
   //
   //---------------------------------------------------------------------------
 
+  @Override
   public String toString() {
     if (this.bytes == null || this.bytes.length == 0) {
       return "empty";
@@ -1125,10 +1127,10 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
 
   /**
    * @param k Key portion of a KeyValue.
-   * @return Key as a String, empty string if k is null. 
+   * @return Key as a String, empty string if k is null.
    */
   public static String keyToString(final byte [] k) {
-    if (k == null) { 
+    if (k == null) {
       return "";
     }
     return keyToString(k, 0, k.length);
@@ -1464,6 +1466,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
    * save on allocations.
    * @return Value in a new byte array.
    */
+  @Override
   @Deprecated // use CellUtil.getValueArray()
   public byte [] getValue() {
     return CellUtil.cloneValue(this);
@@ -1477,6 +1480,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
    * lengths instead.
    * @return Row in a new byte array.
    */
+  @Override
   @Deprecated // use CellUtil.getRowArray()
   public byte [] getRow() {
     return CellUtil.cloneRow(this);
@@ -1534,6 +1538,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
    * lengths instead.
    * @return Returns family. Makes a copy.
    */
+  @Override
   @Deprecated // use CellUtil.getFamilyArray
   public byte [] getFamily() {
     return CellUtil.cloneFamily(this);
@@ -1548,6 +1553,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
    * Use {@link #getBuffer()} with appropriate offsets and lengths instead.
    * @return Returns qualifier. Makes a copy.
    */
+  @Override
   @Deprecated // use CellUtil.getQualifierArray
   public byte [] getQualifier() {
     return CellUtil.cloneQualifier(this);
@@ -1846,7 +1852,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
       return compareFlatKey(l,loff,llen, r,roff,rlen);
     }
 
-    
+
     /**
      * Compares the only the user specified portion of a Key.  This is overridden by MetaComparator.
      * @param left
@@ -2355,7 +2361,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
     in.readFully(bytes);
     return new KeyValue(bytes, 0, length);
   }
-  
+
   /**
    * Create a new KeyValue by copying existing cell and adding new tags
    * @param c
@@ -2371,9 +2377,9 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
       existingTags = newTags;
     }
     return new KeyValue(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(),
-      c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(), 
-      c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(), 
-      c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(), 
+      c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(),
+      c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(),
+      c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(),
       c.getValueLength(), existingTags);
   }
 
@@ -2478,6 +2484,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
       this.comparator = c;
     }
 
+    @Override
     public int compare(KeyValue left, KeyValue right) {
       return comparator.compareRows(left, right);
     }
@@ -2486,7 +2493,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
 
   /**
    * Avoids redundant comparisons for better performance.
-   * 
+   *
    * TODO get rid of this wart
    */
   public interface SamePrefixComparator<T> {
@@ -2509,6 +2516,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
      * TODO: With V3 consider removing this.
      * @return legacy class name for FileFileTrailer#comparatorClassName
      */
+    @Override
     public String getLegacyKeyComparatorName() {
       return "org.apache.hadoop.hbase.util.Bytes$ByteArrayComparator";
     }
@@ -2516,6 +2524,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
     /**
      * @deprecated Since 0.99.2.
      */
+    @Override
     @Deprecated
     public int compareFlatKey(byte[] left, int loffset, int llength, byte[] right,
         int roffset, int rlength) {
@@ -2527,6 +2536,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
       return compareOnlyKeyPortion(left, right);
     }
 
+    @Override
     @VisibleForTesting
     public int compareOnlyKeyPortion(Cell left, Cell right) {
       int c = Bytes.BYTES_RAWCOMPARATOR.compare(left.getRowArray(), left.getRowOffset(),
@@ -2553,6 +2563,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
       return (0xff & left.getTypeByte()) - (0xff & right.getTypeByte());
     }
 
+    @Override
     public byte[] calcIndexKey(byte[] lastKeyOfPreviousBlock, byte[] firstKeyInBlock) {
       return firstKeyInBlock;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3e10e6e1/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
index c9fa854..35192cc 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
@@ -5522,6 +5522,24 @@ public final class WALProtos {
      */
     org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptorOrBuilder getStoreFlushesOrBuilder(
         int index);
+
+    // optional bytes region_name = 6;
+    /**
+     * <code>optional bytes region_name = 6;</code>
+     *
+     * <pre>
+     * full region name
+     * </pre>
+     */
+    boolean hasRegionName();
+    /**
+     * <code>optional bytes region_name = 6;</code>
+     *
+     * <pre>
+     * full region name
+     * </pre>
+     */
+    com.google.protobuf.ByteString getRegionName();
   }
   /**
    * Protobuf type {@code FlushDescriptor}
@@ -5613,6 +5631,11 @@ public final class WALProtos {
               storeFlushes_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor.PARSER, extensionRegistry));
               break;
             }
+            case 50: {
+              bitField0_ |= 0x00000010;
+              regionName_ = input.readBytes();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -6772,12 +6795,37 @@ public final class WALProtos {
       return storeFlushes_.get(index);
     }
 
+    // optional bytes region_name = 6;
+    public static final int REGION_NAME_FIELD_NUMBER = 6;
+    private com.google.protobuf.ByteString regionName_;
+    /**
+     * <code>optional bytes region_name = 6;</code>
+     *
+     * <pre>
+     * full region name
+     * </pre>
+     */
+    public boolean hasRegionName() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    /**
+     * <code>optional bytes region_name = 6;</code>
+     *
+     * <pre>
+     * full region name
+     * </pre>
+     */
+    public com.google.protobuf.ByteString getRegionName() {
+      return regionName_;
+    }
+
     private void initFields() {
       action_ = org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction.START_FLUSH;
       tableName_ = com.google.protobuf.ByteString.EMPTY;
       encodedRegionName_ = com.google.protobuf.ByteString.EMPTY;
       flushSequenceNumber_ = 0L;
       storeFlushes_ = java.util.Collections.emptyList();
+      regionName_ = com.google.protobuf.ByteString.EMPTY;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -6824,6 +6872,9 @@ public final class WALProtos {
       for (int i = 0; i < storeFlushes_.size(); i++) {
         output.writeMessage(5, storeFlushes_.get(i));
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeBytes(6, regionName_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -6853,6 +6904,10 @@ public final class WALProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(5, storeFlushes_.get(i));
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(6, regionName_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -6898,6 +6953,11 @@ public final class WALProtos {
       }
       result = result && getStoreFlushesList()
           .equals(other.getStoreFlushesList());
+      result = result && (hasRegionName() == other.hasRegionName());
+      if (hasRegionName()) {
+        result = result && getRegionName()
+            .equals(other.getRegionName());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -6931,6 +6991,10 @@ public final class WALProtos {
         hash = (37 * hash) + STORE_FLUSHES_FIELD_NUMBER;
         hash = (53 * hash) + getStoreFlushesList().hashCode();
       }
+      if (hasRegionName()) {
+        hash = (37 * hash) + REGION_NAME_FIELD_NUMBER;
+        hash = (53 * hash) + getRegionName().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -7060,6 +7124,8 @@ public final class WALProtos {
         } else {
           storeFlushesBuilder_.clear();
         }
+        regionName_ = com.google.protobuf.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000020);
         return this;
       }
 
@@ -7113,6 +7179,10 @@ public final class WALProtos {
         } else {
           result.storeFlushes_ = storeFlushesBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.regionName_ = regionName_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -7167,6 +7237,9 @@ public final class WALProtos {
             }
           }
         }
+        if (other.hasRegionName()) {
+          setRegionName(other.getRegionName());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -7593,6 +7666,58 @@ public final class WALProtos {
         return storeFlushesBuilder_;
       }
 
+      // optional bytes region_name = 6;
+      private com.google.protobuf.ByteString regionName_ = com.google.protobuf.ByteString.EMPTY;
+      /**
+       * <code>optional bytes region_name = 6;</code>
+       *
+       * <pre>
+       * full region name
+       * </pre>
+       */
+      public boolean hasRegionName() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      /**
+       * <code>optional bytes region_name = 6;</code>
+       *
+       * <pre>
+       * full region name
+       * </pre>
+       */
+      public com.google.protobuf.ByteString getRegionName() {
+        return regionName_;
+      }
+      /**
+       * <code>optional bytes region_name = 6;</code>
+       *
+       * <pre>
+       * full region name
+       * </pre>
+       */
+      public Builder setRegionName(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000020;
+        regionName_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bytes region_name = 6;</code>
+       *
+       * <pre>
+       * full region name
+       * </pre>
+       */
+      public Builder clearRegionName() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        regionName_ = getDefaultInstance().getRegionName();
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:FlushDescriptor)
     }
 
@@ -9772,6 +9897,24 @@ public final class WALProtos {
      * </pre>
      */
     org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getServerOrBuilder();
+
+    // optional bytes region_name = 7;
+    /**
+     * <code>optional bytes region_name = 7;</code>
+     *
+     * <pre>
+     * full region name
+     * </pre>
+     */
+    boolean hasRegionName();
+    /**
+     * <code>optional bytes region_name = 7;</code>
+     *
+     * <pre>
+     * full region name
+     * </pre>
+     */
+    com.google.protobuf.ByteString getRegionName();
   }
   /**
    * Protobuf type {@code RegionEventDescriptor}
@@ -9876,6 +10019,11 @@ public final class WALProtos {
               bitField0_ |= 0x00000010;
               break;
             }
+            case 58: {
+              bitField0_ |= 0x00000020;
+              regionName_ = input.readBytes();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -10135,6 +10283,30 @@ public final class WALProtos {
       return server_;
     }
 
+    // optional bytes region_name = 7;
+    public static final int REGION_NAME_FIELD_NUMBER = 7;
+    private com.google.protobuf.ByteString regionName_;
+    /**
+     * <code>optional bytes region_name = 7;</code>
+     *
+     * <pre>
+     * full region name
+     * </pre>
+     */
+    public boolean hasRegionName() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    /**
+     * <code>optional bytes region_name = 7;</code>
+     *
+     * <pre>
+     * full region name
+     * </pre>
+     */
+    public com.google.protobuf.ByteString getRegionName() {
+      return regionName_;
+    }
+
     private void initFields() {
       eventType_ = org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType.REGION_OPEN;
       tableName_ = com.google.protobuf.ByteString.EMPTY;
@@ -10142,6 +10314,7 @@ public final class WALProtos {
       logSequenceNumber_ = 0L;
       stores_ = java.util.Collections.emptyList();
       server_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
+      regionName_ = com.google.protobuf.ByteString.EMPTY;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -10197,6 +10370,9 @@ public final class WALProtos {
       if (((bitField0_ & 0x00000010) == 0x00000010)) {
         output.writeMessage(6, server_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeBytes(7, regionName_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -10230,6 +10406,10 @@ public final class WALProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(6, server_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(7, regionName_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -10280,6 +10460,11 @@ public final class WALProtos {
         result = result && getServer()
             .equals(other.getServer());
       }
+      result = result && (hasRegionName() == other.hasRegionName());
+      if (hasRegionName()) {
+        result = result && getRegionName()
+            .equals(other.getRegionName());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -10317,6 +10502,10 @@ public final class WALProtos {
         hash = (37 * hash) + SERVER_FIELD_NUMBER;
         hash = (53 * hash) + getServer().hashCode();
       }
+      if (hasRegionName()) {
+        hash = (37 * hash) + REGION_NAME_FIELD_NUMBER;
+        hash = (53 * hash) + getRegionName().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -10453,6 +10642,8 @@ public final class WALProtos {
           serverBuilder_.clear();
         }
         bitField0_ = (bitField0_ & ~0x00000020);
+        regionName_ = com.google.protobuf.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000040);
         return this;
       }
 
@@ -10514,6 +10705,10 @@ public final class WALProtos {
         } else {
           result.server_ = serverBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.regionName_ = regionName_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -10571,6 +10766,9 @@ public final class WALProtos {
         if (other.hasServer()) {
           mergeServer(other.getServer());
         }
+        if (other.hasRegionName()) {
+          setRegionName(other.getRegionName());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -11156,6 +11354,58 @@ public final class WALProtos {
         return serverBuilder_;
       }
 
+      // optional bytes region_name = 7;
+      private com.google.protobuf.ByteString regionName_ = com.google.protobuf.ByteString.EMPTY;
+      /**
+       * <code>optional bytes region_name = 7;</code>
+       *
+       * <pre>
+       * full region name
+       * </pre>
+       */
+      public boolean hasRegionName() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      /**
+       * <code>optional bytes region_name = 7;</code>
+       *
+       * <pre>
+       * full region name
+       * </pre>
+       */
+      public com.google.protobuf.ByteString getRegionName() {
+        return regionName_;
+      }
+      /**
+       * <code>optional bytes region_name = 7;</code>
+       *
+       * <pre>
+       * full region name
+       * </pre>
+       */
+      public Builder setRegionName(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000040;
+        regionName_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bytes region_name = 7;</code>
+       *
+       * <pre>
+       * full region name
+       * </pre>
+       */
+      public Builder clearRegionName() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        regionName_ = getDefaultInstance().getRegionName();
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:RegionEventDescriptor)
     }
 
@@ -11598,32 +11848,33 @@ public final class WALProtos {
       "n_name\030\002 \002(\014\022\023\n\013family_name\030\003 \002(\014\022\030\n\020com" +
       "paction_input\030\004 \003(\t\022\031\n\021compaction_output" +
       "\030\005 \003(\t\022\026\n\016store_home_dir\030\006 \002(\t\022\023\n\013region" +
-      "_name\030\007 \001(\014\"\353\002\n\017FlushDescriptor\022,\n\006actio" +
+      "_name\030\007 \001(\014\"\200\003\n\017FlushDescriptor\022,\n\006actio" +
       "n\030\001 \002(\0162\034.FlushDescriptor.FlushAction\022\022\n",
       "\ntable_name\030\002 \002(\014\022\033\n\023encoded_region_name" +
       "\030\003 \002(\014\022\035\n\025flush_sequence_number\030\004 \001(\004\022<\n" +
       "\rstore_flushes\030\005 \003(\0132%.FlushDescriptor.S" +
-      "toreFlushDescriptor\032Y\n\024StoreFlushDescrip" +
-      "tor\022\023\n\013family_name\030\001 \002(\014\022\026\n\016store_home_d" +
-      "ir\030\002 \002(\t\022\024\n\014flush_output\030\003 \003(\t\"A\n\013FlushA" +
-      "ction\022\017\n\013START_FLUSH\020\000\022\020\n\014COMMIT_FLUSH\020\001" +
-      "\022\017\n\013ABORT_FLUSH\020\002\"R\n\017StoreDescriptor\022\023\n\013" +
-      "family_name\030\001 \002(\014\022\026\n\016store_home_dir\030\002 \002(" +
-      "\t\022\022\n\nstore_file\030\003 \003(\t\"\215\001\n\022BulkLoadDescri",
-      "ptor\022\036\n\ntable_name\030\001 \002(\0132\n.TableName\022\033\n\023" +
-      "encoded_region_name\030\002 \002(\014\022 \n\006stores\030\003 \003(" +
-      "\0132\020.StoreDescriptor\022\030\n\020bulkload_seq_num\030" +
-      "\004 \002(\003\"\212\002\n\025RegionEventDescriptor\0224\n\nevent" +
-      "_type\030\001 \002(\0162 .RegionEventDescriptor.Even" +
-      "tType\022\022\n\ntable_name\030\002 \002(\014\022\033\n\023encoded_reg" +
-      "ion_name\030\003 \002(\014\022\033\n\023log_sequence_number\030\004 " +
-      "\001(\004\022 \n\006stores\030\005 \003(\0132\020.StoreDescriptor\022\033\n" +
-      "\006server\030\006 \001(\0132\013.ServerName\".\n\tEventType\022" +
-      "\017\n\013REGION_OPEN\020\000\022\020\n\014REGION_CLOSE\020\001\"\014\n\nWA",
-      "LTrailer*F\n\tScopeType\022\033\n\027REPLICATION_SCO" +
-      "PE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL\020\001" +
-      "B?\n*org.apache.hadoop.hbase.protobuf.gen" +
-      "eratedB\tWALProtosH\001\210\001\000\240\001\001"
+      "toreFlushDescriptor\022\023\n\013region_name\030\006 \001(\014" +
+      "\032Y\n\024StoreFlushDescriptor\022\023\n\013family_name\030" +
+      "\001 \002(\014\022\026\n\016store_home_dir\030\002 \002(\t\022\024\n\014flush_o" +
+      "utput\030\003 \003(\t\"A\n\013FlushAction\022\017\n\013START_FLUS" +
+      "H\020\000\022\020\n\014COMMIT_FLUSH\020\001\022\017\n\013ABORT_FLUSH\020\002\"R" +
+      "\n\017StoreDescriptor\022\023\n\013family_name\030\001 \002(\014\022\026" +
+      "\n\016store_home_dir\030\002 \002(\t\022\022\n\nstore_file\030\003 \003",
+      "(\t\"\215\001\n\022BulkLoadDescriptor\022\036\n\ntable_name\030" +
+      "\001 \002(\0132\n.TableName\022\033\n\023encoded_region_name" +
+      "\030\002 \002(\014\022 \n\006stores\030\003 \003(\0132\020.StoreDescriptor" +
+      "\022\030\n\020bulkload_seq_num\030\004 \002(\003\"\237\002\n\025RegionEve" +
+      "ntDescriptor\0224\n\nevent_type\030\001 \002(\0162 .Regio" +
+      "nEventDescriptor.EventType\022\022\n\ntable_name" +
+      "\030\002 \002(\014\022\033\n\023encoded_region_name\030\003 \002(\014\022\033\n\023l" +
+      "og_sequence_number\030\004 \001(\004\022 \n\006stores\030\005 \003(\013" +
+      "2\020.StoreDescriptor\022\033\n\006server\030\006 \001(\0132\013.Ser" +
+      "verName\022\023\n\013region_name\030\007 \001(\014\".\n\tEventTyp",
+      "e\022\017\n\013REGION_OPEN\020\000\022\020\n\014REGION_CLOSE\020\001\"\014\n\n" +
+      "WALTrailer*F\n\tScopeType\022\033\n\027REPLICATION_S" +
+      "COPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL" +
+      "\020\001B?\n*org.apache.hadoop.hbase.protobuf.g" +
+      "eneratedB\tWALProtosH\001\210\001\000\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -11659,7 +11910,7 @@ public final class WALProtos {
           internal_static_FlushDescriptor_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_FlushDescriptor_descriptor,
-              new java.lang.String[] { "Action", "TableName", "EncodedRegionName", "FlushSequenceNumber", "StoreFlushes", });
+              new java.lang.String[] { "Action", "TableName", "EncodedRegionName", "FlushSequenceNumber", "StoreFlushes", "RegionName", });
           internal_static_FlushDescriptor_StoreFlushDescriptor_descriptor =
             internal_static_FlushDescriptor_descriptor.getNestedTypes().get(0);
           internal_static_FlushDescriptor_StoreFlushDescriptor_fieldAccessorTable = new
@@ -11683,7 +11934,7 @@ public final class WALProtos {
           internal_static_RegionEventDescriptor_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_RegionEventDescriptor_descriptor,
-              new java.lang.String[] { "EventType", "TableName", "EncodedRegionName", "LogSequenceNumber", "Stores", "Server", });
+              new java.lang.String[] { "EventType", "TableName", "EncodedRegionName", "LogSequenceNumber", "Stores", "Server", "RegionName", });
           internal_static_WALTrailer_descriptor =
             getDescriptor().getMessageTypes().get(8);
           internal_static_WALTrailer_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hbase/blob/3e10e6e1/hbase-protocol/src/main/protobuf/WAL.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto
index 169a9b2..3fd6025 100644
--- a/hbase-protocol/src/main/protobuf/WAL.proto
+++ b/hbase-protocol/src/main/protobuf/WAL.proto
@@ -122,6 +122,7 @@ message FlushDescriptor {
   required bytes encoded_region_name = 3;
   optional uint64 flush_sequence_number = 4;
   repeated StoreFlushDescriptor store_flushes = 5;
+  optional bytes  region_name = 6; // full region name
 }
 
 message StoreDescriptor {
@@ -155,6 +156,7 @@ message RegionEventDescriptor {
   optional uint64 log_sequence_number = 4;
   repeated StoreDescriptor stores = 5;
   optional ServerName server = 6;  // Server who opened the region
+  optional bytes  region_name = 7; // full region name
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/3e10e6e1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index 48b78c2..081d7a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -208,6 +208,11 @@ public class DefaultMemStore implements MemStore {
     return this.snapshotSize > 0 ? this.snapshotSize : keySize();
   }
 
+  @Override
+  public long getSnapshotSize() {
+    return this.snapshotSize;
+  }
+
   /**
    * Write an update
    * @param cell
@@ -462,6 +467,7 @@ public class DefaultMemStore implements MemStore {
    * @param now
    * @return  Timestamp
    */
+  @Override
   public long updateColumnValue(byte[] row,
                                 byte[] family,
                                 byte[] qualifier,
@@ -524,7 +530,7 @@ public class DefaultMemStore implements MemStore {
    * atomically.  Scans will only see each KeyValue update as atomic.
    *
    * @param cells
-   * @param readpoint readpoint below which we can safely remove duplicate KVs 
+   * @param readpoint readpoint below which we can safely remove duplicate KVs
    * @return change in memstore size
    */
   @Override
@@ -1031,7 +1037,7 @@ public class DefaultMemStore implements MemStore {
   public long size() {
     return heapSize();
   }
- 
+
   /**
    * Code to help figure if our approximation of object heap sizes is close
    * enough.  See hbase-900.  Fills memstores then waits so user can heap

http://git-wip-us.apache.org/repos/asf/hbase/blob/3e10e6e1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 53e732a..aa65ddd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -34,6 +34,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.NavigableSet;
 import java.util.RandomAccess;
@@ -61,7 +62,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import com.google.protobuf.ByteString;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -72,7 +72,6 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CompoundConfiguration;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -100,6 +99,7 @@ import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.IsolationLevel;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
@@ -133,12 +133,16 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.ReplayHLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
@@ -176,6 +180,7 @@ import com.google.protobuf.Message;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
+import com.google.protobuf.TextFormat;
 
 /**
  * HRegion stores data for a certain region of a table.  It stores all columns
@@ -255,6 +260,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
   private final AtomicLong sequenceId = new AtomicLong(-1L);
 
   /**
+   * The sequence id of the last replayed open region event from the primary region. This is used
+   * to skip entries before this due to the possibility of replay edits coming out of order from
+   * replication.
+   */
+  protected volatile long lastReplayedOpenRegionSeqId = -1L;
+
+  /**
    * Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for
    * startRegionOperation to possibly invoke different checks before any region operations. Not all
    * operations have to be defined here. It's only needed when a special check is need in
@@ -262,7 +274,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
    */
   public enum Operation {
     ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE,
-    REPLAY_BATCH_MUTATE, COMPACT_REGION
+    REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT
   }
 
   //////////////////////////////////////////////////////////////////////////////
@@ -367,6 +379,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
   // The following map is populated when opening the region
   Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
 
+  /** Saved state from replaying prepare flush cache */
+  private PrepareFlushResult prepareFlushResult = null;
+
   /**
    * Config setting for whether to allow writes when a region is in recovering or not.
    */
@@ -516,6 +531,54 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     public boolean isCompactionNeeded() {
       return result == Result.FLUSHED_COMPACTION_NEEDED;
     }
+
+    @Override
+    public String toString() {
+      return new StringBuilder()
+        .append("flush result:").append(result).append(", ")
+        .append("failureReason:").append(failureReason).append(",")
+        .append("flush seq id").append(flushSequenceId).toString();
+    }
+  }
+
+  /** A result object from prepare flush cache stage */
+  @VisibleForTesting
+  static class PrepareFlushResult {
+    final FlushResult result; // indicating a failure result from prepare
+    final TreeMap<byte[], StoreFlushContext> storeFlushCtxs;
+    final TreeMap<byte[], List<Path>> committedFiles;
+    final long startTime;
+    final long flushOpSeqId;
+    final long flushedSeqId;
+    final long totalFlushableSize;
+
+    /** Constructs an early exit case */
+    PrepareFlushResult(FlushResult result, long flushSeqId) {
+      this(result, null, null, Math.max(0, flushSeqId), 0, 0, 0);
+    }
+
+    /** Constructs a successful prepare flush result */
+    PrepareFlushResult(
+      TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
+      TreeMap<byte[], List<Path>> committedFiles, long startTime, long flushSeqId,
+      long flushedSeqId, long totalFlushableSize) {
+      this(null, storeFlushCtxs, committedFiles, startTime,
+        flushSeqId, flushedSeqId, totalFlushableSize);
+    }
+
+    private PrepareFlushResult(
+      FlushResult result,
+      TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
+      TreeMap<byte[], List<Path>> committedFiles, long startTime, long flushSeqId,
+      long flushedSeqId, long totalFlushableSize) {
+      this.result = result;
+      this.storeFlushCtxs = storeFlushCtxs;
+      this.committedFiles = committedFiles;
+      this.startTime = startTime;
+      this.flushOpSeqId = flushSeqId;
+      this.flushedSeqId = flushedSeqId;
+      this.totalFlushableSize = totalFlushableSize;
+    }
   }
 
   final WriteState writestate = new WriteState();
@@ -771,6 +834,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     // Initialize all the HStores
     status.setStatus("Initializing all the Stores");
     long maxSeqId = initializeRegionStores(reporter, status);
+    this.lastReplayedOpenRegionSeqId = maxSeqId;
 
     this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
     this.writestate.flushRequested = false;
@@ -1229,9 +1293,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     }
 
     status.setStatus("Disabling compacts and flushes for region");
+    boolean canFlush = true;
     synchronized (writestate) {
       // Disable compacting and flushing by background threads for this
       // region.
+      canFlush = !writestate.readOnly;
       writestate.writesEnabled = false;
       LOG.debug("Closing " + this + ": disabling compactions & flushes");
       waitForFlushesAndCompactions();
@@ -1239,7 +1305,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     // If we were not just flushing, is it worth doing a preflush...one
     // that will clear out of the bulk of the memstore before we put up
     // the close flag?
-    if (!abort && worthPreFlushing()) {
+    if (!abort && worthPreFlushing() && canFlush) {
       status.setStatus("Pre-flushing region before close");
       LOG.info("Running close preflush of " + this.getRegionNameAsString());
       try {
@@ -1262,7 +1328,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       }
       LOG.debug("Updates disabled for region " + this);
       // Don't flush the cache if we are aborting
-      if (!abort) {
+      if (!abort && canFlush) {
         int flushCount = 0;
         while (this.getMemstoreSize().get() > 0) {
           try {
@@ -1300,7 +1366,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
 
         // close each store in parallel
         for (final Store store : stores.values()) {
-          assert abort || store.getFlushableSize() == 0;
+          assert abort || store.getFlushableSize() == 0 || writestate.readOnly;
           completionService
               .submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
                 @Override
@@ -1336,7 +1402,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       }
 
       this.closed.set(true);
-      if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());
+      if (!canFlush) {
+        addAndGetGlobalMemstoreSize(-memstoreSize.get());
+      } else if (memstoreSize.get() != 0) {
+        LOG.error("Memstore size is " + memstoreSize.get());
+      }
       if (coprocessorHost != null) {
         status.setStatus("Running coprocessor post-close hooks");
         this.coprocessorHost.postClose(abort);
@@ -1362,6 +1432,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
    */
   public void waitForFlushesAndCompactions() {
     synchronized (writestate) {
+      if (this.writestate.readOnly) {
+        // we should not wait for replayed flushed if we are read only (for example in case the
+        // region is a secondary replica).
+        return;
+      }
       boolean interrupted = false;
       try {
         while (writestate.compacting > 0 || writestate.flushing) {
@@ -1592,6 +1667,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
   }
 
   /**
+   * This is a helper function that compact the given store
+   * It is used by utilities and testing
+   *
+   * @throws IOException e
+   */
+  @VisibleForTesting
+  void compactStore(byte[] family, CompactionThroughputController throughputController)
+      throws IOException {
+    Store s = getStore(family);
+    CompactionContext compaction = s.requestCompaction();
+    if (compaction != null) {
+      compact(compaction, s, throughputController);
+    }
+  }
+
+  /*
    * Called by compaction thread and after region is opened to compact the
    * HStores if necessary.
    *
@@ -1738,6 +1829,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
         status.setStatus("Running coprocessor pre-flush hooks");
         coprocessorHost.preFlush();
       }
+      // TODO: this should be managed within memstore with the snapshot, updated only after flush
+      // successful
       if (numMutationsWithoutWAL.get() > 0) {
         numMutationsWithoutWAL.set(0);
         dataInMemoryWithoutWAL.set(0);
@@ -1903,6 +1996,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
    */
   protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
       final Collection<Store> storesToFlush, MonitoredTask status) throws IOException {
+    PrepareFlushResult result
+      = internalPrepareFlushCache(wal, myseqid, storesToFlush, status, false);
+    if (result.result == null) {
+      return internalFlushCacheAndCommit(wal, status, result, storesToFlush);
+    } else {
+      return result.result; // early exit due to failure from prepare stage
+    }
+  }
+
+  protected PrepareFlushResult internalPrepareFlushCache(
+      final WAL wal, final long myseqid, final Collection<Store> storesToFlush,
+      MonitoredTask status, boolean isReplay)
+          throws IOException {
+
     if (this.rsServices != null && this.rsServices.isAborted()) {
       // Don't flush when server aborting, it's unsafe
       throw new IOException("Aborting flush because server is aborted...");
@@ -1930,10 +2037,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
             w.setWriteNumber(flushSeqId);
             mvcc.waitForPreviousTransactionsComplete(w);
             w = null;
-            return flushResult;
+            return new PrepareFlushResult(flushResult, myseqid);
           } else {
-            return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
-              "Nothing to flush");
+            return new PrepareFlushResult(
+              new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush"),
+              myseqid);
           }
         }
       } finally {
@@ -1977,7 +2085,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       flushedFamilyNames.add(store.getFamily().getName());
     }
 
-    List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
+    TreeMap<byte[], StoreFlushContext> storeFlushCtxs
+      = new TreeMap<byte[], StoreFlushContext>(Bytes.BYTES_COMPARATOR);
     TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
         Bytes.BYTES_COMPARATOR);
     // The sequence id of this flush operation which is used to log FlushMarker and pass to
@@ -1998,7 +2107,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
             String msg = "Flush will not be started for ["
                 + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
             status.setStatus(msg);
-            return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
+            return new PrepareFlushResult(new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg),
+              myseqid);
           }
           flushOpSeqId = getNextSequenceId(wal);
           long oldestUnflushedSeqId = wal.getEarliestMemstoreSeqNum(encodedRegionName);
@@ -2013,12 +2123,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
 
         for (Store s : storesToFlush) {
           totalFlushableSizeOfFlushableStores += s.getFlushableSize();
-          storeFlushCtxs.add(s.createFlushContext(flushOpSeqId));
+          storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId));
           committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL
         }
 
         // write the snapshot start to WAL
-        if (wal != null) {
+        if (wal != null && !writestate.readOnly) {
           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
             getRegionInfo(), flushOpSeqId, committedFiles);
           // no sync. Sync is below where we do not hold the updates lock
@@ -2027,7 +2137,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
         }
 
         // Prepare flush (take a snapshot)
-        for (StoreFlushContext flush : storeFlushCtxs) {
+        for (StoreFlushContext flush : storeFlushCtxs.values()) {
           flush.prepare();
         }
       } catch (IOException ex) {
@@ -2075,15 +2185,32 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       mvcc.waitForPreviousTransactionsComplete(w);
       // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block
       w = null;
-      s = "Flushing stores of " + this;
-      status.setStatus(s);
-      if (LOG.isTraceEnabled()) LOG.trace(s);
     } finally {
       if (w != null) {
         // in case of failure just mark current w as complete
         mvcc.advanceMemstore(w);
       }
     }
+    return new PrepareFlushResult(storeFlushCtxs, committedFiles, startTime, flushOpSeqId,
+      flushedSeqId, totalFlushableSizeOfFlushableStores);
+  }
+
+  protected FlushResult internalFlushCacheAndCommit(
+        final WAL wal, MonitoredTask status, final PrepareFlushResult prepareResult,
+        final Collection<Store> storesToFlush)
+    throws IOException {
+
+    // prepare flush context is carried via PrepareFlushResult
+    TreeMap<byte[], StoreFlushContext> storeFlushCtxs = prepareResult.storeFlushCtxs;
+    TreeMap<byte[], List<Path>> committedFiles = prepareResult.committedFiles;
+    long startTime = prepareResult.startTime;
+    long flushOpSeqId = prepareResult.flushOpSeqId;
+    long flushedSeqId = prepareResult.flushedSeqId;
+    long totalFlushableSizeOfFlushableStores = prepareResult.totalFlushableSize;
+
+    String s = "Flushing stores of " + this;
+    status.setStatus(s);
+    if (LOG.isTraceEnabled()) LOG.trace(s);
 
     // Any failure from here on out will be catastrophic requiring server
     // restart so wal content can be replayed and put back into the memstore.
@@ -2096,7 +2223,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       // just-made new flush store file. The new flushed file is still in the
       // tmp directory.
 
-      for (StoreFlushContext flush : storeFlushCtxs) {
+      for (StoreFlushContext flush : storeFlushCtxs.values()) {
         flush.flushCache(status);
       }
 
@@ -2104,7 +2231,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       // all the store scanners to reset/reseek).
       Iterator<Store> it = storesToFlush.iterator();
       // stores.values() and storeFlushCtxs have same order
-      for (StoreFlushContext flush : storeFlushCtxs) {
+      for (StoreFlushContext flush : storeFlushCtxs.values()) {
         boolean needsCompaction = flush.commit(status);
         if (needsCompaction) {
           compactionRequested = true;
@@ -2593,6 +2720,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
    */
   public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId)
       throws IOException {
+    if (!RegionReplicaUtil.isDefaultReplica(getRegionInfo())
+        && replaySeqId < lastReplayedOpenRegionSeqId) {
+      // if it is a secondary replica we should ignore these entries silently
+      // since they are coming out of order
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(getRegionInfo().getEncodedName() + " : "
+          + "Skipping " + mutations.length + " mutations with replaySeqId=" + replaySeqId
+          + " which is < than lastReplayedOpenRegionSeqId=" + lastReplayedOpenRegionSeqId);
+        for (MutationReplay mut : mutations) {
+          LOG.trace(getRegionInfo().getEncodedName() + " : Skipping : " + mut.mutation);
+        }
+      }
+
+      OperationStatus[] statuses = new OperationStatus[mutations.length];
+      for (int i = 0; i < statuses.length; i++) {
+        statuses[i] = OperationStatus.SUCCESS;
+      }
+      return statuses;
+    }
     return batchMutate(new ReplayBatch(mutations, replaySeqId));
   }
 
@@ -2897,7 +3043,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
             }
             // txid should always increase, so having the one from the last call is ok.
             // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
-            walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
+            walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
               this.htableDescriptor.getTableName(), now, m.getClusterIds(),
               currentNonceGroup, currentNonce);
             txid = this.wal.append(this.htableDescriptor,  this.getRegionInfo(),  walKey,
@@ -2923,14 +3069,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       // STEP 5. Append the final edit to WAL. Do not sync wal.
       // -------------------------
       Mutation mutation = batchOp.getMutation(firstIndex);
+      if (isInReplay) {
+        // use wal key from the original
+        walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
+          this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
+          mutation.getClusterIds(), currentNonceGroup, currentNonce);
+        long replaySeqId = batchOp.getReplaySequenceId();
+        walKey.setOrigLogSeqNum(replaySeqId);
+
+        // ensure that the sequence id of the region is at least as big as orig log seq id
+        while (true) {
+          long seqId = getSequenceId().get();
+          if (seqId >= replaySeqId) break;
+          if (getSequenceId().compareAndSet(seqId, replaySeqId)) break;
+        }
+      }
       if (walEdit.size() > 0) {
+        if (!isInReplay) {
         // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
         walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
             this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
             mutation.getClusterIds(), currentNonceGroup, currentNonce);
-        if(isInReplay) {
-          walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId());
         }
+
         txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
           getSequenceId(), true, memstoreCells);
       }
@@ -3803,7 +3964,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
               CompactionDescriptor compaction = WALEdit.getCompaction(cell);
               if (compaction != null) {
                 //replay the compaction
-                completeCompactionMarker(compaction);
+                replayWALCompactionMarker(compaction, false, true, Long.MAX_VALUE);
               }
               skippedEdits++;
               continue;
@@ -3886,15 +4047,506 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
    * that was not finished.  We could find one recovering a WAL after a regionserver crash.
    * See HBASE-2331.
    */
-  void completeCompactionMarker(CompactionDescriptor compaction)
+  void replayWALCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles,
+      boolean removeFiles, long replaySeqId)
+      throws IOException {
+    checkTargetRegion(compaction.getEncodedRegionName().toByteArray(),
+      "Compaction marker from WAL ", compaction);
+
+    if (replaySeqId < lastReplayedOpenRegionSeqId) {
+      LOG.warn("Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction)
+        + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
+        + " of " + lastReplayedOpenRegionSeqId);
+      return;
+    }
+
+    startRegionOperation(Operation.REPLAY_EVENT);
+    try {
+      Store store = this.getStore(compaction.getFamilyName().toByteArray());
+      if (store == null) {
+        LOG.warn("Found Compaction WAL edit for deleted family:" +
+            Bytes.toString(compaction.getFamilyName().toByteArray()));
+        return;
+      }
+      store.replayCompactionMarker(compaction, pickCompactionFiles, removeFiles);
+    } finally {
+      closeRegionOperation(Operation.REPLAY_EVENT);
+    }
+  }
+
+  void replayWALFlushMarker(FlushDescriptor flush) throws IOException {
+    checkTargetRegion(flush.getEncodedRegionName().toByteArray(),
+      "Flush marker from WAL ", flush);
+
+    if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
+      return; // if primary nothing to do
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Replaying flush marker " + TextFormat.shortDebugString(flush));
+    }
+
+    startRegionOperation(Operation.REPLAY_EVENT); // use region close lock to guard against close
+    try {
+      FlushAction action = flush.getAction();
+      switch (action) {
+      case START_FLUSH:
+        replayWALFlushStartMarker(flush);
+        break;
+      case COMMIT_FLUSH:
+        replayWALFlushCommitMarker(flush);
+        break;
+      case ABORT_FLUSH:
+        replayWALFlushAbortMarker(flush);
+        break;
+      default:
+        LOG.warn("Received a flush event with unknown action, ignoring. "
+            + TextFormat.shortDebugString(flush));
+        break;
+      }
+    } finally {
+      closeRegionOperation(Operation.REPLAY_EVENT);
+    }
+  }
+
+  /** Replay the flush marker from primary region by creating a corresponding snapshot of
+   * the store memstores, only if the memstores do not have a higher seqId from an earlier wal
+   * edit (because the events may be coming out of order).
+   */
+  @VisibleForTesting
+  PrepareFlushResult replayWALFlushStartMarker(FlushDescriptor flush) throws IOException {
+    long flushSeqId = flush.getFlushSequenceNumber();
+
+    HashSet<Store> storesToFlush = new HashSet<Store>();
+    for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
+      byte[] family = storeFlush.getFamilyName().toByteArray();
+      Store store = getStore(family);
+      if (store == null) {
+        LOG.info("Received a flush start marker from primary, but the family is not found. Ignoring"
+          + " StoreFlushDescriptor:" + TextFormat.shortDebugString(storeFlush));
+        continue;
+      }
+      storesToFlush.add(store);
+    }
+
+    MonitoredTask status = TaskMonitor.get().createStatus("Preparing flush " + this);
+
+    // we will use writestate as a coarse-grain lock for all the replay events
+    // (flush, compaction, region open etc)
+    synchronized (writestate) {
+      try {
+        if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) {
+          LOG.warn("Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
+            + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
+            + " of " + lastReplayedOpenRegionSeqId);
+          return null;
+        }
+        if (numMutationsWithoutWAL.get() > 0) {
+          numMutationsWithoutWAL.set(0);
+          dataInMemoryWithoutWAL.set(0);
+        }
+
+        if (!writestate.flushing) {
+          // we do not have an active snapshot and corresponding this.prepareResult. This means
+          // we can just snapshot our memstores and continue as normal.
+
+          // invoke prepareFlushCache. Send null as wal since we do not want the flush events in wal
+          PrepareFlushResult prepareResult = internalPrepareFlushCache(null,
+            flushSeqId, storesToFlush, status, true);
+          if (prepareResult.result == null) {
+            // save the PrepareFlushResult so that we can use it later from commit flush
+            this.writestate.flushing = true;
+            this.prepareFlushResult = prepareResult;
+            status.markComplete("Flush prepare successful");
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(getRegionInfo().getEncodedName() + " : "
+                  + " Prepared flush with seqId:" + flush.getFlushSequenceNumber());
+            }
+          } else {
+            status.abort("Flush prepare failed with " + prepareResult.result);
+            // nothing much to do. prepare flush failed because of some reason.
+          }
+          return prepareResult;
+        } else {
+          // we already have an active snapshot.
+          if (flush.getFlushSequenceNumber() == this.prepareFlushResult.flushOpSeqId) {
+            // They define the same flush. Log and continue.
+            LOG.warn("Received a flush prepare marker with the same seqId: " +
+                + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
+                + prepareFlushResult.flushOpSeqId + ". Ignoring");
+            // ignore
+          } else if (flush.getFlushSequenceNumber() < this.prepareFlushResult.flushOpSeqId) {
+            // We received a flush with a smaller seqNum than what we have prepared. We can only
+            // ignore this prepare flush request.
+            LOG.warn("Received a flush prepare marker with a smaller seqId: " +
+                + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
+                + prepareFlushResult.flushOpSeqId + ". Ignoring");
+            // ignore
+          } else {
+            // We received a flush with a larger seqNum than what we have prepared
+            LOG.warn("Received a flush prepare marker with a larger seqId: " +
+                + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
+                + prepareFlushResult.flushOpSeqId + ". Ignoring");
+            // We do not have multiple active snapshots in the memstore or a way to merge current
+            // memstore snapshot with the contents and resnapshot for now. We cannot take
+            // another snapshot and drop the previous one because that will cause temporary
+            // data loss in the secondary. So we ignore this for now, deferring the resolution
+            // to happen when we see the corresponding flush commit marker. If we have a memstore
+            // snapshot with x, and later received another prepare snapshot with y (where x < y),
+            // when we see flush commit for y, we will drop snapshot for x, and can also drop all
+            // the memstore edits if everything in memstore is < y. This is the usual case for
+            // RS crash + recovery where we might see consequtive prepare flush wal markers.
+            // Otherwise, this will cause more memory to be used in secondary replica until a
+            // further prapare + commit flush is seen and replayed.
+          }
+        }
+      } finally {
+        status.cleanup();
+        writestate.notifyAll();
+      }
+    }
+    return null;
+  }
+
+  @VisibleForTesting
+  void replayWALFlushCommitMarker(FlushDescriptor flush) throws IOException {
+    MonitoredTask status = TaskMonitor.get().createStatus("Committing flush " + this);
+
+    // check whether we have the memstore snapshot with the corresponding seqId. Replay to
+    // secondary region replicas are in order, except for when the region moves or then the
+    // region server crashes. In those cases, we may receive replay requests out of order from
+    // the original seqIds.
+    synchronized (writestate) {
+      try {
+        if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) {
+          LOG.warn("Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
+            + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
+            + " of " + lastReplayedOpenRegionSeqId);
+          return;
+        }
+
+        if (writestate.flushing) {
+          PrepareFlushResult prepareFlushResult = this.prepareFlushResult;
+          if (flush.getFlushSequenceNumber() == prepareFlushResult.flushOpSeqId) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(getRegionInfo().getEncodedName() + " : "
+                  + "Received a flush commit marker with seqId:" + flush.getFlushSequenceNumber()
+                  + " and a previous prepared snapshot was found");
+            }
+            // This is the regular case where we received commit flush after prepare flush
+            // corresponding to the same seqId.
+            replayFlushInStores(flush, prepareFlushResult, true);
+
+            // Set down the memstore size by amount of flush.
+            this.addAndGetGlobalMemstoreSize(-prepareFlushResult.totalFlushableSize);
+
+            this.prepareFlushResult = null;
+            writestate.flushing = false;
+          } else if (flush.getFlushSequenceNumber() < prepareFlushResult.flushOpSeqId) {
+            // This should not happen normally. However, lets be safe and guard against these cases
+            // we received a flush commit with a smaller seqId than what we have prepared
+            // we will pick the flush file up from this commit (if we have not seen it), but we
+            // will not drop the memstore
+            LOG.warn("Received a flush commit marker with smaller seqId: "
+                + flush.getFlushSequenceNumber() + " than what we have prepared with seqId: "
+                + prepareFlushResult.flushOpSeqId + ". Picking up new file, but not dropping"
+                +"  prepared memstore snapshot");
+            replayFlushInStores(flush, prepareFlushResult, false);
+
+            // snapshot is not dropped, so memstore sizes should not be decremented
+            // we still have the prepared snapshot, flushing should still be true
+          } else {
+            // This should not happen normally. However, lets be safe and guard against these cases
+            // we received a flush commit with a larger seqId than what we have prepared
+            // we will pick the flush file for this. We will also obtain the updates lock and
+            // look for contents of the memstore to see whether we have edits after this seqId.
+            // If not, we will drop all the memstore edits and the snapshot as well.
+            LOG.warn("Received a flush commit marker with larger seqId: "
+                + flush.getFlushSequenceNumber() + " than what we have prepared with seqId: " +
+                prepareFlushResult.flushOpSeqId + ". Picking up new file and dropping prepared"
+                +" memstore snapshot");
+
+            replayFlushInStores(flush, prepareFlushResult, true);
+
+            // Set down the memstore size by amount of flush.
+            this.addAndGetGlobalMemstoreSize(-prepareFlushResult.totalFlushableSize);
+
+            // Inspect the memstore contents to see whether the memstore contains only edits
+            // with seqId smaller than the flush seqId. If so, we can discard those edits.
+            dropMemstoreContentsForSeqId(flush.getFlushSequenceNumber(), null);
+
+            this.prepareFlushResult = null;
+            writestate.flushing = false;
+          }
+        } else {
+          LOG.warn(getRegionInfo().getEncodedName() + " : "
+              + "Received a flush commit marker with seqId:" + flush.getFlushSequenceNumber()
+              + ", but no previous prepared snapshot was found");
+          // There is no corresponding prepare snapshot from before.
+          // We will pick up the new flushed file
+          replayFlushInStores(flush, null, false);
+
+          // Inspect the memstore contents to see whether the memstore contains only edits
+          // with seqId smaller than the flush seqId. If so, we can discard those edits.
+          dropMemstoreContentsForSeqId(flush.getFlushSequenceNumber(), null);
+        }
+
+        status.markComplete("Flush commit successful");
+
+        // Update the last flushed sequence id for region.
+        this.maxFlushedSeqId = flush.getFlushSequenceNumber();
+
+        // advance the mvcc read point so that the new flushed file is visible.
+        // there may be some in-flight transactions, but they won't be made visible since they are
+        // either greater than flush seq number or they were already dropped via flush.
+        // TODO: If we are using FlushAllStoresPolicy, then this can make edits visible from other
+        // stores while they are still in flight because the flush commit marker will not contain
+        // flushes from ALL stores.
+        getMVCC().advanceMemstoreReadPointIfNeeded(flush.getFlushSequenceNumber());
+
+        // C. Finally notify anyone waiting on memstore to clear:
+        // e.g. checkResources().
+        synchronized (this) {
+          notifyAll(); // FindBugs NN_NAKED_NOTIFY
+        }
+      } finally {
+        status.cleanup();
+        writestate.notifyAll();
+      }
+    }
+  }
+
+  /**
+   * Replays the given flush descriptor by opening the flush files in stores and dropping the
+   * memstore snapshots if requested.
+   * @param flush
+   * @param prepareFlushResult
+   * @param dropMemstoreSnapshot
+   * @throws IOException
+   */
+  private void replayFlushInStores(FlushDescriptor flush, PrepareFlushResult prepareFlushResult,
+      boolean dropMemstoreSnapshot)
       throws IOException {
-    Store store = this.getStore(compaction.getFamilyName().toByteArray());
-    if (store == null) {
-      LOG.warn("Found Compaction WAL edit for deleted family:" +
-          Bytes.toString(compaction.getFamilyName().toByteArray()));
+    for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
+      byte[] family = storeFlush.getFamilyName().toByteArray();
+      Store store = getStore(family);
+      if (store == null) {
+        LOG.warn("Received a flush commit marker from primary, but the family is not found." +
+            "Ignoring StoreFlushDescriptor:" + storeFlush);
+        continue;
+      }
+      List<String> flushFiles = storeFlush.getFlushOutputList();
+      StoreFlushContext ctx = null;
+      long startTime = EnvironmentEdgeManager.currentTime();
+      if (prepareFlushResult == null) {
+        ctx = store.createFlushContext(flush.getFlushSequenceNumber());
+      } else {
+        ctx = prepareFlushResult.storeFlushCtxs.get(family);
+        startTime = prepareFlushResult.startTime;
+      }
+
+      if (ctx == null) {
+        LOG.warn("Unexpected: flush commit marker received from store "
+            + Bytes.toString(family) + " but no associated flush context. Ignoring");
+        continue;
+      }
+      ctx.replayFlush(flushFiles, dropMemstoreSnapshot); // replay the flush
+
+      // Record latest flush time
+      this.lastStoreFlushTimeMap.put(store, startTime);
+    }
+  }
+
+  /**
+   * Drops the memstore contents after replaying a flush descriptor or region open event replay
+   * if the memstore edits have seqNums smaller than the given seq id
+   * @param flush the flush descriptor
+   * @throws IOException
+   */
+  private void dropMemstoreContentsForSeqId(long seqId, Store store) throws IOException {
+    this.updatesLock.writeLock().lock();
+    try {
+      mvcc.waitForPreviousTransactionsComplete();
+      long currentSeqId = getSequenceId().get();
+      if (seqId >= currentSeqId) {
+        // then we can drop the memstore contents since everything is below this seqId
+        LOG.info("Dropping memstore contents as well since replayed flush seqId: "
+            + seqId + " is greater than current seqId:" + currentSeqId);
+
+        // Prepare flush (take a snapshot) and then abort (drop the snapshot)
+        if (store == null ) {
+          for (Store s : stores.values()) {
+            dropStoreMemstoreContentsForSeqId(s, currentSeqId);
+          }
+        } else {
+          dropStoreMemstoreContentsForSeqId(store, currentSeqId);
+        }
+      } else {
+        LOG.info("Not dropping memstore contents since replayed flush seqId: "
+            + seqId + " is smaller than current seqId:" + currentSeqId);
+      }
+    } finally {
+      this.updatesLock.writeLock().unlock();
+    }
+  }
+
+  private void dropStoreMemstoreContentsForSeqId(Store s, long currentSeqId) throws IOException {
+    this.addAndGetGlobalMemstoreSize(-s.getFlushableSize());
+    StoreFlushContext ctx = s.createFlushContext(currentSeqId);
+    ctx.prepare();
+    ctx.abort();
+  }
+
+  private void replayWALFlushAbortMarker(FlushDescriptor flush) {
+    // nothing to do for now. A flush abort will cause a RS abort which means that the region
+    // will be opened somewhere else later. We will see the region open event soon, and replaying
+    // that will drop the snapshot
+  }
+
+  @VisibleForTesting
+  PrepareFlushResult getPrepareFlushResult() {
+    return prepareFlushResult;
+  }
+
+  void replayWALRegionEventMarker(RegionEventDescriptor regionEvent) throws IOException {
+    checkTargetRegion(regionEvent.getEncodedRegionName().toByteArray(),
+      "RegionEvent marker from WAL ", regionEvent);
+
+    startRegionOperation(Operation.REPLAY_EVENT);
+    try {
+      if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
+        return; // if primary nothing to do
+      }
+
+      if (regionEvent.getEventType() == EventType.REGION_CLOSE) {
+        // nothing to do on REGION_CLOSE for now.
+        return;
+      }
+      if (regionEvent.getEventType() != EventType.REGION_OPEN) {
+        LOG.warn("Unknown region event received, ignoring :"
+            + TextFormat.shortDebugString(regionEvent));
+        return;
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Replaying region open event marker " + TextFormat.shortDebugString(regionEvent));
+      }
+
+      // we will use writestate as a coarse-grain lock for all the replay events
+      synchronized (writestate) {
+        // Replication can deliver events out of order when primary region moves or the region
+        // server crashes, since there is no coordination between replication of different wal files
+        // belonging to different region servers. We have to safe guard against this case by using
+        // region open event's seqid. Since this is the first event that the region puts (after
+        // possibly flushing recovered.edits), after seeing this event, we can ignore every edit
+        // smaller than this seqId
+        if (this.lastReplayedOpenRegionSeqId < regionEvent.getLogSequenceNumber()) {
+          this.lastReplayedOpenRegionSeqId = regionEvent.getLogSequenceNumber();
+        } else {
+          LOG.warn("Skipping replaying region event :" + TextFormat.shortDebugString(regionEvent)
+            + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
+            + " of " + lastReplayedOpenRegionSeqId);
+          return;
+        }
+
+        // region open lists all the files that the region has at the time of the opening. Just pick
+        // all the files and drop prepared flushes and empty memstores
+        for (StoreDescriptor storeDescriptor : regionEvent.getStoresList()) {
+          // stores of primary may be different now
+          byte[] family = storeDescriptor.getFamilyName().toByteArray();
+          Store store = getStore(family);
+          if (store == null) {
+            LOG.warn("Received a region open marker from primary, but the family is not found. "
+                + "Ignoring. StoreDescriptor:" + storeDescriptor);
+            continue;
+          }
+
+          long storeSeqId = store.getMaxSequenceId();
+          List<String> storeFiles = storeDescriptor.getStoreFileList();
+          store.refreshStoreFiles(storeFiles); // replace the files with the new ones
+          if (store.getMaxSequenceId() != storeSeqId) {
+            // Record latest flush time if we picked up new files
+            lastStoreFlushTimeMap.put(store, EnvironmentEdgeManager.currentTime());
+          }
+
+          if (writestate.flushing) {
+            // only drop memstore snapshots if they are smaller than last flush for the store
+            if (this.prepareFlushResult.flushOpSeqId <= regionEvent.getLogSequenceNumber()) {
+              StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs.get(family);
+              if (ctx != null) {
+                long snapshotSize = store.getFlushableSize();
+                ctx.abort();
+                this.addAndGetGlobalMemstoreSize(-snapshotSize);
+                this.prepareFlushResult.storeFlushCtxs.remove(family);
+              }
+            }
+          }
+
+          // Drop the memstore contents if they are now smaller than the latest seen flushed file
+          dropMemstoreContentsForSeqId(regionEvent.getLogSequenceNumber(), store);
+          if (storeSeqId > this.maxFlushedSeqId) {
+            this.maxFlushedSeqId = storeSeqId;
+          }
+        }
+
+        // if all stores ended up dropping their snapshots, we can safely drop the
+        // prepareFlushResult
+        if (writestate.flushing) {
+          boolean canDrop = true;
+          for (Entry<byte[], StoreFlushContext> entry
+              : prepareFlushResult.storeFlushCtxs.entrySet()) {
+            Store store = getStore(entry.getKey());
+            if (store == null) {
+              continue;
+            }
+            if (store.getSnapshotSize() > 0) {
+              canDrop = false;
+            }
+          }
+
+          // this means that all the stores in the region has finished flushing, but the WAL marker
+          // may not have been written or we did not receive it yet.
+          if (canDrop) {
+            writestate.flushing = false;
+            this.prepareFlushResult = null;
+          }
+        }
+
+
+        // advance the mvcc read point so that the new flushed file is visible.
+        // there may be some in-flight transactions, but they won't be made visible since they are
+        // either greater than flush seq number or they were already dropped via flush.
+        getMVCC().advanceMemstoreReadPointIfNeeded(this.maxFlushedSeqId);
+
+        // C. Finally notify anyone waiting on memstore to clear:
+        // e.g. checkResources().
+        synchronized (this) {
+          notifyAll(); // FindBugs NN_NAKED_NOTIFY
+        }
+      }
+    } finally {
+      closeRegionOperation(Operation.REPLAY_EVENT);
+    }
+  }
+
+  /** Checks whether the given regionName is either equal to our region, or that
+   * the regionName is the primary region to our corresponding range for the secondary replica.
+   */
+  private void checkTargetRegion(byte[] encodedRegionName, String exceptionMsg, Object payload)
+      throws WrongRegionException {
+    if (Bytes.equals(this.getRegionInfo().getEncodedNameAsBytes(), encodedRegionName)) {
       return;
     }
-    store.completeCompactionMarker(compaction);
+
+    if (!RegionReplicaUtil.isDefaultReplica(this.getRegionInfo()) &&
+        Bytes.equals(encodedRegionName,
+          this.fs.getRegionInfoForFS().getEncodedNameAsBytes())) {
+      return;
+    }
+
+    throw new WrongRegionException(exceptionMsg + payload
+      + " targetted for region " + Bytes.toStringBinary(encodedRegionName)
+      + " does not match this region: " + this.getRegionInfo());
   }
 
   /**
@@ -4127,8 +4779,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
    * @param familyPaths      List of Pair<byte[] column family, String hfilePath>
    * @param bulkLoadListener Internal hooks enabling massaging/preparation of a
    *                         file about to be bulk loaded
-   * @param assignSeqId      Force a flush, get it's sequenceId to preserve the guarantee that 
-   *                         all the edits lower than the highest sequential ID from all the 
+   * @param assignSeqId      Force a flush, get it's sequenceId to preserve the guarantee that
+   *                         all the edits lower than the highest sequential ID from all the
    *                         HFiles are flushed on disk.
    * @return true if successful, false if failed recoverably
    * @throws IOException if failed unrecoverably.
@@ -4217,7 +4869,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
             finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
           }
           store.bulkLoadHFile(finalPath, seqId);
-          
+
           if(storeFiles.containsKey(familyName)) {
             storeFiles.get(familyName).add(new Path(finalPath));
           } else {
@@ -4265,7 +4917,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
           }
         }
       }
-    
+
       closeBulkRegionOperation();
     }
   }
@@ -4989,7 +5641,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     checkClassLoading();
     this.openSeqNum = initialize(reporter);
     this.setSequenceId(openSeqNum);
-    if (wal != null && getRegionServerServices() != null) {
+    if (wal != null && getRegionServerServices() != null && !writestate.readOnly) {
       writeRegionOpenMarker(wal, openSeqNum);
     }
     return this;
@@ -5660,7 +6312,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
                   }
                 }
                 if (cell.getTagsLength() > 0) {
-                  Iterator<Tag> i  = CellUtil.tagsIterator(cell.getTagsArray(), 
+                  Iterator<Tag> i  = CellUtil.tagsIterator(cell.getTagsArray(),
                     cell.getTagsOffset(), cell.getTagsLength());
                   while (i.hasNext()) {
                     newTags.add(i.next());
@@ -6080,8 +6732,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
   public static final long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT +
       ClassSize.ARRAY +
-      44 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
-      (11 * Bytes.SIZEOF_LONG) +
+      45 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
+      (12 * Bytes.SIZEOF_LONG) +
       4 * Bytes.SIZEOF_BOOLEAN);
 
   // woefully out of date - currently missing:

http://git-wip-us.apache.org/repos/asf/hbase/blob/3e10e6e1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index 0751634..014ec2c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -119,6 +119,10 @@ public class HRegionFileSystem {
     return this.regionInfo;
   }
 
+  public HRegionInfo getRegionInfoForFS() {
+    return this.regionInfoForFs;
+  }
+
   /** @return {@link Path} to the region's root directory. */
   public Path getTableDir() {
     return this.tableDir;
@@ -205,7 +209,7 @@ public class HRegionFileSystem {
         continue;
       }
       StoreFileInfo info = ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, regionInfo,
-        regionInfoForFs, familyName, status);
+        regionInfoForFs, familyName, status.getPath());
       storeFiles.add(info);
 
     }
@@ -234,8 +238,8 @@ public class HRegionFileSystem {
   StoreFileInfo getStoreFileInfo(final String familyName, final String fileName)
       throws IOException {
     Path familyDir = getStoreDir(familyName);
-    FileStatus status = fs.getFileStatus(new Path(familyDir, fileName));
-    return new StoreFileInfo(this.conf, this.fs, status);
+    return ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, regionInfo,
+      regionInfoForFs, familyName, new Path(familyDir, fileName));
   }
 
   /**