You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2015/04/01 03:03:54 UTC

[5/6] hbase git commit: HBASE-12975 Supportable SplitTransaction and RegionMergeTransaction interfaces

http://git-wip-us.apache.org/repos/asf/hbase/blob/e156ed61/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
index bbee93c..a21c19d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
@@ -1,5 +1,4 @@
-/**
- *
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,93 +18,62 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.CancelableProgressable;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.HasThread;
-import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.util.PairOfSameType;
-import org.apache.zookeeper.KeeperException;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * Executes region split as a "transaction".  Call {@link #prepare()} to setup
  * the transaction, {@link #execute(Server, RegionServerServices)} to run the
  * transaction and {@link #rollback(Server, RegionServerServices)} to cleanup if execute fails.
  *
- * <p>Here is an example of how you would use this class:
+ * <p>Here is an example of how you would use this interface:
  * <pre>
- *  SplitTransaction st = new SplitTransaction(this.conf, parent, midKey)
+ *  SplitTransactionFactory factory = new SplitTransactionFactory(conf);
+ *  SplitTransaction st = factory.create(parent, midKey)
+ *    .registerTransactionListener(new TransactionListener() {
+ *       public void transition(SplitTransaction transaction, SplitTransactionPhase from,
+ *           SplitTransactionPhase to) throws IOException {
+ *         // ...
+ *       }
+ *       public void rollback(SplitTransaction transaction, SplitTransactionPhase from,
+ *           SplitTransactionPhase to) {
+ *         // ...
+ *       }
+ *    });
  *  if (!st.prepare()) return;
  *  try {
  *    st.execute(server, services);
- *  } catch (IOException ioe) {
+ *  } catch (IOException e) {
  *    try {
  *      st.rollback(server, services);
  *      return;
  *    } catch (RuntimeException e) {
- *      myAbortable.abort("Failed split, abort");
+ *      // abort the server
  *    }
  *  }
  * </Pre>
- * <p>This class is not thread safe.  Caller needs ensure split is run by
+ * <p>A split transaction is not thread safe.  Callers must ensure a split is run by
  * one thread only.
  */
-@InterfaceAudience.Private
-public class SplitTransaction {
-  private static final Log LOG = LogFactory.getLog(SplitTransaction.class);
-
-  /*
-   * Region to split
-   */
-  private final HRegion parent;
-  private HRegionInfo hri_a;
-  private HRegionInfo hri_b;
-  private long fileSplitTimeout = 30000;
-
-  /*
-   * Row to split around
-   */
-  private final byte [] splitrow;
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+@InterfaceStability.Evolving
+public interface SplitTransaction {
 
   /**
-   * Types to add to the transaction journal.
-   * Each enum is a step in the split transaction. Used to figure how much
-   * we need to rollback.
+   * Each enum is a step in the split transaction.
    */
-  static enum JournalEntryType {
+  public enum SplitTransactionPhase {
     /**
      * Started
      */
     STARTED,
     /**
-     * Prepared (after table lock)
+     * Prepared
      */
     PREPARED,
     /**
@@ -149,6 +117,12 @@ public class SplitTransaction {
      */
     OPENED_REGION_B,
     /**
+     * Point of no return.
+     * If we got here, then transaction is not recoverable other than by
+     * crashing out the regionserver.
+     */
+    PONR,
+    /**
      * Before postSplit coprocessor hook
      */
     BEFORE_POST_SPLIT_HOOK,
@@ -157,327 +131,60 @@ public class SplitTransaction {
      */
     AFTER_POST_SPLIT_HOOK,
     /**
-     * Point of no return.
-     * If we got here, then transaction is not recoverable other than by
-     * crashing out the regionserver.
+     * Completed
      */
-    PONR
-  }
-
-  static class JournalEntry {
-    private JournalEntryType type;
-    private long timestamp;
-
-    public JournalEntry(JournalEntryType type) {
-      this(type, EnvironmentEdgeManager.currentTime());
-    }
-
-    public JournalEntry(JournalEntryType type, long timestamp) {
-      this.type = type;
-      this.timestamp = timestamp;
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder sb = new StringBuilder();
-      sb.append(type);
-      sb.append(" at ");
-      sb.append(timestamp);
-      return sb.toString();
-    }
+    COMPLETED
   }
 
-  /*
-   * Journal of how far the split transaction has progressed.
-   */
-  private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
-
   /**
-   * Constructor
-   * @param r Region to split
-   * @param splitrow Row to split around
+   * Split transaction journal entry
    */
-  public SplitTransaction(final Region r, final byte [] splitrow) {
-    this.parent = (HRegion)r;
-    this.splitrow = splitrow;
-    this.journal.add(new JournalEntry(JournalEntryType.STARTED));
-  }
+  public interface JournalEntry {
 
-  /**
-   * Does checks on split inputs.
-   * @return <code>true</code> if the region is splittable else
-   * <code>false</code> if it is not (e.g. its already closed, etc.).
-   */
-  public boolean prepare() {
-    if (!this.parent.isSplittable()) return false;
-    // Split key can be null if this region is unsplittable; i.e. has refs.
-    if (this.splitrow == null) return false;
-    HRegionInfo hri = this.parent.getRegionInfo();
-    parent.prepareToSplit();
-    // Check splitrow.
-    byte [] startKey = hri.getStartKey();
-    byte [] endKey = hri.getEndKey();
-    if (Bytes.equals(startKey, splitrow) ||
-        !this.parent.getRegionInfo().containsRow(splitrow)) {
-      LOG.info("Split row is not inside region key range or is equal to " +
-          "startkey: " + Bytes.toStringBinary(this.splitrow));
-      return false;
-    }
-    long rid = getDaughterRegionIdTimestamp(hri);
-    this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
-    this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);
-    this.journal.add(new JournalEntry(JournalEntryType.PREPARED));
-    return true;
-  }
+    /** @return the completed phase marked by this journal entry */
+    SplitTransactionPhase getPhase();
 
-  /**
-   * Calculate daughter regionid to use.
-   * @param hri Parent {@link HRegionInfo}
-   * @return Daughter region id (timestamp) to use.
-   */
-  private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) {
-    long rid = EnvironmentEdgeManager.currentTime();
-    // Regionid is timestamp.  Can't be less than that of parent else will insert
-    // at wrong location in hbase:meta (See HBASE-710).
-    if (rid < hri.getRegionId()) {
-      LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() +
-        " but current time here is " + rid);
-      rid = hri.getRegionId() + 1;
-    }
-    return rid;
+    /** @return the time of phase completion */
+    long getTimeStamp();
   }
 
-  private static IOException closedByOtherException = new IOException(
-      "Failed to close region: already closed by another thread");
-
   /**
-   * Prepare the regions and region files.
-   * @param server Hosting server instance.  Can be null when testing (won't try
-   * and update in zk if a null server)
-   * @param services Used to online/offline regions.
-   * @throws IOException If thrown, transaction failed.
-   *    Call {@link #rollback(Server, RegionServerServices)}
-   * @return Regions created
+   * Split transaction listener
    */
-  /* package */PairOfSameType<Region> createDaughters(final Server server,
-      final RegionServerServices services) throws IOException {
-    LOG.info("Starting split of region " + this.parent);
-    if ((server != null && server.isStopped()) ||
-        (services != null && services.isStopping())) {
-      throw new IOException("Server is stopped or stopping");
-    }
-    assert !this.parent.lock.writeLock().isHeldByCurrentThread():
-      "Unsafe to hold write lock while performing RPCs";
-
-    journal.add(new JournalEntry(JournalEntryType.BEFORE_PRE_SPLIT_HOOK));
-
-    // Coprocessor callback
-    if (this.parent.getCoprocessorHost() != null) {
-      // TODO: Remove one of these
-      this.parent.getCoprocessorHost().preSplit();
-      this.parent.getCoprocessorHost().preSplit(this.splitrow);
-    }
-
-    journal.add(new JournalEntry(JournalEntryType.AFTER_PRE_SPLIT_HOOK));
-
-    // If true, no cluster to write meta edits to or to update znodes in.
-    boolean testing = server == null? true:
-        server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
-    this.fileSplitTimeout = testing ? this.fileSplitTimeout :
-        server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout",
-          this.fileSplitTimeout);
-
-    PairOfSameType<Region> daughterRegions = stepsBeforePONR(server, services, testing);
-
-    List<Mutation> metaEntries = new ArrayList<Mutation>();
-    if (this.parent.getCoprocessorHost() != null) {
-      if (this.parent.getCoprocessorHost().
-          preSplitBeforePONR(this.splitrow, metaEntries)) {
-        throw new IOException("Coprocessor bypassing region "
-            + this.parent.getRegionInfo().getRegionNameAsString() + " split.");
-      }
-      try {
-        for (Mutation p : metaEntries) {
-          HRegionInfo.parseRegionName(p.getRow());
-        }
-      } catch (IOException e) {
-        LOG.error("Row key of mutation from coprossor is not parsable as region name."
-            + "Mutations from coprocessor should only for hbase:meta table.");
-        throw e;
-      }
-    }
-
-    // This is the point of no return.  Adding subsequent edits to .META. as we
-    // do below when we do the daughter opens adding each to .META. can fail in
-    // various interesting ways the most interesting of which is a timeout
-    // BUT the edits all go through (See HBASE-3872).  IF we reach the PONR
-    // then subsequent failures need to crash out this regionserver; the
-    // server shutdown processing should be able to fix-up the incomplete split.
-    // The offlined parent will have the daughters as extra columns.  If
-    // we leave the daughter regions in place and do not remove them when we
-    // crash out, then they will have their references to the parent in place
-    // still and the server shutdown fixup of .META. will point to these
-    // regions.
-    // We should add PONR JournalEntry before offlineParentInMeta,so even if
-    // OfflineParentInMeta timeout,this will cause regionserver exit,and then
-    // master ServerShutdownHandler will fix daughter & avoid data loss. (See
-    // HBase-4562).
-    this.journal.add(new JournalEntry(JournalEntryType.PONR));
-
-    // Edit parent in meta.  Offlines parent region and adds splita and splitb
-    // as an atomic update. See HBASE-7721. This update to META makes the region
-    // will determine whether the region is split or not in case of failures.
-    // If it is successful, master will roll-forward, if not, master will rollback
-    // and assign the parent region.
-    if (services != null && !services.reportRegionStateTransition(TransitionCode.SPLIT_PONR,
-        parent.getRegionInfo(), hri_a, hri_b)) {
-      // Passed PONR, let SSH clean it up
-      throw new IOException("Failed to notify master that split passed PONR: "
-        + parent.getRegionInfo().getRegionNameAsString());
-    }
-    return daughterRegions;
-  }
+  public interface TransactionListener {
 
-  public PairOfSameType<Region> stepsBeforePONR(final Server server,
-      final RegionServerServices services, boolean testing) throws IOException {
-    if (services != null && !services.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT,
-        parent.getRegionInfo(), hri_a, hri_b)) {
-      throw new IOException("Failed to get ok from master to split "
-        + parent.getRegionInfo().getRegionNameAsString());
-    }
-    this.journal.add(new JournalEntry(JournalEntryType.SET_SPLITTING));
-
-    this.parent.getRegionFileSystem().createSplitsDir();
-    this.journal.add(new JournalEntry(JournalEntryType.CREATE_SPLIT_DIR));
-
-    Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
-    Exception exceptionToThrow = null;
-    try{
-      hstoreFilesToSplit = this.parent.close(false);
-    } catch (Exception e) {
-      exceptionToThrow = e;
-    }
-    if (exceptionToThrow == null && hstoreFilesToSplit == null) {
-      // The region was closed by a concurrent thread.  We can't continue
-      // with the split, instead we must just abandon the split.  If we
-      // reopen or split this could cause problems because the region has
-      // probably already been moved to a different server, or is in the
-      // process of moving to a different server.
-      exceptionToThrow = closedByOtherException;
-    }
-    if (exceptionToThrow != closedByOtherException) {
-      this.journal.add(new JournalEntry(JournalEntryType.CLOSED_PARENT_REGION));
-    }
-    if (exceptionToThrow != null) {
-      if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
-      throw new IOException(exceptionToThrow);
-    }
-    if (!testing) {
-      services.removeFromOnlineRegions(this.parent, null);
-    }
-    this.journal.add(new JournalEntry(JournalEntryType.OFFLINED_PARENT));
-
-    // TODO: If splitStoreFiles were multithreaded would we complete steps in
-    // less elapsed time?  St.Ack 20100920
-    //
-    // splitStoreFiles creates daughter region dirs under the parent splits dir
-    // Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will
-    // clean this up.
-    Pair<Integer, Integer> expectedReferences = splitStoreFiles(hstoreFilesToSplit);
-
-    // Log to the journal that we are creating region A, the first daughter
-    // region.  We could fail halfway through.  If we do, we could have left
-    // stuff in fs that needs cleanup -- a storefile or two.  Thats why we
-    // add entry to journal BEFORE rather than AFTER the change.
-    this.journal.add(new JournalEntry(JournalEntryType.STARTED_REGION_A_CREATION));
-    assertReferenceFileCount(expectedReferences.getFirst(),
-        this.parent.getRegionFileSystem().getSplitsDir(this.hri_a));
-    Region a = this.parent.createDaughterRegionFromSplits(this.hri_a);
-    assertReferenceFileCount(expectedReferences.getFirst(),
-        new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_a.getEncodedName()));
-
-    // Ditto
-    this.journal.add(new JournalEntry(JournalEntryType.STARTED_REGION_B_CREATION));
-    assertReferenceFileCount(expectedReferences.getSecond(),
-        this.parent.getRegionFileSystem().getSplitsDir(this.hri_b));
-    Region b = this.parent.createDaughterRegionFromSplits(this.hri_b);
-    assertReferenceFileCount(expectedReferences.getSecond(),
-        new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_b.getEncodedName()));
-
-    return new PairOfSameType<Region>(a, b);
-  }
+    /**
+     * Invoked when transitioning forward from one transaction phase to another
+     * @param transaction the transaction
+     * @param from the current phase
+     * @param to the next phase
+     * @throws IOException listener can throw this to abort
+     */
+    void transition(SplitTransaction transaction, SplitTransactionPhase from,
+        SplitTransactionPhase to) throws IOException;
 
-  void assertReferenceFileCount(int expectedReferenceFileCount, Path dir)
-      throws IOException {
-    if (expectedReferenceFileCount != 0 &&
-        expectedReferenceFileCount != FSUtils.getRegionReferenceFileCount(this.parent.getFilesystem(), dir)) {
-      throw new IOException("Failing split. Expected reference file count isn't equal.");
-    }
+    /**
+     * Invoked when rolling back a transaction from one transaction phase to the
+     * previous
+     * @param transaction the transaction
+     * @param from the current phase
+     * @param to the previous phase
+     */
+    void rollback(SplitTransaction transaction, SplitTransactionPhase from,
+        SplitTransactionPhase to);
   }
 
   /**
-   * Perform time consuming opening of the daughter regions.
-   * @param server Hosting server instance.  Can be null when testing
-   * @param services Used to online/offline regions.
-   * @param a first daughter region
-   * @param a second daughter region
-   * @throws IOException If thrown, transaction failed.
-   *          Call {@link #rollback(Server, RegionServerServices)}
+   * Check split inputs and prepare the transaction.
+   * @return <code>true</code> if the region is splittable else
+   * <code>false</code> if it is not (e.g. its already closed, etc.).
+   * @throws IOException 
    */
-  /* package */void openDaughters(final Server server,
-      final RegionServerServices services, Region a, Region b)
-      throws IOException {
-    boolean stopped = server != null && server.isStopped();
-    boolean stopping = services != null && services.isStopping();
-    // TODO: Is this check needed here?
-    if (stopped || stopping) {
-      LOG.info("Not opening daughters " +
-          b.getRegionInfo().getRegionNameAsString() +
-          " and " +
-          a.getRegionInfo().getRegionNameAsString() +
-          " because stopping=" + stopping + ", stopped=" + stopped);
-    } else {
-      // Open daughters in parallel.
-      DaughterOpener aOpener = new DaughterOpener(server, (HRegion)a);
-      DaughterOpener bOpener = new DaughterOpener(server, (HRegion)b);
-      aOpener.start();
-      bOpener.start();
-      try {
-        aOpener.join();
-        if (aOpener.getException() == null) {
-          journal.add(new JournalEntry(JournalEntryType.OPENED_REGION_A));
-        }
-        bOpener.join();
-        if (bOpener.getException() == null) {
-          journal.add(new JournalEntry(JournalEntryType.OPENED_REGION_B));
-        }
-      } catch (InterruptedException e) {
-        throw (InterruptedIOException)new InterruptedIOException().initCause(e);
-      }
-      if (aOpener.getException() != null) {
-        throw new IOException("Failed " +
-          aOpener.getName(), aOpener.getException());
-      }
-      if (bOpener.getException() != null) {
-        throw new IOException("Failed " +
-          bOpener.getName(), bOpener.getException());
-      }
-      if (services != null) {
-        if (!services.reportRegionStateTransition(TransitionCode.SPLIT,
-            parent.getRegionInfo(), hri_a, hri_b)) {
-          throw new IOException("Failed to report split region to master: "
-            + parent.getRegionInfo().getShortNameToLog());
-        }
-        // Should add it to OnlineRegions
-        services.addToOnlineRegions(b);
-        services.addToOnlineRegions(a);
-      }
-    }
-  }
+  boolean prepare() throws IOException;
 
   /**
    * Run the transaction.
-   * @param server Hosting server instance.  Can be null when testing
+   * @param server Hosting server instance.  Can be null when testing.
    * @param services Used to online/offline regions.
    * @throws IOException If thrown, transaction failed.
    *          Call {@link #rollback(Server, RegionServerServices)}
@@ -485,325 +192,44 @@ public class SplitTransaction {
    * @throws IOException
    * @see #rollback(Server, RegionServerServices)
    */
-  public PairOfSameType<Region> execute(final Server server,
-      final RegionServerServices services)
-  throws IOException {
-    PairOfSameType<Region> regions = createDaughters(server, services);
-    if (this.parent.getCoprocessorHost() != null) {
-      this.parent.getCoprocessorHost().preSplitAfterPONR();
-    }
-    return stepsAfterPONR(server, services, regions);
-  }
-
-  public PairOfSameType<Region> stepsAfterPONR(final Server server,
-      final RegionServerServices services, PairOfSameType<Region> regions)
-      throws IOException {
-    openDaughters(server, services, regions.getFirst(), regions.getSecond());
-    journal.add(new JournalEntry(JournalEntryType.BEFORE_POST_SPLIT_HOOK));
-    // Coprocessor callback
-    if (parent.getCoprocessorHost() != null) {
-      parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond());
-    }
-    journal.add(new JournalEntry(JournalEntryType.AFTER_POST_SPLIT_HOOK));
-    return regions;
-  }
-
-  public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
-    p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
-      Bytes.toBytes(sn.getHostAndPort()));
-    p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
-      Bytes.toBytes(sn.getStartcode()));
-    p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER,
-        Bytes.toBytes(openSeqNum));
-    return p;
-  }
+  PairOfSameType<Region> execute(Server server, RegionServerServices services) throws IOException;
 
-  /*
-   * Open daughter region in its own thread.
-   * If we fail, abort this hosting server.
+  /**
+   * Roll back a failed transaction
+   * @param server Hosting server instance (May be null when testing).
+   * @param services
+   * @throws IOException If thrown, rollback failed.  Take drastic action.
+   * @return True if we successfully rolled back, false if we got to the point
+   * of no return and so now need to abort the server to minimize damage.
    */
-  class DaughterOpener extends HasThread {
-    private final Server server;
-    private final HRegion r;
-    private Throwable t = null;
-
-    DaughterOpener(final Server s, final HRegion r) {
-      super((s == null? "null-services": s.getServerName()) +
-        "-daughterOpener=" + r.getRegionInfo().getEncodedName());
-      setDaemon(true);
-      this.server = s;
-      this.r = r;
-    }
-
-    /**
-     * @return Null if open succeeded else exception that causes us fail open.
-     * Call it after this thread exits else you may get wrong view on result.
-     */
-    Throwable getException() {
-      return this.t;
-    }
-
-    @Override
-    public void run() {
-      try {
-        openDaughterRegion(this.server, r);
-      } catch (Throwable t) {
-        this.t = t;
-      }
-    }
-  }
+  boolean rollback(Server server, RegionServerServices services) throws IOException;
 
   /**
-   * Open daughter regions, add them to online list and update meta.
-   * @param server
-   * @param daughter
-   * @throws IOException
-   * @throws KeeperException
+   * Register a listener for transaction preparation, execution, and possibly
+   * rollback phases.
+   * <p>A listener can abort a transaction by throwing an exception. 
+   * @param listener the listener
+   * @return 'this' for chaining
    */
-  void openDaughterRegion(final Server server, final HRegion daughter)
-  throws IOException, KeeperException {
-    HRegionInfo hri = daughter.getRegionInfo();
-    LoggingProgressable reporter = server == null ? null
-        : new LoggingProgressable(hri, server.getConfiguration().getLong(
-            "hbase.regionserver.split.daughter.open.log.interval", 10000));
-    daughter.openHRegion(reporter);
-  }
-
-  static class LoggingProgressable implements CancelableProgressable {
-    private final HRegionInfo hri;
-    private long lastLog = -1;
-    private final long interval;
-
-    LoggingProgressable(final HRegionInfo hri, final long interval) {
-      this.hri = hri;
-      this.interval = interval;
-    }
-
-    @Override
-    public boolean progress() {
-      long now = EnvironmentEdgeManager.currentTime();
-      if (now - lastLog > this.interval) {
-        LOG.info("Opening " + this.hri.getRegionNameAsString());
-        this.lastLog = now;
-      }
-      return true;
-    }
-  }
+  SplitTransaction registerTransactionListener(TransactionListener listener);
 
   /**
-   * Creates reference files for top and bottom half of the
-   * @param hstoreFilesToSplit map of store files to create half file references for.
-   * @return the number of reference files that were created.
-   * @throws IOException
+   * Get the journal for the transaction.
+   * <p>Journal entries are an opaque type represented as JournalEntry. They can
+   * also provide useful debugging information via their toString method.
+   * @return the transaction journal
    */
-  private Pair<Integer, Integer> splitStoreFiles(
-      final Map<byte[], List<StoreFile>> hstoreFilesToSplit)
-      throws IOException {
-    if (hstoreFilesToSplit == null) {
-      // Could be null because close didn't succeed -- for now consider it fatal
-      throw new IOException("Close returned empty list of StoreFiles");
-    }
-    // The following code sets up a thread pool executor with as many slots as
-    // there's files to split. It then fires up everything, waits for
-    // completion and finally checks for any exception
-    int nbFiles = hstoreFilesToSplit.size();
-    if (nbFiles == 0) {
-      // no file needs to be splitted.
-      return new Pair<Integer, Integer>(0,0);
-    }
-    LOG.info("Preparing to split " + nbFiles + " storefiles for region " + this.parent);
-    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
-    builder.setNameFormat("StoreFileSplitter-%1$d");
-    ThreadFactory factory = builder.build();
-    ThreadPoolExecutor threadPool =
-      (ThreadPoolExecutor) Executors.newFixedThreadPool(nbFiles, factory);
-    List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>> (nbFiles);
-
-    // Split each store file.
-    for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
-      for (StoreFile sf: entry.getValue()) {
-        StoreFileSplitter sfs = new StoreFileSplitter(entry.getKey(), sf);
-        futures.add(threadPool.submit(sfs));
-      }
-    }
-    // Shutdown the pool
-    threadPool.shutdown();
-
-    // Wait for all the tasks to finish
-    try {
-      boolean stillRunning = !threadPool.awaitTermination(
-          this.fileSplitTimeout, TimeUnit.MILLISECONDS);
-      if (stillRunning) {
-        threadPool.shutdownNow();
-        // wait for the thread to shutdown completely.
-        while (!threadPool.isTerminated()) {
-          Thread.sleep(50);
-        }
-        throw new IOException("Took too long to split the" +
-            " files and create the references, aborting split");
-      }
-    } catch (InterruptedException e) {
-      throw (InterruptedIOException)new InterruptedIOException().initCause(e);
-    }
-
-    int created_a = 0;
-    int created_b = 0;
-    // Look for any exception
-    for (Future<Pair<Path, Path>> future : futures) {
-      try {
-        Pair<Path, Path> p = future.get();
-        created_a += p.getFirst() != null ? 1 : 0;
-        created_b += p.getSecond() != null ? 1 : 0;
-      } catch (InterruptedException e) {
-        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
-      } catch (ExecutionException e) {
-        throw new IOException(e);
-      }
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Split storefiles for region " + this.parent + " Daugther A: " + created_a
-          + " storefiles, Daugther B: " + created_b + " storefiles.");
-    }
-    return new Pair<Integer, Integer>(created_a, created_b);
-  }
-
-  private Pair<Path, Path> splitStoreFile(final byte[] family, final StoreFile sf) throws IOException {
-    HRegionFileSystem fs = this.parent.getRegionFileSystem();
-    String familyName = Bytes.toString(family);
-    Path path_a =
-        fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false,
-          this.parent.getSplitPolicy());
-    Path path_b =
-        fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true,
-          this.parent.getSplitPolicy());
-    return new Pair<Path,Path>(path_a, path_b);
-  }
+  List<JournalEntry> getJournal();
 
   /**
-   * Utility class used to do the file splitting / reference writing
-   * in parallel instead of sequentially.
+   * Get the Server running the transaction or rollback
+   * @return server instance
    */
-  class StoreFileSplitter implements Callable<Pair<Path,Path>> {
-    private final byte[] family;
-    private final StoreFile sf;
-
-    /**
-     * Constructor that takes what it needs to split
-     * @param family Family that contains the store file
-     * @param sf which file
-     */
-    public StoreFileSplitter(final byte[] family, final StoreFile sf) {
-      this.sf = sf;
-      this.family = family;
-    }
-
-    public Pair<Path,Path> call() throws IOException {
-      return splitStoreFile(family, sf);
-    }
-  }
+  Server getServer();
 
   /**
-   * @param server Hosting server instance (May be null when testing).
-   * @param services
-   * @throws IOException If thrown, rollback failed.  Take drastic action.
-   * @return True if we successfully rolled back, false if we got to the point
-   * of no return and so now need to abort the server to minimize damage.
+   * Get the RegonServerServices of the server running the transaction or rollback
+   * @return region server services
    */
-  @SuppressWarnings("deprecation")
-  public boolean rollback(final Server server, final RegionServerServices services)
-  throws IOException {
-    // Coprocessor callback
-    if (this.parent.getCoprocessorHost() != null) {
-      this.parent.getCoprocessorHost().preRollBackSplit();
-    }
-
-    boolean result = true;
-    ListIterator<JournalEntry> iterator =
-      this.journal.listIterator(this.journal.size());
-    // Iterate in reverse.
-    while (iterator.hasPrevious()) {
-      JournalEntry je = iterator.previous();
-      switch(je.type) {
-
-      case SET_SPLITTING:
-        if (services != null
-            && !services.reportRegionStateTransition(TransitionCode.SPLIT_REVERTED,
-                parent.getRegionInfo(), hri_a, hri_b)) {
-          return false;
-        }
-        break;
-
-      case CREATE_SPLIT_DIR:
-        this.parent.writestate.writesEnabled = true;
-        this.parent.getRegionFileSystem().cleanupSplitsDir();
-        break;
-
-      case CLOSED_PARENT_REGION:
-        try {
-          // So, this returns a seqid but if we just closed and then reopened, we
-          // should be ok. On close, we flushed using sequenceid obtained from
-          // hosting regionserver so no need to propagate the sequenceid returned
-          // out of initialize below up into regionserver as we normally do.
-          // TODO: Verify.
-          this.parent.initialize();
-        } catch (IOException e) {
-          LOG.error("Failed rollbacking CLOSED_PARENT_REGION of region " +
-            this.parent.getRegionInfo().getRegionNameAsString(), e);
-          throw new RuntimeException(e);
-        }
-        break;
-
-      case STARTED_REGION_A_CREATION:
-        this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_a);
-        break;
-
-      case STARTED_REGION_B_CREATION:
-        this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_b);
-        break;
-
-      case OFFLINED_PARENT:
-        if (services != null) services.addToOnlineRegions(this.parent);
-        break;
-
-      case PONR:
-        // We got to the point-of-no-return so we need to just abort. Return
-        // immediately.  Do not clean up created daughter regions.  They need
-        // to be in place so we don't delete the parent region mistakenly.
-        // See HBASE-3872.
-        return false;
-
-      // Informational only cases
-      case STARTED:
-      case PREPARED:
-      case BEFORE_PRE_SPLIT_HOOK:
-      case AFTER_PRE_SPLIT_HOOK:
-      case BEFORE_POST_SPLIT_HOOK:
-      case AFTER_POST_SPLIT_HOOK:
-      case OPENED_REGION_A:
-      case OPENED_REGION_B:
-        break;
-
-      default:
-        throw new RuntimeException("Unhandled journal entry: " + je);
-      }
-    }
-    // Coprocessor callback
-    if (this.parent.getCoprocessorHost() != null) {
-      this.parent.getCoprocessorHost().postRollBackSplit();
-    }
-    return result;
-  }
-
-  HRegionInfo getFirstDaughter() {
-    return hri_a;
-  }
-
-  HRegionInfo getSecondDaughter() {
-    return hri_b;
-  }
-
-  List<JournalEntry> getJournal() {
-    return journal;
-  }
+  RegionServerServices getRegionServerServices();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e156ed61/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionFactory.java
new file mode 100644
index 0000000..7df8233
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
+/**
+ * A factory for creating SplitTransactions, which execute region split as a "transaction".
+ * See {@link SplitTransaction}
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+@InterfaceStability.Evolving
+public class SplitTransactionFactory implements Configurable {
+
+  public static final String SPLIT_TRANSACTION_IMPL_KEY =
+      "hbase.regionserver.split.transaction.impl";
+
+  private Configuration conf;
+
+  public SplitTransactionFactory(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Create a split transaction
+   * @param r the region to split
+   * @param splitrow the split point in the keyspace
+   * @return transaction instance
+   */
+  public SplitTransaction create(final Region r, final byte [] splitrow) {
+    return ReflectionUtils.instantiateWithCustomCtor(
+      // The implementation class must extend SplitTransactionImpl, not only
+      // implement the SplitTransaction interface like you might expect,
+      // because various places such as AssignmentManager use static methods
+      // from SplitTransactionImpl. Whatever we use for implementation must
+      // be compatible, so it's safest to require ? extends SplitTransactionImpl.
+      // If not compatible we will throw a runtime exception from here.
+      conf.getClass(SPLIT_TRANSACTION_IMPL_KEY, SplitTransactionImpl.class,
+        SplitTransactionImpl.class).getName(),
+      new Class[] { Region.class, byte[].class },
+      new Object[] { r, splitrow });
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e156ed61/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java
new file mode 100644
index 0000000..8695c77
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java
@@ -0,0 +1,789 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HasThread;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.PairOfSameType;
+import org.apache.zookeeper.KeeperException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+@InterfaceAudience.Private
+public class SplitTransactionImpl implements SplitTransaction {
+  private static final Log LOG = LogFactory.getLog(SplitTransactionImpl.class);
+
+  /*
+   * Region to split
+   */
+  private final HRegion parent;
+  private HRegionInfo hri_a;
+  private HRegionInfo hri_b;
+  private long fileSplitTimeout = 30000;
+
+  /*
+   * Row to split around
+   */
+  private final byte [] splitrow;
+
+  /*
+   * Transaction state for listener, only valid during execute and
+   * rollback
+   */
+  private SplitTransactionPhase currentPhase = SplitTransactionPhase.STARTED;
+  private Server server;
+  private RegionServerServices rsServices;
+
+  public static class JournalEntryImpl implements JournalEntry {
+    private SplitTransactionPhase type;
+    private long timestamp;
+
+    public JournalEntryImpl(SplitTransactionPhase type) {
+      this(type, EnvironmentEdgeManager.currentTime());
+    }
+
+    public JournalEntryImpl(SplitTransactionPhase type, long timestamp) {
+      this.type = type;
+      this.timestamp = timestamp;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append(type);
+      sb.append(" at ");
+      sb.append(timestamp);
+      return sb.toString();
+    }
+
+    @Override
+    public SplitTransactionPhase getPhase() {
+      return type;
+    }
+
+    @Override
+    public long getTimeStamp() {
+      return timestamp;
+    }
+  }
+
+  /*
+   * Journal of how far the split transaction has progressed.
+   */
+  private final ArrayList<JournalEntry> journal = new ArrayList<JournalEntry>();
+
+  /**
+   * Listeners
+   */
+  private final ArrayList<TransactionListener> listeners = new ArrayList<TransactionListener>();
+
+  /**
+   * Constructor
+   * @param r Region to split
+   * @param splitrow Row to split around
+   */
+  public SplitTransactionImpl(final Region r, final byte [] splitrow) {
+    this.parent = (HRegion)r;
+    this.splitrow = splitrow;
+    this.journal.add(new JournalEntryImpl(SplitTransactionPhase.STARTED));
+  }
+
+  private void transition(SplitTransactionPhase nextPhase) throws IOException {
+    transition(nextPhase, false);
+  }
+
+  private void transition(SplitTransactionPhase nextPhase, boolean isRollback)
+      throws IOException {
+    if (!isRollback) {
+      // Add to the journal first, because if the listener throws an exception
+      // we need to roll back starting at 'nextPhase'
+      this.journal.add(new JournalEntryImpl(nextPhase));
+    }
+    for (int i = 0; i < listeners.size(); i++) {
+      TransactionListener listener = listeners.get(i);
+      if (!isRollback) {
+        listener.transition(this, currentPhase, nextPhase);
+      } else {
+        listener.rollback(this, currentPhase, nextPhase);
+      }
+    }
+    currentPhase = nextPhase;
+  }
+
+  @Override
+  public boolean prepare() throws IOException {
+    if (!this.parent.isSplittable()) return false;
+    // Split key can be null if this region is unsplittable; i.e. has refs.
+    if (this.splitrow == null) return false;
+    HRegionInfo hri = this.parent.getRegionInfo();
+    parent.prepareToSplit();
+    // Check splitrow.
+    byte [] startKey = hri.getStartKey();
+    byte [] endKey = hri.getEndKey();
+    if (Bytes.equals(startKey, splitrow) ||
+        !this.parent.getRegionInfo().containsRow(splitrow)) {
+      LOG.info("Split row is not inside region key range or is equal to " +
+          "startkey: " + Bytes.toStringBinary(this.splitrow));
+      return false;
+    }
+    long rid = getDaughterRegionIdTimestamp(hri);
+    this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
+    this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);
+
+    transition(SplitTransactionPhase.PREPARED);
+
+    return true;
+  }
+
+  /**
+   * Calculate daughter regionid to use.
+   * @param hri Parent {@link HRegionInfo}
+   * @return Daughter region id (timestamp) to use.
+   */
+  private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) {
+    long rid = EnvironmentEdgeManager.currentTime();
+    // Regionid is timestamp.  Can't be less than that of parent else will insert
+    // at wrong location in hbase:meta (See HBASE-710).
+    if (rid < hri.getRegionId()) {
+      LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() +
+        " but current time here is " + rid);
+      rid = hri.getRegionId() + 1;
+    }
+    return rid;
+  }
+
+  private static IOException closedByOtherException = new IOException(
+      "Failed to close region: already closed by another thread");
+
+  /**
+   * Prepare the regions and region files.
+   * @param server Hosting server instance.  Can be null when testing (won't try
+   * and update in zk if a null server)
+   * @param services Used to online/offline regions.
+   * @throws IOException If thrown, transaction failed.
+   *    Call {@link #rollback(Server, RegionServerServices)}
+   * @return Regions created
+   */
+  @VisibleForTesting
+  PairOfSameType<Region> createDaughters(final Server server,
+      final RegionServerServices services) throws IOException {
+    LOG.info("Starting split of region " + this.parent);
+    if ((server != null && server.isStopped()) ||
+        (services != null && services.isStopping())) {
+      throw new IOException("Server is stopped or stopping");
+    }
+    assert !this.parent.lock.writeLock().isHeldByCurrentThread():
+      "Unsafe to hold write lock while performing RPCs";
+
+    transition(SplitTransactionPhase.BEFORE_PRE_SPLIT_HOOK);
+
+    // Coprocessor callback
+    if (this.parent.getCoprocessorHost() != null) {
+      // TODO: Remove one of these
+      this.parent.getCoprocessorHost().preSplit();
+      this.parent.getCoprocessorHost().preSplit(this.splitrow);
+    }
+
+    transition(SplitTransactionPhase.AFTER_PRE_SPLIT_HOOK);
+
+    // If true, no cluster to write meta edits to or to update znodes in.
+    boolean testing = server == null? true:
+        server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
+    this.fileSplitTimeout = testing ? this.fileSplitTimeout :
+        server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout",
+          this.fileSplitTimeout);
+
+    PairOfSameType<Region> daughterRegions = stepsBeforePONR(server, services, testing);
+
+    List<Mutation> metaEntries = new ArrayList<Mutation>();
+    if (this.parent.getCoprocessorHost() != null) {
+      if (this.parent.getCoprocessorHost().
+          preSplitBeforePONR(this.splitrow, metaEntries)) {
+        throw new IOException("Coprocessor bypassing region "
+            + parent.getRegionInfo().getRegionNameAsString() + " split.");
+      }
+      try {
+        for (Mutation p : metaEntries) {
+          HRegionInfo.parseRegionName(p.getRow());
+        }
+      } catch (IOException e) {
+        LOG.error("Row key of mutation from coprossor is not parsable as region name."
+            + "Mutations from coprocessor should only for hbase:meta table.");
+        throw e;
+      }
+    }
+
+    // This is the point of no return.  Adding subsequent edits to .META. as we
+    // do below when we do the daughter opens adding each to .META. can fail in
+    // various interesting ways the most interesting of which is a timeout
+    // BUT the edits all go through (See HBASE-3872).  IF we reach the PONR
+    // then subsequent failures need to crash out this regionserver; the
+    // server shutdown processing should be able to fix-up the incomplete split.
+    // The offlined parent will have the daughters as extra columns.  If
+    // we leave the daughter regions in place and do not remove them when we
+    // crash out, then they will have their references to the parent in place
+    // still and the server shutdown fixup of .META. will point to these
+    // regions.
+    // We should add PONR JournalEntry before offlineParentInMeta,so even if
+    // OfflineParentInMeta timeout,this will cause regionserver exit,and then
+    // master ServerShutdownHandler will fix daughter & avoid data loss. (See
+    // HBase-4562).
+
+    transition(SplitTransactionPhase.PONR);
+
+    // Edit parent in meta.  Offlines parent region and adds splita and splitb
+    // as an atomic update. See HBASE-7721. This update to META makes the region
+    // will determine whether the region is split or not in case of failures.
+    // If it is successful, master will roll-forward, if not, master will rollback
+    // and assign the parent region.
+    if (services != null && !services.reportRegionStateTransition(TransitionCode.SPLIT_PONR,
+        parent.getRegionInfo(), hri_a, hri_b)) {
+      // Passed PONR, let SSH clean it up
+      throw new IOException("Failed to notify master that split passed PONR: "
+        + parent.getRegionInfo().getRegionNameAsString());
+    }
+    return daughterRegions;
+  }
+
+  @VisibleForTesting
+  Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
+    p.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes
+      .toBytes(sn.getHostAndPort()));
+    p.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER, Bytes.toBytes(sn
+      .getStartcode()));
+    p.add(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER, Bytes.toBytes(openSeqNum));
+    return p;
+  }
+
+  @VisibleForTesting
+  public PairOfSameType<Region> stepsBeforePONR(final Server server,
+      final RegionServerServices services, boolean testing) throws IOException {
+    if (services != null && !services.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT,
+        parent.getRegionInfo(), hri_a, hri_b)) {
+      throw new IOException("Failed to get ok from master to split "
+        + parent.getRegionInfo().getRegionNameAsString());
+    }
+
+    transition(SplitTransactionPhase.SET_SPLITTING);
+
+    this.parent.getRegionFileSystem().createSplitsDir();
+
+    transition(SplitTransactionPhase.CREATE_SPLIT_DIR);
+
+    Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
+    Exception exceptionToThrow = null;
+    try{
+      hstoreFilesToSplit = this.parent.close(false);
+    } catch (Exception e) {
+      exceptionToThrow = e;
+    }
+    if (exceptionToThrow == null && hstoreFilesToSplit == null) {
+      // The region was closed by a concurrent thread.  We can't continue
+      // with the split, instead we must just abandon the split.  If we
+      // reopen or split this could cause problems because the region has
+      // probably already been moved to a different server, or is in the
+      // process of moving to a different server.
+      exceptionToThrow = closedByOtherException;
+    }
+    if (exceptionToThrow != closedByOtherException) {
+      transition(SplitTransactionPhase.CLOSED_PARENT_REGION);
+    }
+    if (exceptionToThrow != null) {
+      if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
+      throw new IOException(exceptionToThrow);
+    }
+    if (!testing) {
+      services.removeFromOnlineRegions(this.parent, null);
+    }
+
+    transition(SplitTransactionPhase.OFFLINED_PARENT);
+
+    // TODO: If splitStoreFiles were multithreaded would we complete steps in
+    // less elapsed time?  St.Ack 20100920
+    //
+    // splitStoreFiles creates daughter region dirs under the parent splits dir
+    // Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will
+    // clean this up.
+    Pair<Integer, Integer> expectedReferences = splitStoreFiles(hstoreFilesToSplit);
+
+    // Log to the journal that we are creating region A, the first daughter
+    // region.  We could fail halfway through.  If we do, we could have left
+    // stuff in fs that needs cleanup -- a storefile or two.  Thats why we
+    // add entry to journal BEFORE rather than AFTER the change.
+
+    transition(SplitTransactionPhase.STARTED_REGION_A_CREATION);
+
+    assertReferenceFileCount(expectedReferences.getFirst(),
+        this.parent.getRegionFileSystem().getSplitsDir(this.hri_a));
+    HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a);
+    assertReferenceFileCount(expectedReferences.getFirst(),
+        new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_a.getEncodedName()));
+
+    // Ditto
+
+    transition(SplitTransactionPhase.STARTED_REGION_B_CREATION);
+
+    assertReferenceFileCount(expectedReferences.getSecond(),
+        this.parent.getRegionFileSystem().getSplitsDir(this.hri_b));
+    HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b);
+    assertReferenceFileCount(expectedReferences.getSecond(),
+        new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_b.getEncodedName()));
+
+    return new PairOfSameType<Region>(a, b);
+  }
+
+  @VisibleForTesting
+  void assertReferenceFileCount(int expectedReferenceFileCount, Path dir)
+      throws IOException {
+    if (expectedReferenceFileCount != 0 &&
+        expectedReferenceFileCount != FSUtils.getRegionReferenceFileCount(parent.getFilesystem(),
+          dir)) {
+      throw new IOException("Failing split. Expected reference file count isn't equal.");
+    }
+  }
+
+  /**
+   * Perform time consuming opening of the daughter regions.
+   * @param server Hosting server instance.  Can be null when testing
+   * @param services Used to online/offline regions.
+   * @param a first daughter region
+   * @param a second daughter region
+   * @throws IOException If thrown, transaction failed.
+   *          Call {@link #rollback(Server, RegionServerServices)}
+   */
+  @VisibleForTesting
+  void openDaughters(final Server server, final RegionServerServices services, Region a,
+      Region b) throws IOException {
+    boolean stopped = server != null && server.isStopped();
+    boolean stopping = services != null && services.isStopping();
+    // TODO: Is this check needed here?
+    if (stopped || stopping) {
+      LOG.info("Not opening daughters " +
+          b.getRegionInfo().getRegionNameAsString() +
+          " and " +
+          a.getRegionInfo().getRegionNameAsString() +
+          " because stopping=" + stopping + ", stopped=" + stopped);
+    } else {
+      // Open daughters in parallel.
+      DaughterOpener aOpener = new DaughterOpener(server, a);
+      DaughterOpener bOpener = new DaughterOpener(server, b);
+      aOpener.start();
+      bOpener.start();
+      try {
+        aOpener.join();
+        if (aOpener.getException() == null) {
+          transition(SplitTransactionPhase.OPENED_REGION_A);
+        }
+        bOpener.join();
+        if (bOpener.getException() == null) {
+          transition(SplitTransactionPhase.OPENED_REGION_B);
+        }
+      } catch (InterruptedException e) {
+        throw (InterruptedIOException)new InterruptedIOException().initCause(e);
+      }
+      if (aOpener.getException() != null) {
+        throw new IOException("Failed " +
+          aOpener.getName(), aOpener.getException());
+      }
+      if (bOpener.getException() != null) {
+        throw new IOException("Failed " +
+          bOpener.getName(), bOpener.getException());
+      }
+      if (services != null) {
+        if (!services.reportRegionStateTransition(TransitionCode.SPLIT,
+            parent.getRegionInfo(), hri_a, hri_b)) {
+          throw new IOException("Failed to report split region to master: "
+            + parent.getRegionInfo().getShortNameToLog());
+        }
+        // Should add it to OnlineRegions
+        services.addToOnlineRegions(b);
+        services.addToOnlineRegions(a);
+      }
+    }
+  }
+
+  @Override
+  public PairOfSameType<Region> execute(final Server server, final RegionServerServices services)
+      throws IOException {
+    this.server = server;
+    this.rsServices = services;
+    PairOfSameType<Region> regions = createDaughters(server, services);
+    stepsAfterPONR(server, services, regions);
+    transition(SplitTransactionPhase.COMPLETED);
+    return regions;
+  }
+
+  @VisibleForTesting
+  void stepsAfterPONR(final Server server,
+      final RegionServerServices services, PairOfSameType<Region> regions)
+      throws IOException {
+    if (this.parent.getCoprocessorHost() != null) {
+      this.parent.getCoprocessorHost().preSplitAfterPONR();
+    }
+
+    openDaughters(server, services, regions.getFirst(), regions.getSecond());
+
+    transition(SplitTransactionPhase.BEFORE_POST_SPLIT_HOOK);
+
+    // Coprocessor callback
+    if (parent.getCoprocessorHost() != null) {
+      parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond());
+    }
+
+    transition(SplitTransactionPhase.AFTER_POST_SPLIT_HOOK);
+  }
+
+  /*
+   * Open daughter region in its own thread.
+   * If we fail, abort this hosting server.
+   */
+  private class DaughterOpener extends HasThread {
+    private final Server server;
+    private final Region r;
+    private Throwable t = null;
+
+    DaughterOpener(final Server s, final Region r) {
+      super((s == null? "null-services": s.getServerName()) +
+        "-daughterOpener=" + r.getRegionInfo().getEncodedName());
+      setDaemon(true);
+      this.server = s;
+      this.r = r;
+    }
+
+    /**
+     * @return Null if open succeeded else exception that causes us fail open.
+     * Call it after this thread exits else you may get wrong view on result.
+     */
+    Throwable getException() {
+      return this.t;
+    }
+
+    @Override
+    public void run() {
+      try {
+        openDaughterRegion(this.server, r);
+      } catch (Throwable t) {
+        this.t = t;
+      }
+    }
+  }
+
+  /**
+   * Open daughter regions, add them to online list and update meta.
+   * @param server
+   * @param daughter
+   * @throws IOException
+   * @throws KeeperException
+   */
+  @VisibleForTesting
+  void openDaughterRegion(final Server server, final Region daughter)
+      throws IOException, KeeperException {
+    HRegionInfo hri = daughter.getRegionInfo();
+    LoggingProgressable reporter = server == null ? null
+        : new LoggingProgressable(hri, server.getConfiguration().getLong(
+            "hbase.regionserver.split.daughter.open.log.interval", 10000));
+    ((HRegion)daughter).openHRegion(reporter);
+  }
+
+  static class LoggingProgressable implements CancelableProgressable {
+    private final HRegionInfo hri;
+    private long lastLog = -1;
+    private final long interval;
+
+    LoggingProgressable(final HRegionInfo hri, final long interval) {
+      this.hri = hri;
+      this.interval = interval;
+    }
+
+    @Override
+    public boolean progress() {
+      long now = EnvironmentEdgeManager.currentTime();
+      if (now - lastLog > this.interval) {
+        LOG.info("Opening " + this.hri.getRegionNameAsString());
+        this.lastLog = now;
+      }
+      return true;
+    }
+  }
+
+  /**
+   * Creates reference files for top and bottom half of the
+   * @param hstoreFilesToSplit map of store files to create half file references for.
+   * @return the number of reference files that were created.
+   * @throws IOException
+   */
+  private Pair<Integer, Integer> splitStoreFiles(
+      final Map<byte[], List<StoreFile>> hstoreFilesToSplit)
+      throws IOException {
+    if (hstoreFilesToSplit == null) {
+      // Could be null because close didn't succeed -- for now consider it fatal
+      throw new IOException("Close returned empty list of StoreFiles");
+    }
+    // The following code sets up a thread pool executor with as many slots as
+    // there's files to split. It then fires up everything, waits for
+    // completion and finally checks for any exception
+    int nbFiles = hstoreFilesToSplit.size();
+    if (nbFiles == 0) {
+      // no file needs to be splitted.
+      return new Pair<Integer, Integer>(0,0);
+    }
+    LOG.info("Preparing to split " + nbFiles + " storefiles for region " + this.parent);
+    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+    builder.setNameFormat("StoreFileSplitter-%1$d");
+    ThreadFactory factory = builder.build();
+    ThreadPoolExecutor threadPool =
+      (ThreadPoolExecutor) Executors.newFixedThreadPool(nbFiles, factory);
+    List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>> (nbFiles);
+
+    // Split each store file.
+    for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
+      for (StoreFile sf: entry.getValue()) {
+        StoreFileSplitter sfs = new StoreFileSplitter(entry.getKey(), sf);
+        futures.add(threadPool.submit(sfs));
+      }
+    }
+    // Shutdown the pool
+    threadPool.shutdown();
+
+    // Wait for all the tasks to finish
+    try {
+      boolean stillRunning = !threadPool.awaitTermination(
+          this.fileSplitTimeout, TimeUnit.MILLISECONDS);
+      if (stillRunning) {
+        threadPool.shutdownNow();
+        // wait for the thread to shutdown completely.
+        while (!threadPool.isTerminated()) {
+          Thread.sleep(50);
+        }
+        throw new IOException("Took too long to split the" +
+            " files and create the references, aborting split");
+      }
+    } catch (InterruptedException e) {
+      throw (InterruptedIOException)new InterruptedIOException().initCause(e);
+    }
+
+    int created_a = 0;
+    int created_b = 0;
+    // Look for any exception
+    for (Future<Pair<Path, Path>> future : futures) {
+      try {
+        Pair<Path, Path> p = future.get();
+        created_a += p.getFirst() != null ? 1 : 0;
+        created_b += p.getSecond() != null ? 1 : 0;
+      } catch (InterruptedException e) {
+        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
+      } catch (ExecutionException e) {
+        throw new IOException(e);
+      }
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Split storefiles for region " + this.parent + " Daugther A: " + created_a
+          + " storefiles, Daugther B: " + created_b + " storefiles.");
+    }
+    return new Pair<Integer, Integer>(created_a, created_b);
+  }
+
+  private Pair<Path, Path> splitStoreFile(final byte[] family, final StoreFile sf)
+      throws IOException {
+    HRegionFileSystem fs = this.parent.getRegionFileSystem();
+    String familyName = Bytes.toString(family);
+    Path path_a =
+        fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false,
+          this.parent.getSplitPolicy());
+    Path path_b =
+        fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true,
+          this.parent.getSplitPolicy());
+    return new Pair<Path,Path>(path_a, path_b);
+  }
+
+  /**
+   * Utility class used to do the file splitting / reference writing
+   * in parallel instead of sequentially.
+   */
+  private class StoreFileSplitter implements Callable<Pair<Path,Path>> {
+    private final byte[] family;
+    private final StoreFile sf;
+
+    /**
+     * Constructor that takes what it needs to split
+     * @param family Family that contains the store file
+     * @param sf which file
+     */
+    public StoreFileSplitter(final byte[] family, final StoreFile sf) {
+      this.sf = sf;
+      this.family = family;
+    }
+
+    public Pair<Path,Path> call() throws IOException {
+      return splitStoreFile(family, sf);
+    }
+  }
+
+  @Override
+  public boolean rollback(final Server server, final RegionServerServices services)
+      throws IOException {
+    this.server = server;
+    this.rsServices = services;
+    // Coprocessor callback
+    if (this.parent.getCoprocessorHost() != null) {
+      this.parent.getCoprocessorHost().preRollBackSplit();
+    }
+
+    boolean result = true;
+    ListIterator<JournalEntry> iterator =
+      this.journal.listIterator(this.journal.size());
+    // Iterate in reverse.
+    while (iterator.hasPrevious()) {
+      JournalEntry je = iterator.previous();
+
+      transition(je.getPhase(), true);
+
+      switch (je.getPhase()) {
+
+      case SET_SPLITTING:
+        if (services != null
+            && !services.reportRegionStateTransition(TransitionCode.SPLIT_REVERTED,
+                parent.getRegionInfo(), hri_a, hri_b)) {
+          return false;
+        }
+        break;
+
+      case CREATE_SPLIT_DIR:
+        this.parent.writestate.writesEnabled = true;
+        this.parent.getRegionFileSystem().cleanupSplitsDir();
+        break;
+
+      case CLOSED_PARENT_REGION:
+        try {
+          // So, this returns a seqid but if we just closed and then reopened, we
+          // should be ok. On close, we flushed using sequenceid obtained from
+          // hosting regionserver so no need to propagate the sequenceid returned
+          // out of initialize below up into regionserver as we normally do.
+          // TODO: Verify.
+          this.parent.initialize();
+        } catch (IOException e) {
+          LOG.error("Failed rollbacking CLOSED_PARENT_REGION of region " +
+            parent.getRegionInfo().getRegionNameAsString(), e);
+          throw new RuntimeException(e);
+        }
+        break;
+
+      case STARTED_REGION_A_CREATION:
+        this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_a);
+        break;
+
+      case STARTED_REGION_B_CREATION:
+        this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_b);
+        break;
+
+      case OFFLINED_PARENT:
+        if (services != null) services.addToOnlineRegions(this.parent);
+        break;
+
+      case PONR:
+        // We got to the point-of-no-return so we need to just abort. Return
+        // immediately.  Do not clean up created daughter regions.  They need
+        // to be in place so we don't delete the parent region mistakenly.
+        // See HBASE-3872.
+        return false;
+
+      // Informational only cases
+      case STARTED:
+      case PREPARED:
+      case BEFORE_PRE_SPLIT_HOOK:
+      case AFTER_PRE_SPLIT_HOOK:
+      case BEFORE_POST_SPLIT_HOOK:
+      case AFTER_POST_SPLIT_HOOK:
+      case OPENED_REGION_A:
+      case OPENED_REGION_B:
+      case COMPLETED:
+        break;
+
+      default:
+        throw new RuntimeException("Unhandled journal entry: " + je);
+      }
+    }
+    // Coprocessor callback
+    if (this.parent.getCoprocessorHost() != null) {
+      this.parent.getCoprocessorHost().postRollBackSplit();
+    }
+    return result;
+  }
+
+  /* package */ HRegionInfo getFirstDaughter() {
+    return hri_a;
+  }
+
+  /* package */ HRegionInfo getSecondDaughter() {
+    return hri_b;
+  }
+
+  @Override
+  public List<JournalEntry> getJournal() {
+    return journal;
+  }
+
+  @Override
+  public SplitTransaction registerTransactionListener(TransactionListener listener) {
+    listeners.add(listener);
+    return this;
+  }
+
+  @Override
+  public Server getServer() {
+    return server;
+  }
+
+  @Override
+  public RegionServerServices getRegionServerServices() {
+    return rsServices;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e156ed61/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
index 6deade8..10ecae3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.SplitTransaction;
+import org.apache.hadoop.hbase.regionserver.SplitTransactionFactory;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
@@ -500,7 +501,8 @@ public class TestCoprocessorInterface {
   private Region [] split(final Region r, final byte [] splitRow) throws IOException {
     Region[] regions = new Region[2];
 
-    SplitTransaction st = new SplitTransaction(r, splitRow);
+    SplitTransaction st = new SplitTransactionFactory(TEST_UTIL.getConfiguration())
+      .create(r, splitRow);
     int i = 0;
 
     if (!st.prepare()) {
@@ -509,8 +511,7 @@ public class TestCoprocessorInterface {
     }
     try {
       Server mockServer = Mockito.mock(Server.class);
-      when(mockServer.getConfiguration()).thenReturn(
-          TEST_UTIL.getConfiguration());
+      when(mockServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
       PairOfSameType<Region> daughters = st.execute(mockServer, null);
       for (Region each_daughter: daughters) {
         regions[i] = each_daughter;

http://git-wip-us.apache.org/repos/asf/hbase/blob/e156ed61/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerObserver.java
index de43feb..e013cbb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerObserver.java
@@ -40,7 +40,8 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction;
+import org.apache.hadoop.hbase.regionserver.RegionMergeTransactionFactory;
+import org.apache.hadoop.hbase.regionserver.RegionMergeTransactionImpl;
 import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
 import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -110,7 +111,7 @@ public class TestRegionServerObserver {
   }
 
   public static class CPRegionServerObserver extends BaseRegionServerObserver {
-    private RegionMergeTransaction rmt = null;
+    private RegionMergeTransactionImpl rmt = null;
     private HRegion mergedRegion = null;
 
     private boolean preMergeCalled;
@@ -143,7 +144,8 @@ public class TestRegionServerObserver {
       HRegionServer rs = (HRegionServer) environment.getRegionServerServices();
       List<Region> onlineRegions =
           rs.getOnlineRegions(TableName.valueOf("testRegionServerObserver_2"));
-      rmt = new RegionMergeTransaction(onlineRegions.get(0), onlineRegions.get(1), true);
+      rmt = (RegionMergeTransactionImpl) new RegionMergeTransactionFactory(rs.getConfiguration())
+        .create(onlineRegions.get(0), onlineRegions.get(1), true);
       if (!rmt.prepare(rs)) {
         LOG.error("Prepare for the region merge of table "
             + onlineRegions.get(0).getTableDesc().getNameAsString()

http://git-wip-us.apache.org/repos/asf/hbase/blob/e156ed61/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
index ca96f50..be43950 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
@@ -106,7 +106,7 @@ public class TestEndToEndSplitTransaction {
       byte[] regionName = conn.getRegionLocator(tableName).getRegionLocation(splitRow)
           .getRegionInfo().getRegionName();
       Region region = server.getRegion(regionName);
-      SplitTransaction split = new SplitTransaction(region, splitRow);
+      SplitTransactionImpl split = new SplitTransactionImpl((HRegion) region, splitRow);
       split.prepare();
 
       // 1. phase I

http://git-wip-us.apache.org/repos/asf/hbase/blob/e156ed61/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index cb8d0be..6a5e844 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -2490,7 +2490,7 @@ public class TestHRegion {
    */
   HRegion[] splitRegion(final HRegion parent, final byte[] midkey) throws IOException {
     PairOfSameType<Region> result = null;
-    SplitTransaction st = new SplitTransaction(parent, midkey);
+    SplitTransactionImpl st = new SplitTransactionImpl(parent, midkey);
     // If prepare does not return true, for some reason -- logged inside in
     // the prepare call -- we are not ready to split just now. Just return.
     if (!st.prepare()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/e156ed61/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransaction.java
index 4a2f9ba..313a6ba 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransaction.java
@@ -60,7 +60,7 @@ import org.mockito.Mockito;
 import com.google.common.collect.ImmutableList;
 
 /**
- * Test the {@link RegionMergeTransaction} class against two HRegions (as
+ * Test the {@link RegionMergeTransactionImpl} class against two HRegions (as
  * opposed to running cluster).
  */
 @Category({RegionServerTests.class, SmallTests.class})
@@ -120,10 +120,10 @@ public class TestRegionMergeTransaction {
     prepareOnGoodRegions();
   }
 
-  private RegionMergeTransaction prepareOnGoodRegions() throws IOException {
-    RegionMergeTransaction mt = new RegionMergeTransaction(region_a, region_b,
+  private RegionMergeTransactionImpl prepareOnGoodRegions() throws IOException {
+    RegionMergeTransactionImpl mt = new RegionMergeTransactionImpl(region_a, region_b,
         false);
-    RegionMergeTransaction spyMT = Mockito.spy(mt);
+    RegionMergeTransactionImpl spyMT = Mockito.spy(mt);
     doReturn(false).when(spyMT).hasMergeQualifierInMeta(null,
         region_a.getRegionInfo().getRegionName());
     doReturn(false).when(spyMT).hasMergeQualifierInMeta(null,
@@ -137,7 +137,7 @@ public class TestRegionMergeTransaction {
    */
   @Test
   public void testPrepareWithSameRegion() throws IOException {
-    RegionMergeTransaction mt = new RegionMergeTransaction(this.region_a,
+    RegionMergeTransactionImpl mt = new RegionMergeTransactionImpl(this.region_a,
         this.region_a, true);
     assertFalse("should not merge the same region even if it is forcible ",
         mt.prepare(null));
@@ -148,7 +148,7 @@ public class TestRegionMergeTransaction {
    */
   @Test
   public void testPrepareWithRegionsNotAdjacent() throws IOException {
-    RegionMergeTransaction mt = new RegionMergeTransaction(this.region_a,
+    RegionMergeTransactionImpl mt = new RegionMergeTransactionImpl(this.region_a,
         this.region_c, false);
     assertFalse("should not merge two regions if they are adjacent except it is forcible",
         mt.prepare(null));
@@ -160,9 +160,9 @@ public class TestRegionMergeTransaction {
   @Test
   public void testPrepareWithRegionsNotAdjacentUnderCompulsory()
       throws IOException {
-    RegionMergeTransaction mt = new RegionMergeTransaction(region_a, region_c,
+    RegionMergeTransactionImpl mt = new RegionMergeTransactionImpl(region_a, region_c,
         true);
-    RegionMergeTransaction spyMT = Mockito.spy(mt);
+    RegionMergeTransactionImpl spyMT = Mockito.spy(mt);
     doReturn(false).when(spyMT).hasMergeQualifierInMeta(null,
         region_a.getRegionInfo().getRegionName());
     doReturn(false).when(spyMT).hasMergeQualifierInMeta(null,
@@ -181,7 +181,7 @@ public class TestRegionMergeTransaction {
     when(storeMock.getFamily()).thenReturn(new HColumnDescriptor("cf"));
     when(storeMock.close()).thenReturn(ImmutableList.<StoreFile>of());
     this.region_a.stores.put(Bytes.toBytes(""), storeMock);
-    RegionMergeTransaction mt = new RegionMergeTransaction(this.region_a,
+    RegionMergeTransactionImpl mt = new RegionMergeTransactionImpl(this.region_a,
         this.region_b, false);
     assertFalse(
         "a region should not be mergeable if it has instances of store file references",
@@ -191,7 +191,7 @@ public class TestRegionMergeTransaction {
   @Test
   public void testPrepareWithClosedRegion() throws IOException {
     this.region_a.close();
-    RegionMergeTransaction mt = new RegionMergeTransaction(this.region_a,
+    RegionMergeTransactionImpl mt = new RegionMergeTransactionImpl(this.region_a,
         this.region_b, false);
     assertFalse(mt.prepare(null));
   }
@@ -202,9 +202,9 @@ public class TestRegionMergeTransaction {
    */
   @Test
   public void testPrepareWithRegionsWithMergeReference() throws IOException {
-    RegionMergeTransaction mt = new RegionMergeTransaction(region_a, region_b,
+    RegionMergeTransactionImpl mt = new RegionMergeTransactionImpl(region_a, region_b,
         false);
-    RegionMergeTransaction spyMT = Mockito.spy(mt);
+    RegionMergeTransactionImpl spyMT = Mockito.spy(mt);
     doReturn(true).when(spyMT).hasMergeQualifierInMeta(null,
         region_a.getRegionInfo().getRegionName());
     doReturn(true).when(spyMT).hasMergeQualifierInMeta(null,
@@ -221,14 +221,14 @@ public class TestRegionMergeTransaction {
     assertEquals(rowCountOfRegionB, countRows(this.region_b));
 
     // Start transaction.
-    RegionMergeTransaction mt = prepareOnGoodRegions();
+    RegionMergeTransactionImpl mt = prepareOnGoodRegions();
 
     // Run the execute. Look at what it returns.
     TEST_UTIL.getConfiguration().setInt(HConstants.REGIONSERVER_PORT, 0);
     CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(
       TEST_UTIL.getConfiguration());
     Server mockServer = new HRegionServer(TEST_UTIL.getConfiguration(), cp);
-    HRegion mergedRegion = mt.execute(mockServer, null);
+    HRegion mergedRegion = (HRegion)mt.execute(mockServer, null);
     // Do some assertions about execution.
     assertTrue(this.fs.exists(mt.getMergesDir()));
     // Assert region_a and region_b is closed.
@@ -265,7 +265,7 @@ public class TestRegionMergeTransaction {
     assertEquals(rowCountOfRegionB, countRows(this.region_b));
 
     // Start transaction.
-    RegionMergeTransaction mt = prepareOnGoodRegions();
+    RegionMergeTransactionImpl mt = prepareOnGoodRegions();
 
     when(mt.createMergedRegionFromMerges(region_a, region_b,
         mt.getMergedRegionInfo())).thenThrow(
@@ -301,7 +301,7 @@ public class TestRegionMergeTransaction {
 
     // Now retry the merge but do not throw an exception this time.
     assertTrue(mt.prepare(null));
-    HRegion mergedRegion = mt.execute(mockServer, null);
+    HRegion mergedRegion = (HRegion)mt.execute(mockServer, null);
     // Count rows. daughters are already open
     // Count rows. merged region are already open
     try {
@@ -325,7 +325,7 @@ public class TestRegionMergeTransaction {
     assertEquals(rowCountOfRegionB, countRows(this.region_b));
 
     // Start transaction.
-    RegionMergeTransaction mt = prepareOnGoodRegions();
+    RegionMergeTransactionImpl mt = prepareOnGoodRegions();
     Mockito.doThrow(new MockedFailedMergedRegionOpen())
         .when(mt)
         .openMergedRegion((Server) Mockito.anyObject(),
@@ -365,31 +365,31 @@ public class TestRegionMergeTransaction {
     byte[] z = Bytes.toBytes("z");
     HRegionInfo r1 = new HRegionInfo(tableName);
     HRegionInfo r2 = new HRegionInfo(tableName, a, z);
-    HRegionInfo m = RegionMergeTransaction.getMergedRegionInfo(r1, r2);
+    HRegionInfo m = RegionMergeTransactionImpl.getMergedRegionInfo(r1, r2);
     assertTrue(Bytes.equals(m.getStartKey(), r1.getStartKey())
         && Bytes.equals(m.getEndKey(), r1.getEndKey()));
 
     r1 = new HRegionInfo(tableName, null, a);
     r2 = new HRegionInfo(tableName, a, z);
-    m = RegionMergeTransaction.getMergedRegionInfo(r1, r2);
+    m = RegionMergeTransactionImpl.getMergedRegionInfo(r1, r2);
     assertTrue(Bytes.equals(m.getStartKey(), r1.getStartKey())
         && Bytes.equals(m.getEndKey(), r2.getEndKey()));
 
     r1 = new HRegionInfo(tableName, null, a);
     r2 = new HRegionInfo(tableName, z, null);
-    m = RegionMergeTransaction.getMergedRegionInfo(r1, r2);
+    m = RegionMergeTransactionImpl.getMergedRegionInfo(r1, r2);
     assertTrue(Bytes.equals(m.getStartKey(), r1.getStartKey())
         && Bytes.equals(m.getEndKey(), r2.getEndKey()));
 
     r1 = new HRegionInfo(tableName, a, z);
     r2 = new HRegionInfo(tableName, z, null);
-    m = RegionMergeTransaction.getMergedRegionInfo(r1, r2);
+    m = RegionMergeTransactionImpl.getMergedRegionInfo(r1, r2);
     assertTrue(Bytes.equals(m.getStartKey(), r1.getStartKey())
       && Bytes.equals(m.getEndKey(), r2.getEndKey()));
 
     r1 = new HRegionInfo(tableName, a, b);
     r2 = new HRegionInfo(tableName, b, z);
-    m = RegionMergeTransaction.getMergedRegionInfo(r1, r2);
+    m = RegionMergeTransactionImpl.getMergedRegionInfo(r1, r2);
     assertTrue(Bytes.equals(m.getStartKey(), r1.getStartKey())
       && Bytes.equals(m.getEndKey(), r2.getEndKey()));
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e156ed61/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
index f4b6f02..2a949a1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
@@ -81,7 +81,7 @@ import com.google.protobuf.ServiceException;
 
 /**
  * Like {@link TestRegionMergeTransaction} in that we're testing
- * {@link RegionMergeTransaction} only the below tests are against a running
+ * {@link RegionMergeTransactionImpl} only the below tests are against a running
  * cluster where {@link TestRegionMergeTransaction} is tests against bare
  * {@link HRegion}.
  */

http://git-wip-us.apache.org/repos/asf/hbase/blob/e156ed61/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java
index 0ed7645..da4f811 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java
@@ -67,7 +67,7 @@ import org.mockito.Mockito;
 import com.google.common.collect.ImmutableList;
 
 /**
- * Test the {@link SplitTransaction} class against an HRegion (as opposed to
+ * Test the {@link SplitTransactionImpl} class against an HRegion (as opposed to
  * running cluster).
  */
 @Category({RegionServerTests.class, SmallTests.class})
@@ -120,8 +120,8 @@ public class TestSplitTransaction {
     assertEquals(rowcount, parentRowCount);
 
     // Start transaction.
-    SplitTransaction st = prepareGOOD_SPLIT_ROW();
-    SplitTransaction spiedUponSt = spy(st);
+    SplitTransactionImpl st = prepareGOOD_SPLIT_ROW();
+    SplitTransactionImpl spiedUponSt = spy(st);
     Mockito
         .doThrow(new MockedFailedDaughterOpen())
         .when(spiedUponSt)
@@ -161,12 +161,13 @@ public class TestSplitTransaction {
     prepareGOOD_SPLIT_ROW();
   }
 
-  private SplitTransaction prepareGOOD_SPLIT_ROW() {
+  private SplitTransactionImpl prepareGOOD_SPLIT_ROW() throws IOException {
     return prepareGOOD_SPLIT_ROW(this.parent);
   }
 
-  private SplitTransaction prepareGOOD_SPLIT_ROW(final HRegion parentRegion) {
-    SplitTransaction st = new SplitTransaction(parentRegion, GOOD_SPLIT_ROW);
+  private SplitTransactionImpl prepareGOOD_SPLIT_ROW(final HRegion parentRegion)
+      throws IOException {
+    SplitTransactionImpl st = new SplitTransactionImpl(parentRegion, GOOD_SPLIT_ROW);
     assertTrue(st.prepare());
     return st;
   }
@@ -181,7 +182,7 @@ public class TestSplitTransaction {
     when(storeMock.close()).thenReturn(ImmutableList.<StoreFile>of());
     this.parent.stores.put(Bytes.toBytes(""), storeMock);
 
-    SplitTransaction st = new SplitTransaction(this.parent, GOOD_SPLIT_ROW);
+    SplitTransactionImpl st = new SplitTransactionImpl(this.parent, GOOD_SPLIT_ROW);
 
     assertFalse("a region should not be splittable if it has instances of store file references",
                 st.prepare());
@@ -192,19 +193,19 @@ public class TestSplitTransaction {
    */
   @Test public void testPrepareWithBadSplitRow() throws IOException {
     // Pass start row as split key.
-    SplitTransaction st = new SplitTransaction(this.parent, STARTROW);
+    SplitTransactionImpl st = new SplitTransactionImpl(this.parent, STARTROW);
     assertFalse(st.prepare());
-    st = new SplitTransaction(this.parent, HConstants.EMPTY_BYTE_ARRAY);
+    st = new SplitTransactionImpl(this.parent, HConstants.EMPTY_BYTE_ARRAY);
     assertFalse(st.prepare());
-    st = new SplitTransaction(this.parent, new byte [] {'A', 'A', 'A'});
+    st = new SplitTransactionImpl(this.parent, new byte [] {'A', 'A', 'A'});
     assertFalse(st.prepare());
-    st = new SplitTransaction(this.parent, ENDROW);
+    st = new SplitTransactionImpl(this.parent, ENDROW);
     assertFalse(st.prepare());
   }
 
   @Test public void testPrepareWithClosedRegion() throws IOException {
     this.parent.close();
-    SplitTransaction st = new SplitTransaction(this.parent, GOOD_SPLIT_ROW);
+    SplitTransactionImpl st = new SplitTransactionImpl(this.parent, GOOD_SPLIT_ROW);
     assertFalse(st.prepare());
   }
 
@@ -220,7 +221,7 @@ public class TestSplitTransaction {
     ((LruBlockCache) cacheConf.getBlockCache()).clearCache();
 
     // Start transaction.
-    SplitTransaction st = prepareGOOD_SPLIT_ROW();
+    SplitTransactionImpl st = prepareGOOD_SPLIT_ROW();
 
     // Run the execute.  Look at what it returns.
     Server mockServer = Mockito.mock(Server.class);
@@ -266,8 +267,8 @@ public class TestSplitTransaction {
 
     // Start transaction.
     HRegion spiedRegion = spy(this.parent);
-    SplitTransaction st = prepareGOOD_SPLIT_ROW(spiedRegion);
-    SplitTransaction spiedUponSt = spy(st);
+    SplitTransactionImpl st = prepareGOOD_SPLIT_ROW(spiedRegion);
+    SplitTransactionImpl spiedUponSt = spy(st);
     doThrow(new IOException("Failing split. Expected reference file count isn't equal."))
         .when(spiedUponSt).assertReferenceFileCount(anyInt(),
         eq(new Path(this.parent.getRegionFileSystem().getTableDir(),
@@ -294,8 +295,8 @@ public class TestSplitTransaction {
 
     // Start transaction.
     HRegion spiedRegion = spy(this.parent);
-    SplitTransaction st = prepareGOOD_SPLIT_ROW(spiedRegion);
-    SplitTransaction spiedUponSt = spy(st);
+    SplitTransactionImpl st = prepareGOOD_SPLIT_ROW(spiedRegion);
+    SplitTransactionImpl spiedUponSt = spy(st);
     doNothing().when(spiedUponSt).assertReferenceFileCount(anyInt(),
         eq(parent.getRegionFileSystem().getSplitsDir(st.getFirstDaughter())));
     when(spiedRegion.createDaughterRegionFromSplits(spiedUponSt.getSecondDaughter())).