You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2015/04/01 03:03:55 UTC

[6/6] hbase git commit: HBASE-12975 Supportable SplitTransaction and RegionMergeTransaction interfaces

HBASE-12975 Supportable SplitTransaction and RegionMergeTransaction interfaces


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e156ed61
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e156ed61
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e156ed61

Branch: refs/heads/master
Commit: e156ed619c1e79055f9832456e6523ec6a2ae085
Parents: 452ce33
Author: Andrew Purtell <ap...@apache.org>
Authored: Tue Mar 31 16:02:38 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Mar 31 16:02:38 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      |   4 +-
 .../hadoop/hbase/regionserver/Region.java       |   2 -
 .../hbase/regionserver/RegionMergeRequest.java  |   2 +-
 .../regionserver/RegionMergeTransaction.java    | 645 +++------------
 .../RegionMergeTransactionFactory.java          |  76 ++
 .../RegionMergeTransactionImpl.java             | 702 +++++++++++++++++
 .../RegionMergeTransactionImpl.java.rej         |  10 +
 .../hadoop/hbase/regionserver/SplitRequest.java |   2 +-
 .../hbase/regionserver/SplitTransaction.java    | 758 +++---------------
 .../regionserver/SplitTransactionFactory.java   |  74 ++
 .../regionserver/SplitTransactionImpl.java      | 789 +++++++++++++++++++
 .../coprocessor/TestCoprocessorInterface.java   |   7 +-
 .../coprocessor/TestRegionServerObserver.java   |   8 +-
 .../TestEndToEndSplitTransaction.java           |   2 +-
 .../hadoop/hbase/regionserver/TestHRegion.java  |   2 +-
 .../TestRegionMergeTransaction.java             |  44 +-
 .../TestRegionMergeTransactionOnCluster.java    |   2 +-
 .../regionserver/TestSplitTransaction.java      |  35 +-
 .../TestSplitTransactionOnCluster.java          |  23 +-
 .../apache/hadoop/hbase/util/TestHBaseFsck.java |   7 +-
 20 files changed, 1912 insertions(+), 1282 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e156ed61/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 83127d2..2ca1eb0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -6239,7 +6239,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       b.getRegionFileSystem().logFileSystemState(LOG);
     }
 
-    RegionMergeTransaction rmt = new RegionMergeTransaction(a, b, true);
+    RegionMergeTransactionImpl rmt = new RegionMergeTransactionImpl(a, b, true);
     if (!rmt.prepare(null)) {
       throw new IOException("Unable to merge regions " + a + " and " + b);
     }
@@ -6252,7 +6252,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         + Bytes.toStringBinary(mergedRegionInfo.getEndKey()) + ">");
     HRegion dstRegion;
     try {
-      dstRegion = rmt.execute(null, null);
+      dstRegion = (HRegion)rmt.execute(null, null);
     } catch (IOException ioe) {
       rmt.rollback(null, null);
       throw new IOException("Failed merging region " + a + " and " + b

http://git-wip-us.apache.org/repos/asf/hbase/blob/e156ed61/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 441a93b..ed9550e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -23,9 +23,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;

http://git-wip-us.apache.org/repos/asf/hbase/blob/e156ed61/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
index ffa98cd..534d01d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
@@ -65,7 +65,7 @@ class RegionMergeRequest implements Runnable {
     }
     try {
       final long startTime = EnvironmentEdgeManager.currentTime();
-      RegionMergeTransaction mt = new RegionMergeTransaction(region_a,
+      RegionMergeTransactionImpl mt = new RegionMergeTransactionImpl(region_a,
           region_b, forcible);
 
       //acquire a shared read lock on the table, so that table schema modifications

http://git-wip-us.apache.org/repos/asf/hbase/blob/e156ed61/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java
index 17ab887..72f0e89 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java
@@ -13,35 +13,20 @@
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitationsME
+ * License for the specific language governing permissions and limitations
  * under the License.
  */
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.regionserver.SplitTransaction.TransactionListener;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.MetaMutationAnnotation;
-import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
-import org.apache.hadoop.hbase.regionserver.SplitTransaction.LoggingProgressable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * Executes region merge as a "transaction". It is similar with
@@ -50,12 +35,21 @@ import org.apache.hadoop.hbase.util.Pair;
  * transaction and {@link #rollback(Server, RegionServerServices)} to cleanup if
  * execute fails.
  * 
- * <p>
- * Here is an example of how you would use this class:
- * 
+ * <p>Here is an example of how you would use this interface:
  * <pre>
- *  RegionMergeTransaction mt = new RegionMergeTransaction(this.conf, parent, midKey)
- *  if (!mt.prepare(services)) return;
+ *  RegionMergeTransactionFactory factory = new RegionMergeTransactionFactory(conf);
+ *  RegionMergeTransaction mt = factory.create(parent, midKey)
+ *    .registerTransactionListener(new TransactionListener() {
+ *       public void transition(RegionMergeTransaction transaction,
+ *         RegionMergeTransactionPhase from, RegionMergeTransactionPhase to) throws IOException {
+ *         // ...
+ *       }
+ *       public void rollback(RegionMergeTransaction transaction,
+ *         RegionMergeTransactionPhase from, RegionMergeTransactionPhase to) {
+ *         // ...
+ *       }
+ *    });
+ *  if (!mt.prepare()) return;
  *  try {
  *    mt.execute(server, services);
  *  } catch (IOException ioe) {
@@ -63,33 +57,25 @@ import org.apache.hadoop.hbase.util.Pair;
  *      mt.rollback(server, services);
  *      return;
  *    } catch (RuntimeException e) {
- *      myAbortable.abort("Failed merge, abort");
+ *      // abort the server
  *    }
  *  }
  * </Pre>
- * <p>
- * This class is not thread safe. Caller needs ensure merge is run by one thread
- * only.
+ * <p>A merge transaction is not thread safe.  Callers must ensure a split is run by
+ * one thread only.
  */
-@InterfaceAudience.Private
-public class RegionMergeTransaction {
-  private static final Log LOG = LogFactory.getLog(RegionMergeTransaction.class);
-
-  // Merged region info
-  private HRegionInfo mergedRegionInfo;
-  // region_a sorts before region_b
-  private final HRegion region_a;
-  private final HRegion region_b;
-  // merges dir is under region_a
-  private final Path mergesdir;
-  // We only merge adjacent regions if forcible is false
-  private final boolean forcible;
-
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+@InterfaceStability.Evolving
+public interface RegionMergeTransaction {
   /**
-   * Types to add to the transaction journal. Each enum is a step in the merge
-   * transaction. Used to figure how much we need to rollback.
+   * Each enum is a step in the merge transaction.
    */
-  enum JournalEntry {
+  enum RegionMergeTransactionPhase {
+    STARTED,
+    /**
+     * Prepared
+     */
+    PREPARED,
     /**
      * Set region as in transition, set it into MERGING state.
      */
@@ -122,95 +108,59 @@ public class RegionMergeTransaction {
      * Point of no return. If we got here, then transaction is not recoverable
      * other than by crashing out the regionserver.
      */
-    PONR
+    PONR,
+    /**
+     * Completed
+     */
+    COMPLETED
   }
 
-  /*
-   * Journal of how far the merge transaction has progressed.
+  /**
+   * Split transaction journal entry
    */
-  private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
+  public interface JournalEntry {
 
-  private static IOException closedByOtherException = new IOException(
-      "Failed to close region: already closed by another thread");
+    /** @return the completed phase marked by this journal entry */
+    RegionMergeTransactionPhase getPhase();
 
-  private RegionServerCoprocessorHost rsCoprocessorHost = null;
+    /** @return the time of phase completion */
+    long getTimeStamp();
+  }
 
   /**
-   * Constructor
-   * @param a region a to merge
-   * @param b region b to merge
-   * @param forcible if false, we will only merge adjacent regions
+   * Split transaction listener
    */
-  public RegionMergeTransaction(final Region a, final Region b,
-      final boolean forcible) {
-    if (a.getRegionInfo().compareTo(b.getRegionInfo()) <= 0) {
-      this.region_a = (HRegion)a;
-      this.region_b = (HRegion)b;
-    } else {
-      this.region_a = (HRegion)b;
-      this.region_b = (HRegion)a;
-    }
-    this.forcible = forcible;
-    this.mergesdir = region_a.getRegionFileSystem().getMergesDir();
+  public interface TransactionListener {
+
+    /**
+     * Invoked when transitioning forward from one transaction phase to another
+     * @param transaction the transaction
+     * @param from the current phase
+     * @param to the next phase
+     * @throws IOException listener can throw this to abort
+     */
+    void transition(RegionMergeTransaction transaction, RegionMergeTransactionPhase from,
+        RegionMergeTransactionPhase to) throws IOException;
+
+    /**
+     * Invoked when rolling back a transaction from one transaction phase to the
+     * previous
+     * @param transaction the transaction
+     * @param from the current phase
+     * @param to the previous phase
+     */
+    void rollback(RegionMergeTransaction transaction, RegionMergeTransactionPhase from,
+        RegionMergeTransactionPhase to);
   }
 
   /**
-   * Does checks on merge inputs.
+   * Check merge inputs and prepare the transaction.
    * @param services
    * @return <code>true</code> if the regions are mergeable else
    *         <code>false</code> if they are not (e.g. its already closed, etc.).
+   * @throws IOException 
    */
-  public boolean prepare(final RegionServerServices services) {
-    if (!region_a.getTableDesc().getTableName()
-        .equals(region_b.getTableDesc().getTableName())) {
-      LOG.info("Can't merge regions " + region_a + "," + region_b
-          + " because they do not belong to the same table");
-      return false;
-    }
-    if (region_a.getRegionInfo().equals(region_b.getRegionInfo())) {
-      LOG.info("Can't merge the same region " + region_a);
-      return false;
-    }
-    if (!forcible && !HRegionInfo.areAdjacent(region_a.getRegionInfo(),
-            region_b.getRegionInfo())) {
-      String msg = "Skip merging " + this.region_a.getRegionInfo().getRegionNameAsString()
-          + " and " + this.region_b.getRegionInfo().getRegionNameAsString()
-          + ", because they are not adjacent.";
-      LOG.info(msg);
-      return false;
-    }
-    if (!this.region_a.isMergeable() || !this.region_b.isMergeable()) {
-      return false;
-    }
-    try {
-      boolean regionAHasMergeQualifier = hasMergeQualifierInMeta(services,
-          region_a.getRegionInfo().getRegionName());
-      if (regionAHasMergeQualifier ||
-          hasMergeQualifierInMeta(services, region_b.getRegionInfo().getRegionName())) {
-        LOG.debug("Region " + (regionAHasMergeQualifier ?
-              region_a.getRegionInfo().getRegionNameAsString() :
-                region_b.getRegionInfo().getRegionNameAsString())
-            + " is not mergeable because it has merge qualifier in META");
-        return false;
-      }
-    } catch (IOException e) {
-      LOG.warn("Failed judging whether merge transaction is available for "
-              + region_a.getRegionInfo().getRegionNameAsString() + " and "
-              + region_b.getRegionInfo().getRegionNameAsString(), e);
-      return false;
-    }
-
-    // WARN: make sure there is no parent region of the two merging regions in
-    // hbase:meta If exists, fixing up daughters would cause daughter regions(we
-    // have merged one) online again when we restart master, so we should clear
-    // the parent region to prevent the above case
-    // Since HBASE-7721, we don't need fix up daughters any more. so here do
-    // nothing
-
-    this.mergedRegionInfo = getMergedRegionInfo(region_a.getRegionInfo(),
-        region_b.getRegionInfo());
-    return true;
-  }
+  boolean prepare(RegionServerServices services) throws IOException;
 
   /**
    * Run the transaction.
@@ -222,325 +172,10 @@ public class RegionMergeTransaction {
    * @throws IOException
    * @see #rollback(Server, RegionServerServices)
    */
-  public HRegion execute(final Server server,
- final RegionServerServices services) throws IOException {
-    if (rsCoprocessorHost == null) {
-      rsCoprocessorHost = server != null ?
-        ((HRegionServer) server).getRegionServerCoprocessorHost() : null;
-    }
-    HRegion mergedRegion = createMergedRegion(server, services);
-    if (rsCoprocessorHost != null) {
-      rsCoprocessorHost.postMergeCommit(this.region_a, this.region_b, mergedRegion);
-    }
-    return stepsAfterPONR(server, services, mergedRegion);
-  }
-
-  public HRegion stepsAfterPONR(final Server server, final RegionServerServices services,
-      HRegion mergedRegion) throws IOException {
-    openMergedRegion(server, services, mergedRegion);
-    if (rsCoprocessorHost != null) {
-      rsCoprocessorHost.postMerge(this.region_a, this.region_b, mergedRegion);
-    }
-    return mergedRegion;
-  }
-
-  /**
-   * Prepare the merged region and region files.
-   * @param server Hosting server instance. Can be null when testing
-   * @param services Used to online/offline regions.
-   * @return merged region
-   * @throws IOException If thrown, transaction failed. Call
-   *           {@link #rollback(Server, RegionServerServices)}
-   */
-  HRegion createMergedRegion(final Server server,
-      final RegionServerServices services) throws IOException {
-    LOG.info("Starting merge of " + region_a + " and "
-        + region_b.getRegionInfo().getRegionNameAsString() + ", forcible=" + forcible);
-    if ((server != null && server.isStopped())
-        || (services != null && services.isStopping())) {
-      throw new IOException("Server is stopped or stopping");
-    }
-
-    if (rsCoprocessorHost != null) {
-      if (rsCoprocessorHost.preMerge(this.region_a, this.region_b)) {
-        throw new IOException("Coprocessor bypassing regions " + this.region_a + " "
-            + this.region_b + " merge.");
-      }
-    }
-
-    // If true, no cluster to write meta edits to or to use coordination.
-    boolean testing = server == null ? true : server.getConfiguration()
-        .getBoolean("hbase.testing.nocluster", false);
-
-    HRegion mergedRegion = stepsBeforePONR(server, services, testing);
-
-    @MetaMutationAnnotation
-    List<Mutation> metaEntries = new ArrayList<Mutation>();
-    if (rsCoprocessorHost != null) {
-      if (rsCoprocessorHost.preMergeCommit(this.region_a, this.region_b, metaEntries)) {
-        throw new IOException("Coprocessor bypassing regions " + this.region_a + " "
-            + this.region_b + " merge.");
-      }
-      try {
-        for (Mutation p : metaEntries) {
-          HRegionInfo.parseRegionName(p.getRow());
-        }
-      } catch (IOException e) {
-        LOG.error("Row key of mutation from coprocessor is not parsable as region name."
-            + "Mutations from coprocessor should only be for hbase:meta table.", e);
-        throw e;
-      }
-    }
-
-    // This is the point of no return. Similar with SplitTransaction.
-    // IF we reach the PONR then subsequent failures need to crash out this
-    // regionserver
-    this.journal.add(JournalEntry.PONR);
-
-    // Add merged region and delete region_a and region_b
-    // as an atomic update. See HBASE-7721. This update to hbase:meta makes the region
-    // will determine whether the region is merged or not in case of failures.
-    // If it is successful, master will roll-forward, if not, master will
-    // rollback
-    if (services != null && !services.reportRegionStateTransition(TransitionCode.MERGE_PONR,
-        mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
-      // Passed PONR, let SSH clean it up
-      throw new IOException("Failed to notify master that merge passed PONR: "
-        + region_a.getRegionInfo().getRegionNameAsString() + " and "
-        + region_b.getRegionInfo().getRegionNameAsString());
-    }
-    return mergedRegion;
-  }
-
-  public void prepareMutationsForMerge(HRegionInfo mergedRegion, HRegionInfo regionA,
-      HRegionInfo regionB, ServerName serverName, List<Mutation> mutations) throws IOException {
-    HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion);
-
-    // Put for parent
-    Put putOfMerged = MetaTableAccessor.makePutFromRegionInfo(copyOfMerged);
-    putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER, regionA.toByteArray());
-    putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER, regionB.toByteArray());
-    mutations.add(putOfMerged);
-    // Deletes for merging regions
-    Delete deleteA = MetaTableAccessor.makeDeleteFromRegionInfo(regionA);
-    Delete deleteB = MetaTableAccessor.makeDeleteFromRegionInfo(regionB);
-    mutations.add(deleteA);
-    mutations.add(deleteB);
-    // The merged is a new region, openSeqNum = 1 is fine.
-    addLocation(putOfMerged, serverName, 1);
-  }
-
-  public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
-    p.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes
-        .toBytes(sn.getHostAndPort()));
-    p.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER, Bytes.toBytes(sn
-        .getStartcode()));
-    p.add(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER, Bytes.toBytes(openSeqNum));
-    return p;
-  }
-
-  public HRegion stepsBeforePONR(final Server server, final RegionServerServices services,
-      boolean testing) throws IOException {
-    if (services != null && !services.reportRegionStateTransition(TransitionCode.READY_TO_MERGE,
-        mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
-      throw new IOException("Failed to get ok from master to merge "
-        + region_a.getRegionInfo().getRegionNameAsString() + " and "
-        + region_b.getRegionInfo().getRegionNameAsString());
-    }
-    this.journal.add(JournalEntry.SET_MERGING);
-
-    this.region_a.getRegionFileSystem().createMergesDir();
-    this.journal.add(JournalEntry.CREATED_MERGE_DIR);
-
-    Map<byte[], List<StoreFile>> hstoreFilesOfRegionA = closeAndOfflineRegion(
-        services, this.region_a, true, testing);
-    Map<byte[], List<StoreFile>> hstoreFilesOfRegionB = closeAndOfflineRegion(
-        services, this.region_b, false, testing);
-
-    assert hstoreFilesOfRegionA != null && hstoreFilesOfRegionB != null;
-
-
-    //
-    // mergeStoreFiles creates merged region dirs under the region_a merges dir
-    // Nothing to unroll here if failure -- clean up of CREATE_MERGE_DIR will
-    // clean this up.
-    mergeStoreFiles(hstoreFilesOfRegionA, hstoreFilesOfRegionB);
-
-    // Log to the journal that we are creating merged region. We could fail
-    // halfway through. If we do, we could have left
-    // stuff in fs that needs cleanup -- a storefile or two. Thats why we
-    // add entry to journal BEFORE rather than AFTER the change.
-    this.journal.add(JournalEntry.STARTED_MERGED_REGION_CREATION);
-    HRegion mergedRegion = createMergedRegionFromMerges(this.region_a,
-        this.region_b, this.mergedRegionInfo);
-    return mergedRegion;
-  }
-
-  /**
-   * Create a merged region from the merges directory under region a. In order
-   * to mock it for tests, place it with a new method.
-   * @param a hri of region a
-   * @param b hri of region b
-   * @param mergedRegion hri of merged region
-   * @return merged HRegion.
-   * @throws IOException
-   */
-  HRegion createMergedRegionFromMerges(final HRegion a, final HRegion b,
-      final HRegionInfo mergedRegion) throws IOException {
-    return a.createMergedRegionFromMerges(mergedRegion, b);
-  }
-
-  /**
-   * Close the merging region and offline it in regionserver
-   * @param services
-   * @param region
-   * @param isRegionA true if it is merging region a, false if it is region b
-   * @param testing true if it is testing
-   * @return a map of family name to list of store files
-   * @throws IOException
-   */
-  private Map<byte[], List<StoreFile>> closeAndOfflineRegion(
-      final RegionServerServices services, final HRegion region,
-      final boolean isRegionA, final boolean testing) throws IOException {
-    Map<byte[], List<StoreFile>> hstoreFilesToMerge = null;
-    Exception exceptionToThrow = null;
-    try {
-      hstoreFilesToMerge = region.close(false);
-    } catch (Exception e) {
-      exceptionToThrow = e;
-    }
-    if (exceptionToThrow == null && hstoreFilesToMerge == null) {
-      // The region was closed by a concurrent thread. We can't continue
-      // with the merge, instead we must just abandon the merge. If we
-      // reopen or merge this could cause problems because the region has
-      // probably already been moved to a different server, or is in the
-      // process of moving to a different server.
-      exceptionToThrow = closedByOtherException;
-    }
-    if (exceptionToThrow != closedByOtherException) {
-      this.journal.add(isRegionA ? JournalEntry.CLOSED_REGION_A
-          : JournalEntry.CLOSED_REGION_B);
-    }
-    if (exceptionToThrow != null) {
-      if (exceptionToThrow instanceof IOException)
-        throw (IOException) exceptionToThrow;
-      throw new IOException(exceptionToThrow);
-    }
-
-    if (!testing) {
-      services.removeFromOnlineRegions(region, null);
-    }
-    this.journal.add(isRegionA ? JournalEntry.OFFLINED_REGION_A
-        : JournalEntry.OFFLINED_REGION_B);
-    return hstoreFilesToMerge;
-  }
-
-  /**
-   * Get merged region info through the specified two regions
-   * @param a merging region A
-   * @param b merging region B
-   * @return the merged region info
-   */
-  public static HRegionInfo getMergedRegionInfo(final HRegionInfo a,
-      final HRegionInfo b) {
-    long rid = EnvironmentEdgeManager.currentTime();
-    // Regionid is timestamp. Merged region's id can't be less than that of
-    // merging regions else will insert at wrong location in hbase:meta
-    if (rid < a.getRegionId() || rid < b.getRegionId()) {
-      LOG.warn("Clock skew; merging regions id are " + a.getRegionId()
-          + " and " + b.getRegionId() + ", but current time here is " + rid);
-      rid = Math.max(a.getRegionId(), b.getRegionId()) + 1;
-    }
-
-    byte[] startKey = null;
-    byte[] endKey = null;
-    // Choose the smaller as start key
-    if (a.compareTo(b) <= 0) {
-      startKey = a.getStartKey();
-    } else {
-      startKey = b.getStartKey();
-    }
-    // Choose the bigger as end key
-    if (Bytes.equals(a.getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
-        || (!Bytes.equals(b.getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
-            && Bytes.compareTo(a.getEndKey(), b.getEndKey()) > 0)) {
-      endKey = a.getEndKey();
-    } else {
-      endKey = b.getEndKey();
-    }
-
-    // Merged region is sorted between two merging regions in META
-    HRegionInfo mergedRegionInfo = new HRegionInfo(a.getTable(), startKey,
-        endKey, false, rid);
-    return mergedRegionInfo;
-  }
-
-  /**
-   * Perform time consuming opening of the merged region.
-   * @param server Hosting server instance. Can be null when testing
-   * @param services Used to online/offline regions.
-   * @param merged the merged region
-   * @throws IOException If thrown, transaction failed. Call
-   *           {@link #rollback(Server, RegionServerServices)}
-   */
-  void openMergedRegion(final Server server,
-      final RegionServerServices services, HRegion merged) throws IOException {
-    boolean stopped = server != null && server.isStopped();
-    boolean stopping = services != null && services.isStopping();
-    if (stopped || stopping) {
-      LOG.info("Not opening merged region  " + merged.getRegionInfo().getRegionNameAsString()
-          + " because stopping=" + stopping + ", stopped=" + stopped);
-      return;
-    }
-    HRegionInfo hri = merged.getRegionInfo();
-    LoggingProgressable reporter = server == null ? null
-        : new LoggingProgressable(hri, server.getConfiguration().getLong(
-            "hbase.regionserver.regionmerge.open.log.interval", 10000));
-    merged.openHRegion(reporter);
-
-    if (services != null) {
-      if (!services.reportRegionStateTransition(TransitionCode.MERGED,
-          mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
-        throw new IOException("Failed to report merged region to master: "
-          + mergedRegionInfo.getShortNameToLog());
-      }
-      services.addToOnlineRegions(merged);
-    }
-  }
-
-  /**
-   * Create reference file(s) of merging regions under the region_a merges dir
-   * @param hstoreFilesOfRegionA
-   * @param hstoreFilesOfRegionB
-   * @throws IOException
-   */
-  private void mergeStoreFiles(
-      Map<byte[], List<StoreFile>> hstoreFilesOfRegionA,
-      Map<byte[], List<StoreFile>> hstoreFilesOfRegionB)
-      throws IOException {
-    // Create reference file(s) of region A in mergdir
-    HRegionFileSystem fs_a = this.region_a.getRegionFileSystem();
-    for (Map.Entry<byte[], List<StoreFile>> entry : hstoreFilesOfRegionA
-        .entrySet()) {
-      String familyName = Bytes.toString(entry.getKey());
-      for (StoreFile storeFile : entry.getValue()) {
-        fs_a.mergeStoreFile(this.mergedRegionInfo, familyName, storeFile,
-            this.mergesdir);
-      }
-    }
-    // Create reference file(s) of region B in mergedir
-    HRegionFileSystem fs_b = this.region_b.getRegionFileSystem();
-    for (Map.Entry<byte[], List<StoreFile>> entry : hstoreFilesOfRegionB
-        .entrySet()) {
-      String familyName = Bytes.toString(entry.getKey());
-      for (StoreFile storeFile : entry.getValue()) {
-        fs_b.mergeStoreFile(this.mergedRegionInfo, familyName, storeFile,
-            this.mergesdir);
-      }
-    }
-  }
+  Region execute(Server server, RegionServerServices services) throws IOException;
 
   /**
+   * Roll back a failed transaction
    * @param server Hosting server instance (May be null when testing).
    * @param services Services of regionserver, used to online regions.
    * @throws IOException If thrown, rollback failed. Take drastic action.
@@ -548,123 +183,37 @@ public class RegionMergeTransaction {
    *         of no return and so now need to abort the server to minimize
    *         damage.
    */
-  @SuppressWarnings("deprecation")
-  public boolean rollback(final Server server,
-      final RegionServerServices services) throws IOException {
-    assert this.mergedRegionInfo != null;
-    // Coprocessor callback
-    if (rsCoprocessorHost != null) {
-      rsCoprocessorHost.preRollBackMerge(this.region_a, this.region_b);
-    }
-
-    boolean result = true;
-    ListIterator<JournalEntry> iterator = this.journal
-        .listIterator(this.journal.size());
-    // Iterate in reverse.
-    while (iterator.hasPrevious()) {
-      JournalEntry je = iterator.previous();
-      switch (je) {
-
-        case SET_MERGING:
-          if (services != null
-              && !services.reportRegionStateTransition(TransitionCode.MERGE_REVERTED,
-                  mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
-            return false;
-        }
-          break;
-
-        case CREATED_MERGE_DIR:
-          this.region_a.writestate.writesEnabled = true;
-          this.region_b.writestate.writesEnabled = true;
-          this.region_a.getRegionFileSystem().cleanupMergesDir();
-          break;
-
-        case CLOSED_REGION_A:
-          try {
-            // So, this returns a seqid but if we just closed and then reopened,
-            // we should be ok. On close, we flushed using sequenceid obtained
-            // from hosting regionserver so no need to propagate the sequenceid
-            // returned out of initialize below up into regionserver as we
-            // normally do.
-            this.region_a.initialize();
-          } catch (IOException e) {
-            LOG.error("Failed rollbacking CLOSED_REGION_A of region "
-                + this.region_a.getRegionInfo().getRegionNameAsString(), e);
-            throw new RuntimeException(e);
-          }
-          break;
-
-        case OFFLINED_REGION_A:
-          if (services != null)
-            services.addToOnlineRegions(this.region_a);
-          break;
-
-        case CLOSED_REGION_B:
-          try {
-            this.region_b.initialize();
-          } catch (IOException e) {
-            LOG.error("Failed rollbacking CLOSED_REGION_A of region "
-                + this.region_b.getRegionInfo().getRegionNameAsString(), e);
-            throw new RuntimeException(e);
-          }
-          break;
-
-        case OFFLINED_REGION_B:
-          if (services != null)
-            services.addToOnlineRegions(this.region_b);
-          break;
-
-        case STARTED_MERGED_REGION_CREATION:
-          this.region_a.getRegionFileSystem().cleanupMergedRegion(
-              this.mergedRegionInfo);
-          break;
-
-        case PONR:
-          // We got to the point-of-no-return so we need to just abort. Return
-          // immediately. Do not clean up created merged regions.
-          return false;
+  boolean rollback(Server server, RegionServerServices services) throws IOException;
 
-        default:
-          throw new RuntimeException("Unhandled journal entry: " + je);
-      }
-    }
-    // Coprocessor callback
-    if (rsCoprocessorHost != null) {
-      rsCoprocessorHost.postRollBackMerge(this.region_a, this.region_b);
-    }
+  /**
+   * Register a listener for transaction preparation, execution, and possibly
+   * rollback phases.
+   * <p>A listener can abort a transaction by throwing an exception. 
+   * @param listener the listener
+   * @return 'this' for chaining
+   */
+  RegionMergeTransaction registerTransactionListener(TransactionListener listener);
 
-    return result;
-  }
+  /** @return merged region info */
+  HRegionInfo getMergedRegionInfo();
 
-  HRegionInfo getMergedRegionInfo() {
-    return this.mergedRegionInfo;
-  }
+  /**
+   * Get the journal for the transaction.
+   * <p>Journal entries are an opaque type represented as JournalEntry. They can
+   * also provide useful debugging information via their toString method.
+   * @return the transaction journal
+   */
+  List<JournalEntry> getJournal();
 
-  // For unit testing.
-  Path getMergesDir() {
-    return this.mergesdir;
-  }
+  /**
+   * Get the Server running the transaction or rollback
+   * @return server instance
+   */
+  Server getServer();
 
   /**
-   * Checks if the given region has merge qualifier in hbase:meta
-   * @param services
-   * @param regionName name of specified region
-   * @return true if the given region has merge qualifier in META.(It will be
-   *         cleaned by CatalogJanitor)
-   * @throws IOException
+   * Get the RegonServerServices of the server running the transaction or rollback
+   * @return region server services
    */
-  boolean hasMergeQualifierInMeta(final RegionServerServices services,
-      final byte[] regionName) throws IOException {
-    if (services == null) return false;
-    // Get merge regions if it is a merged region and already has merge
-    // qualifier
-    Pair<HRegionInfo, HRegionInfo> mergeRegions = MetaTableAccessor
-        .getRegionsFromMergeQualifier(services.getConnection(), regionName);
-    if (mergeRegions != null &&
-        (mergeRegions.getFirst() != null || mergeRegions.getSecond() != null)) {
-      // It has merge qualifier
-      return true;
-    }
-    return false;
-  }
+  RegionServerServices getRegionServerServices();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e156ed61/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionFactory.java
new file mode 100644
index 0000000..c844d54
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
+/**
+ * A factory for creating RegionMergeTransactions, which execute region split as a "transaction".
+ * See {@link RegionMergeTransactionImpl}
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+@InterfaceStability.Evolving
+public class RegionMergeTransactionFactory implements Configurable {
+
+  public static final String MERGE_TRANSACTION_IMPL_KEY =
+      "hbase.regionserver.merge.transaction.impl";
+
+  private Configuration conf;
+
+  public RegionMergeTransactionFactory(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Create a merge transaction
+   * @param a region a to merge
+   * @param b region b to merge
+   * @param forcible if false, we will only merge adjacent regions
+   * @return transaction instance
+   */
+  public RegionMergeTransactionImpl create(final Region a, final Region b,
+      final boolean forcible) {
+    // The implementation class must extend RegionMergeTransactionImpl, not only
+    // implement the RegionMergeTransaction interface like you might expect,
+    // because various places such as AssignmentManager use static methods
+    // from RegionMergeTransactionImpl. Whatever we use for implementation must
+    // be compatible, so it's safest to require ? extends RegionMergeTransactionImpl.
+    // If not compatible we will throw a runtime exception from here.
+    return ReflectionUtils.instantiateWithCustomCtor(
+      conf.getClass(MERGE_TRANSACTION_IMPL_KEY, RegionMergeTransactionImpl.class,
+        RegionMergeTransactionImpl.class).getName(),
+      new Class[] { Region.class, Region.class, boolean.class },
+      new Object[] { a, b, forcible });
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e156ed61/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java
new file mode 100644
index 0000000..bf69534
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java
@@ -0,0 +1,702 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitationsME
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.MetaMutationAnnotation;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
+import org.apache.hadoop.hbase.regionserver.SplitTransactionImpl.LoggingProgressable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.annotations.VisibleForTesting;
+
+@InterfaceAudience.Private
+public class RegionMergeTransactionImpl implements RegionMergeTransaction {
+  private static final Log LOG = LogFactory.getLog(RegionMergeTransactionImpl.class);
+
+  // Merged region info
+  private HRegionInfo mergedRegionInfo;
+  // region_a sorts before region_b
+  private final HRegion region_a;
+  private final HRegion region_b;
+  // merges dir is under region_a
+  private final Path mergesdir;
+  // We only merge adjacent regions if forcible is false
+  private final boolean forcible;
+
+  /*
+   * Transaction state for listener, only valid during execute and
+   * rollback
+   */
+  private RegionMergeTransactionPhase currentPhase = RegionMergeTransactionPhase.STARTED;
+  private Server server;
+  private RegionServerServices rsServices;
+
+  public static class JournalEntryImpl implements JournalEntry {
+    private RegionMergeTransactionPhase type;
+    private long timestamp;
+
+    public JournalEntryImpl(RegionMergeTransactionPhase type) {
+      this(type, EnvironmentEdgeManager.currentTime());
+    }
+
+    public JournalEntryImpl(RegionMergeTransactionPhase type, long timestamp) {
+      this.type = type;
+      this.timestamp = timestamp;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append(type);
+      sb.append(" at ");
+      sb.append(timestamp);
+      return sb.toString();
+    }
+
+    @Override
+    public RegionMergeTransactionPhase getPhase() {
+      return type;
+    }
+
+    @Override
+    public long getTimeStamp() {
+      return timestamp;
+    }
+  }
+
+  /*
+   * Journal of how far the merge transaction has progressed.
+   */
+  private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
+
+  /**
+   * Listeners
+   */
+  private final ArrayList<TransactionListener> listeners = new ArrayList<TransactionListener>();
+
+  private static IOException closedByOtherException = new IOException(
+      "Failed to close region: already closed by another thread");
+
+  private RegionServerCoprocessorHost rsCoprocessorHost = null;
+
+  /**
+   * Constructor
+   * @param a region a to merge
+   * @param b region b to merge
+   * @param forcible if false, we will only merge adjacent regions
+   */
+  public RegionMergeTransactionImpl(final HRegion a, final HRegion b,
+      final boolean forcible) {
+    if (a.getRegionInfo().compareTo(b.getRegionInfo()) <= 0) {
+      this.region_a = a;
+      this.region_b = b;
+    } else {
+      this.region_a = b;
+      this.region_b = a;
+    }
+    this.forcible = forcible;
+    this.mergesdir = region_a.getRegionFileSystem().getMergesDir();
+  }
+
+  private void transition(RegionMergeTransactionPhase nextPhase) throws IOException {
+    transition(nextPhase, false);
+  }
+
+  private void transition(RegionMergeTransactionPhase nextPhase, boolean isRollback)
+      throws IOException {
+    if (!isRollback) {
+      // Add to the journal first, because if the listener throws an exception
+      // we need to roll back starting at 'nextPhase'
+      this.journal.add(new JournalEntryImpl(nextPhase));
+    }
+    for (int i = 0; i < listeners.size(); i++) {
+      TransactionListener listener = listeners.get(i);
+      if (!isRollback) {
+        listener.transition(this, currentPhase, nextPhase);
+      } else {
+        listener.rollback(this, currentPhase, nextPhase);
+      }
+    }
+    currentPhase = nextPhase;
+  }
+
+  @Override
+  public boolean prepare(final RegionServerServices services) throws IOException {
+    if (!region_a.getTableDesc().getTableName()
+        .equals(region_b.getTableDesc().getTableName())) {
+      LOG.info("Can't merge regions " + region_a + "," + region_b
+          + " because they do not belong to the same table");
+      return false;
+    }
+    if (region_a.getRegionInfo().equals(region_b.getRegionInfo())) {
+      LOG.info("Can't merge the same region " + region_a);
+      return false;
+    }
+    if (!forcible && !HRegionInfo.areAdjacent(region_a.getRegionInfo(),
+            region_b.getRegionInfo())) {
+      String msg = "Skip merging " + region_a.getRegionInfo().getRegionNameAsString()
+          + " and " + region_b.getRegionInfo().getRegionNameAsString()
+          + ", because they are not adjacent.";
+      LOG.info(msg);
+      return false;
+    }
+    if (!this.region_a.isMergeable() || !this.region_b.isMergeable()) {
+      return false;
+    }
+    try {
+      boolean regionAHasMergeQualifier = hasMergeQualifierInMeta(services,
+          region_a.getRegionInfo().getRegionName());
+      if (regionAHasMergeQualifier ||
+          hasMergeQualifierInMeta(services, region_b.getRegionInfo().getRegionName())) {
+        LOG.debug("Region " + (regionAHasMergeQualifier ? 
+            region_a.getRegionInfo().getRegionNameAsString()
+                : region_b.getRegionInfo().getRegionNameAsString())
+            + " is not mergeable because it has merge qualifier in META");
+        return false;
+      }
+    } catch (IOException e) {
+      LOG.warn("Failed judging whether merge transaction is available for "
+              + region_a.getRegionInfo().getRegionNameAsString() + " and "
+              + region_b.getRegionInfo().getRegionNameAsString(), e);
+      return false;
+    }
+
+    // WARN: make sure there is no parent region of the two merging regions in
+    // hbase:meta If exists, fixing up daughters would cause daughter regions(we
+    // have merged one) online again when we restart master, so we should clear
+    // the parent region to prevent the above case
+    // Since HBASE-7721, we don't need fix up daughters any more. so here do
+    // nothing
+
+    this.mergedRegionInfo = getMergedRegionInfo(region_a.getRegionInfo(),
+        region_b.getRegionInfo());
+
+    transition(RegionMergeTransactionPhase.PREPARED);
+    return true;
+  }
+
+  @Override
+  public Region execute(final Server server, final RegionServerServices services)
+      throws IOException {
+    this.server = server;
+    this.rsServices = services;
+    if (rsCoprocessorHost == null) {
+      rsCoprocessorHost = server != null ?
+        ((HRegionServer) server).getRegionServerCoprocessorHost() : null;
+    }
+    HRegion mergedRegion = createMergedRegion(server, services);
+    if (rsCoprocessorHost != null) {
+      rsCoprocessorHost.postMergeCommit(this.region_a, this.region_b, mergedRegion);
+    }
+    stepsAfterPONR(server, services, mergedRegion);
+
+    transition(RegionMergeTransactionPhase.COMPLETED);
+
+    return mergedRegion;
+  }
+
+  @VisibleForTesting
+  public void stepsAfterPONR(final Server server, final RegionServerServices services,
+      HRegion mergedRegion) throws IOException {
+    openMergedRegion(server, services, mergedRegion);
+    if (rsCoprocessorHost != null) {
+      rsCoprocessorHost.postMerge(this.region_a, this.region_b, mergedRegion);
+    }
+  }
+
+  /**
+   * Prepare the merged region and region files.
+   * @param server Hosting server instance. Can be null when testing
+   * @param services Used to online/offline regions.
+   * @return merged region
+   * @throws IOException If thrown, transaction failed. Call
+   *           {@link #rollback(Server, RegionServerServices)}
+   */
+  private HRegion createMergedRegion(final Server server, final RegionServerServices services)
+      throws IOException {
+    LOG.info("Starting merge of " + region_a + " and "
+        + region_b.getRegionInfo().getRegionNameAsString() + ", forcible=" + forcible);
+    if ((server != null && server.isStopped())
+        || (services != null && services.isStopping())) {
+      throw new IOException("Server is stopped or stopping");
+    }
+
+    if (rsCoprocessorHost != null) {
+      if (rsCoprocessorHost.preMerge(this.region_a, this.region_b)) {
+        throw new IOException("Coprocessor bypassing regions " + this.region_a + " "
+            + this.region_b + " merge.");
+      }
+    }
+
+    // If true, no cluster to write meta edits to or to use coordination.
+    boolean testing = server == null ? true : server.getConfiguration()
+        .getBoolean("hbase.testing.nocluster", false);
+
+    HRegion mergedRegion = stepsBeforePONR(server, services, testing);
+
+    @MetaMutationAnnotation
+    List<Mutation> metaEntries = new ArrayList<Mutation>();
+    if (rsCoprocessorHost != null) {
+      if (rsCoprocessorHost.preMergeCommit(this.region_a, this.region_b, metaEntries)) {
+        throw new IOException("Coprocessor bypassing regions " + this.region_a + " "
+            + this.region_b + " merge.");
+      }
+      try {
+        for (Mutation p : metaEntries) {
+          HRegionInfo.parseRegionName(p.getRow());
+        }
+      } catch (IOException e) {
+        LOG.error("Row key of mutation from coprocessor is not parsable as region name."
+            + "Mutations from coprocessor should only be for hbase:meta table.", e);
+        throw e;
+      }
+    }
+
+    // This is the point of no return. Similar with SplitTransaction.
+    // IF we reach the PONR then subsequent failures need to crash out this
+    // regionserver
+    transition(RegionMergeTransactionPhase.PONR);
+
+    // Add merged region and delete region_a and region_b
+    // as an atomic update. See HBASE-7721. This update to hbase:meta makes the region
+    // will determine whether the region is merged or not in case of failures.
+    // If it is successful, master will roll-forward, if not, master will
+    // rollback
+    if (services != null && !services.reportRegionStateTransition(TransitionCode.MERGE_PONR,
+        mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
+      // Passed PONR, let SSH clean it up
+      throw new IOException("Failed to notify master that merge passed PONR: "
+        + region_a.getRegionInfo().getRegionNameAsString() + " and "
+        + region_b.getRegionInfo().getRegionNameAsString());
+    }
+    return mergedRegion;
+  }
+
+  @VisibleForTesting
+  public void prepareMutationsForMerge(HRegionInfo mergedRegion, HRegionInfo regionA,
+      HRegionInfo regionB, ServerName serverName, List<Mutation> mutations) throws IOException {
+    HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion);
+
+    // Put for parent
+    Put putOfMerged = MetaTableAccessor.makePutFromRegionInfo(copyOfMerged);
+    putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER,
+      regionA.toByteArray());
+    putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER,
+      regionB.toByteArray());
+    mutations.add(putOfMerged);
+    // Deletes for merging regions
+    Delete deleteA = MetaTableAccessor.makeDeleteFromRegionInfo(regionA);
+    Delete deleteB = MetaTableAccessor.makeDeleteFromRegionInfo(regionB);
+    mutations.add(deleteA);
+    mutations.add(deleteB);
+    // The merged is a new region, openSeqNum = 1 is fine.
+    addLocation(putOfMerged, serverName, 1);
+  }
+
+  @VisibleForTesting
+  Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
+    p.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes
+        .toBytes(sn.getHostAndPort()));
+    p.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER, Bytes.toBytes(sn
+        .getStartcode()));
+    p.add(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER, Bytes.toBytes(openSeqNum));
+    return p;
+  }
+
+  @VisibleForTesting
+  public HRegion stepsBeforePONR(final Server server, final RegionServerServices services,
+      boolean testing) throws IOException {
+    if (services != null && !services.reportRegionStateTransition(TransitionCode.READY_TO_MERGE,
+        mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
+      throw new IOException("Failed to get ok from master to merge "
+        + region_a.getRegionInfo().getRegionNameAsString() + " and "
+        + region_b.getRegionInfo().getRegionNameAsString());
+    }
+
+    transition(RegionMergeTransactionPhase.SET_MERGING);
+
+    this.region_a.getRegionFileSystem().createMergesDir();
+
+    transition(RegionMergeTransactionPhase.CREATED_MERGE_DIR);
+
+    Map<byte[], List<StoreFile>> hstoreFilesOfRegionA = closeAndOfflineRegion(
+        services, this.region_a, true, testing);
+    Map<byte[], List<StoreFile>> hstoreFilesOfRegionB = closeAndOfflineRegion(
+        services, this.region_b, false, testing);
+
+    assert hstoreFilesOfRegionA != null && hstoreFilesOfRegionB != null;
+
+    // mergeStoreFiles creates merged region dirs under the region_a merges dir
+    // Nothing to unroll here if failure -- clean up of CREATE_MERGE_DIR will
+    // clean this up.
+    mergeStoreFiles(hstoreFilesOfRegionA, hstoreFilesOfRegionB);
+
+    // Log to the journal that we are creating merged region. We could fail
+    // halfway through. If we do, we could have left
+    // stuff in fs that needs cleanup -- a storefile or two. Thats why we
+    // add entry to journal BEFORE rather than AFTER the change.
+
+    transition(RegionMergeTransactionPhase.STARTED_MERGED_REGION_CREATION);
+
+    HRegion mergedRegion = createMergedRegionFromMerges(this.region_a,
+        this.region_b, this.mergedRegionInfo);
+    return mergedRegion;
+  }
+
+  /**
+   * Create a merged region from the merges directory under region a. In order
+   * to mock it for tests, place it with a new method.
+   * @param a hri of region a
+   * @param b hri of region b
+   * @param mergedRegion hri of merged region
+   * @return merged HRegion.
+   * @throws IOException
+   */
+  @VisibleForTesting
+  HRegion createMergedRegionFromMerges(final HRegion a, final HRegion b,
+      final HRegionInfo mergedRegion) throws IOException {
+    return a.createMergedRegionFromMerges(mergedRegion, b);
+  }
+
+  /**
+   * Close the merging region and offline it in regionserver
+   * @param services
+   * @param region
+   * @param isRegionA true if it is merging region a, false if it is region b
+   * @param testing true if it is testing
+   * @return a map of family name to list of store files
+   * @throws IOException
+   */
+  private Map<byte[], List<StoreFile>> closeAndOfflineRegion(
+      final RegionServerServices services, final HRegion region,
+      final boolean isRegionA, final boolean testing) throws IOException {
+    Map<byte[], List<StoreFile>> hstoreFilesToMerge = null;
+    Exception exceptionToThrow = null;
+    try {
+      hstoreFilesToMerge = region.close(false);
+    } catch (Exception e) {
+      exceptionToThrow = e;
+    }
+    if (exceptionToThrow == null && hstoreFilesToMerge == null) {
+      // The region was closed by a concurrent thread. We can't continue
+      // with the merge, instead we must just abandon the merge. If we
+      // reopen or merge this could cause problems because the region has
+      // probably already been moved to a different server, or is in the
+      // process of moving to a different server.
+      exceptionToThrow = closedByOtherException;
+    }
+    if (exceptionToThrow != closedByOtherException) {
+      transition(isRegionA ? RegionMergeTransactionPhase.CLOSED_REGION_A
+          : RegionMergeTransactionPhase.CLOSED_REGION_B);
+    }
+    if (exceptionToThrow != null) {
+      if (exceptionToThrow instanceof IOException)
+        throw (IOException) exceptionToThrow;
+      throw new IOException(exceptionToThrow);
+    }
+    if (!testing) {
+      services.removeFromOnlineRegions(region, null);
+    }
+
+    transition(isRegionA ? RegionMergeTransactionPhase.OFFLINED_REGION_A
+        : RegionMergeTransactionPhase.OFFLINED_REGION_B);
+
+    return hstoreFilesToMerge;
+  }
+
+  /**
+   * Get merged region info through the specified two regions
+   * @param a merging region A
+   * @param b merging region B
+   * @return the merged region info
+   */
+  @VisibleForTesting
+  static HRegionInfo getMergedRegionInfo(final HRegionInfo a, final HRegionInfo b) {
+    long rid = EnvironmentEdgeManager.currentTime();
+    // Regionid is timestamp. Merged region's id can't be less than that of
+    // merging regions else will insert at wrong location in hbase:meta
+    if (rid < a.getRegionId() || rid < b.getRegionId()) {
+      LOG.warn("Clock skew; merging regions id are " + a.getRegionId()
+          + " and " + b.getRegionId() + ", but current time here is " + rid);
+      rid = Math.max(a.getRegionId(), b.getRegionId()) + 1;
+    }
+
+    byte[] startKey = null;
+    byte[] endKey = null;
+    // Choose the smaller as start key
+    if (a.compareTo(b) <= 0) {
+      startKey = a.getStartKey();
+    } else {
+      startKey = b.getStartKey();
+    }
+    // Choose the bigger as end key
+    if (Bytes.equals(a.getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
+        || (!Bytes.equals(b.getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
+            && Bytes.compareTo(a.getEndKey(), b.getEndKey()) > 0)) {
+      endKey = a.getEndKey();
+    } else {
+      endKey = b.getEndKey();
+    }
+
+    // Merged region is sorted between two merging regions in META
+    HRegionInfo mergedRegionInfo = new HRegionInfo(a.getTable(), startKey,
+        endKey, false, rid);
+    return mergedRegionInfo;
+  }
+
+  /**
+   * Perform time consuming opening of the merged region.
+   * @param server Hosting server instance. Can be null when testing
+   * @param services Used to online/offline regions.
+   * @param merged the merged region
+   * @throws IOException If thrown, transaction failed. Call
+   *           {@link #rollback(Server, RegionServerServices)}
+   */
+  @VisibleForTesting
+  void openMergedRegion(final Server server,  final RegionServerServices services,
+      HRegion merged) throws IOException {
+    boolean stopped = server != null && server.isStopped();
+    boolean stopping = services != null && services.isStopping();
+    if (stopped || stopping) {
+      LOG.info("Not opening merged region  " + merged.getRegionInfo().getRegionNameAsString()
+          + " because stopping=" + stopping + ", stopped=" + stopped);
+      return;
+    }
+    HRegionInfo hri = merged.getRegionInfo();
+    LoggingProgressable reporter = server == null ? null
+        : new LoggingProgressable(hri, server.getConfiguration().getLong(
+            "hbase.regionserver.regionmerge.open.log.interval", 10000));
+    merged.openHRegion(reporter);
+
+    if (services != null) {
+      if (!services.reportRegionStateTransition(TransitionCode.MERGED,
+          mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
+        throw new IOException("Failed to report merged region to master: "
+          + mergedRegionInfo.getShortNameToLog());
+      }
+      services.addToOnlineRegions(merged);
+    }
+  }
+
+  /**
+   * Create reference file(s) of merging regions under the region_a merges dir
+   * @param hstoreFilesOfRegionA
+   * @param hstoreFilesOfRegionB
+   * @throws IOException
+   */
+  private void mergeStoreFiles(
+      Map<byte[], List<StoreFile>> hstoreFilesOfRegionA,
+      Map<byte[], List<StoreFile>> hstoreFilesOfRegionB)
+      throws IOException {
+    // Create reference file(s) of region A in mergdir
+    HRegionFileSystem fs_a = this.region_a.getRegionFileSystem();
+    for (Map.Entry<byte[], List<StoreFile>> entry : hstoreFilesOfRegionA
+        .entrySet()) {
+      String familyName = Bytes.toString(entry.getKey());
+      for (StoreFile storeFile : entry.getValue()) {
+        fs_a.mergeStoreFile(this.mergedRegionInfo, familyName, storeFile,
+            this.mergesdir);
+      }
+    }
+    // Create reference file(s) of region B in mergedir
+    HRegionFileSystem fs_b = this.region_b.getRegionFileSystem();
+    for (Map.Entry<byte[], List<StoreFile>> entry : hstoreFilesOfRegionB
+        .entrySet()) {
+      String familyName = Bytes.toString(entry.getKey());
+      for (StoreFile storeFile : entry.getValue()) {
+        fs_b.mergeStoreFile(this.mergedRegionInfo, familyName, storeFile,
+            this.mergesdir);
+      }
+    }
+  }
+
+  @Override
+  public boolean rollback(final Server server,
+      final RegionServerServices services) throws IOException {
+    assert this.mergedRegionInfo != null;
+    this.server = server;
+    this.rsServices = services;
+    // Coprocessor callback
+    if (rsCoprocessorHost != null) {
+      rsCoprocessorHost.preRollBackMerge(this.region_a, this.region_b);
+    }
+
+    boolean result = true;
+    ListIterator<JournalEntry> iterator = this.journal
+        .listIterator(this.journal.size());
+    // Iterate in reverse.
+    while (iterator.hasPrevious()) {
+      JournalEntry je = iterator.previous();
+
+      transition(je.getPhase(), true);
+
+      switch (je.getPhase()) {
+
+        case SET_MERGING:
+          if (services != null
+              && !services.reportRegionStateTransition(TransitionCode.MERGE_REVERTED,
+                  mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
+            return false;
+        }
+          break;
+
+        case CREATED_MERGE_DIR:
+          this.region_a.writestate.writesEnabled = true;
+          this.region_b.writestate.writesEnabled = true;
+          this.region_a.getRegionFileSystem().cleanupMergesDir();
+          break;
+
+        case CLOSED_REGION_A:
+          try {
+            // So, this returns a seqid but if we just closed and then reopened,
+            // we should be ok. On close, we flushed using sequenceid obtained
+            // from hosting regionserver so no need to propagate the sequenceid
+            // returned out of initialize below up into regionserver as we
+            // normally do.
+            this.region_a.initialize();
+          } catch (IOException e) {
+            LOG.error("Failed rollbacking CLOSED_REGION_A of region "
+                + region_a.getRegionInfo().getRegionNameAsString(), e);
+            throw new RuntimeException(e);
+          }
+          break;
+
+        case OFFLINED_REGION_A:
+          if (services != null)
+            services.addToOnlineRegions(this.region_a);
+          break;
+
+        case CLOSED_REGION_B:
+          try {
+            this.region_b.initialize();
+          } catch (IOException e) {
+            LOG.error("Failed rollbacking CLOSED_REGION_A of region "
+                + region_b.getRegionInfo().getRegionNameAsString(), e);
+            throw new RuntimeException(e);
+          }
+          break;
+
+        case OFFLINED_REGION_B:
+          if (services != null)
+            services.addToOnlineRegions(this.region_b);
+          break;
+
+        case STARTED_MERGED_REGION_CREATION:
+          this.region_a.getRegionFileSystem().cleanupMergedRegion(
+              this.mergedRegionInfo);
+          break;
+
+        case PONR:
+          // We got to the point-of-no-return so we need to just abort. Return
+          // immediately. Do not clean up created merged regions.
+          return false;
+
+         // Informational states only
+        case STARTED:
+        case PREPARED:
+        case COMPLETED:
+          break;
+
+        default:
+          throw new RuntimeException("Unhandled journal entry: " + je);
+      }
+    }
+    // Coprocessor callback
+    if (rsCoprocessorHost != null) {
+      rsCoprocessorHost.postRollBackMerge(this.region_a, this.region_b);
+    }
+
+    return result;
+  }
+
+  @Override
+  public HRegionInfo getMergedRegionInfo() {
+    return this.mergedRegionInfo;
+  }
+
+  @VisibleForTesting
+  Path getMergesDir() {
+    return this.mergesdir;
+  }
+
+  /**
+   * Checks if the given region has merge qualifier in hbase:meta
+   * @param services
+   * @param regionName name of specified region
+   * @return true if the given region has merge qualifier in META.(It will be
+   *         cleaned by CatalogJanitor)
+   * @throws IOException
+   */
+  @VisibleForTesting
+  boolean hasMergeQualifierInMeta(final RegionServerServices services, final byte[] regionName)
+      throws IOException {
+    if (services == null) return false;
+    // Get merge regions if it is a merged region and already has merge
+    // qualifier
+    Pair<HRegionInfo, HRegionInfo> mergeRegions = MetaTableAccessor
+        .getRegionsFromMergeQualifier(services.getConnection(), regionName);
+    if (mergeRegions != null &&
+        (mergeRegions.getFirst() != null || mergeRegions.getSecond() != null)) {
+      // It has merge qualifier
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public List<JournalEntry> getJournal() {
+    return journal;
+  }
+
+  @Override
+  public RegionMergeTransaction registerTransactionListener(TransactionListener listener) {
+    listeners.add(listener);
+    return this;
+  }
+
+  @Override
+  public Server getServer() {
+    return server;
+  }
+
+  @Override
+  public RegionServerServices getRegionServerServices() {
+    return rsServices;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e156ed61/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java.rej
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java.rej b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java.rej
new file mode 100644
index 0000000..45e9534
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java.rej
@@ -0,0 +1,10 @@
+--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java
++++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java
+@@ -41,6 +41,7 @@
+ import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+ import org.apache.hadoop.hbase.coordination.RegionMergeCoordination.RegionMergeDetails;
+ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
++import org.apache.hadoop.hbase.regionserver.SplitTransactionImpl.JournalEntryImpl;
+ import org.apache.hadoop.hbase.regionserver.SplitTransactionImpl.LoggingProgressable;
+ import org.apache.hadoop.hbase.util.Bytes;
+ import org.apache.hadoop.hbase.util.ConfigUtil;

http://git-wip-us.apache.org/repos/asf/hbase/blob/e156ed61/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
index 9034a72..b1600c0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
@@ -64,7 +64,7 @@ class SplitRequest implements Runnable {
     boolean success = false;
     server.metricsRegionServer.incrSplitRequest();
     long startTime = EnvironmentEdgeManager.currentTime();
-    SplitTransaction st = new SplitTransaction(parent, midKey);
+    SplitTransactionImpl st = new SplitTransactionImpl(parent, midKey);
     try {
       //acquire a shared read lock on the table, so that table schema modifications
       //do not happen concurrently