You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2015/04/01 03:03:51 UTC

[2/6] hbase git commit: HBASE-12975 Supportable SplitTransaction and RegionMergeTransaction interfaces

http://git-wip-us.apache.org/repos/asf/hbase/blob/bf76082f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
index c3bcfec..a21c19d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
@@ -1,5 +1,4 @@
-/**
- *
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,100 +18,62 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
-import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.CancelableProgressable;
-import org.apache.hadoop.hbase.util.ConfigUtil;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.HasThread;
-import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.util.PairOfSameType;
-import org.apache.zookeeper.KeeperException;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * Executes region split as a "transaction".  Call {@link #prepare()} to setup
  * the transaction, {@link #execute(Server, RegionServerServices)} to run the
  * transaction and {@link #rollback(Server, RegionServerServices)} to cleanup if execute fails.
  *
- * <p>Here is an example of how you would use this class:
+ * <p>Here is an example of how you would use this interface:
  * <pre>
- *  SplitTransaction st = new SplitTransaction(this.conf, parent, midKey)
+ *  SplitTransactionFactory factory = new SplitTransactionFactory(conf);
+ *  SplitTransaction st = factory.create(parent, midKey)
+ *    .registerTransactionListener(new TransactionListener() {
+ *       public void transition(SplitTransaction transaction, SplitTransactionPhase from,
+ *           SplitTransactionPhase to) throws IOException {
+ *         // ...
+ *       }
+ *       public void rollback(SplitTransaction transaction, SplitTransactionPhase from,
+ *           SplitTransactionPhase to) {
+ *         // ...
+ *       }
+ *    });
  *  if (!st.prepare()) return;
  *  try {
  *    st.execute(server, services);
- *  } catch (IOException ioe) {
+ *  } catch (IOException e) {
  *    try {
  *      st.rollback(server, services);
  *      return;
  *    } catch (RuntimeException e) {
- *      myAbortable.abort("Failed split, abort");
+ *      // abort the server
  *    }
  *  }
  * </Pre>
- * <p>This class is not thread safe.  Caller needs ensure split is run by
+ * <p>A split transaction is not thread safe.  Callers must ensure a split is run by
  * one thread only.
  */
-@InterfaceAudience.Private
-public class SplitTransaction {
-  private static final Log LOG = LogFactory.getLog(SplitTransaction.class);
-
-  /*
-   * Region to split
-   */
-  private final HRegion parent;
-  private HRegionInfo hri_a;
-  private HRegionInfo hri_b;
-  private long fileSplitTimeout = 30000;
-  public SplitTransactionCoordination.SplitTransactionDetails std;
-  boolean useZKForAssignment;
-
-  /*
-   * Row to split around
-   */
-  private final byte [] splitrow;
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+@InterfaceStability.Evolving
+public interface SplitTransaction {
 
   /**
-   * Types to add to the transaction journal.
-   * Each enum is a step in the split transaction. Used to figure how much
-   * we need to rollback.
+   * Each enum is a step in the split transaction.
    */
-  static enum JournalEntryType {
+  public enum SplitTransactionPhase {
     /**
      * Started
      */
     STARTED,
     /**
-     * Prepared (after table lock)
+     * Prepared
      */
     PREPARED,
     /**
@@ -156,6 +117,12 @@ public class SplitTransaction {
      */
     OPENED_REGION_B,
     /**
+     * Point of no return.
+     * If we got here, then transaction is not recoverable other than by
+     * crashing out the regionserver.
+     */
+    PONR,
+    /**
      * Before postSplit coprocessor hook
      */
     BEFORE_POST_SPLIT_HOOK,
@@ -164,369 +131,60 @@ public class SplitTransaction {
      */
     AFTER_POST_SPLIT_HOOK,
     /**
-     * Point of no return.
-     * If we got here, then transaction is not recoverable other than by
-     * crashing out the regionserver.
+     * Completed
      */
-    PONR
+    COMPLETED
   }
 
-  static class JournalEntry {
-    private JournalEntryType type;
-    private long timestamp;
-
-    public JournalEntry(JournalEntryType type) {
-      this(type, EnvironmentEdgeManager.currentTime());
-    }
-
-    public JournalEntry(JournalEntryType type, long timestamp) {
-      this.type = type;
-      this.timestamp = timestamp;
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder sb = new StringBuilder();
-      sb.append(type);
-      sb.append(" at ");
-      sb.append(timestamp);
-      return sb.toString();
-    }
-  }
-
-  /*
-   * Journal of how far the split transaction has progressed.
-   */
-  private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
-
   /**
-   * Constructor
-   * @param r Region to split
-   * @param splitrow Row to split around
+   * Split transaction journal entry
    */
-  public SplitTransaction(final Region r, final byte [] splitrow) {
-    this.parent = (HRegion)r;
-    this.splitrow = splitrow;
-    this.journal.add(new JournalEntry(JournalEntryType.STARTED));
-    useZKForAssignment = ConfigUtil.useZKForAssignment(parent.getBaseConf());
-  }
+  public interface JournalEntry {
 
-  /**
-   * Does checks on split inputs.
-   * @return <code>true</code> if the region is splittable else
-   * <code>false</code> if it is not (e.g. its already closed, etc.).
-   */
-  public boolean prepare() {
-    if (!this.parent.isSplittable()) return false;
-    // Split key can be null if this region is unsplittable; i.e. has refs.
-    if (this.splitrow == null) return false;
-    HRegionInfo hri = this.parent.getRegionInfo();
-    parent.prepareToSplit();
-    // Check splitrow.
-    byte [] startKey = hri.getStartKey();
-    byte [] endKey = hri.getEndKey();
-    if (Bytes.equals(startKey, splitrow) ||
-        !this.parent.getRegionInfo().containsRow(splitrow)) {
-      LOG.info("Split row is not inside region key range or is equal to " +
-          "startkey: " + Bytes.toStringBinary(this.splitrow));
-      return false;
-    }
-    long rid = getDaughterRegionIdTimestamp(hri);
-    this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
-    this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);
-    this.journal.add(new JournalEntry(JournalEntryType.PREPARED));
-    return true;
-  }
+    /** @return the completed phase marked by this journal entry */
+    SplitTransactionPhase getPhase();
 
-  /**
-   * Calculate daughter regionid to use.
-   * @param hri Parent {@link HRegionInfo}
-   * @return Daughter region id (timestamp) to use.
-   */
-  private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) {
-    long rid = EnvironmentEdgeManager.currentTime();
-    // Regionid is timestamp.  Can't be less than that of parent else will insert
-    // at wrong location in hbase:meta (See HBASE-710).
-    if (rid < hri.getRegionId()) {
-      LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() +
-        " but current time here is " + rid);
-      rid = hri.getRegionId() + 1;
-    }
-    return rid;
+    /** @return the time of phase completion */
+    long getTimeStamp();
   }
 
-  private static IOException closedByOtherException = new IOException(
-      "Failed to close region: already closed by another thread");
-
   /**
-   * Prepare the regions and region files.
-   * @param server Hosting server instance.  Can be null when testing (won't try
-   * and update in zk if a null server)
-   * @param services Used to online/offline regions.
-   * @throws IOException If thrown, transaction failed.
-   *    Call {@link #rollback(Server, RegionServerServices)}
-   * @return Regions created
+   * Split transaction listener
    */
-  /* package */PairOfSameType<Region> createDaughters(final Server server,
-      final RegionServerServices services) throws IOException {
-    LOG.info("Starting split of region " + this.parent);
-    if ((server != null && server.isStopped()) ||
-        (services != null && services.isStopping())) {
-      throw new IOException("Server is stopped or stopping");
-    }
-    assert !this.parent.lock.writeLock().isHeldByCurrentThread():
-      "Unsafe to hold write lock while performing RPCs";
-
-    journal.add(new JournalEntry(JournalEntryType.BEFORE_PRE_SPLIT_HOOK));
-
-    // Coprocessor callback
-    if (this.parent.getCoprocessorHost() != null) {
-      // TODO: Remove one of these
-      this.parent.getCoprocessorHost().preSplit();
-      this.parent.getCoprocessorHost().preSplit(this.splitrow);
-    }
-
-    journal.add(new JournalEntry(JournalEntryType.AFTER_PRE_SPLIT_HOOK));
-
-    // If true, no cluster to write meta edits to or to update znodes in.
-    boolean testing = server == null? true:
-        server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
-    this.fileSplitTimeout = testing ? this.fileSplitTimeout :
-        server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout",
-          this.fileSplitTimeout);
-
-    PairOfSameType<Region> daughterRegions = stepsBeforePONR(server, services, testing);
-
-    List<Mutation> metaEntries = new ArrayList<Mutation>();
-    if (this.parent.getCoprocessorHost() != null) {
-      if (this.parent.getCoprocessorHost().
-          preSplitBeforePONR(this.splitrow, metaEntries)) {
-        throw new IOException("Coprocessor bypassing region "
-            + this.parent.getRegionInfo().getRegionNameAsString() + " split.");
-      }
-      try {
-        for (Mutation p : metaEntries) {
-          HRegionInfo.parseRegionName(p.getRow());
-        }
-      } catch (IOException e) {
-        LOG.error("Row key of mutation from coprossor is not parsable as region name."
-            + "Mutations from coprocessor should only for hbase:meta table.");
-        throw e;
-      }
-    }
-
-    // This is the point of no return.  Adding subsequent edits to .META. as we
-    // do below when we do the daughter opens adding each to .META. can fail in
-    // various interesting ways the most interesting of which is a timeout
-    // BUT the edits all go through (See HBASE-3872).  IF we reach the PONR
-    // then subsequent failures need to crash out this regionserver; the
-    // server shutdown processing should be able to fix-up the incomplete split.
-    // The offlined parent will have the daughters as extra columns.  If
-    // we leave the daughter regions in place and do not remove them when we
-    // crash out, then they will have their references to the parent in place
-    // still and the server shutdown fixup of .META. will point to these
-    // regions.
-    // We should add PONR JournalEntry before offlineParentInMeta,so even if
-    // OfflineParentInMeta timeout,this will cause regionserver exit,and then
-    // master ServerShutdownHandler will fix daughter & avoid data loss. (See
-    // HBase-4562).
-    this.journal.add(new JournalEntry(JournalEntryType.PONR));
-
-    // Edit parent in meta.  Offlines parent region and adds splita and splitb
-    // as an atomic update. See HBASE-7721. This update to META makes the region
-    // will determine whether the region is split or not in case of failures.
-    // If it is successful, master will roll-forward, if not, master will rollback
-    // and assign the parent region.
-    if (!testing && useZKForAssignment) {
-      if (metaEntries == null || metaEntries.isEmpty()) {
-        MetaTableAccessor.splitRegion(server.getConnection(),
-          parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(),
-          daughterRegions.getSecond().getRegionInfo(), server.getServerName(),
-          parent.getTableDesc().getRegionReplication());
-      } else {
-        offlineParentInMetaAndputMetaEntries(server.getConnection(),
-          parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), daughterRegions
-              .getSecond().getRegionInfo(), server.getServerName(), metaEntries,
-              parent.getTableDesc().getRegionReplication());
-      }
-    } else if (services != null && !useZKForAssignment) {
-      if (!services.reportRegionStateTransition(TransitionCode.SPLIT_PONR,
-          parent.getRegionInfo(), hri_a, hri_b)) {
-        // Passed PONR, let SSH clean it up
-        throw new IOException("Failed to notify master that split passed PONR: "
-          + parent.getRegionInfo().getRegionNameAsString());
-      }
-    }
-    return daughterRegions;
-  }
-
-  public PairOfSameType<Region> stepsBeforePONR(final Server server,
-      final RegionServerServices services, boolean testing) throws IOException {
-
-    if (useCoordinatedStateManager(server)) {
-      if (std == null) {
-        std =
-            ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
-                .getSplitTransactionCoordination().getDefaultDetails();
-      }
-      ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
-          .getSplitTransactionCoordination().startSplitTransaction(parent, server.getServerName(),
-            hri_a, hri_b);
-    } else if (services != null && !useZKForAssignment) {
-      if (!services.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT,
-          parent.getRegionInfo(), hri_a, hri_b)) {
-        throw new IOException("Failed to get ok from master to split "
-          + parent.getRegionInfo().getRegionNameAsString());
-      }
-    }
-    this.journal.add(new JournalEntry(JournalEntryType.SET_SPLITTING));
-    if (useCoordinatedStateManager(server)) {
-      ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
-          .getSplitTransactionCoordination().waitForSplitTransaction(services, parent, hri_a,
-            hri_b, std);
-    }
-
-    this.parent.getRegionFileSystem().createSplitsDir();
-    this.journal.add(new JournalEntry(JournalEntryType.CREATE_SPLIT_DIR));
-
-    Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
-    Exception exceptionToThrow = null;
-    try{
-      hstoreFilesToSplit = this.parent.close(false);
-    } catch (Exception e) {
-      exceptionToThrow = e;
-    }
-    if (exceptionToThrow == null && hstoreFilesToSplit == null) {
-      // The region was closed by a concurrent thread.  We can't continue
-      // with the split, instead we must just abandon the split.  If we
-      // reopen or split this could cause problems because the region has
-      // probably already been moved to a different server, or is in the
-      // process of moving to a different server.
-      exceptionToThrow = closedByOtherException;
-    }
-    if (exceptionToThrow != closedByOtherException) {
-      this.journal.add(new JournalEntry(JournalEntryType.CLOSED_PARENT_REGION));
-    }
-    if (exceptionToThrow != null) {
-      if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
-      throw new IOException(exceptionToThrow);
-    }
-    if (!testing) {
-      services.removeFromOnlineRegions(this.parent, null);
-    }
-    this.journal.add(new JournalEntry(JournalEntryType.OFFLINED_PARENT));
-
-    // TODO: If splitStoreFiles were multithreaded would we complete steps in
-    // less elapsed time?  St.Ack 20100920
-    //
-    // splitStoreFiles creates daughter region dirs under the parent splits dir
-    // Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will
-    // clean this up.
-    Pair<Integer, Integer> expectedReferences = splitStoreFiles(hstoreFilesToSplit);
+  public interface TransactionListener {
 
-    // Log to the journal that we are creating region A, the first daughter
-    // region.  We could fail halfway through.  If we do, we could have left
-    // stuff in fs that needs cleanup -- a storefile or two.  Thats why we
-    // add entry to journal BEFORE rather than AFTER the change.
-    this.journal.add(new JournalEntry(JournalEntryType.STARTED_REGION_A_CREATION));
-    assertReferenceFileCount(expectedReferences.getFirst(),
-        this.parent.getRegionFileSystem().getSplitsDir(this.hri_a));
-    Region a = this.parent.createDaughterRegionFromSplits(this.hri_a);
-    assertReferenceFileCount(expectedReferences.getFirst(),
-        new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_a.getEncodedName()));
-
-    // Ditto
-    this.journal.add(new JournalEntry(JournalEntryType.STARTED_REGION_B_CREATION));
-    assertReferenceFileCount(expectedReferences.getSecond(),
-        this.parent.getRegionFileSystem().getSplitsDir(this.hri_b));
-    Region b = this.parent.createDaughterRegionFromSplits(this.hri_b);
-    assertReferenceFileCount(expectedReferences.getSecond(),
-        new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_b.getEncodedName()));
-
-    return new PairOfSameType<Region>(a, b);
-  }
+    /**
+     * Invoked when transitioning forward from one transaction phase to another
+     * @param transaction the transaction
+     * @param from the current phase
+     * @param to the next phase
+     * @throws IOException listener can throw this to abort
+     */
+    void transition(SplitTransaction transaction, SplitTransactionPhase from,
+        SplitTransactionPhase to) throws IOException;
 
-  void assertReferenceFileCount(int expectedReferenceFileCount, Path dir)
-      throws IOException {
-    if (expectedReferenceFileCount != 0 &&
-        expectedReferenceFileCount != FSUtils.getRegionReferenceFileCount(this.parent.getFilesystem(), dir)) {
-      throw new IOException("Failing split. Expected reference file count isn't equal.");
-    }
+    /**
+     * Invoked when rolling back a transaction from one transaction phase to the
+     * previous
+     * @param transaction the transaction
+     * @param from the current phase
+     * @param to the previous phase
+     */
+    void rollback(SplitTransaction transaction, SplitTransactionPhase from,
+        SplitTransactionPhase to);
   }
 
   /**
-   * Perform time consuming opening of the daughter regions.
-   * @param server Hosting server instance.  Can be null when testing
-   * @param services Used to online/offline regions.
-   * @param a first daughter region
-   * @param a second daughter region
-   * @throws IOException If thrown, transaction failed.
-   *          Call {@link #rollback(Server, RegionServerServices)}
+   * Check split inputs and prepare the transaction.
+   * @return <code>true</code> if the region is splittable else
+   * <code>false</code> if it is not (e.g. its already closed, etc.).
+   * @throws IOException 
    */
-  /* package */void openDaughters(final Server server,
-      final RegionServerServices services, Region a, Region b)
-      throws IOException {
-    boolean stopped = server != null && server.isStopped();
-    boolean stopping = services != null && services.isStopping();
-    // TODO: Is this check needed here?
-    if (stopped || stopping) {
-      LOG.info("Not opening daughters " +
-          b.getRegionInfo().getRegionNameAsString() +
-          " and " +
-          a.getRegionInfo().getRegionNameAsString() +
-          " because stopping=" + stopping + ", stopped=" + stopped);
-    } else {
-      // Open daughters in parallel.
-      DaughterOpener aOpener = new DaughterOpener(server, (HRegion)a);
-      DaughterOpener bOpener = new DaughterOpener(server, (HRegion)b);
-      aOpener.start();
-      bOpener.start();
-      try {
-        aOpener.join();
-        if (aOpener.getException() == null) {
-          journal.add(new JournalEntry(JournalEntryType.OPENED_REGION_A));
-        }
-        bOpener.join();
-        if (bOpener.getException() == null) {
-          journal.add(new JournalEntry(JournalEntryType.OPENED_REGION_B));
-        }
-      } catch (InterruptedException e) {
-        throw (InterruptedIOException)new InterruptedIOException().initCause(e);
-      }
-      if (aOpener.getException() != null) {
-        throw new IOException("Failed " +
-          aOpener.getName(), aOpener.getException());
-      }
-      if (bOpener.getException() != null) {
-        throw new IOException("Failed " +
-          bOpener.getName(), bOpener.getException());
-      }
-      if (services != null) {
-        try {
-          if (useZKForAssignment) {
-            // add 2nd daughter first (see HBASE-4335)
-            services.postOpenDeployTasks(b);
-          } else if (!services.reportRegionStateTransition(TransitionCode.SPLIT,
-              parent.getRegionInfo(), hri_a, hri_b)) {
-            throw new IOException("Failed to report split region to master: "
-              + parent.getRegionInfo().getShortNameToLog());
-          }
-          // Should add it to OnlineRegions
-          services.addToOnlineRegions(b);
-          if (useZKForAssignment) {
-            services.postOpenDeployTasks(a);
-          }
-          services.addToOnlineRegions(a);
-        } catch (KeeperException ke) {
-          throw new IOException(ke);
-        }
-      }
-    }
-  }
+  boolean prepare() throws IOException;
 
   /**
    * Run the transaction.
-   * @param server Hosting server instance.  Can be null when testing
+   * @param server Hosting server instance.  Can be null when testing.
    * @param services Used to online/offline regions.
    * @throws IOException If thrown, transaction failed.
    *          Call {@link #rollback(Server, RegionServerServices)}
@@ -534,384 +192,44 @@ public class SplitTransaction {
    * @throws IOException
    * @see #rollback(Server, RegionServerServices)
    */
-  public PairOfSameType<Region> execute(final Server server,
-      final RegionServerServices services)
-  throws IOException {
-    useZKForAssignment = server == null ? true :
-      ConfigUtil.useZKForAssignment(server.getConfiguration());
-    if (useCoordinatedStateManager(server)) {
-      std =
-          ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
-              .getSplitTransactionCoordination().getDefaultDetails();
-    }
-    PairOfSameType<Region> regions = createDaughters(server, services);
-    if (this.parent.getCoprocessorHost() != null) {
-      this.parent.getCoprocessorHost().preSplitAfterPONR();
-    }
-    return stepsAfterPONR(server, services, regions);
-  }
-
-  public PairOfSameType<Region> stepsAfterPONR(final Server server,
-      final RegionServerServices services, PairOfSameType<Region> regions)
-      throws IOException {
-    openDaughters(server, services, regions.getFirst(), regions.getSecond());
-    if (useCoordinatedStateManager(server)) {
-      ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
-          .getSplitTransactionCoordination().completeSplitTransaction(services, regions.getFirst(),
-            regions.getSecond(), std, parent);
-    }
-    journal.add(new JournalEntry(JournalEntryType.BEFORE_POST_SPLIT_HOOK));
-    // Coprocessor callback
-    if (parent.getCoprocessorHost() != null) {
-      parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond());
-    }
-    journal.add(new JournalEntry(JournalEntryType.AFTER_POST_SPLIT_HOOK));
-    return regions;
-  }
+  PairOfSameType<Region> execute(Server server, RegionServerServices services) throws IOException;
 
-  private void offlineParentInMetaAndputMetaEntries(HConnection hConnection,
-      HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB,
-      ServerName serverName, List<Mutation> metaEntries, int regionReplication) throws IOException {
-    List<Mutation> mutations = metaEntries;
-    HRegionInfo copyOfParent = new HRegionInfo(parent);
-    copyOfParent.setOffline(true);
-    copyOfParent.setSplit(true);
-
-    //Put for parent
-    Put putParent = MetaTableAccessor.makePutFromRegionInfo(copyOfParent);
-    MetaTableAccessor.addDaughtersToPut(putParent, splitA, splitB);
-    mutations.add(putParent);
-    
-    //Puts for daughters
-    Put putA = MetaTableAccessor.makePutFromRegionInfo(splitA);
-    Put putB = MetaTableAccessor.makePutFromRegionInfo(splitB);
-
-    addLocation(putA, serverName, 1); //these are new regions, openSeqNum = 1 is fine.
-    addLocation(putB, serverName, 1);
-    mutations.add(putA);
-    mutations.add(putB);
-
-    // Add empty locations for region replicas of daughters so that number of replicas can be
-    // cached whenever the primary region is looked up from meta
-    for (int i = 1; i < regionReplication; i++) {
-      addEmptyLocation(putA, i);
-      addEmptyLocation(putB, i);
-    }
-
-    MetaTableAccessor.mutateMetaTable(hConnection, mutations);
-  }
-
-  private static Put addEmptyLocation(final Put p, int replicaId){
-    p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getServerColumn(replicaId), null);
-    p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getStartCodeColumn(replicaId), null);
-    p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getSeqNumColumn(replicaId), null);
-    return p;
-  }
-
-  public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
-    p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
-      Bytes.toBytes(sn.getHostAndPort()));
-    p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
-      Bytes.toBytes(sn.getStartcode()));
-    p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER,
-        Bytes.toBytes(openSeqNum));
-    return p;
-  }
-
-  /*
-   * Open daughter region in its own thread.
-   * If we fail, abort this hosting server.
+  /**
+   * Roll back a failed transaction
+   * @param server Hosting server instance (May be null when testing).
+   * @param services
+   * @throws IOException If thrown, rollback failed.  Take drastic action.
+   * @return True if we successfully rolled back, false if we got to the point
+   * of no return and so now need to abort the server to minimize damage.
    */
-  class DaughterOpener extends HasThread {
-    private final Server server;
-    private final HRegion r;
-    private Throwable t = null;
-
-    DaughterOpener(final Server s, final HRegion r) {
-      super((s == null? "null-services": s.getServerName()) +
-        "-daughterOpener=" + r.getRegionInfo().getEncodedName());
-      setDaemon(true);
-      this.server = s;
-      this.r = r;
-    }
-
-    /**
-     * @return Null if open succeeded else exception that causes us fail open.
-     * Call it after this thread exits else you may get wrong view on result.
-     */
-    Throwable getException() {
-      return this.t;
-    }
-
-    @Override
-    public void run() {
-      try {
-        openDaughterRegion(this.server, r);
-      } catch (Throwable t) {
-        this.t = t;
-      }
-    }
-  }
+  boolean rollback(Server server, RegionServerServices services) throws IOException;
 
   /**
-   * Open daughter regions, add them to online list and update meta.
-   * @param server
-   * @param daughter
-   * @throws IOException
-   * @throws KeeperException
+   * Register a listener for transaction preparation, execution, and possibly
+   * rollback phases.
+   * <p>A listener can abort a transaction by throwing an exception. 
+   * @param listener the listener
+   * @return 'this' for chaining
    */
-  void openDaughterRegion(final Server server, final HRegion daughter)
-  throws IOException, KeeperException {
-    HRegionInfo hri = daughter.getRegionInfo();
-    LoggingProgressable reporter = server == null ? null
-        : new LoggingProgressable(hri, server.getConfiguration().getLong(
-            "hbase.regionserver.split.daughter.open.log.interval", 10000));
-    daughter.openHRegion(reporter);
-  }
-
-  static class LoggingProgressable implements CancelableProgressable {
-    private final HRegionInfo hri;
-    private long lastLog = -1;
-    private final long interval;
-
-    LoggingProgressable(final HRegionInfo hri, final long interval) {
-      this.hri = hri;
-      this.interval = interval;
-    }
-
-    @Override
-    public boolean progress() {
-      long now = EnvironmentEdgeManager.currentTime();
-      if (now - lastLog > this.interval) {
-        LOG.info("Opening " + this.hri.getRegionNameAsString());
-        this.lastLog = now;
-      }
-      return true;
-    }
-  }
-
-  private boolean useCoordinatedStateManager(final Server server) {
-    return server != null && useZKForAssignment && server.getCoordinatedStateManager() != null;
-  }
+  SplitTransaction registerTransactionListener(TransactionListener listener);
 
   /**
-   * Creates reference files for top and bottom half of the
-   * @param hstoreFilesToSplit map of store files to create half file references for.
-   * @return the number of reference files that were created.
-   * @throws IOException
+   * Get the journal for the transaction.
+   * <p>Journal entries are an opaque type represented as JournalEntry. They can
+   * also provide useful debugging information via their toString method.
+   * @return the transaction journal
    */
-  private Pair<Integer, Integer> splitStoreFiles(
-      final Map<byte[], List<StoreFile>> hstoreFilesToSplit)
-      throws IOException {
-    if (hstoreFilesToSplit == null) {
-      // Could be null because close didn't succeed -- for now consider it fatal
-      throw new IOException("Close returned empty list of StoreFiles");
-    }
-    // The following code sets up a thread pool executor with as many slots as
-    // there's files to split. It then fires up everything, waits for
-    // completion and finally checks for any exception
-    int nbFiles = hstoreFilesToSplit.size();
-    if (nbFiles == 0) {
-      // no file needs to be splitted.
-      return new Pair<Integer, Integer>(0,0);
-    }
-    LOG.info("Preparing to split " + nbFiles + " storefiles for region " + this.parent);
-    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
-    builder.setNameFormat("StoreFileSplitter-%1$d");
-    ThreadFactory factory = builder.build();
-    ThreadPoolExecutor threadPool =
-      (ThreadPoolExecutor) Executors.newFixedThreadPool(nbFiles, factory);
-    List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>> (nbFiles);
-
-    // Split each store file.
-    for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
-      for (StoreFile sf: entry.getValue()) {
-        StoreFileSplitter sfs = new StoreFileSplitter(entry.getKey(), sf);
-        futures.add(threadPool.submit(sfs));
-      }
-    }
-    // Shutdown the pool
-    threadPool.shutdown();
-
-    // Wait for all the tasks to finish
-    try {
-      boolean stillRunning = !threadPool.awaitTermination(
-          this.fileSplitTimeout, TimeUnit.MILLISECONDS);
-      if (stillRunning) {
-        threadPool.shutdownNow();
-        // wait for the thread to shutdown completely.
-        while (!threadPool.isTerminated()) {
-          Thread.sleep(50);
-        }
-        throw new IOException("Took too long to split the" +
-            " files and create the references, aborting split");
-      }
-    } catch (InterruptedException e) {
-      throw (InterruptedIOException)new InterruptedIOException().initCause(e);
-    }
-
-    int created_a = 0;
-    int created_b = 0;
-    // Look for any exception
-    for (Future<Pair<Path, Path>> future : futures) {
-      try {
-        Pair<Path, Path> p = future.get();
-        created_a += p.getFirst() != null ? 1 : 0;
-        created_b += p.getSecond() != null ? 1 : 0;
-      } catch (InterruptedException e) {
-        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
-      } catch (ExecutionException e) {
-        throw new IOException(e);
-      }
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Split storefiles for region " + this.parent + " Daugther A: " + created_a
-          + " storefiles, Daugther B: " + created_b + " storefiles.");
-    }
-    return new Pair<Integer, Integer>(created_a, created_b);
-  }
-
-  private Pair<Path, Path> splitStoreFile(final byte[] family, final StoreFile sf) throws IOException {
-    HRegionFileSystem fs = this.parent.getRegionFileSystem();
-    String familyName = Bytes.toString(family);
-
-    Path path_a =
-        fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false,
-          this.parent.getSplitPolicy());
-    Path path_b =
-        fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true,
-          this.parent.getSplitPolicy());
-    return new Pair<Path,Path>(path_a, path_b);
-  }
+  List<JournalEntry> getJournal();
 
   /**
-   * Utility class used to do the file splitting / reference writing
-   * in parallel instead of sequentially.
+   * Get the Server running the transaction or rollback
+   * @return server instance
    */
-  class StoreFileSplitter implements Callable<Pair<Path,Path>> {
-    private final byte[] family;
-    private final StoreFile sf;
-
-    /**
-     * Constructor that takes what it needs to split
-     * @param family Family that contains the store file
-     * @param sf which file
-     */
-    public StoreFileSplitter(final byte[] family, final StoreFile sf) {
-      this.sf = sf;
-      this.family = family;
-    }
-
-    public Pair<Path,Path> call() throws IOException {
-      return splitStoreFile(family, sf);
-    }
-  }
+  Server getServer();
 
   /**
-   * @param server Hosting server instance (May be null when testing).
-   * @param services
-   * @throws IOException If thrown, rollback failed.  Take drastic action.
-   * @return True if we successfully rolled back, false if we got to the point
-   * of no return and so now need to abort the server to minimize damage.
+   * Get the RegonServerServices of the server running the transaction or rollback
+   * @return region server services
    */
-  @SuppressWarnings("deprecation")
-  public boolean rollback(final Server server, final RegionServerServices services)
-  throws IOException {
-    // Coprocessor callback
-    if (this.parent.getCoprocessorHost() != null) {
-      this.parent.getCoprocessorHost().preRollBackSplit();
-    }
-
-    boolean result = true;
-    ListIterator<JournalEntry> iterator =
-      this.journal.listIterator(this.journal.size());
-    // Iterate in reverse.
-    while (iterator.hasPrevious()) {
-      JournalEntry je = iterator.previous();
-      switch(je.type) {
-
-      case SET_SPLITTING:
-        if (useCoordinatedStateManager(server) && server instanceof HRegionServer) {
-          ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
-              .getSplitTransactionCoordination().clean(this.parent.getRegionInfo());
-        } else if (services != null && !useZKForAssignment
-            && !services.reportRegionStateTransition(TransitionCode.SPLIT_REVERTED,
-                parent.getRegionInfo(), hri_a, hri_b)) {
-          return false;
-        }
-        break;
-
-      case CREATE_SPLIT_DIR:
-        this.parent.writestate.writesEnabled = true;
-        this.parent.getRegionFileSystem().cleanupSplitsDir();
-        break;
-
-      case CLOSED_PARENT_REGION:
-        try {
-          // So, this returns a seqid but if we just closed and then reopened, we
-          // should be ok. On close, we flushed using sequenceid obtained from
-          // hosting regionserver so no need to propagate the sequenceid returned
-          // out of initialize below up into regionserver as we normally do.
-          // TODO: Verify.
-          this.parent.initialize();
-        } catch (IOException e) {
-          LOG.error("Failed rollbacking CLOSED_PARENT_REGION of region " +
-            this.parent.getRegionInfo().getRegionNameAsString(), e);
-          throw new RuntimeException(e);
-        }
-        break;
-
-      case STARTED_REGION_A_CREATION:
-        this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_a);
-        break;
-
-      case STARTED_REGION_B_CREATION:
-        this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_b);
-        break;
-
-      case OFFLINED_PARENT:
-        if (services != null) services.addToOnlineRegions(this.parent);
-        break;
-
-      case PONR:
-        // We got to the point-of-no-return so we need to just abort. Return
-        // immediately.  Do not clean up created daughter regions.  They need
-        // to be in place so we don't delete the parent region mistakenly.
-        // See HBASE-3872.
-        return false;
-
-      // Informational only cases
-      case STARTED:
-      case PREPARED:
-      case BEFORE_PRE_SPLIT_HOOK:
-      case AFTER_PRE_SPLIT_HOOK:
-      case BEFORE_POST_SPLIT_HOOK:
-      case AFTER_POST_SPLIT_HOOK:
-      case OPENED_REGION_A:
-      case OPENED_REGION_B:
-        break;
-
-      default:
-        throw new RuntimeException("Unhandled journal entry: " + je);
-      }
-    }
-    // Coprocessor callback
-    if (this.parent.getCoprocessorHost() != null) {
-      this.parent.getCoprocessorHost().postRollBackSplit();
-    }
-    return result;
-  }
-
-  HRegionInfo getFirstDaughter() {
-    return hri_a;
-  }
-
-  HRegionInfo getSecondDaughter() {
-    return hri_b;
-  }
-
-  List<JournalEntry> getJournal() {
-    return journal;
-  }
+  RegionServerServices getRegionServerServices();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/bf76082f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionFactory.java
new file mode 100644
index 0000000..7df8233
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
+/**
+ * A factory for creating SplitTransactions, which execute region split as a "transaction".
+ * See {@link SplitTransaction}
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+@InterfaceStability.Evolving
+public class SplitTransactionFactory implements Configurable {
+
+  public static final String SPLIT_TRANSACTION_IMPL_KEY =
+      "hbase.regionserver.split.transaction.impl";
+
+  private Configuration conf;
+
+  public SplitTransactionFactory(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Create a split transaction
+   * @param r the region to split
+   * @param splitrow the split point in the keyspace
+   * @return transaction instance
+   */
+  public SplitTransaction create(final Region r, final byte [] splitrow) {
+    return ReflectionUtils.instantiateWithCustomCtor(
+      // The implementation class must extend SplitTransactionImpl, not only
+      // implement the SplitTransaction interface like you might expect,
+      // because various places such as AssignmentManager use static methods
+      // from SplitTransactionImpl. Whatever we use for implementation must
+      // be compatible, so it's safest to require ? extends SplitTransactionImpl.
+      // If not compatible we will throw a runtime exception from here.
+      conf.getClass(SPLIT_TRANSACTION_IMPL_KEY, SplitTransactionImpl.class,
+        SplitTransactionImpl.class).getName(),
+      new Class[] { Region.class, byte[].class },
+      new Object[] { r, splitrow });
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/bf76082f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java
new file mode 100644
index 0000000..08212a3
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java
@@ -0,0 +1,911 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.ConfigUtil;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HasThread;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.PairOfSameType;
+import org.apache.zookeeper.KeeperException;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+@InterfaceAudience.Private
+public class SplitTransactionImpl implements SplitTransaction {
+  private static final Log LOG = LogFactory.getLog(SplitTransaction.class);
+
+  /*
+   * Region to split
+   */
+  private final HRegion parent;
+  private HRegionInfo hri_a;
+  private HRegionInfo hri_b;
+  private long fileSplitTimeout = 30000;
+  public SplitTransactionCoordination.SplitTransactionDetails std;
+  boolean useZKForAssignment;
+
+  /*
+   * Row to split around
+   */
+  private final byte [] splitrow;
+
+  /*
+   * Transaction state for listener, only valid during execute and
+   * rollback
+   */
+  private SplitTransactionPhase currentPhase = SplitTransactionPhase.STARTED;
+  private Server server;
+  private RegionServerServices rsServices;
+
+  public static class JournalEntryImpl implements JournalEntry {
+    private SplitTransactionPhase type;
+    private long timestamp;
+
+    public JournalEntryImpl(SplitTransactionPhase type) {
+      this(type, EnvironmentEdgeManager.currentTime());
+    }
+
+    public JournalEntryImpl(SplitTransactionPhase type, long timestamp) {
+      this.type = type;
+      this.timestamp = timestamp;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append(type);
+      sb.append(" at ");
+      sb.append(timestamp);
+      return sb.toString();
+    }
+
+    @Override
+    public SplitTransactionPhase getPhase() {
+      return type;
+    }
+
+    @Override
+    public long getTimeStamp() {
+      return timestamp;
+    }
+  }
+
+  /*
+   * Journal of how far the split transaction has progressed.
+   */
+  private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
+
+  /**
+   * Listeners
+   */
+  private final ArrayList<TransactionListener> listeners = new ArrayList<TransactionListener>();
+
+  /**
+   * Constructor
+   * @param r Region to split
+   * @param splitrow Row to split around
+   */
+  public SplitTransactionImpl(final Region r, final byte [] splitrow) {
+    this.parent = (HRegion)r;
+    this.splitrow = splitrow;
+    this.journal.add(new JournalEntryImpl(SplitTransactionPhase.STARTED));
+    useZKForAssignment = ConfigUtil.useZKForAssignment(parent.getBaseConf());
+  }
+
+  private void transition(SplitTransactionPhase nextPhase) throws IOException {
+    transition(nextPhase, false);
+  }
+
+  private void transition(SplitTransactionPhase nextPhase, boolean isRollback)
+      throws IOException {
+    if (!isRollback) {
+      // Add to the journal first, because if the listener throws an exception
+      // we need to roll back starting at 'nextPhase'
+      this.journal.add(new JournalEntryImpl(nextPhase));
+    }
+    for (int i = 0; i < listeners.size(); i++) {
+      TransactionListener listener = listeners.get(i);
+      if (!isRollback) {
+        listener.transition(this, currentPhase, nextPhase);
+      } else {
+        listener.rollback(this, currentPhase, nextPhase);
+      }
+    }
+    currentPhase = nextPhase;
+  }
+
+  /**
+   * Does checks on split inputs.
+   * @return <code>true</code> if the region is splittable else
+   * <code>false</code> if it is not (e.g. its already closed, etc.).
+   */
+  public boolean prepare() throws IOException {
+    if (!this.parent.isSplittable()) return false;
+    // Split key can be null if this region is unsplittable; i.e. has refs.
+    if (this.splitrow == null) return false;
+    HRegionInfo hri = this.parent.getRegionInfo();
+    parent.prepareToSplit();
+    // Check splitrow.
+    byte [] startKey = hri.getStartKey();
+    byte [] endKey = hri.getEndKey();
+    if (Bytes.equals(startKey, splitrow) ||
+        !this.parent.getRegionInfo().containsRow(splitrow)) {
+      LOG.info("Split row is not inside region key range or is equal to " +
+          "startkey: " + Bytes.toStringBinary(this.splitrow));
+      return false;
+    }
+    long rid = getDaughterRegionIdTimestamp(hri);
+    this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
+    this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);
+
+    transition(SplitTransactionPhase.PREPARED);
+
+    return true;
+  }
+
+  /**
+   * Calculate daughter regionid to use.
+   * @param hri Parent {@link HRegionInfo}
+   * @return Daughter region id (timestamp) to use.
+   */
+  private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) {
+    long rid = EnvironmentEdgeManager.currentTime();
+    // Regionid is timestamp.  Can't be less than that of parent else will insert
+    // at wrong location in hbase:meta (See HBASE-710).
+    if (rid < hri.getRegionId()) {
+      LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() +
+        " but current time here is " + rid);
+      rid = hri.getRegionId() + 1;
+    }
+    return rid;
+  }
+
+  private static IOException closedByOtherException = new IOException(
+      "Failed to close region: already closed by another thread");
+
+  /**
+   * Prepare the regions and region files.
+   * @param server Hosting server instance.  Can be null when testing (won't try
+   * and update in zk if a null server)
+   * @param services Used to online/offline regions.
+   * @throws IOException If thrown, transaction failed.
+   *    Call {@link #rollback(Server, RegionServerServices)}
+   * @return Regions created
+   */
+  /* package */PairOfSameType<Region> createDaughters(final Server server,
+      final RegionServerServices services) throws IOException {
+    LOG.info("Starting split of region " + this.parent);
+    if ((server != null && server.isStopped()) ||
+        (services != null && services.isStopping())) {
+      throw new IOException("Server is stopped or stopping");
+    }
+    assert !this.parent.lock.writeLock().isHeldByCurrentThread():
+      "Unsafe to hold write lock while performing RPCs";
+
+    transition(SplitTransactionPhase.BEFORE_PRE_SPLIT_HOOK);
+
+    // Coprocessor callback
+    if (this.parent.getCoprocessorHost() != null) {
+      // TODO: Remove one of these
+      this.parent.getCoprocessorHost().preSplit();
+      this.parent.getCoprocessorHost().preSplit(this.splitrow);
+    }
+
+    transition(SplitTransactionPhase.AFTER_PRE_SPLIT_HOOK);
+
+    // If true, no cluster to write meta edits to or to update znodes in.
+    boolean testing = server == null? true:
+        server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
+    this.fileSplitTimeout = testing ? this.fileSplitTimeout :
+        server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout",
+          this.fileSplitTimeout);
+
+    PairOfSameType<Region> daughterRegions = stepsBeforePONR(server, services, testing);
+
+    List<Mutation> metaEntries = new ArrayList<Mutation>();
+    if (this.parent.getCoprocessorHost() != null) {
+      if (this.parent.getCoprocessorHost().
+          preSplitBeforePONR(this.splitrow, metaEntries)) {
+        throw new IOException("Coprocessor bypassing region "
+            + this.parent.getRegionInfo().getRegionNameAsString() + " split.");
+      }
+      try {
+        for (Mutation p : metaEntries) {
+          HRegionInfo.parseRegionName(p.getRow());
+        }
+      } catch (IOException e) {
+        LOG.error("Row key of mutation from coprossor is not parsable as region name."
+            + "Mutations from coprocessor should only for hbase:meta table.");
+        throw e;
+      }
+    }
+
+    // This is the point of no return.  Adding subsequent edits to .META. as we
+    // do below when we do the daughter opens adding each to .META. can fail in
+    // various interesting ways the most interesting of which is a timeout
+    // BUT the edits all go through (See HBASE-3872).  IF we reach the PONR
+    // then subsequent failures need to crash out this regionserver; the
+    // server shutdown processing should be able to fix-up the incomplete split.
+    // The offlined parent will have the daughters as extra columns.  If
+    // we leave the daughter regions in place and do not remove them when we
+    // crash out, then they will have their references to the parent in place
+    // still and the server shutdown fixup of .META. will point to these
+    // regions.
+    // We should add PONR JournalEntry before offlineParentInMeta,so even if
+    // OfflineParentInMeta timeout,this will cause regionserver exit,and then
+    // master ServerShutdownHandler will fix daughter & avoid data loss. (See
+    // HBase-4562).
+    transition(SplitTransactionPhase.PONR);
+
+    // Edit parent in meta.  Offlines parent region and adds splita and splitb
+    // as an atomic update. See HBASE-7721. This update to META makes the region
+    // will determine whether the region is split or not in case of failures.
+    // If it is successful, master will roll-forward, if not, master will rollback
+    // and assign the parent region.
+    if (!testing && useZKForAssignment) {
+      if (metaEntries == null || metaEntries.isEmpty()) {
+        MetaTableAccessor.splitRegion(server.getConnection(),
+          parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(),
+          daughterRegions.getSecond().getRegionInfo(), server.getServerName(),
+          parent.getTableDesc().getRegionReplication());
+      } else {
+        offlineParentInMetaAndputMetaEntries(server.getConnection(),
+          parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), daughterRegions
+              .getSecond().getRegionInfo(), server.getServerName(), metaEntries,
+              parent.getTableDesc().getRegionReplication());
+      }
+    } else if (services != null && !useZKForAssignment) {
+      if (!services.reportRegionStateTransition(TransitionCode.SPLIT_PONR,
+          parent.getRegionInfo(), hri_a, hri_b)) {
+        // Passed PONR, let SSH clean it up
+        throw new IOException("Failed to notify master that split passed PONR: "
+          + parent.getRegionInfo().getRegionNameAsString());
+      }
+    }
+    return daughterRegions;
+  }
+
+  public PairOfSameType<Region> stepsBeforePONR(final Server server,
+      final RegionServerServices services, boolean testing) throws IOException {
+
+    if (useCoordinatedStateManager(server)) {
+      if (std == null) {
+        std =
+            ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+                .getSplitTransactionCoordination().getDefaultDetails();
+      }
+      ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+          .getSplitTransactionCoordination().startSplitTransaction(parent, server.getServerName(),
+            hri_a, hri_b);
+    } else if (services != null && !useZKForAssignment) {
+      if (!services.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT,
+          parent.getRegionInfo(), hri_a, hri_b)) {
+        throw new IOException("Failed to get ok from master to split "
+          + parent.getRegionInfo().getRegionNameAsString());
+      }
+    }
+
+    transition(SplitTransactionPhase.SET_SPLITTING);
+
+    if (useCoordinatedStateManager(server)) {
+      ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+          .getSplitTransactionCoordination().waitForSplitTransaction(services, parent, hri_a,
+            hri_b, std);
+    }
+
+    this.parent.getRegionFileSystem().createSplitsDir();
+
+    transition(SplitTransactionPhase.CREATE_SPLIT_DIR);
+
+    Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
+    Exception exceptionToThrow = null;
+    try{
+      hstoreFilesToSplit = this.parent.close(false);
+    } catch (Exception e) {
+      exceptionToThrow = e;
+    }
+    if (exceptionToThrow == null && hstoreFilesToSplit == null) {
+      // The region was closed by a concurrent thread.  We can't continue
+      // with the split, instead we must just abandon the split.  If we
+      // reopen or split this could cause problems because the region has
+      // probably already been moved to a different server, or is in the
+      // process of moving to a different server.
+      exceptionToThrow = closedByOtherException;
+    }
+    if (exceptionToThrow != closedByOtherException) {
+      transition(SplitTransactionPhase.CLOSED_PARENT_REGION);
+    }
+    if (exceptionToThrow != null) {
+      if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
+      throw new IOException(exceptionToThrow);
+    }
+    if (!testing) {
+      services.removeFromOnlineRegions(this.parent, null);
+    }
+
+    transition(SplitTransactionPhase.OFFLINED_PARENT);
+
+    // TODO: If splitStoreFiles were multithreaded would we complete steps in
+    // less elapsed time?  St.Ack 20100920
+    //
+    // splitStoreFiles creates daughter region dirs under the parent splits dir
+    // Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will
+    // clean this up.
+    Pair<Integer, Integer> expectedReferences = splitStoreFiles(hstoreFilesToSplit);
+
+    // Log to the journal that we are creating region A, the first daughter
+    // region.  We could fail halfway through.  If we do, we could have left
+    // stuff in fs that needs cleanup -- a storefile or two.  Thats why we
+    // add entry to journal BEFORE rather than AFTER the change.
+    transition(SplitTransactionPhase.STARTED_REGION_A_CREATION);
+
+    assertReferenceFileCount(expectedReferences.getFirst(),
+        this.parent.getRegionFileSystem().getSplitsDir(this.hri_a));
+    Region a = this.parent.createDaughterRegionFromSplits(this.hri_a);
+    assertReferenceFileCount(expectedReferences.getFirst(),
+        new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_a.getEncodedName()));
+
+    // Ditto
+    transition(SplitTransactionPhase.STARTED_REGION_B_CREATION);
+
+    assertReferenceFileCount(expectedReferences.getSecond(),
+        this.parent.getRegionFileSystem().getSplitsDir(this.hri_b));
+    Region b = this.parent.createDaughterRegionFromSplits(this.hri_b);
+    assertReferenceFileCount(expectedReferences.getSecond(),
+        new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_b.getEncodedName()));
+
+    return new PairOfSameType<Region>(a, b);
+  }
+
+  void assertReferenceFileCount(int expectedReferenceFileCount, Path dir)
+      throws IOException {
+    if (expectedReferenceFileCount != 0 &&
+        expectedReferenceFileCount != FSUtils.getRegionReferenceFileCount(parent.getFilesystem(),
+          dir)) {
+      throw new IOException("Failing split. Expected reference file count isn't equal.");
+    }
+  }
+
+  /**
+   * Perform time consuming opening of the daughter regions.
+   * @param server Hosting server instance.  Can be null when testing
+   * @param services Used to online/offline regions.
+   * @param a first daughter region
+   * @param a second daughter region
+   * @throws IOException If thrown, transaction failed.
+   *          Call {@link #rollback(Server, RegionServerServices)}
+   */
+  /* package */void openDaughters(final Server server,
+      final RegionServerServices services, Region a, Region b)
+      throws IOException {
+    boolean stopped = server != null && server.isStopped();
+    boolean stopping = services != null && services.isStopping();
+    // TODO: Is this check needed here?
+    if (stopped || stopping) {
+      LOG.info("Not opening daughters " +
+          b.getRegionInfo().getRegionNameAsString() +
+          " and " +
+          a.getRegionInfo().getRegionNameAsString() +
+          " because stopping=" + stopping + ", stopped=" + stopped);
+    } else {
+      // Open daughters in parallel.
+      DaughterOpener aOpener = new DaughterOpener(server, (HRegion)a);
+      DaughterOpener bOpener = new DaughterOpener(server, (HRegion)b);
+      aOpener.start();
+      bOpener.start();
+      try {
+        aOpener.join();
+        if (aOpener.getException() == null) {
+          transition(SplitTransactionPhase.OPENED_REGION_A);
+        }
+        bOpener.join();
+        if (bOpener.getException() == null) {
+          transition(SplitTransactionPhase.OPENED_REGION_B);
+        }
+      } catch (InterruptedException e) {
+        throw (InterruptedIOException)new InterruptedIOException().initCause(e);
+      }
+      if (aOpener.getException() != null) {
+        throw new IOException("Failed " +
+          aOpener.getName(), aOpener.getException());
+      }
+      if (bOpener.getException() != null) {
+        throw new IOException("Failed " +
+          bOpener.getName(), bOpener.getException());
+      }
+      if (services != null) {
+        try {
+          if (useZKForAssignment) {
+            // add 2nd daughter first (see HBASE-4335)
+            services.postOpenDeployTasks(b);
+          } else if (!services.reportRegionStateTransition(TransitionCode.SPLIT,
+              parent.getRegionInfo(), hri_a, hri_b)) {
+            throw new IOException("Failed to report split region to master: "
+              + parent.getRegionInfo().getShortNameToLog());
+          }
+          // Should add it to OnlineRegions
+          services.addToOnlineRegions(b);
+          if (useZKForAssignment) {
+            services.postOpenDeployTasks(a);
+          }
+          services.addToOnlineRegions(a);
+        } catch (KeeperException ke) {
+          throw new IOException(ke);
+        }
+      }
+    }
+  }
+
+  /**
+   * Run the transaction.
+   * @param server Hosting server instance.  Can be null when testing
+   * @param services Used to online/offline regions.
+   * @throws IOException If thrown, transaction failed.
+   *          Call {@link #rollback(Server, RegionServerServices)}
+   * @return Regions created
+   * @throws IOException
+   * @see #rollback(Server, RegionServerServices)
+   */
+  public PairOfSameType<Region> execute(final Server server,
+      final RegionServerServices services) throws IOException {
+    this.server = server;
+    this.rsServices = services;
+    useZKForAssignment = server == null ? true :
+      ConfigUtil.useZKForAssignment(server.getConfiguration());
+    if (useCoordinatedStateManager(server)) {
+      std =
+          ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+              .getSplitTransactionCoordination().getDefaultDetails();
+    }
+    PairOfSameType<Region> regions = createDaughters(server, services);
+    if (this.parent.getCoprocessorHost() != null) {
+      this.parent.getCoprocessorHost().preSplitAfterPONR();
+    }
+    regions = stepsAfterPONR(server, services, regions);
+
+    transition(SplitTransactionPhase.COMPLETED);
+
+    return regions;
+  }
+
+  public PairOfSameType<Region> stepsAfterPONR(final Server server,
+      final RegionServerServices services, PairOfSameType<Region> regions)
+      throws IOException {
+    openDaughters(server, services, regions.getFirst(), regions.getSecond());
+    if (useCoordinatedStateManager(server)) {
+      ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+          .getSplitTransactionCoordination().completeSplitTransaction(services, regions.getFirst(),
+            regions.getSecond(), std, parent);
+    }
+
+    transition(SplitTransactionPhase.BEFORE_POST_SPLIT_HOOK);
+
+    // Coprocessor callback
+    if (parent.getCoprocessorHost() != null) {
+      parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond());
+    }
+
+    transition(SplitTransactionPhase.AFTER_POST_SPLIT_HOOK);
+
+    return regions;
+  }
+
+  private void offlineParentInMetaAndputMetaEntries(HConnection hConnection,
+      HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB,
+      ServerName serverName, List<Mutation> metaEntries, int regionReplication)
+          throws IOException {
+    List<Mutation> mutations = metaEntries;
+    HRegionInfo copyOfParent = new HRegionInfo(parent);
+    copyOfParent.setOffline(true);
+    copyOfParent.setSplit(true);
+
+    //Put for parent
+    Put putParent = MetaTableAccessor.makePutFromRegionInfo(copyOfParent);
+    MetaTableAccessor.addDaughtersToPut(putParent, splitA, splitB);
+    mutations.add(putParent);
+    
+    //Puts for daughters
+    Put putA = MetaTableAccessor.makePutFromRegionInfo(splitA);
+    Put putB = MetaTableAccessor.makePutFromRegionInfo(splitB);
+
+    addLocation(putA, serverName, 1); //these are new regions, openSeqNum = 1 is fine.
+    addLocation(putB, serverName, 1);
+    mutations.add(putA);
+    mutations.add(putB);
+
+    // Add empty locations for region replicas of daughters so that number of replicas can be
+    // cached whenever the primary region is looked up from meta
+    for (int i = 1; i < regionReplication; i++) {
+      addEmptyLocation(putA, i);
+      addEmptyLocation(putB, i);
+    }
+
+    MetaTableAccessor.mutateMetaTable(hConnection, mutations);
+  }
+
+  private static Put addEmptyLocation(final Put p, int replicaId){
+    p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getServerColumn(replicaId), null);
+    p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getStartCodeColumn(replicaId),
+      null);
+    p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getSeqNumColumn(replicaId), null);
+    return p;
+  }
+
+  public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
+    p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
+      Bytes.toBytes(sn.getHostAndPort()));
+    p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
+      Bytes.toBytes(sn.getStartcode()));
+    p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER,
+        Bytes.toBytes(openSeqNum));
+    return p;
+  }
+
+  /*
+   * Open daughter region in its own thread.
+   * If we fail, abort this hosting server.
+   */
+  class DaughterOpener extends HasThread {
+    private final Server server;
+    private final HRegion r;
+    private Throwable t = null;
+
+    DaughterOpener(final Server s, final HRegion r) {
+      super((s == null? "null-services": s.getServerName()) +
+        "-daughterOpener=" + r.getRegionInfo().getEncodedName());
+      setDaemon(true);
+      this.server = s;
+      this.r = r;
+    }
+
+    /**
+     * @return Null if open succeeded else exception that causes us fail open.
+     * Call it after this thread exits else you may get wrong view on result.
+     */
+    Throwable getException() {
+      return this.t;
+    }
+
+    @Override
+    public void run() {
+      try {
+        openDaughterRegion(this.server, r);
+      } catch (Throwable t) {
+        this.t = t;
+      }
+    }
+  }
+
+  /**
+   * Open daughter regions, add them to online list and update meta.
+   * @param server
+   * @param daughter
+   * @throws IOException
+   * @throws KeeperException
+   */
+  void openDaughterRegion(final Server server, final HRegion daughter)
+  throws IOException, KeeperException {
+    HRegionInfo hri = daughter.getRegionInfo();
+    LoggingProgressable reporter = server == null ? null
+        : new LoggingProgressable(hri, server.getConfiguration().getLong(
+            "hbase.regionserver.split.daughter.open.log.interval", 10000));
+    daughter.openHRegion(reporter);
+  }
+
+  static class LoggingProgressable implements CancelableProgressable {
+    private final HRegionInfo hri;
+    private long lastLog = -1;
+    private final long interval;
+
+    LoggingProgressable(final HRegionInfo hri, final long interval) {
+      this.hri = hri;
+      this.interval = interval;
+    }
+
+    @Override
+    public boolean progress() {
+      long now = EnvironmentEdgeManager.currentTime();
+      if (now - lastLog > this.interval) {
+        LOG.info("Opening " + this.hri.getRegionNameAsString());
+        this.lastLog = now;
+      }
+      return true;
+    }
+  }
+
+  private boolean useCoordinatedStateManager(final Server server) {
+    return server != null && useZKForAssignment && server.getCoordinatedStateManager() != null;
+  }
+
+  /**
+   * Creates reference files for top and bottom half of the
+   * @param hstoreFilesToSplit map of store files to create half file references for.
+   * @return the number of reference files that were created.
+   * @throws IOException
+   */
+  private Pair<Integer, Integer> splitStoreFiles(
+      final Map<byte[], List<StoreFile>> hstoreFilesToSplit)
+      throws IOException {
+    if (hstoreFilesToSplit == null) {
+      // Could be null because close didn't succeed -- for now consider it fatal
+      throw new IOException("Close returned empty list of StoreFiles");
+    }
+    // The following code sets up a thread pool executor with as many slots as
+    // there's files to split. It then fires up everything, waits for
+    // completion and finally checks for any exception
+    int nbFiles = hstoreFilesToSplit.size();
+    if (nbFiles == 0) {
+      // no file needs to be splitted.
+      return new Pair<Integer, Integer>(0,0);
+    }
+    LOG.info("Preparing to split " + nbFiles + " storefiles for region " + this.parent);
+    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+    builder.setNameFormat("StoreFileSplitter-%1$d");
+    ThreadFactory factory = builder.build();
+    ThreadPoolExecutor threadPool =
+      (ThreadPoolExecutor) Executors.newFixedThreadPool(nbFiles, factory);
+    List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>> (nbFiles);
+
+    // Split each store file.
+    for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
+      for (StoreFile sf: entry.getValue()) {
+        StoreFileSplitter sfs = new StoreFileSplitter(entry.getKey(), sf);
+        futures.add(threadPool.submit(sfs));
+      }
+    }
+    // Shutdown the pool
+    threadPool.shutdown();
+
+    // Wait for all the tasks to finish
+    try {
+      boolean stillRunning = !threadPool.awaitTermination(
+          this.fileSplitTimeout, TimeUnit.MILLISECONDS);
+      if (stillRunning) {
+        threadPool.shutdownNow();
+        // wait for the thread to shutdown completely.
+        while (!threadPool.isTerminated()) {
+          Thread.sleep(50);
+        }
+        throw new IOException("Took too long to split the" +
+            " files and create the references, aborting split");
+      }
+    } catch (InterruptedException e) {
+      throw (InterruptedIOException)new InterruptedIOException().initCause(e);
+    }
+
+    int created_a = 0;
+    int created_b = 0;
+    // Look for any exception
+    for (Future<Pair<Path, Path>> future : futures) {
+      try {
+        Pair<Path, Path> p = future.get();
+        created_a += p.getFirst() != null ? 1 : 0;
+        created_b += p.getSecond() != null ? 1 : 0;
+      } catch (InterruptedException e) {
+        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
+      } catch (ExecutionException e) {
+        throw new IOException(e);
+      }
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Split storefiles for region " + this.parent + " Daugther A: " + created_a
+          + " storefiles, Daugther B: " + created_b + " storefiles.");
+    }
+    return new Pair<Integer, Integer>(created_a, created_b);
+  }
+
+  private Pair<Path, Path> splitStoreFile(final byte[] family, final StoreFile sf)
+      throws IOException {
+    HRegionFileSystem fs = this.parent.getRegionFileSystem();
+    String familyName = Bytes.toString(family);
+
+    Path path_a =
+        fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false,
+          this.parent.getSplitPolicy());
+    Path path_b =
+        fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true,
+          this.parent.getSplitPolicy());
+    return new Pair<Path,Path>(path_a, path_b);
+  }
+
+  /**
+   * Utility class used to do the file splitting / reference writing
+   * in parallel instead of sequentially.
+   */
+  class StoreFileSplitter implements Callable<Pair<Path,Path>> {
+    private final byte[] family;
+    private final StoreFile sf;
+
+    /**
+     * Constructor that takes what it needs to split
+     * @param family Family that contains the store file
+     * @param sf which file
+     */
+    public StoreFileSplitter(final byte[] family, final StoreFile sf) {
+      this.sf = sf;
+      this.family = family;
+    }
+
+    public Pair<Path,Path> call() throws IOException {
+      return splitStoreFile(family, sf);
+    }
+  }
+
+  /**
+   * @param server Hosting server instance (May be null when testing).
+   * @param services
+   * @throws IOException If thrown, rollback failed.  Take drastic action.
+   * @return True if we successfully rolled back, false if we got to the point
+   * of no return and so now need to abort the server to minimize damage.
+   */
+  @SuppressWarnings("deprecation")
+  public boolean rollback(final Server server, final RegionServerServices services)
+  throws IOException {
+    // Coprocessor callback
+    if (this.parent.getCoprocessorHost() != null) {
+      this.parent.getCoprocessorHost().preRollBackSplit();
+    }
+
+    boolean result = true;
+    ListIterator<JournalEntry> iterator =
+      this.journal.listIterator(this.journal.size());
+    // Iterate in reverse.
+    while (iterator.hasPrevious()) {
+      JournalEntry je = iterator.previous();
+
+      transition(je.getPhase(), true);
+
+      switch(je.getPhase()) {
+
+      case SET_SPLITTING:
+        if (useCoordinatedStateManager(server) && server instanceof HRegionServer) {
+          ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+              .getSplitTransactionCoordination().clean(this.parent.getRegionInfo());
+        } else if (services != null && !useZKForAssignment
+            && !services.reportRegionStateTransition(TransitionCode.SPLIT_REVERTED,
+                parent.getRegionInfo(), hri_a, hri_b)) {
+          return false;
+        }
+        break;
+
+      case CREATE_SPLIT_DIR:
+        this.parent.writestate.writesEnabled = true;
+        this.parent.getRegionFileSystem().cleanupSplitsDir();
+        break;
+
+      case CLOSED_PARENT_REGION:
+        try {
+          // So, this returns a seqid but if we just closed and then reopened, we
+          // should be ok. On close, we flushed using sequenceid obtained from
+          // hosting regionserver so no need to propagate the sequenceid returned
+          // out of initialize below up into regionserver as we normally do.
+          // TODO: Verify.
+          this.parent.initialize();
+        } catch (IOException e) {
+          LOG.error("Failed rollbacking CLOSED_PARENT_REGION of region " +
+            this.parent.getRegionInfo().getRegionNameAsString(), e);
+          throw new RuntimeException(e);
+        }
+        break;
+
+      case STARTED_REGION_A_CREATION:
+        this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_a);
+        break;
+
+      case STARTED_REGION_B_CREATION:
+        this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_b);
+        break;
+
+      case OFFLINED_PARENT:
+        if (services != null) services.addToOnlineRegions(this.parent);
+        break;
+
+      case PONR:
+        // We got to the point-of-no-return so we need to just abort. Return
+        // immediately.  Do not clean up created daughter regions.  They need
+        // to be in place so we don't delete the parent region mistakenly.
+        // See HBASE-3872.
+        return false;
+
+      // Informational only cases
+      case STARTED:
+      case PREPARED:
+      case BEFORE_PRE_SPLIT_HOOK:
+      case AFTER_PRE_SPLIT_HOOK:
+      case BEFORE_POST_SPLIT_HOOK:
+      case AFTER_POST_SPLIT_HOOK:
+      case OPENED_REGION_A:
+      case OPENED_REGION_B:
+      case COMPLETED:
+        break;
+
+      default:
+        throw new RuntimeException("Unhandled journal entry: " + je);
+      }
+    }
+    // Coprocessor callback
+    if (this.parent.getCoprocessorHost() != null) {
+      this.parent.getCoprocessorHost().postRollBackSplit();
+    }
+    return result;
+  }
+
+  HRegionInfo getFirstDaughter() {
+    return hri_a;
+  }
+
+  HRegionInfo getSecondDaughter() {
+    return hri_b;
+  }
+
+  @Override
+  public List<JournalEntry> getJournal() {
+    return journal;
+  }
+
+  @Override
+  public SplitTransaction registerTransactionListener(TransactionListener listener) {
+    listeners.add(listener);
+    return this;
+  }
+
+  @Override
+  public Server getServer() {
+    return server;
+  }
+
+  @Override
+  public RegionServerServices getRegionServerServices() {
+    return rsServices;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/bf76082f/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
index c0243c2..26d7438 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.SplitTransaction;
+import org.apache.hadoop.hbase.regionserver.SplitTransactionFactory;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.util.PairOfSameType;
@@ -498,7 +499,8 @@ public class TestCoprocessorInterface {
   private Region [] split(final Region r, final byte [] splitRow) throws IOException {
     Region[] regions = new Region[2];
 
-    SplitTransaction st = new SplitTransaction(r, splitRow);
+    SplitTransaction st = new SplitTransactionFactory(TEST_UTIL.getConfiguration())
+      .create(r, splitRow);
     int i = 0;
 
     if (!st.prepare()) {
@@ -507,8 +509,7 @@ public class TestCoprocessorInterface {
     }
     try {
       Server mockServer = Mockito.mock(Server.class);
-      when(mockServer.getConfiguration()).thenReturn(
-          TEST_UTIL.getConfiguration());
+      when(mockServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
       PairOfSameType<Region> daughters = st.execute(mockServer, null);
       for (Region each_daughter: daughters) {
         regions[i] = each_daughter;

http://git-wip-us.apache.org/repos/asf/hbase/blob/bf76082f/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerObserver.java
index 0e3738d..65ca117 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerObserver.java
@@ -42,7 +42,8 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction;
+import org.apache.hadoop.hbase.regionserver.RegionMergeTransactionFactory;
+import org.apache.hadoop.hbase.regionserver.RegionMergeTransactionImpl;
 import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Test;
@@ -110,7 +111,7 @@ public class TestRegionServerObserver {
   }
 
   public static class CPRegionServerObserver extends BaseRegionServerObserver {
-    private RegionMergeTransaction rmt = null;
+    private RegionMergeTransactionImpl rmt = null;
     private HRegion mergedRegion = null;
 
     private boolean preMergeCalled;
@@ -143,7 +144,8 @@ public class TestRegionServerObserver {
       HRegionServer rs = (HRegionServer) environment.getRegionServerServices();
       List<Region> onlineRegions =
           rs.getOnlineRegions(TableName.valueOf("testRegionServerObserver_2"));
-      rmt = new RegionMergeTransaction(onlineRegions.get(0), onlineRegions.get(1), true);
+      rmt = (RegionMergeTransactionImpl) new RegionMergeTransactionFactory(rs.getConfiguration())
+        .create(onlineRegions.get(0), onlineRegions.get(1), true);
       if (!rmt.prepare(rs)) {
         LOG.error("Prepare for the region merge of table "
             + onlineRegions.get(0).getTableDesc().getNameAsString()

http://git-wip-us.apache.org/repos/asf/hbase/blob/bf76082f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
index ac63e93..33fe65a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
@@ -60,7 +60,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction;
+import org.apache.hadoop.hbase.regionserver.RegionMergeTransactionImpl;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
@@ -243,7 +243,7 @@ public class TestMasterFailover {
     assertEquals(2, mergingRegions.size());
     HRegionInfo a = mergingRegions.get(0);
     HRegionInfo b = mergingRegions.get(1);
-    HRegionInfo newRegion = RegionMergeTransaction.getMergedRegionInfo(a, b);
+    HRegionInfo newRegion = RegionMergeTransactionImpl.getMergedRegionInfo(a, b);
     ServerName mergingServer = regionStates.getRegionServerOfRegion(a);
     ServerName serverB = regionStates.getRegionServerOfRegion(b);
     if (!serverB.equals(mergingServer)) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/bf76082f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
index ebc74e3..8de04a2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
@@ -110,7 +110,7 @@ public class TestEndToEndSplitTransaction {
       byte[] regionName = conn.getRegionLocator(tableName).getRegionLocation(splitRow)
           .getRegionInfo().getRegionName();
       Region region = server.getRegion(regionName);
-      SplitTransaction split = new SplitTransaction(region, splitRow);
+      SplitTransactionImpl split = new SplitTransactionImpl((HRegion) region, splitRow);
       split.prepare();
 
       // 1. phase I

http://git-wip-us.apache.org/repos/asf/hbase/blob/bf76082f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 6cd5c81..43964a1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -2466,7 +2466,7 @@ public class TestHRegion {
    */
   HRegion[] splitRegion(final HRegion parent, final byte[] midkey) throws IOException {
     PairOfSameType<Region> result = null;
-    SplitTransaction st = new SplitTransaction(parent, midkey);
+    SplitTransactionImpl st = new SplitTransactionImpl(parent, midkey);
     // If prepare does not return true, for some reason -- logged inside in
     // the prepare call -- we are not ready to split just now. Just return.
     if (!st.prepare()) {