You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2014/06/06 16:46:19 UTC

git commit: HBASE-10962 Decouple region opening (HM and HRS) from ZK (Mikhail Antonov)

Repository: hbase
Updated Branches:
  refs/heads/master 6ce225b1d -> 623cfa33d


HBASE-10962 Decouple region opening (HM and HRS) from ZK (Mikhail Antonov)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/623cfa33
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/623cfa33
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/623cfa33

Branch: refs/heads/master
Commit: 623cfa33d1007b36d6dc35886c02d233e91227b1
Parents: 6ce225b
Author: Michael Stack <st...@duboce.net>
Authored: Fri Jun 6 07:46:00 2014 -0700
Committer: Michael Stack <st...@duboce.net>
Committed: Fri Jun 6 07:46:00 2014 -0700

----------------------------------------------------------------------
 .../BaseCoordinatedStateManager.java            |   5 +
 .../coordination/OpenRegionCoordination.java    | 129 ++++++
 .../coordination/ZkCoordinatedStateManager.java |   7 +
 .../coordination/ZkOpenRegionCoordination.java  | 414 +++++++++++++++++++
 .../hadoop/hbase/master/AssignmentManager.java  |  48 ++-
 .../master/handler/OpenedRegionHandler.java     |  69 +---
 .../hbase/regionserver/RSRpcServices.java       |  17 +-
 .../regionserver/handler/OpenMetaHandler.java   |  11 +-
 .../regionserver/handler/OpenRegionHandler.java | 237 ++---------
 .../hbase/master/TestAssignmentManager.java     |  26 +-
 .../hbase/master/TestOpenedRegionHandler.java   |  15 +-
 .../regionserver/TestRegionServerNoMaster.java  |  29 +-
 .../handler/TestCloseRegionHandler.java         |  55 +--
 .../handler/TestOpenRegionHandler.java          |  88 +++-
 14 files changed, 811 insertions(+), 339 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/623cfa33/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
index 5e4e53e..6cee2df 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
@@ -62,4 +62,9 @@ public abstract class BaseCoordinatedStateManager implements CoordinatedStateMan
    * Method to retrieve coordination for closing region operations.
    */
   public abstract CloseRegionCoordination getCloseRegionCoordination();
+
+  /**
+   * Method to retrieve coordination for opening region operations.
+   */
+  public abstract OpenRegionCoordination getOpenRegionCoordination();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/623cfa33/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/OpenRegionCoordination.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/OpenRegionCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/OpenRegionCoordination.java
new file mode 100644
index 0000000..0c6871d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/OpenRegionCoordination.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coordination;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+
+import java.io.IOException;
+
+/**
+ * Cocoordination operations for opening regions.
+ */
+@InterfaceAudience.Private
+public interface OpenRegionCoordination {
+
+  //---------------------
+  // RS-side operations
+  //---------------------
+  /**
+   * Tries to move regions to OPENED state.
+   *
+   * @param r Region we're working on.
+   * @param ord details about region opening task
+   * @return whether transition was successful or not
+   * @throws java.io.IOException
+   */
+  boolean transitionToOpened(HRegion r, OpenRegionDetails ord) throws IOException;
+
+  /**
+   * Transitions region from offline to opening state.
+   * @param regionInfo region we're working on.
+   * @param ord details about opening task.
+   * @return true if successful, false otherwise
+   */
+  boolean transitionFromOfflineToOpening(HRegionInfo regionInfo,
+                                         OpenRegionDetails ord);
+
+  /**
+   * Heartbeats to prevent timeouts.
+   *
+   * @param ord details about opening task.
+   * @param regionInfo region we're working on.
+   * @param rsServices instance of RegionServerrServices
+   * @param context used for logging purposes only
+   * @return true if successful heartbeat, false otherwise.
+   */
+  boolean tickleOpening(OpenRegionDetails ord, HRegionInfo regionInfo,
+                        RegionServerServices rsServices, String context);
+
+  /**
+   * Tries transition region from offline to failed open.
+   * @param rsServices instance of RegionServerServices
+   * @param hri region we're working on
+   * @param ord details about region opening task
+   * @return true if successful, false otherwise
+   */
+  boolean tryTransitionFromOfflineToFailedOpen(RegionServerServices rsServices,
+                                               HRegionInfo hri, OpenRegionDetails ord);
+
+  /**
+   * Tries transition from Opening to Failed open.
+   * @param hri region we're working on
+   * @param ord details about region opening task
+   * @return true if successfu. false otherwise.
+   */
+  boolean tryTransitionFromOpeningToFailedOpen(HRegionInfo hri, OpenRegionDetails ord);
+
+  /**
+   * Construct OpenRegionDetails instance from part of protobuf request.
+   * @return instance of OpenRegionDetails.
+   */
+  OpenRegionDetails parseFromProtoRequest(AdminProtos.OpenRegionRequest.RegionOpenInfo
+                                            regionOpenInfo);
+
+  /**
+   * Get details object with params for case when we're opening on
+   * regionserver side with all "default" properties.
+   */
+  OpenRegionDetails getDetailsForNonCoordinatedOpening();
+
+  //-------------------------
+  // HMaster-side operations
+  //-------------------------
+
+  /**
+   * Commits opening operation on HM side (steps required for "commit"
+   * are determined by coordination implementation).
+   * @return true if committed successfully, false otherwise.
+   */
+  public boolean commitOpenOnMasterSide(AssignmentManager assignmentManager,
+                                        HRegionInfo regionInfo,
+                                        OpenRegionDetails ord);
+
+  /**
+   * Interface for region opening tasks. Used to carry implementation details in
+   * encapsulated way through Handlers to the coordination API.
+   */
+  static interface OpenRegionDetails {
+    /**
+     * Sets server name on which opening operation is running.
+     */
+    void setServerName(ServerName serverName);
+
+    /**
+     * @return server name on which opening op is running.
+     */
+    ServerName getServerName();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/623cfa33/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
index dab0262..2c2fa44 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
@@ -37,6 +37,7 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
   protected ZooKeeperWatcher watcher;
   protected SplitTransactionCoordination splitTransactionCoordination;
   protected CloseRegionCoordination closeRegionCoordination;
+  protected OpenRegionCoordination openRegionCoordination;
 
   @Override
   public void initialize(Server server) {
@@ -45,6 +46,7 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
 
     splitTransactionCoordination = new ZKSplitTransactionCoordination(this, watcher);
     closeRegionCoordination = new ZkCloseRegionCoordination(this, watcher);
+    openRegionCoordination = new ZkOpenRegionCoordination(this, watcher);
   }
 
   @Override
@@ -71,4 +73,9 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
   public CloseRegionCoordination getCloseRegionCoordination() {
     return closeRegionCoordination;
   }
+
+  @Override
+  public OpenRegionCoordination getOpenRegionCoordination() {
+    return openRegionCoordination;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/623cfa33/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkOpenRegionCoordination.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkOpenRegionCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkOpenRegionCoordination.java
new file mode 100644
index 0000000..290ed09
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkOpenRegionCoordination.java
@@ -0,0 +1,414 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coordination;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+
+/**
+ * ZK-based implementation of {@link OpenRegionCoordination}.
+ */
+@InterfaceAudience.Private
+public class ZkOpenRegionCoordination implements OpenRegionCoordination {
+  private static final Log LOG = LogFactory.getLog(ZkOpenRegionCoordination.class);
+
+  private CoordinatedStateManager coordination;
+  private final ZooKeeperWatcher watcher;
+
+  public ZkOpenRegionCoordination(CoordinatedStateManager coordination,
+                                  ZooKeeperWatcher watcher) {
+    this.coordination = coordination;
+    this.watcher = watcher;
+  }
+
+  //-------------------------------
+  // Region Server-side operations
+  //-------------------------------
+
+  /**
+   * @param r Region we're working on.
+   * @return whether znode is successfully transitioned to OPENED state.
+   * @throws java.io.IOException
+   */
+  @Override
+  public boolean transitionToOpened(final HRegion r, OpenRegionDetails ord) throws IOException {
+    ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord;
+
+    boolean result = false;
+    HRegionInfo hri = r.getRegionInfo();
+    final String name = hri.getRegionNameAsString();
+    // Finally, Transition ZK node to OPENED
+    try {
+      if (ZKAssign.transitionNodeOpened(watcher, hri,
+        zkOrd.getServerName(), zkOrd.getVersion()) == -1) {
+        String warnMsg = "Completed the OPEN of region " + name +
+          " but when transitioning from " + " OPENING to OPENED ";
+        try {
+          String node = ZKAssign.getNodeName(watcher, hri.getEncodedName());
+          if (ZKUtil.checkExists(watcher, node) < 0) {
+            // if the znode
+            coordination.getServer().abort(warnMsg + "the znode disappeared", null);
+          } else {
+            LOG.warn(warnMsg + "got a version mismatch, someone else clashed; " +
+              "so now unassigning -- closing region on server: " + zkOrd.getServerName());
+          }
+        } catch (KeeperException ke) {
+          coordination.getServer().abort(warnMsg, ke);
+        }
+      } else {
+        LOG.debug("Transitioned " + r.getRegionInfo().getEncodedName() +
+          " to OPENED in zk on " + zkOrd.getServerName());
+        result = true;
+      }
+    } catch (KeeperException e) {
+      LOG.error("Failed transitioning node " + name +
+        " from OPENING to OPENED -- closing region", e);
+    }
+    return result;
+  }
+
+  /**
+   * Transition ZK node from OFFLINE to OPENING.
+   * @param regionInfo region info instance
+   * @param ord - instance of open region details, for ZK implementation
+   *   will include version Of OfflineNode that needs to be compared
+   *   before changing the node's state from OFFLINE
+   * @return True if successful transition.
+   */
+  @Override
+  public boolean transitionFromOfflineToOpening(HRegionInfo regionInfo,
+                                                OpenRegionDetails ord) {
+    ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord;
+
+    // encoded name is used as znode encoded name in ZK
+    final String encodedName = regionInfo.getEncodedName();
+
+    // TODO: should also handle transition from CLOSED?
+    try {
+      // Initialize the znode version.
+      zkOrd.setVersion(ZKAssign.transitionNode(watcher, regionInfo,
+        zkOrd.getServerName(), EventType.M_ZK_REGION_OFFLINE,
+        EventType.RS_ZK_REGION_OPENING, zkOrd.getVersionOfOfflineNode()));
+    } catch (KeeperException e) {
+      LOG.error("Error transition from OFFLINE to OPENING for region=" +
+        encodedName, e);
+      zkOrd.setVersion(-1);
+      return false;
+    }
+    boolean b = isGoodVersion(zkOrd);
+    if (!b) {
+      LOG.warn("Failed transition from OFFLINE to OPENING for region=" +
+        encodedName);
+    }
+    return b;
+  }
+
+  /**
+   * Update our OPENING state in zookeeper.
+   * Do this so master doesn't timeout this region-in-transition.
+   * We may lose the znode ownership during the open.  Currently its
+   * too hard interrupting ongoing region open.  Just let it complete
+   * and check we still have the znode after region open.
+   *
+   * @param context Some context to add to logs if failure
+   * @return True if successful transition.
+   */
+  @Override
+  public boolean tickleOpening(OpenRegionDetails ord, HRegionInfo regionInfo,
+                               RegionServerServices rsServices, final String context) {
+    ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord;
+    if (!isRegionStillOpening(regionInfo, rsServices)) {
+      LOG.warn("Open region aborted since it isn't opening any more");
+      return false;
+    }
+    // If previous checks failed... do not try again.
+    if (!isGoodVersion(zkOrd)) return false;
+    String encodedName = regionInfo.getEncodedName();
+    try {
+      zkOrd.setVersion(ZKAssign.confirmNodeOpening(watcher,
+          regionInfo, zkOrd.getServerName(), zkOrd.getVersion()));
+    } catch (KeeperException e) {
+      coordination.getServer().abort("Exception refreshing OPENING; region=" + encodedName +
+        ", context=" + context, e);
+      zkOrd.setVersion(-1);
+      return false;
+    }
+    boolean b = isGoodVersion(zkOrd);
+    if (!b) {
+      LOG.warn("Failed refreshing OPENING; region=" + encodedName +
+        ", context=" + context);
+    }
+    return b;
+  }
+
+  /**
+   * Try to transition to open.
+   *
+   * This is not guaranteed to succeed, we just do our best.
+   *
+   * @param rsServices
+   * @param hri Region we're working on.
+   * @param ord Details about region open task
+   * @return whether znode is successfully transitioned to FAILED_OPEN state.
+   */
+  @Override
+  public boolean tryTransitionFromOfflineToFailedOpen(RegionServerServices rsServices,
+                                                      final HRegionInfo hri,
+                                                      OpenRegionDetails ord) {
+    ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord;
+    boolean result = false;
+    final String name = hri.getRegionNameAsString();
+    try {
+      LOG.info("Opening of region " + hri + " failed, transitioning" +
+        " from OFFLINE to FAILED_OPEN in ZK, expecting version " +
+        zkOrd.getVersionOfOfflineNode());
+      if (ZKAssign.transitionNode(
+        rsServices.getZooKeeper(), hri,
+        rsServices.getServerName(),
+        EventType.M_ZK_REGION_OFFLINE,
+        EventType.RS_ZK_REGION_FAILED_OPEN,
+        zkOrd.getVersionOfOfflineNode()) == -1) {
+        LOG.warn("Unable to mark region " + hri + " as FAILED_OPEN. " +
+          "It's likely that the master already timed out this open " +
+          "attempt, and thus another RS already has the region.");
+      } else {
+        result = true;
+      }
+    } catch (KeeperException e) {
+      LOG.error("Failed transitioning node " + name + " from OFFLINE to FAILED_OPEN", e);
+    }
+    return result;
+  }
+
+  private boolean isGoodVersion(ZkOpenRegionDetails zkOrd) {
+    return zkOrd.getVersion() != -1;
+  }
+
+  /**
+   * This is not guaranteed to succeed, we just do our best.
+   * @param hri Region we're working on.
+   * @return whether znode is successfully transitioned to FAILED_OPEN state.
+   */
+  @Override
+  public boolean tryTransitionFromOpeningToFailedOpen(final HRegionInfo hri,
+                                                      OpenRegionDetails ord) {
+    ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord;
+    boolean result = false;
+    final String name = hri.getRegionNameAsString();
+    try {
+      LOG.info("Opening of region " + hri + " failed, transitioning" +
+        " from OPENING to FAILED_OPEN in ZK, expecting version " + zkOrd.getVersion());
+      if (ZKAssign.transitionNode(
+        watcher, hri,
+        zkOrd.getServerName(),
+        EventType.RS_ZK_REGION_OPENING,
+        EventType.RS_ZK_REGION_FAILED_OPEN,
+        zkOrd.getVersion()) == -1) {
+        LOG.warn("Unable to mark region " + hri + " as FAILED_OPEN. " +
+          "It's likely that the master already timed out this open " +
+          "attempt, and thus another RS already has the region.");
+      } else {
+        result = true;
+      }
+    } catch (KeeperException e) {
+      LOG.error("Failed transitioning node " + name +
+        " from OPENING to FAILED_OPEN", e);
+    }
+    return result;
+  }
+
+  /**
+   * Parse ZK-related fields from request.
+   */
+  @Override
+  public OpenRegionCoordination.OpenRegionDetails parseFromProtoRequest(
+      AdminProtos.OpenRegionRequest.RegionOpenInfo regionOpenInfo) {
+    ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd =
+      new ZkOpenRegionCoordination.ZkOpenRegionDetails();
+
+    int versionOfOfflineNode = -1;
+    if (regionOpenInfo.hasVersionOfOfflineNode()) {
+      versionOfOfflineNode = regionOpenInfo.getVersionOfOfflineNode();
+    }
+    zkCrd.setVersionOfOfflineNode(versionOfOfflineNode);
+    zkCrd.setServerName(coordination.getServer().getServerName());
+
+    return zkCrd;
+  }
+
+  /**
+   * No ZK tracking will be performed for that case.
+   * This method should be used when we want to construct CloseRegionDetails,
+   * but don't want any coordination on that (when it's initiated by regionserver),
+   * so no znode state transitions will be performed.
+   */
+  @Override
+  public OpenRegionCoordination.OpenRegionDetails getDetailsForNonCoordinatedOpening() {
+    ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd =
+      new ZkOpenRegionCoordination.ZkOpenRegionDetails();
+    zkCrd.setVersionOfOfflineNode(-1);
+    zkCrd.setServerName(coordination.getServer().getServerName());
+
+    return zkCrd;
+  }
+
+  //--------------------------
+  // HMaster-side operations
+  //--------------------------
+  @Override
+  public boolean commitOpenOnMasterSide(AssignmentManager assignmentManager,
+                                        HRegionInfo regionInfo,
+                                        OpenRegionDetails ord) {
+    boolean committedSuccessfully = true;
+
+    // Code to defend against case where we get SPLIT before region open
+    // processing completes; temporary till we make SPLITs go via zk -- 0.92.
+    RegionState regionState = assignmentManager.getRegionStates()
+      .getRegionTransitionState(regionInfo.getEncodedName());
+    boolean openedNodeDeleted = false;
+    if (regionState != null && regionState.isOpened()) {
+      openedNodeDeleted = deleteOpenedNode(regionInfo, ord);
+      if (!openedNodeDeleted) {
+        LOG.error("Znode of region " + regionInfo.getShortNameToLog() + " could not be deleted.");
+      }
+    } else {
+      LOG.warn("Skipping the onlining of " + regionInfo.getShortNameToLog() +
+        " because regions is NOT in RIT -- presuming this is because it SPLIT");
+    }
+    if (!openedNodeDeleted) {
+      if (assignmentManager.getTableStateManager().isTableState(regionInfo.getTable(),
+          ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
+        debugLog(regionInfo, "Opened region "
+          + regionInfo.getShortNameToLog() + " but "
+          + "this table is disabled, triggering close of region");
+        committedSuccessfully = false;
+      }
+    }
+
+    return committedSuccessfully;
+  }
+
+  private boolean deleteOpenedNode(HRegionInfo regionInfo, OpenRegionDetails ord) {
+    ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord;
+    int expectedVersion = zkOrd.getVersion();
+
+    debugLog(regionInfo, "Handling OPENED of " +
+      regionInfo.getShortNameToLog() + " from " + zkOrd.getServerName().toString() +
+      "; deleting unassigned node");
+    try {
+      // delete the opened znode only if the version matches.
+      return ZKAssign.deleteNode(this.coordination.getServer().getZooKeeper(),
+        regionInfo.getEncodedName(), EventType.RS_ZK_REGION_OPENED, expectedVersion);
+    } catch(KeeperException.NoNodeException e){
+      // Getting no node exception here means that already the region has been opened.
+      LOG.warn("The znode of the region " + regionInfo.getShortNameToLog() +
+        " would have already been deleted");
+      return false;
+    } catch (KeeperException e) {
+      this.coordination.getServer().abort("Error deleting OPENED node in ZK (" +
+        regionInfo.getRegionNameAsString() + ")", e);
+    }
+    return false;
+  }
+
+  private void debugLog(HRegionInfo region, String string) {
+    if (region.isMetaTable()) {
+      LOG.info(string);
+    } else {
+      LOG.debug(string);
+    }
+  }
+
+  // Additional classes and helper methods
+
+  /**
+   * ZK-based implementation. Has details about whether the state transition should be
+   * reflected in ZK, as well as expected version of znode.
+   */
+  public static class ZkOpenRegionDetails implements OpenRegionCoordination.OpenRegionDetails {
+
+    // We get version of our znode at start of open process and monitor it across
+    // the total open. We'll fail the open if someone hijacks our znode; we can
+    // tell this has happened if version is not as expected.
+    private volatile int version = -1;
+
+    //version of the offline node that was set by the master
+    private volatile int versionOfOfflineNode = -1;
+
+    /**
+     * Server name the handler is running on.
+     */
+    private ServerName serverName;
+
+    public ZkOpenRegionDetails() {
+    }
+
+    public ZkOpenRegionDetails(int versionOfOfflineNode) {
+      this.versionOfOfflineNode = versionOfOfflineNode;
+    }
+
+    public int getVersionOfOfflineNode() {
+      return versionOfOfflineNode;
+    }
+
+    public void setVersionOfOfflineNode(int versionOfOfflineNode) {
+      this.versionOfOfflineNode = versionOfOfflineNode;
+    }
+
+    public int getVersion() {
+      return version;
+    }
+
+    public void setVersion(int version) {
+      this.version = version;
+    }
+
+    @Override
+    public ServerName getServerName() {
+      return serverName;
+    }
+
+    @Override
+    public void setServerName(ServerName serverName) {
+      this.serverName = serverName;
+    }
+  }
+
+  private boolean isRegionStillOpening(HRegionInfo regionInfo, RegionServerServices rsServices) {
+    byte[] encodedName = regionInfo.getEncodedNameAsBytes();
+    Boolean action = rsServices.getRegionsInTransitionInRS().get(encodedName);
+    return Boolean.TRUE.equals(action); // true means opening for RIT
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/623cfa33/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index 1c4804d..529a333 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -58,6 +58,8 @@ import org.apache.hadoop.hbase.TableStateManager;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
 import org.apache.hadoop.hbase.catalog.MetaReader;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
+import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
 import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
@@ -75,6 +77,7 @@ import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
 import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
 import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
 import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction;
 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
@@ -577,8 +580,20 @@ public class AssignmentManager extends ZooKeeperListener {
           return false;
         }
       }
+
+      // TODO: This code is tied to ZK anyway, so for now leaving it as is,
+      // will refactor when whole region assignment will be abstracted from ZK
+      BaseCoordinatedStateManager cp =
+        (BaseCoordinatedStateManager) this.server.getCoordinatedStateManager();
+      OpenRegionCoordination openRegionCoordination = cp.getOpenRegionCoordination();
+
+      ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
+        new ZkOpenRegionCoordination.ZkOpenRegionDetails();
+      zkOrd.setVersion(stat.getVersion());
+      zkOrd.setServerName(cp.getServer().getServerName());
+
       return processRegionsInTransition(
-        rt, hri, stat.getVersion());
+        rt, hri, openRegionCoordination, zkOrd);
     } finally {
       lock.unlock();
     }
@@ -594,7 +609,8 @@ public class AssignmentManager extends ZooKeeperListener {
    */
   boolean processRegionsInTransition(
       final RegionTransition rt, final HRegionInfo regionInfo,
-      final int expectedVersion) throws KeeperException {
+      OpenRegionCoordination coordination,
+      final OpenRegionCoordination.OpenRegionDetails ord) throws KeeperException {
     EventType et = rt.getEventType();
     // Get ServerName.  Could not be null.
     final ServerName sn = rt.getServerName();
@@ -652,6 +668,8 @@ public class AssignmentManager extends ZooKeeperListener {
             public void process() throws IOException {
               ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
               try {
+                final int expectedVersion = ((ZkOpenRegionCoordination.ZkOpenRegionDetails) ord)
+                  .getVersion();
                 unassign(regionInfo, rsClosing, expectedVersion, null, true, null);
                 if (regionStates.isRegionOffline(regionInfo)) {
                   assign(regionInfo, true);
@@ -699,7 +717,7 @@ public class AssignmentManager extends ZooKeeperListener {
         // This could be done asynchronously, we would need then to acquire the lock in the
         //  handler.
         regionStates.updateRegionState(rt, State.OPEN);
-        new OpenedRegionHandler(server, this, regionInfo, sn, expectedVersion).process();
+        new OpenedRegionHandler(server, this, regionInfo, coordination, ord).process();
         break;
       case RS_ZK_REQUEST_REGION_SPLIT:
       case RS_ZK_REGION_SPLITTING:
@@ -748,10 +766,12 @@ public class AssignmentManager extends ZooKeeperListener {
    * <p>
    * This deals with skipped transitions (we got a CLOSED but didn't see CLOSING
    * yet).
-   * @param rt
-   * @param expectedVersion
+   * @param rt region transition
+   * @param coordination coordination for opening region
+   * @param ord details about opening region
    */
-  void handleRegion(final RegionTransition rt, int expectedVersion) {
+  void handleRegion(final RegionTransition rt, OpenRegionCoordination coordination,
+                    OpenRegionCoordination.OpenRegionDetails ord) {
     if (rt == null) {
       LOG.warn("Unexpected NULL input for RegionTransition rt");
       return;
@@ -931,7 +951,7 @@ public class AssignmentManager extends ZooKeeperListener {
           if (regionState != null) {
             failedOpenTracker.remove(encodedName); // reset the count, if any
             new OpenedRegionHandler(
-              server, this, regionState.getRegion(), sn, expectedVersion).process();
+              server, this, regionState.getRegion(), coordination, ord).process();
             updateOpenedRegionHandlerTracker(regionState.getRegion());
           }
           break;
@@ -1299,7 +1319,19 @@ public class AssignmentManager extends ZooKeeperListener {
             if (data == null) return;
 
             RegionTransition rt = RegionTransition.parseFrom(data);
-            handleRegion(rt, stat.getVersion());
+
+            // TODO: This code is tied to ZK anyway, so for now leaving it as is,
+            // will refactor when whole region assignment will be abstracted from ZK
+            BaseCoordinatedStateManager csm =
+              (BaseCoordinatedStateManager) server.getCoordinatedStateManager();
+            OpenRegionCoordination openRegionCoordination = csm.getOpenRegionCoordination();
+
+            ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
+              new ZkOpenRegionCoordination.ZkOpenRegionDetails();
+            zkOrd.setVersion(stat.getVersion());
+            zkOrd.setServerName(csm.getServer().getServerName());
+
+            handleRegion(rt, openRegionCoordination, zkOrd);
           } catch (KeeperException e) {
             server.abort("Unexpected ZK exception reading unassigned node data", e);
           } catch (DeserializationException e) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/623cfa33/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java
index 3457131..a2dc41b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java
@@ -18,21 +18,16 @@
  */
 package org.apache.hadoop.hbase.master.handler;
 
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.master.AssignmentManager;
-import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
-import org.apache.hadoop.hbase.zookeeper.ZKAssign;
-import org.apache.zookeeper.KeeperException;
 
 /**
  * Handles OPENED region event on Master.
@@ -42,9 +37,10 @@ public class OpenedRegionHandler extends EventHandler implements TotesHRegionInf
   private static final Log LOG = LogFactory.getLog(OpenedRegionHandler.class);
   private final AssignmentManager assignmentManager;
   private final HRegionInfo regionInfo;
-  private final ServerName sn;
   private final OpenedPriority priority;
-  private final int expectedVersion;
+
+  private OpenRegionCoordination coordination;
+  private OpenRegionCoordination.OpenRegionDetails ord;
 
   private enum OpenedPriority {
     META (1),
@@ -62,12 +58,13 @@ public class OpenedRegionHandler extends EventHandler implements TotesHRegionInf
 
   public OpenedRegionHandler(Server server,
       AssignmentManager assignmentManager, HRegionInfo regionInfo,
-      ServerName sn, int expectedVersion) {
+      OpenRegionCoordination coordination,
+      OpenRegionCoordination.OpenRegionDetails ord) {
     super(server, EventType.RS_ZK_REGION_OPENED);
     this.assignmentManager = assignmentManager;
     this.regionInfo = regionInfo;
-    this.sn = sn;
-    this.expectedVersion = expectedVersion;
+    this.coordination = coordination;
+    this.ord = ord;
     if(regionInfo.isMetaRegion()) {
       priority = OpenedPriority.META;
     } else if(regionInfo.getTable()
@@ -99,56 +96,8 @@ public class OpenedRegionHandler extends EventHandler implements TotesHRegionInf
 
   @Override
   public void process() {
-    // Code to defend against case where we get SPLIT before region open
-    // processing completes; temporary till we make SPLITs go via zk -- 0.92.
-    RegionState regionState = this.assignmentManager.getRegionStates()
-      .getRegionTransitionState(regionInfo.getEncodedName());
-    boolean openedNodeDeleted = false;
-    if (regionState != null && regionState.isOpened()) {
-      openedNodeDeleted = deleteOpenedNode(expectedVersion);
-      if (!openedNodeDeleted) {
-        LOG.error("Znode of region " + regionInfo.getShortNameToLog() + " could not be deleted.");
-      }
-    } else {
-      LOG.warn("Skipping the onlining of " + regionInfo.getShortNameToLog() +
-        " because regions is NOT in RIT -- presuming this is because it SPLIT");
-    }
-    if (!openedNodeDeleted) {
-      if (this.assignmentManager.getTableStateManager().isTableState(regionInfo.getTable(),
-        ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
-        debugLog(regionInfo, "Opened region "
-            + regionInfo.getShortNameToLog() + " but "
-            + "this table is disabled, triggering close of region");
+    if (!coordination.commitOpenOnMasterSide(assignmentManager,regionInfo, ord)) {
         assignmentManager.unassign(regionInfo);
-      }
-    }
-  }
-
-  private boolean deleteOpenedNode(int expectedVersion) {
-    debugLog(regionInfo, "Handling OPENED of " +
-      this.regionInfo.getShortNameToLog() + " from " + this.sn.toString() +
-      "; deleting unassigned node");
-    try {
-      // delete the opened znode only if the version matches.
-      return ZKAssign.deleteNode(server.getZooKeeper(),
-          regionInfo.getEncodedName(), EventType.RS_ZK_REGION_OPENED, expectedVersion);
-    } catch(KeeperException.NoNodeException e){
-      // Getting no node exception here means that already the region has been opened.
-      LOG.warn("The znode of the region " + regionInfo.getShortNameToLog() +
-        " would have already been deleted");
-      return false;
-    } catch (KeeperException e) {
-      server.abort("Error deleting OPENED node in ZK (" +
-        regionInfo.getRegionNameAsString() + ")", e);
-    }
-    return false;
-  }
-
-  private void debugLog(HRegionInfo region, String string) {
-    if (region.isMetaTable()) {
-      LOG.info(string);
-    } else {
-      LOG.debug(string);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/623cfa33/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 7e24610..9d90d1d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -141,6 +141,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Re
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
 import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
+import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
@@ -1188,11 +1189,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     final boolean isBulkAssign = regionCount > 1;
     for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
       final HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion());
+      OpenRegionCoordination coordination = regionServer.getCoordinatedStateManager().
+        getOpenRegionCoordination();
+      OpenRegionCoordination.OpenRegionDetails ord =
+        coordination.parseFromProtoRequest(regionOpenInfo);
 
-      int versionOfOfflineNode = -1;
-      if (regionOpenInfo.hasVersionOfOfflineNode()) {
-        versionOfOfflineNode = regionOpenInfo.getVersionOfOfflineNode();
-      }
       HTableDescriptor htd;
       try {
         final HRegion onlineRegion = regionServer.getFromOnlineRegions(region.getEncodedName());
@@ -1237,8 +1238,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
 
         if (Boolean.FALSE.equals(previous)) {
           // There is a close in progress. We need to mark this open as failed in ZK.
-          OpenRegionHandler.
-            tryTransitionFromOfflineToFailedOpen(regionServer, region, versionOfOfflineNode);
+
+          coordination.tryTransitionFromOfflineToFailedOpen(regionServer, region, ord);
 
           throw new RegionAlreadyInTransitionException("Received OPEN for the region:"
             + region.getRegionNameAsString() + " , which we are already trying to CLOSE ");
@@ -1266,12 +1267,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           // Need to pass the expected version in the constructor.
           if (region.isMetaRegion()) {
             regionServer.service.submit(new OpenMetaHandler(
-              regionServer, regionServer, region, htd, versionOfOfflineNode));
+              regionServer, regionServer, region, htd, coordination, ord));
           } else {
             regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
               regionOpenInfo.getFavoredNodesList());
             regionServer.service.submit(new OpenRegionHandler(
-              regionServer, regionServer, region, htd, versionOfOfflineNode));
+              regionServer, regionServer, region, htd, coordination, ord));
           }
         }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/623cfa33/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java
index 2278e0b..f2d5f1f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
 
 /**
  * Handles opening of a meta region on a region server.
@@ -34,13 +35,9 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 public class OpenMetaHandler extends OpenRegionHandler {
   public OpenMetaHandler(final Server server,
       final RegionServerServices rsServices, HRegionInfo regionInfo,
-      final HTableDescriptor htd) {
-    this(server, rsServices, regionInfo, htd, -1);
-  }
-  public OpenMetaHandler(final Server server,
-      final RegionServerServices rsServices, HRegionInfo regionInfo,
-      final HTableDescriptor htd, int versionOfOfflineNode) {
+      final HTableDescriptor htd, OpenRegionCoordination coordination,
+      OpenRegionCoordination.OpenRegionDetails ord) {
     super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_META,
-        versionOfOfflineNode);
+        coordination, ord);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/623cfa33/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
index 212a557..07235f5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
@@ -32,10 +32,9 @@ import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
-import org.apache.hadoop.hbase.zookeeper.ZKAssign;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.zookeeper.KeeperException;
+
 /**
  * Handles opening of a region on a region server.
  * <p>
@@ -50,34 +49,27 @@ public class OpenRegionHandler extends EventHandler {
   private final HRegionInfo regionInfo;
   private final HTableDescriptor htd;
 
-  // We get version of our znode at start of open process and monitor it across
-  // the total open. We'll fail the open if someone hijacks our znode; we can
-  // tell this has happened if version is not as expected.
-  private volatile int version = -1;
-  //version of the offline node that was set by the master
-  private volatile int versionOfOfflineNode = -1;
+  private OpenRegionCoordination coordination;
+  private OpenRegionCoordination.OpenRegionDetails ord;
 
   public OpenRegionHandler(final Server server,
       final RegionServerServices rsServices, HRegionInfo regionInfo,
-      HTableDescriptor htd) {
-    this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION, -1);
-  }
-  public OpenRegionHandler(final Server server,
-      final RegionServerServices rsServices, HRegionInfo regionInfo,
-      HTableDescriptor htd, int versionOfOfflineNode) {
+      HTableDescriptor htd, OpenRegionCoordination coordination,
+      OpenRegionCoordination.OpenRegionDetails ord) {
     this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION,
-        versionOfOfflineNode);
+        coordination, ord);
   }
 
   protected OpenRegionHandler(final Server server,
       final RegionServerServices rsServices, final HRegionInfo regionInfo,
       final HTableDescriptor htd, EventType eventType,
-      final int versionOfOfflineNode) {
+      OpenRegionCoordination coordination, OpenRegionCoordination.OpenRegionDetails ord) {
     super(server, eventType);
     this.rsServices = rsServices;
     this.regionInfo = regionInfo;
     this.htd = htd;
-    this.versionOfOfflineNode = versionOfOfflineNode;
+    this.coordination = coordination;
+    this.ord = ord;
   }
 
   public HRegionInfo getRegionInfo() {
@@ -112,13 +104,13 @@ public class OpenRegionHandler extends EventHandler {
 
       // Check that we're still supposed to open the region and transition.
       // If fails, just return.  Someone stole the region from under us.
-      // Calling transitionZookeeperOfflineToOpening initializes this.version.
+      // Calling transitionFromOfflineToOpening initializes this.version.
       if (!isRegionStillOpening()){
         LOG.error("Region " + encodedName + " opening cancelled");
         return;
       }
 
-      if (!transitionZookeeperOfflineToOpening(encodedName, versionOfOfflineNode)) {
+      if (!coordination.transitionFromOfflineToOpening(regionInfo, ord)) {
         LOG.warn("Region was hijacked? Opening cancelled for encodedName=" + encodedName);
         // This is a desperate attempt: the znode is unlikely to be ours. But we can't do more.
         return;
@@ -132,7 +124,7 @@ public class OpenRegionHandler extends EventHandler {
       }
 
       boolean failed = true;
-      if (tickleOpening("post_region_open")) {
+      if (coordination.tickleOpening(ord, regionInfo, rsServices, "post_region_open")) {
         if (updateMeta(region)) {
           failed = false;
         }
@@ -142,8 +134,7 @@ public class OpenRegionHandler extends EventHandler {
         return;
       }
 
-
-      if (!isRegionStillOpening() || !transitionToOpened(region)) {
+      if (!isRegionStillOpening() || !coordination.transitionToOpened(region, ord)) {
         // If we fail to transition to opened, it's because of one of two cases:
         //    (a) we lost our ZK lease
         // OR (b) someone else opened the region before us
@@ -173,7 +164,7 @@ public class OpenRegionHandler extends EventHandler {
     } finally {
       // Do all clean up here
       if (!openSuccessful) {
-        doCleanUpOnFailedOpen(region, transitionedToOpening);
+        doCleanUpOnFailedOpen(region, transitionedToOpening, ord);
       }
       final Boolean current = this.rsServices.getRegionsInTransitionInRS().
           remove(this.regionInfo.getEncodedNameAsBytes());
@@ -200,7 +191,8 @@ public class OpenRegionHandler extends EventHandler {
     }
   }
 
-  private void doCleanUpOnFailedOpen(HRegion region, boolean transitionedToOpening)
+  private void doCleanUpOnFailedOpen(HRegion region, boolean transitionedToOpening,
+                                     OpenRegionCoordination.OpenRegionDetails ord)
       throws IOException {
     if (transitionedToOpening) {
       try {
@@ -210,12 +202,12 @@ public class OpenRegionHandler extends EventHandler {
       } finally {
         // Even if cleanupFailed open fails we need to do this transition
         // See HBASE-7698
-        tryTransitionFromOpeningToFailedOpen(regionInfo);
+        coordination.tryTransitionFromOpeningToFailedOpen(regionInfo, ord);
       }
     } else {
       // If still transition to OPENING is not done, we need to transition znode
       // to FAILED_OPEN
-      tryTransitionFromOfflineToFailedOpen(this.rsServices, regionInfo, versionOfOfflineNode);
+      coordination.tryTransitionFromOfflineToFailedOpen(this.rsServices, regionInfo, ord);
     }
   }
 
@@ -250,7 +242,7 @@ public class OpenRegionHandler extends EventHandler {
       if (elapsed > 120000) { // 2 minutes, no need to tickleOpening too often
         // Only tickle OPENING if postOpenDeployTasks is taking some time.
         lastUpdate = now;
-        tickleOpening = tickleOpening("post_open_deploy");
+        tickleOpening = coordination.tickleOpening(ord, regionInfo, rsServices, "post_open_deploy");
       }
       synchronized (signaller) {
         try {
@@ -314,7 +306,7 @@ public class OpenRegionHandler extends EventHandler {
       try {
         this.services.postOpenDeployTasks(this.region,
           this.server.getCatalogTracker());
-      } catch (KeeperException e) {
+      } catch (IOException e) {
         server.abort("Exception running postOpenDeployTasks; region=" +
             this.region.getRegionInfo().getEncodedName(), e);
       } catch (Throwable e) {
@@ -337,131 +329,22 @@ public class OpenRegionHandler extends EventHandler {
     }
   }
 
-
-  /**
-   * @param r Region we're working on.
-   * @return whether znode is successfully transitioned to OPENED state.
-   * @throws IOException
-   */
-  boolean transitionToOpened(final HRegion r) throws IOException {
-    boolean result = false;
-    HRegionInfo hri = r.getRegionInfo();
-    final String name = hri.getRegionNameAsString();
-    // Finally, Transition ZK node to OPENED
-    try {
-      if (ZKAssign.transitionNodeOpened(this.server.getZooKeeper(), hri,
-          this.server.getServerName(), this.version) == -1) {
-        String warnMsg = "Completed the OPEN of region " + name +
-          " but when transitioning from " + " OPENING to OPENED ";
-        try {
-          String node = ZKAssign.getNodeName(this.server.getZooKeeper(), hri.getEncodedName());
-          if (ZKUtil.checkExists(this.server.getZooKeeper(), node) < 0) {
-            // if the znode 
-            rsServices.abort(warnMsg + "the znode disappeared", null);
-          } else {
-            LOG.warn(warnMsg + "got a version mismatch, someone else clashed; " +
-          "so now unassigning -- closing region on server: " + this.server.getServerName());
-          }
-        } catch (KeeperException ke) {
-          rsServices.abort(warnMsg, ke);
-        }
-      } else {
-        LOG.debug("Transitioned " + r.getRegionInfo().getEncodedName() +
-          " to OPENED in zk on " + this.server.getServerName());
-        result = true;
-      }
-    } catch (KeeperException e) {
-      LOG.error("Failed transitioning node " + name +
-        " from OPENING to OPENED -- closing region", e);
-    }
-    return result;
-  }
-
-  /**
-   * This is not guaranteed to succeed, we just do our best.
-   * @param hri Region we're working on.
-   * @return whether znode is successfully transitioned to FAILED_OPEN state.
-   */
-  private boolean tryTransitionFromOpeningToFailedOpen(final HRegionInfo hri) {
-    boolean result = false;
-    final String name = hri.getRegionNameAsString();
-    try {
-      LOG.info("Opening of region " + hri + " failed, transitioning" +
-          " from OPENING to FAILED_OPEN in ZK, expecting version " + this.version);
-      if (ZKAssign.transitionNode(
-          this.server.getZooKeeper(), hri,
-          this.server.getServerName(),
-          EventType.RS_ZK_REGION_OPENING,
-          EventType.RS_ZK_REGION_FAILED_OPEN,
-          this.version) == -1) {
-        LOG.warn("Unable to mark region " + hri + " as FAILED_OPEN. " +
-            "It's likely that the master already timed out this open " +
-            "attempt, and thus another RS already has the region.");
-      } else {
-        result = true;
-      }
-    } catch (KeeperException e) {
-      LOG.error("Failed transitioning node " + name +
-        " from OPENING to FAILED_OPEN", e);
-    }
-    return result;
-  }
-
-  /**
-   * Try to transition to open. This function is static to make it usable before creating the
-   *  handler.
-   *
-   * This is not guaranteed to succeed, we just do our best.
-   *
-   * @param rsServices
-   * @param hri Region we're working on.
-   * @param versionOfOfflineNode version to checked.
-   * @return whether znode is successfully transitioned to FAILED_OPEN state.
-   */
-  public static boolean tryTransitionFromOfflineToFailedOpen(RegionServerServices rsServices,
-       final HRegionInfo hri, final int versionOfOfflineNode) {
-    boolean result = false;
-    final String name = hri.getRegionNameAsString();
-    try {
-      LOG.info("Opening of region " + hri + " failed, transitioning" +
-          " from OFFLINE to FAILED_OPEN in ZK, expecting version " + versionOfOfflineNode);
-      if (ZKAssign.transitionNode(
-          rsServices.getZooKeeper(), hri,
-          rsServices.getServerName(),
-          EventType.M_ZK_REGION_OFFLINE,
-          EventType.RS_ZK_REGION_FAILED_OPEN,
-          versionOfOfflineNode) == -1) {
-        LOG.warn("Unable to mark region " + hri + " as FAILED_OPEN. " +
-            "It's likely that the master already timed out this open " +
-            "attempt, and thus another RS already has the region.");
-      } else {
-        result = true;
-      }
-    } catch (KeeperException e) {
-      LOG.error("Failed transitioning node " + name + " from OFFLINE to FAILED_OPEN", e);
-    }
-    return result;
-  }
-
-
   /**
    * @return Instance of HRegion if successful open else null.
    */
   HRegion openRegion() {
     HRegion region = null;
     try {
-      // Instantiate the region.  This also periodically tickles our zk OPENING
+      // Instantiate the region.  This also periodically tickles OPENING
       // state so master doesn't timeout this region in transition.
       region = HRegion.openHRegion(this.regionInfo, this.htd,
-          this.rsServices.getWAL(this.regionInfo),
-          this.server.getConfiguration(),
-          this.rsServices,
+        this.rsServices.getWAL(this.regionInfo),
+        this.server.getConfiguration(),
+        this.rsServices,
         new CancelableProgressable() {
           public boolean progress() {
-            // We may lose the znode ownership during the open.  Currently its
-            // too hard interrupting ongoing region open.  Just let it complete
-            // and check we still have the znode after region open.
-            return tickleOpening("open_region_progress");
+            // if tickle failed, we need to cancel opening region.
+            return coordination.tickleOpening(ord, regionInfo, rsServices, "open_region_progress");
           }
         });
     } catch (Throwable t) {
@@ -495,70 +378,4 @@ public class OpenRegionHandler extends EventHandler {
     Boolean action = rsServices.getRegionsInTransitionInRS().get(encodedName);
     return Boolean.TRUE.equals(action); // true means opening for RIT
   }
-
-  /**
-   * Transition ZK node from OFFLINE to OPENING.
-   * @param encodedName Name of the znode file (Region encodedName is the znode
-   * name).
-   * @param versionOfOfflineNode - version Of OfflineNode that needs to be compared
-   * before changing the node's state from OFFLINE
-   * @return True if successful transition.
-   */
-  boolean transitionZookeeperOfflineToOpening(final String encodedName,
-      int versionOfOfflineNode) {
-    // TODO: should also handle transition from CLOSED?
-    try {
-      // Initialize the znode version.
-      this.version = ZKAssign.transitionNode(server.getZooKeeper(), regionInfo,
-          server.getServerName(), EventType.M_ZK_REGION_OFFLINE,
-          EventType.RS_ZK_REGION_OPENING, versionOfOfflineNode);
-    } catch (KeeperException e) {
-      LOG.error("Error transition from OFFLINE to OPENING for region=" +
-        encodedName, e);
-      this.version = -1;
-      return false;
-    }
-    boolean b = isGoodVersion();
-    if (!b) {
-      LOG.warn("Failed transition from OFFLINE to OPENING for region=" +
-        encodedName);
-    }
-    return b;
-  }
-
-  /**
-   * Update our OPENING state in zookeeper.
-   * Do this so master doesn't timeout this region-in-transition.
-   * @param context Some context to add to logs if failure
-   * @return True if successful transition.
-   */
-  boolean tickleOpening(final String context) {
-    if (!isRegionStillOpening()) {
-      LOG.warn("Open region aborted since it isn't opening any more");
-      return false;
-    }
-    // If previous checks failed... do not try again.
-    if (!isGoodVersion()) return false;
-    String encodedName = this.regionInfo.getEncodedName();
-    try {
-      this.version =
-        ZKAssign.confirmNodeOpening(server.getZooKeeper(),
-          this.regionInfo, this.server.getServerName(), this.version);
-    } catch (KeeperException e) {
-      server.abort("Exception refreshing OPENING; region=" + encodedName +
-        ", context=" + context, e);
-      this.version = -1;
-      return false;
-    }
-    boolean b = isGoodVersion();
-    if (!b) {
-      LOG.warn("Failed refreshing OPENING; region=" + encodedName +
-        ", context=" + context);
-    }
-    return b;
-  }
-
-  private boolean isGoodVersion() {
-    return this.version != -1;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/623cfa33/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
index c7d3f1f..f5e8952 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
@@ -54,6 +54,9 @@ import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
+import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.executor.ExecutorService;
@@ -255,7 +258,7 @@ public class TestAssignmentManager {
   }
 
   @Test(timeout = 60000)
-  public void testBalanceOnMasterFailoverScenarioWithClosedNode() 
+  public void testBalanceOnMasterFailoverScenarioWithClosedNode()
       throws IOException, KeeperException, InterruptedException, ServiceException,
         DeserializationException, CoordinatedStateException {
     AssignmentManagerWithExtrasForTesting am =
@@ -876,9 +879,19 @@ public class TestAssignmentManager {
     am.getRegionStates().createRegionState(REGIONINFO);
     am.gate.set(false);
     CatalogTracker ct = Mockito.mock(CatalogTracker.class);
-    assertFalse(am.processRegionsInTransition(rt, REGIONINFO, version));
-    am.getTableStateManager().setTableState(REGIONINFO.getTable(),
-      Table.State.ENABLED);
+
+    BaseCoordinatedStateManager cp = new ZkCoordinatedStateManager();
+    cp.initialize(server);
+    cp.start();
+
+    OpenRegionCoordination orc = cp.getOpenRegionCoordination();
+    ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
+      new ZkOpenRegionCoordination.ZkOpenRegionDetails();
+    zkOrd.setServerName(server.getServerName());
+    zkOrd.setVersion(version);
+
+    assertFalse(am.processRegionsInTransition(rt, REGIONINFO, orc, zkOrd));
+    am.getTableStateManager().setTableState(REGIONINFO.getTable(), Table.State.ENABLED);
     processServerShutdownHandler(ct, am, false);
     // Waiting for the assignment to get completed.
     while (!am.gate.get()) {
@@ -1357,8 +1370,9 @@ public class TestAssignmentManager {
       this.serverManager, ct, balancer, null, null, master.getTableLockManager()) {
 
       @Override
-      void handleRegion(final RegionTransition rt, int expectedVersion) {
-        super.handleRegion(rt, expectedVersion);
+      void handleRegion(final RegionTransition rt, OpenRegionCoordination coordination,
+                        OpenRegionCoordination.OpenRegionDetails ord) {
+        super.handleRegion(rt, coordination, ord);
         if (rt != null && Bytes.equals(hri.getRegionName(),
           rt.getRegionName()) && rt.getEventType() == EventType.RS_ZK_REGION_OPENING) {
           zkEventProcessed.set(true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/623cfa33/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java
index 7bd7893..e91b1c2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java
@@ -31,6 +31,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
+import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
+import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -137,8 +141,17 @@ public class TestOpenedRegionHandler {
       ZKUtil.getDataAndWatch(zkw, nodeName, stat);
 
       // use the version for the OpenedRegionHandler
+      BaseCoordinatedStateManager csm = new ZkCoordinatedStateManager();
+      csm.initialize(server);
+      csm.start();
+
+      OpenRegionCoordination orc = csm.getOpenRegionCoordination();
+      ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
+        new ZkOpenRegionCoordination.ZkOpenRegionDetails();
+      zkOrd.setServerName(server.getServerName());
+      zkOrd.setVersion(stat.getVersion());
       OpenedRegionHandler handler = new OpenedRegionHandler(server, am, region
-          .getRegionInfo(), server.getServerName(), stat.getVersion());
+          .getRegionInfo(), orc, zkOrd);
       // Once again overwrite the same znode so that the version changes.
       ZKAssign.transitionNode(zkw, region.getRegionInfo(), server
           .getServerName(), EventType.RS_ZK_REGION_OPENED,

http://git-wip-us.apache.org/repos/asf/hbase/blob/623cfa33/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
index b7cc51a..2d94a77 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
@@ -31,6 +31,9 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.catalog.MetaEditor;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
+import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
@@ -341,7 +344,18 @@ public class TestRegionServerNoMaster {
 
     // Let's start the open handler
     HTableDescriptor htd = getRS().tableDescriptors.get(hri.getTable());
-    getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd, 0));
+
+    BaseCoordinatedStateManager csm = new ZkCoordinatedStateManager();
+    csm.initialize(getRS());
+    csm.start();
+
+    ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd =
+      new ZkOpenRegionCoordination.ZkOpenRegionDetails();
+    zkCrd.setServerName(getRS().getServerName());
+    zkCrd.setVersionOfOfflineNode(0);
+
+    getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd,
+      csm.getOpenRegionCoordination(), zkCrd));
 
     // The open handler should have removed the region from RIT but kept the region closed
     checkRegionIsClosed();
@@ -395,7 +409,18 @@ public class TestRegionServerNoMaster {
     //  2) The region in RIT was changed.
     // The order is more or less implementation dependant.
     HTableDescriptor htd = getRS().tableDescriptors.get(hri.getTable());
-    getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd, 0));
+
+    BaseCoordinatedStateManager csm = new ZkCoordinatedStateManager();
+    csm.initialize(getRS());
+    csm.start();
+
+    ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd =
+      new ZkOpenRegionCoordination.ZkOpenRegionDetails();
+    zkCrd.setServerName(getRS().getServerName());
+    zkCrd.setVersionOfOfflineNode(0);
+
+    getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd,
+      csm.getOpenRegionCoordination(), zkCrd));
 
     // The open handler should have removed the region from RIT but kept the region closed
     checkRegionIsClosed();

http://git-wip-us.apache.org/repos/asf/hbase/blob/623cfa33/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java
index 6793ff5..43d5875 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hbase.RegionTransition;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
 import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.executor.EventType;
@@ -138,13 +139,13 @@ public class TestCloseRegionHandler {
       HRegion.closeHRegion(region);
     }
   }
-  
+
      /**
       * Test if close region can handle ZK closing node version mismatch
       * @throws IOException
       * @throws NodeExistsException
       * @throws KeeperException
-     * @throws DeserializationException 
+     * @throws DeserializationException
       */
      @Test public void testZKClosingNodeVersionMismatch()
      throws IOException, NodeExistsException, KeeperException, DeserializationException {
@@ -153,38 +154,38 @@ public class TestCloseRegionHandler {
 
        HTableDescriptor htd = TEST_HTD;
        final HRegionInfo hri = TEST_HRI;
-   
+
+       ZkCoordinatedStateManager coordinationProvider = new ZkCoordinatedStateManager();
+       coordinationProvider.initialize(server);
+       coordinationProvider.start();
+
        // open a region first so that it can be closed later
-       OpenRegion(server, rss, htd, hri);
-   
+       OpenRegion(server, rss, htd, hri, coordinationProvider.getOpenRegionCoordination());
+
        // close the region
        // Create it CLOSING, which is what Master set before sending CLOSE RPC
        int versionOfClosingNode = ZKAssign.createNodeClosing(server.getZooKeeper(),
          hri, server.getServerName());
-   
+
        // The CloseRegionHandler will validate the expected version
        // Given it is set to invalid versionOfClosingNode+1,
        // CloseRegionHandler should be M_ZK_REGION_CLOSING
 
-       ZkCoordinatedStateManager consensusProvider = new ZkCoordinatedStateManager();
-       consensusProvider.initialize(server);
-       consensusProvider.start();
-
        ZkCloseRegionCoordination.ZkCloseRegionDetails zkCrd =
          new ZkCloseRegionCoordination.ZkCloseRegionDetails();
        zkCrd.setPublishStatusInZk(true);
        zkCrd.setExpectedVersion(versionOfClosingNode+1);
 
        CloseRegionHandler handler = new CloseRegionHandler(server, rss, hri, false,
-         consensusProvider.getCloseRegionCoordination(), zkCrd);
+         coordinationProvider.getCloseRegionCoordination(), zkCrd);
        handler.process();
-   
+
        // Handler should remain in M_ZK_REGION_CLOSING
        RegionTransition rt =
          RegionTransition.parseFrom(ZKAssign.getData(server.getZooKeeper(), hri.getEncodedName()));
        assertTrue(rt.getEventType().equals(EventType.M_ZK_REGION_CLOSING ));
      }
-  
+
      /**
       * Test if the region can be closed properly
       * @throws IOException
@@ -196,33 +197,33 @@ public class TestCloseRegionHandler {
      throws IOException, NodeExistsException, KeeperException, DeserializationException {
        final Server server = new MockServer(HTU);
        final RegionServerServices rss = HTU.createMockRegionServerService();
-   
+
        HTableDescriptor htd = TEST_HTD;
        HRegionInfo hri = TEST_HRI;
-   
+
+       ZkCoordinatedStateManager coordinationProvider = new ZkCoordinatedStateManager();
+       coordinationProvider.initialize(server);
+       coordinationProvider.start();
+
        // open a region first so that it can be closed later
-       OpenRegion(server, rss, htd, hri);
-   
+       OpenRegion(server, rss, htd, hri, coordinationProvider.getOpenRegionCoordination());
+
        // close the region
        // Create it CLOSING, which is what Master set before sending CLOSE RPC
        int versionOfClosingNode = ZKAssign.createNodeClosing(server.getZooKeeper(),
          hri, server.getServerName());
-   
+
        // The CloseRegionHandler will validate the expected version
        // Given it is set to correct versionOfClosingNode,
        // CloseRegionHandlerit should be RS_ZK_REGION_CLOSED
 
-       ZkCoordinatedStateManager consensusProvider = new ZkCoordinatedStateManager();
-       consensusProvider.initialize(server);
-       consensusProvider.start();
-
        ZkCloseRegionCoordination.ZkCloseRegionDetails zkCrd =
          new ZkCloseRegionCoordination.ZkCloseRegionDetails();
        zkCrd.setPublishStatusInZk(true);
        zkCrd.setExpectedVersion(versionOfClosingNode);
 
        CloseRegionHandler handler = new CloseRegionHandler(server, rss, hri, false,
-         consensusProvider.getCloseRegionCoordination(), zkCrd);
+         coordinationProvider.getCloseRegionCoordination(), zkCrd);
        handler.process();
        // Handler should have transitioned it to RS_ZK_REGION_CLOSED
        RegionTransition rt = RegionTransition.parseFrom(
@@ -231,11 +232,15 @@ public class TestCloseRegionHandler {
      }
 
      private void OpenRegion(Server server, RegionServerServices rss,
-         HTableDescriptor htd, HRegionInfo hri)
+         HTableDescriptor htd, HRegionInfo hri, OpenRegionCoordination coordination)
      throws IOException, NodeExistsException, KeeperException, DeserializationException {
        // Create it OFFLINE node, which is what Master set before sending OPEN RPC
        ZKAssign.createNodeOffline(server.getZooKeeper(), hri, server.getServerName());
-       OpenRegionHandler openHandler = new OpenRegionHandler(server, rss, hri, htd);
+
+       OpenRegionCoordination.OpenRegionDetails ord =
+         coordination.getDetailsForNonCoordinatedOpening();
+       OpenRegionHandler openHandler =
+         new OpenRegionHandler(server, rss, hri, htd, coordination, ord);
        rss.getRegionsInTransitionInRS().put(hri.getEncodedNameAsBytes(), Boolean.TRUE);
        openHandler.process();
        // This parse is not used?

http://git-wip-us.apache.org/repos/asf/hbase/blob/623cfa33/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java
index 7c7cf5a..869d727 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java
@@ -25,6 +25,8 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
+import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
@@ -96,7 +98,16 @@ public class TestOpenRegionHandler {
             .getConfiguration(), htd);
     assertNotNull(region);
     try {
-      OpenRegionHandler handler = new OpenRegionHandler(server, rss, hri, htd) {
+      ZkCoordinatedStateManager csm = new ZkCoordinatedStateManager();
+      csm.initialize(server);
+      csm.start();
+
+      ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd =
+        new ZkOpenRegionCoordination.ZkOpenRegionDetails();
+      zkCrd.setServerName(server.getServerName());
+
+      OpenRegionHandler handler = new OpenRegionHandler(server, rss, hri,
+        htd, csm.getOpenRegionCoordination(), zkCrd) {
         HRegion openRegion() {
           // Open region first, then remove znode as though it'd been hijacked.
           HRegion region = super.openRegion();
@@ -150,10 +161,22 @@ public class TestOpenRegionHandler {
             .getConfiguration(), htd);
     assertNotNull(region);
     try {
-      OpenRegionHandler handler = new OpenRegionHandler(server, rss, hri, htd) {
-        boolean transitionToOpened(final HRegion r) throws IOException {
+
+      ZkCoordinatedStateManager csm = new ZkCoordinatedStateManager();
+      csm.initialize(server);
+      csm.start();
+
+      ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd =
+        new ZkOpenRegionCoordination.ZkOpenRegionDetails();
+      zkCrd.setServerName(server.getServerName());
+
+      ZkOpenRegionCoordination openRegionCoordination =
+        new ZkOpenRegionCoordination(csm, server.getZooKeeper()) {
+        @Override
+        public boolean transitionToOpened(final HRegion r, OpenRegionDetails ord)
+            throws IOException {
           // remove znode simulating intermittent zookeeper connection issue
-          ZooKeeperWatcher zkw = this.server.getZooKeeper();
+          ZooKeeperWatcher zkw = server.getZooKeeper();
           String node = ZKAssign.getNodeName(zkw, hri.getEncodedName());
           try {
             ZKUtil.deleteNodeFailSilent(zkw, node);
@@ -161,9 +184,12 @@ public class TestOpenRegionHandler {
             throw new RuntimeException("Ugh failed delete of " + node, e);
           }
           // then try to transition to OPENED
-          return super.transitionToOpened(r);
+          return super.transitionToOpened(r, ord);
         }
       };
+
+      OpenRegionHandler handler = new OpenRegionHandler(server, rss, hri, htd,
+        openRegionCoordination, zkCrd);
       rss.getRegionsInTransitionInRS().put(
         hri.getEncodedNameAsBytes(), Boolean.TRUE);
       // Call process without first creating OFFLINE region in zk, see if
@@ -182,7 +208,7 @@ public class TestOpenRegionHandler {
     // Region server is expected to abort due to OpenRegionHandler perceiving transitioning
     // to OPENED as failed
     // This was corresponding to the second handler.process() call above.
-    assertTrue("region server should have aborted", rss.isAborted());
+    assertTrue("region server should have aborted", server.isAborted());
   }
   
   @Test
@@ -193,9 +219,18 @@ public class TestOpenRegionHandler {
     // Create it OFFLINE, which is what it expects
     ZKAssign.createNodeOffline(server.getZooKeeper(), TEST_HRI, server.getServerName());
 
+    ZkCoordinatedStateManager csm = new ZkCoordinatedStateManager();
+    csm.initialize(server);
+    csm.start();
+
+    ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd =
+      new ZkOpenRegionCoordination.ZkOpenRegionDetails();
+    zkCrd.setServerName(server.getServerName());
+
     // Create the handler
     OpenRegionHandler handler =
-      new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD) {
+      new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD,
+        csm.getOpenRegionCoordination(), zkCrd) {
         @Override
         HRegion openRegion() {
           // Fake failure of opening a region due to an IOE, which is caught
@@ -221,8 +256,16 @@ public class TestOpenRegionHandler {
     ZKAssign.createNodeOffline(server.getZooKeeper(), TEST_HRI, server.getServerName());
 
     // Create the handler
-    OpenRegionHandler handler =
-      new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD) {
+    ZkCoordinatedStateManager csm = new ZkCoordinatedStateManager();
+    csm.initialize(server);
+    csm.start();
+
+    ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd =
+      new ZkOpenRegionCoordination.ZkOpenRegionDetails();
+    zkCrd.setServerName(server.getServerName());
+
+    OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD,
+      csm.getOpenRegionCoordination(), zkCrd) {
         @Override
         boolean updateMeta(final HRegion r) {
           // Fake failure of updating META
@@ -246,7 +289,16 @@ public class TestOpenRegionHandler {
     // Create it OFFLINE, which is what it expects
     ZKAssign.createNodeOffline(server.getZooKeeper(), TEST_HRI, server.getServerName());
     // Create the handler
-    OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD) {
+    ZkCoordinatedStateManager csm = new ZkCoordinatedStateManager();
+    csm.initialize(server);
+    csm.start();
+
+    ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd =
+      new ZkOpenRegionCoordination.ZkOpenRegionDetails();
+    zkCrd.setServerName(server.getServerName());
+
+    OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD,
+      csm.getOpenRegionCoordination(), zkCrd) {
       @Override
       boolean updateMeta(HRegion r) {
         return false;
@@ -275,13 +327,25 @@ public class TestOpenRegionHandler {
     // Create it OFFLINE, which is what it expects
     ZKAssign.createNodeOffline(server.getZooKeeper(), TEST_HRI, server.getServerName());
     // Create the handler
-    OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD) {
+    ZkCoordinatedStateManager csm = new ZkCoordinatedStateManager();
+    csm.initialize(server);
+    csm.start();
 
+    ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd =
+      new ZkOpenRegionCoordination.ZkOpenRegionDetails();
+    zkCrd.setServerName(server.getServerName());
+
+    ZkOpenRegionCoordination openRegionCoordination =
+      new ZkOpenRegionCoordination(csm, server.getZooKeeper()) {
       @Override
-      boolean transitionZookeeperOfflineToOpening(String encodedName, int versionOfOfflineNode) {
+      public boolean transitionFromOfflineToOpening(HRegionInfo regionInfo,
+                                                    OpenRegionDetails ord) {
         return false;
       }
     };
+
+    OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD,
+      openRegionCoordination, zkCrd);
     rsServices.getRegionsInTransitionInRS().put(TEST_HRI.getEncodedNameAsBytes(), Boolean.TRUE);
 
     handler.process();