You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2014/06/14 17:40:37 UTC

[2/5] HBASE-11059 ZK-less region assignment

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index 31fe2a0..3a87410 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -179,7 +179,8 @@ public class ServerManager {
    * is currently in startup mode. In this case, the dead server will be parked in this set
    * temporarily.
    */
-  private Map<ServerName, Boolean> requeuedDeadServers = new HashMap<ServerName, Boolean>();
+  private Map<ServerName, Boolean> requeuedDeadServers
+    = new ConcurrentHashMap<ServerName, Boolean>();
 
   /** Listeners that are called on server events. */
   private List<ServerListener> listeners = new CopyOnWriteArrayList<ServerListener>();
@@ -985,6 +986,15 @@ public class ServerManager {
   }
 
   /**
+   * During startup, if we figure it is not a failover, i.e. there is
+   * no more HLog files to split, we won't try to recover these dead servers.
+   * So we just remove them from the queue. Use caution in calling this.
+   */
+  void removeRequeuedDeadServers() {
+    requeuedDeadServers.clear();
+  }
+
+  /**
    * @return A copy of the internal map of requeuedDeadServers servers and their corresponding
    *         splitlog need flag.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/main/java/org/apache/hadoop/hbase/master/UnAssignCallable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/UnAssignCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/UnAssignCallable.java
new file mode 100644
index 0000000..a627548
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/UnAssignCallable.java
@@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+
+/**
+ * A callable object that invokes the corresponding action that needs to be
+ * taken for unassignment of a region in transition. Implementing as future
+ * callable we are able to act on the timeout asynchronously.
+ */
+@InterfaceAudience.Private
+public class UnAssignCallable implements Callable<Object> {
+  private AssignmentManager assignmentManager;
+
+  private HRegionInfo hri;
+
+  public UnAssignCallable(AssignmentManager assignmentManager, HRegionInfo hri) {
+    this.assignmentManager = assignmentManager;
+    this.hri = hri;
+  }
+
+  @Override
+  public Object call() throws Exception {
+    assignmentManager.unassign(hri, true);
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
index fcbe4f2..50e09ad 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
@@ -21,10 +21,7 @@ package org.apache.hadoop.hbase.master.handler;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
 import java.util.Set;
 import java.util.concurrent.locks.Lock;
 
@@ -37,11 +34,11 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
 import org.apache.hadoop.hbase.catalog.MetaReader;
-import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.master.DeadServer;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.master.RegionState.State;
@@ -49,7 +46,6 @@ import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.master.ServerManager;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
-import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
 import org.apache.zookeeper.KeeperException;
 
@@ -160,14 +156,14 @@ public class ServerShutdownHandler extends EventHandler {
       // completed (zk is updated after edits to hbase:meta have gone in).  See
       // {@link SplitTransaction}.  We'd also have to be figure another way for
       // doing the below hbase:meta daughters fixup.
-      NavigableMap<HRegionInfo, Result> hris = null;
+      Set<HRegionInfo> hris = null;
       while (!this.server.isStopped()) {
         try {
           this.server.getCatalogTracker().waitForMeta();
           // Skip getting user regions if the server is stopped.
           if (!this.server.isStopped()) {
             hris = MetaReader.getServerUserRegions(this.server.getCatalogTracker(),
-                this.serverName);
+              this.serverName).keySet();
           }
           break;
         } catch (InterruptedException e) {
@@ -193,9 +189,8 @@ public class ServerShutdownHandler extends EventHandler {
           LOG.info("Splitting logs for " + serverName + " before assignment.");
           if (distributedLogReplay) {
             LOG.info("Mark regions in recovery before assignment.");
-            Set<ServerName> serverNames = new HashSet<ServerName>();
-            serverNames.add(serverName);
-            this.services.getMasterFileSystem().prepareLogReplay(serverNames);
+            MasterFileSystem mfs = this.services.getMasterFileSystem();
+            mfs.prepareLogReplay(serverName, hris);
           } else {
             this.services.getMasterFileSystem().splitLog(serverName);
           }
@@ -221,10 +216,9 @@ public class ServerShutdownHandler extends EventHandler {
       toAssignRegions.addAll(regionsInTransition);
 
       // Iterate regions that were on this server and assign them
-      if (hris != null) {
+      if (hris != null && !hris.isEmpty()) {
         RegionStates regionStates = am.getRegionStates();
-        for (Map.Entry<HRegionInfo, Result> e: hris.entrySet()) {
-          HRegionInfo hri = e.getKey();
+        for (HRegionInfo hri: hris) {
           if (regionsInTransition.contains(hri)) {
             continue;
           }
@@ -232,7 +226,7 @@ public class ServerShutdownHandler extends EventHandler {
           Lock lock = am.acquireRegionLock(encodedName);
           try {
             RegionState rit = regionStates.getRegionTransitionState(hri);
-            if (processDeadRegion(hri, e.getValue(), am, server.getCatalogTracker())) {
+            if (processDeadRegion(hri, am, server.getCatalogTracker())) {
               ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
               if (addressFromAM != null && !addressFromAM.equals(this.serverName)) {
                 // If this region is in transition on the dead server, it must be
@@ -258,7 +252,7 @@ public class ServerShutdownHandler extends EventHandler {
                 }
               } else if (regionStates.isRegionInState(
                   hri, State.SPLITTING_NEW, State.MERGING_NEW)) {
-                regionStates.regionOffline(hri);
+                regionStates.updateRegionState(hri, State.OFFLINE);
               }
               toAssignRegions.add(hri);
             } else if (rit != null) {
@@ -332,13 +326,12 @@ public class ServerShutdownHandler extends EventHandler {
    * Process a dead region from a dead RS. Checks if the region is disabled or
    * disabling or if the region has a partially completed split.
    * @param hri
-   * @param result
    * @param assignmentManager
    * @param catalogTracker
    * @return Returns true if specified region should be assigned, false if not.
    * @throws IOException
    */
-  public static boolean processDeadRegion(HRegionInfo hri, Result result,
+  public static boolean processDeadRegion(HRegionInfo hri,
       AssignmentManager assignmentManager, CatalogTracker catalogTracker)
   throws IOException {
     boolean tablePresent = assignmentManager.getTableStateManager().isTablePresent(hri.getTable());

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index ea6ee53..59713df 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -108,7 +108,11 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition.TransitionCode;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionTransitionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionTransitionResponse;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
@@ -121,12 +125,12 @@ import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CompressionTest;
+import org.apache.hadoop.hbase.util.ConfigUtil;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Sleeper;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.util.VersionInfo;
@@ -264,7 +268,7 @@ public class HRegionServer extends HasThread implements
   protected final int numRegionsToReport;
 
   // Stub to do region server status calls against the master.
-  private RegionServerStatusService.BlockingInterface rssStub;
+  private volatile RegionServerStatusService.BlockingInterface rssStub;
   // RPC client. Used to make the stub above that does region server status checking.
   RpcClient rpcClient;
 
@@ -393,6 +397,8 @@ public class HRegionServer extends HasThread implements
 
   protected BaseCoordinatedStateManager csm;
 
+  private final boolean useZKForAssignment;
+
   /**
    * Starts a HRegionServer at the default location.
    * @param conf
@@ -460,6 +466,8 @@ public class HRegionServer extends HasThread implements
       }
     };
 
+    useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
+
     // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
     // underlying hadoop hdfs accessors will be going against wrong filesystem
     // (unless all is set to defaults).
@@ -928,8 +936,9 @@ public class HRegionServer extends HasThread implements
   @VisibleForTesting
   protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
   throws IOException {
-    if (this.rssStub == null) {
-      // the current server is stopping.
+    RegionServerStatusService.BlockingInterface rss = rssStub;
+    if (rss == null) {
+      // the current server could be stopping.
       return;
     }
     ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
@@ -939,18 +948,19 @@ public class HRegionServer extends HasThread implements
         this.serverName.getVersionedBytes());
       request.setServer(ProtobufUtil.toServerName(sn));
       request.setLoad(sl);
-      this.rssStub.regionServerReport(null, request.build());
+      rss.regionServerReport(null, request.build());
     } catch (ServiceException se) {
       IOException ioe = ProtobufUtil.getRemoteException(se);
       if (ioe instanceof YouAreDeadException) {
         // This will be caught and handled as a fatal error in run()
         throw ioe;
       }
+      if (rssStub == rss) {
+        rssStub = null;
+      }
       // Couldn't connect to the master, get location from zk and reconnect
       // Method blocks until new master is found or we are stopped
-      Pair<ServerName, RegionServerStatusService.BlockingInterface> p =
-        createRegionServerStatusStub();
-      this.rssStub = p.getSecond();
+      createRegionServerStatusStub();
     }
   }
 
@@ -1670,12 +1680,62 @@ public class HRegionServer extends HasThread implements
     // Update ZK, or META
     if (r.getRegionInfo().isMetaRegion()) {
       MetaRegionTracker.setMetaLocation(getZooKeeper(), serverName);
-    } else {
+    } else if (useZKForAssignment) {
       MetaEditor.updateRegionLocation(ct, r.getRegionInfo(),
         this.serverName, openSeqNum);
     }
+    if (!useZKForAssignment && !reportRegionTransition(
+        TransitionCode.OPENED, openSeqNum, r.getRegionInfo())) {
+      throw new IOException("Failed to report opened region to master: "
+        + r.getRegionNameAsString());
+    }
+
     LOG.debug("Finished post open deploy task for " + r.getRegionNameAsString());
+  }
+
+  @Override
+  public boolean reportRegionTransition(TransitionCode code, HRegionInfo... hris) {
+    return reportRegionTransition(code, HConstants.NO_SEQNUM, hris);
+  }
 
+  @Override
+  public boolean reportRegionTransition(
+      TransitionCode code, long openSeqNum, HRegionInfo... hris) {
+    ReportRegionTransitionRequest.Builder builder = ReportRegionTransitionRequest.newBuilder();
+    builder.setServer(ProtobufUtil.toServerName(serverName));
+    RegionTransition.Builder transition = builder.addTransitionBuilder();
+    transition.setTransitionCode(code);
+    if (code == TransitionCode.OPENED && openSeqNum >= 0) {
+      transition.setOpenSeqNum(openSeqNum);
+    }
+    for (HRegionInfo hri: hris) {
+      transition.addRegionInfo(HRegionInfo.convert(hri));
+    }
+    ReportRegionTransitionRequest request = builder.build();
+    while (keepLooping()) {
+      RegionServerStatusService.BlockingInterface rss = rssStub;
+      try {
+        if (rss == null) {
+          createRegionServerStatusStub();
+          continue;
+        }
+        ReportRegionTransitionResponse response =
+          rss.reportRegionTransition(null, request);
+        if (response.hasErrorMessage()) {
+          LOG.info("Failed to transition " + hris[0]
+            + " to " + code + ": " + response.getErrorMessage());
+          return false;
+        }
+        return true;
+      } catch (ServiceException se) {
+        IOException ioe = ProtobufUtil.getRemoteException(se);
+        LOG.info("Failed to report region transition, will retry", ioe);
+        if (rssStub == rss) {
+          rssStub = null;
+        }
+      }
+    }
+    return false;
   }
 
   @Override
@@ -1819,8 +1879,10 @@ public class HRegionServer extends HasThread implements
    *
    * @return master + port, or null if server has been stopped
    */
-  private Pair<ServerName, RegionServerStatusService.BlockingInterface>
-      createRegionServerStatusStub() {
+  private synchronized ServerName createRegionServerStatusStub() {
+    if (rssStub != null) {
+      return masterAddressTracker.getMasterAddress();
+    }
     ServerName sn = null;
     long previousLogTime = 0;
     RegionServerStatusService.BlockingInterface master = null;
@@ -1880,7 +1942,8 @@ public class HRegionServer extends HasThread implements
         Thread.currentThread().interrupt();
       }
     }
-    return new Pair<ServerName, RegionServerStatusService.BlockingInterface>(sn, intf);
+    rssStub = intf;
+    return sn;
   }
 
   /**
@@ -1899,12 +1962,9 @@ public class HRegionServer extends HasThread implements
    * @throws IOException
    */
   private RegionServerStartupResponse reportForDuty() throws IOException {
+    ServerName masterServerName = createRegionServerStatusStub();
+    if (masterServerName == null) return null;
     RegionServerStartupResponse result = null;
-    Pair<ServerName, RegionServerStatusService.BlockingInterface> p =
-      createRegionServerStatusStub();
-    this.rssStub = p.getSecond();
-    ServerName masterServerName = p.getFirst();
-    if (masterServerName == null) return result;
     try {
       rpcServices.requestCount.set(0);
       LOG.info("reportForDuty to master=" + masterServerName + " with port="

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java
index 4c02cfd..eedba2b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java
@@ -45,8 +45,10 @@ import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition.TransitionCode;
 import org.apache.hadoop.hbase.regionserver.SplitTransaction.LoggingProgressable;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ConfigUtil;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
@@ -98,6 +100,7 @@ public class RegionMergeTransaction {
   private int znodeVersion = -1;
   // We only merge adjacent regions if forcible is false
   private final boolean forcible;
+  private boolean useZKForAssignment;
 
   /**
    * Types to add to the transaction journal. Each enum is a step in the merge
@@ -238,6 +241,8 @@ public class RegionMergeTransaction {
    */
   public HRegion execute(final Server server,
       final RegionServerServices services) throws IOException {
+    useZKForAssignment = server == null ? true :
+      ConfigUtil.useZKForAssignment(server.getConfiguration());
     if (rsCoprocessorHost == null) {
       rsCoprocessorHost = server != null ?
         ((HRegionServer) server).getRegionServerCoprocessorHost() : null;
@@ -315,7 +320,7 @@ public class RegionMergeTransaction {
     // will determine whether the region is merged or not in case of failures.
     // If it is successful, master will roll-forward, if not, master will
     // rollback
-    if (!testing) {
+    if (!testing && useZKForAssignment) {
       if (metaEntries.isEmpty()) {
         MetaEditor.mergeRegions(server.getCatalogTracker(), mergedRegion.getRegionInfo(), region_a
             .getRegionInfo(), region_b.getRegionInfo(), server.getServerName());
@@ -323,6 +328,14 @@ public class RegionMergeTransaction {
         mergeRegionsAndPutMetaEntries(server.getCatalogTracker(), mergedRegion.getRegionInfo(),
           region_a.getRegionInfo(), region_b.getRegionInfo(), server.getServerName(), metaEntries);
       }
+    } else if (services != null && !useZKForAssignment) {
+      if (!services.reportRegionTransition(TransitionCode.MERGE_PONR,
+          mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
+        // Passed PONR, let SSH clean it up
+        throw new IOException("Failed to notify master that merge passed PONR: "
+          + region_a.getRegionInfo().getRegionNameAsString() + " and "
+          + region_b.getRegionInfo().getRegionNameAsString());
+      }
     }
     return mergedRegion;
   }
@@ -352,6 +365,7 @@ public class RegionMergeTransaction {
     addLocation(putOfMerged, serverName, 1);
   }
 
+  @SuppressWarnings("deprecation")
   public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
     p.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes
         .toBytes(sn.getHostAndPort()));
@@ -365,7 +379,7 @@ public class RegionMergeTransaction {
       boolean testing) throws IOException {
     // Set ephemeral MERGING znode up in zk. Mocked servers sometimes don't
     // have zookeeper so don't do zk stuff if server or zookeeper is null
-    if (server != null && server.getZooKeeper() != null) {
+    if (useZKAndZKIsSet(server)) {
       try {
         createNodeMerging(server.getZooKeeper(), this.mergedRegionInfo,
           server.getServerName(), region_a.getRegionInfo(), region_b.getRegionInfo());
@@ -373,9 +387,16 @@ public class RegionMergeTransaction {
         throw new IOException("Failed creating PENDING_MERGE znode on "
             + this.mergedRegionInfo.getRegionNameAsString(), e);
       }
+    } else if (services != null && !useZKForAssignment) {
+      if (!services.reportRegionTransition(TransitionCode.READY_TO_MERGE,
+          mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
+        throw new IOException("Failed to get ok from master to merge "
+          + region_a.getRegionInfo().getRegionNameAsString() + " and "
+          + region_b.getRegionInfo().getRegionNameAsString());
+      }
     }
     this.journal.add(JournalEntry.SET_MERGING_IN_ZK);
-    if (server != null && server.getZooKeeper() != null) {
+    if (useZKAndZKIsSet(server)) {
       // After creating the merge node, wait for master to transition it
       // from PENDING_MERGE to MERGING so that we can move on. We want master
       // knows about it and won't transition any region which is merging.
@@ -399,7 +420,7 @@ public class RegionMergeTransaction {
     // clean this up.
     mergeStoreFiles(hstoreFilesOfRegionA, hstoreFilesOfRegionB);
 
-    if (server != null && server.getZooKeeper() != null) {
+    if (server != null && useZKAndZKIsSet(server)) {
       try {
         // Do one more check on the merging znode (before it is too late) in case
         // any merging region is moved somehow. If so, the znode transition will fail.
@@ -548,7 +569,13 @@ public class RegionMergeTransaction {
 
     if (services != null) {
       try {
-        services.postOpenDeployTasks(merged, server.getCatalogTracker());
+        if (useZKForAssignment) {
+          services.postOpenDeployTasks(merged, server.getCatalogTracker());
+        } else if (!services.reportRegionTransition(TransitionCode.MERGED,
+            mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
+          throw new IOException("Failed to report merged region to master: "
+            + mergedRegionInfo.getShortNameToLog());
+        }
         services.addToOnlineRegions(merged);
       } catch (KeeperException ke) {
         throw new IOException(ke);
@@ -567,43 +594,41 @@ public class RegionMergeTransaction {
    */
   void transitionZKNode(final Server server, final RegionServerServices services,
       HRegion mergedRegion) throws IOException {
-    if (server == null || server.getZooKeeper() == null) {
-      return;
-    }
-
-    // Tell master about merge by updating zk. If we fail, abort.
-    try {
-      this.znodeVersion = transitionMergingNode(server.getZooKeeper(),
-        this.mergedRegionInfo, region_a.getRegionInfo(),
-        region_b.getRegionInfo(), server.getServerName(), this.znodeVersion,
-        RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGED);
-
-      long startTime = EnvironmentEdgeManager.currentTimeMillis();
-      int spins = 0;
-      // Now wait for the master to process the merge. We know it's done
-      // when the znode is deleted. The reason we keep tickling the znode is
-      // that it's possible for the master to miss an event.
-      do {
-        if (spins % 10 == 0) {
-          LOG.debug("Still waiting on the master to process the merge for "
-              + this.mergedRegionInfo.getEncodedName() + ", waited "
-              + (EnvironmentEdgeManager.currentTimeMillis() - startTime) + "ms");
-        }
-        Thread.sleep(100);
-        // When this returns -1 it means the znode doesn't exist
+    if (useZKAndZKIsSet(server)) {
+      // Tell master about merge by updating zk. If we fail, abort.
+      try {
         this.znodeVersion = transitionMergingNode(server.getZooKeeper(),
           this.mergedRegionInfo, region_a.getRegionInfo(),
           region_b.getRegionInfo(), server.getServerName(), this.znodeVersion,
-          RS_ZK_REGION_MERGED, RS_ZK_REGION_MERGED);
-        spins++;
-      } while (this.znodeVersion != -1 && !server.isStopped()
-          && !services.isStopping());
-    } catch (Exception e) {
-      if (e instanceof InterruptedException) {
-        Thread.currentThread().interrupt();
+          RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGED);
+  
+        long startTime = EnvironmentEdgeManager.currentTimeMillis();
+        int spins = 0;
+        // Now wait for the master to process the merge. We know it's done
+        // when the znode is deleted. The reason we keep tickling the znode is
+        // that it's possible for the master to miss an event.
+        do {
+          if (spins % 10 == 0) {
+            LOG.debug("Still waiting on the master to process the merge for "
+                + this.mergedRegionInfo.getEncodedName() + ", waited "
+                + (EnvironmentEdgeManager.currentTimeMillis() - startTime) + "ms");
+          }
+          Thread.sleep(100);
+          // When this returns -1 it means the znode doesn't exist
+          this.znodeVersion = transitionMergingNode(server.getZooKeeper(),
+            this.mergedRegionInfo, region_a.getRegionInfo(),
+            region_b.getRegionInfo(), server.getServerName(), this.znodeVersion,
+            RS_ZK_REGION_MERGED, RS_ZK_REGION_MERGED);
+          spins++;
+        } while (this.znodeVersion != -1 && !server.isStopped()
+            && !services.isStopping());
+      } catch (Exception e) {
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+        }
+        throw new IOException("Failed telling master about merge "
+            + mergedRegionInfo.getEncodedName(), e);
       }
-      throw new IOException("Failed telling master about merge "
-          + mergedRegionInfo.getEncodedName(), e);
     }
 
     if (rsCoprocessorHost != null) {
@@ -745,8 +770,12 @@ public class RegionMergeTransaction {
       switch (je) {
 
         case SET_MERGING_IN_ZK:
-          if (server != null && server.getZooKeeper() != null) {
+          if (useZKAndZKIsSet(server)) {
             cleanZK(server, this.mergedRegionInfo);
+          } else if (services != null && !useZKForAssignment
+              && !services.reportRegionTransition(TransitionCode.MERGE_REVERTED,
+                  mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
+            return false;
           }
           break;
 
@@ -822,6 +851,10 @@ public class RegionMergeTransaction {
     return this.mergesdir;
   }
 
+  private boolean useZKAndZKIsSet(final Server server) {
+    return server != null && useZKForAssignment && server.getZooKeeper() != null;
+  }
+
   private static void cleanZK(final Server server, final HRegionInfo hri) {
     try {
       // Only delete if its in expected state; could have been hijacked.

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index d50fad7..d2e43df 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.catalog.CatalogTracker;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.master.TableLockManager;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition.TransitionCode;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.zookeeper.KeeperException;
 
@@ -80,6 +81,16 @@ public interface RegionServerServices
   throws KeeperException, IOException;
 
   /**
+   * Notify master that a handler requests to change a region state
+   */
+  boolean reportRegionTransition(TransitionCode code, long openSeqNum, HRegionInfo... hris);
+
+  /**
+   * Notify master that a handler requests to change a region state
+   */
+  boolean reportRegionTransition(TransitionCode code, HRegionInfo... hris);
+
+  /**
    * Returns a reference to the region server's RPC server
    */
   RpcServerInterface getRpcServer();

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
index db4dad9..bd1bf3a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
@@ -45,8 +45,10 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
 import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition.TransitionCode;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.ConfigUtil;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.PairOfSameType;
@@ -89,6 +91,7 @@ public class SplitTransaction {
   private HRegionInfo hri_b;
   private long fileSplitTimeout = 30000;
   public SplitTransactionCoordination.SplitTransactionDetails std;
+  boolean useZKForAssignment;
 
   /*
    * Row to split around
@@ -272,7 +275,7 @@ public class SplitTransaction {
     // will determine whether the region is split or not in case of failures.
     // If it is successful, master will roll-forward, if not, master will rollback
     // and assign the parent region.
-    if (!testing) {
+    if (!testing && useZKForAssignment) {
       if (metaEntries == null || metaEntries.isEmpty()) {
         MetaEditor.splitRegion(server.getCatalogTracker(),
             parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(),
@@ -282,13 +285,21 @@ public class SplitTransaction {
           parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), daughterRegions
               .getSecond().getRegionInfo(), server.getServerName(), metaEntries);
       }
+    } else if (services != null && !useZKForAssignment) {
+      if (!services.reportRegionTransition(TransitionCode.SPLIT_PONR,
+          parent.getRegionInfo(), hri_a, hri_b)) {
+        // Passed PONR, let SSH clean it up
+        throw new IOException("Failed to notify master that split passed PONR: "
+          + parent.getRegionInfo().getRegionNameAsString());
+      }
     }
     return daughterRegions;
   }
+
   public PairOfSameType<HRegion> stepsBeforePONR(final Server server,
       final RegionServerServices services, boolean testing) throws IOException {
 
-    if (server != null && server.getCoordinatedStateManager() != null) {
+    if (useCoordinatedStateManager(server)) {
       if (std == null) {
         std =
             ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
@@ -297,9 +308,15 @@ public class SplitTransaction {
       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
           .getSplitTransactionCoordination().startSplitTransaction(parent, server.getServerName(),
             hri_a, hri_b);
+    } else if (services != null && !useZKForAssignment) {
+      if (!services.reportRegionTransition(TransitionCode.READY_TO_SPLIT,
+          parent.getRegionInfo(), hri_a, hri_b)) {
+        throw new IOException("Failed to get ok from master to split "
+          + parent.getRegionNameAsString());
+      }
     }
     this.journal.add(JournalEntry.SET_SPLITTING);
-    if (server != null && server.getCoordinatedStateManager() != null) {
+    if (useCoordinatedStateManager(server)) {
       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
           .getSplitTransactionCoordination().waitForSplitTransaction(services, parent, hri_a,
             hri_b, std);
@@ -399,11 +416,19 @@ public class SplitTransaction {
       }
       if (services != null) {
         try {
-          // add 2nd daughter first (see HBASE-4335)
-          services.postOpenDeployTasks(b, server.getCatalogTracker());
+          if (useZKForAssignment) {
+            // add 2nd daughter first (see HBASE-4335)
+            services.postOpenDeployTasks(b, server.getCatalogTracker());
+          } else if (!services.reportRegionTransition(TransitionCode.SPLIT,
+              parent.getRegionInfo(), hri_a, hri_b)) {
+            throw new IOException("Failed to report split region to master: "
+              + parent.getRegionInfo().getShortNameToLog());
+          }
           // Should add it to OnlineRegions
           services.addToOnlineRegions(b);
-          services.postOpenDeployTasks(a, server.getCatalogTracker());
+          if (useZKForAssignment) {
+            services.postOpenDeployTasks(a, server.getCatalogTracker());
+          }
           services.addToOnlineRegions(a);
         } catch (KeeperException ke) {
           throw new IOException(ke);
@@ -425,7 +450,9 @@ public class SplitTransaction {
   public PairOfSameType<HRegion> execute(final Server server,
       final RegionServerServices services)
   throws IOException {
-    if (server != null && server.getCoordinatedStateManager() != null) {
+    useZKForAssignment = server == null ? true :
+      ConfigUtil.useZKForAssignment(server.getConfiguration());
+    if (useCoordinatedStateManager(server)) {
       std =
           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
               .getSplitTransactionCoordination().getDefaultDetails();
@@ -441,7 +468,7 @@ public class SplitTransaction {
       final RegionServerServices services, PairOfSameType<HRegion> regions)
       throws IOException {
     openDaughters(server, services, regions.getFirst(), regions.getSecond());
-    if (server != null && server.getCoordinatedStateManager() != null) {
+    if (useCoordinatedStateManager(server)) {
       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
           .getSplitTransactionCoordination().completeSplitTransaction(services, regions.getFirst(),
             regions.getSecond(), std, parent);
@@ -561,6 +588,10 @@ public class SplitTransaction {
     }
   }
 
+  private boolean useCoordinatedStateManager(final Server server) {
+    return server != null && useZKForAssignment && server.getCoordinatedStateManager() != null;
+  }
+
   private void splitStoreFiles(final Map<byte[], List<StoreFile>> hstoreFilesToSplit)
       throws IOException {
     if (hstoreFilesToSplit == null) {
@@ -676,9 +707,13 @@ public class SplitTransaction {
       switch(je) {
 
       case SET_SPLITTING:
-        if (server != null && server instanceof HRegionServer) {
+        if (useCoordinatedStateManager(server) && server instanceof HRegionServer) {
           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
               .getSplitTransactionCoordination().clean(this.parent.getRegionInfo());
+        } else if (services != null && !useZKForAssignment
+            && !services.reportRegionTransition(TransitionCode.SPLIT_REVERTED,
+                parent.getRegionInfo(), hri_a, hri_b)) {
+          return false;
         }
         break;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
index e7d3ef8..13b0927 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
@@ -29,8 +29,10 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition.TransitionCode;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.util.ConfigUtil;
 
 /**
  * Handles closing of a region on a region server.
@@ -53,6 +55,7 @@ public class CloseRegionHandler extends EventHandler {
   private ServerName destination;
   private CloseRegionCoordination closeRegionCoordination;
   private CloseRegionCoordination.CloseRegionDetails closeRegionDetails;
+  private final boolean useZKForAssignment;
 
   /**
    * This method used internally by the RegionServer to close out regions.
@@ -102,6 +105,7 @@ public class CloseRegionHandler extends EventHandler {
     this.destination = destination;
     this.closeRegionCoordination = closeRegionCoordination;
     this.closeRegionDetails = crd;
+    useZKForAssignment = ConfigUtil.useZKForAssignment(server.getConfiguration());
   }
 
   public HRegionInfo getRegionInfo() {
@@ -124,7 +128,8 @@ public class CloseRegionHandler extends EventHandler {
 
       // Close the region
       try {
-        if (closeRegionCoordination.checkClosingState(regionInfo, closeRegionDetails)) {
+        if (useZKForAssignment && closeRegionCoordination.checkClosingState(
+            regionInfo, closeRegionDetails)) {
           return;
         }
 
@@ -148,8 +153,12 @@ public class CloseRegionHandler extends EventHandler {
       }
 
       this.rsServices.removeFromOnlineRegions(region, destination);
-      closeRegionCoordination.setClosedState(region, this.server.getServerName(),
-        closeRegionDetails);
+      if (!useZKForAssignment) {
+        rsServices.reportRegionTransition(TransitionCode.CLOSED, regionInfo);
+      } else {
+        closeRegionCoordination.setClosedState(region, this.server.getServerName(),
+          closeRegionDetails);
+      }
 
       // Done!  Region is closed on this RS
       LOG.debug("Closed " + region.getRegionNameAsString());

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
index 07235f5..f4c4ff9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
@@ -27,14 +27,15 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition.TransitionCode;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
-
+import org.apache.hadoop.hbase.util.ConfigUtil;
 /**
  * Handles opening of a region on a region server.
  * <p>
@@ -52,6 +53,8 @@ public class OpenRegionHandler extends EventHandler {
   private OpenRegionCoordination coordination;
   private OpenRegionCoordination.OpenRegionDetails ord;
 
+  private final boolean useZKForAssignment;
+
   public OpenRegionHandler(final Server server,
       final RegionServerServices rsServices, HRegionInfo regionInfo,
       HTableDescriptor htd, OpenRegionCoordination coordination,
@@ -70,6 +73,7 @@ public class OpenRegionHandler extends EventHandler {
     this.htd = htd;
     this.coordination = coordination;
     this.ord = ord;
+    useZKForAssignment = ConfigUtil.useZKForAssignment(server.getConfiguration());
   }
 
   public HRegionInfo getRegionInfo() {
@@ -110,7 +114,8 @@ public class OpenRegionHandler extends EventHandler {
         return;
       }
 
-      if (!coordination.transitionFromOfflineToOpening(regionInfo, ord)) {
+      if (useZKForAssignment
+          && !coordination.transitionFromOfflineToOpening(regionInfo, ord)) {
         LOG.warn("Region was hijacked? Opening cancelled for encodedName=" + encodedName);
         // This is a desperate attempt: the znode is unlikely to be ours. But we can't do more.
         return;
@@ -124,7 +129,8 @@ public class OpenRegionHandler extends EventHandler {
       }
 
       boolean failed = true;
-      if (coordination.tickleOpening(ord, regionInfo, rsServices, "post_region_open")) {
+      if (!useZKForAssignment ||
+          coordination.tickleOpening(ord, regionInfo, rsServices, "post_region_open")) {
         if (updateMeta(region)) {
           failed = false;
         }
@@ -134,7 +140,8 @@ public class OpenRegionHandler extends EventHandler {
         return;
       }
 
-      if (!isRegionStillOpening() || !coordination.transitionToOpened(region, ord)) {
+      if (!isRegionStillOpening() ||
+          (useZKForAssignment && !coordination.transitionToOpened(region, ord))) {
         // If we fail to transition to opened, it's because of one of two cases:
         //    (a) we lost our ZK lease
         // OR (b) someone else opened the region before us
@@ -200,10 +207,16 @@ public class OpenRegionHandler extends EventHandler {
           cleanupFailedOpen(region);
         }
       } finally {
-        // Even if cleanupFailed open fails we need to do this transition
-        // See HBASE-7698
-        coordination.tryTransitionFromOpeningToFailedOpen(regionInfo, ord);
+        if (!useZKForAssignment) {
+          rsServices.reportRegionTransition(TransitionCode.FAILED_OPEN, regionInfo);
+        } else {
+          // Even if cleanupFailed open fails we need to do this transition
+          // See HBASE-7698
+          coordination.tryTransitionFromOpeningToFailedOpen(regionInfo, ord);
+        }
       }
+    } else if (!useZKForAssignment) {
+      rsServices.reportRegionTransition(TransitionCode.FAILED_OPEN, regionInfo);
     } else {
       // If still transition to OPENING is not done, we need to transition znode
       // to FAILED_OPEN
@@ -242,7 +255,10 @@ public class OpenRegionHandler extends EventHandler {
       if (elapsed > 120000) { // 2 minutes, no need to tickleOpening too often
         // Only tickle OPENING if postOpenDeployTasks is taking some time.
         lastUpdate = now;
-        tickleOpening = coordination.tickleOpening(ord, regionInfo, rsServices, "post_open_deploy");
+        if (useZKForAssignment) {
+          tickleOpening = coordination.tickleOpening(
+            ord, regionInfo, rsServices, "post_open_deploy");
+        }
       }
       synchronized (signaller) {
         try {
@@ -343,8 +359,16 @@ public class OpenRegionHandler extends EventHandler {
         this.rsServices,
         new CancelableProgressable() {
           public boolean progress() {
-            // if tickle failed, we need to cancel opening region.
-            return coordination.tickleOpening(ord, regionInfo, rsServices, "open_region_progress");
+            if (useZKForAssignment) {
+              // if tickle failed, we need to cancel opening region.
+              return coordination.tickleOpening(ord, regionInfo,
+                rsServices, "open_region_progress");
+            }
+            if (!isRegionStillOpening()) {
+              LOG.warn("Open region aborted since it isn't opening any more");
+              return false;
+            }
+            return true;
           }
         });
     } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConfigUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConfigUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConfigUtil.java
new file mode 100644
index 0000000..2183ee9
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConfigUtil.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Some configuration related utilities
+ */
+@InterfaceAudience.Private
+public class ConfigUtil {
+
+  public static boolean useZKForAssignment(Configuration conf) {
+    // To change the default, please also update ZooKeeperWatcher.java
+    return conf.getBoolean("hbase.assignment.usezk", true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index 6d1cfc4..19a3b20 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.master.TableLockManager;
 import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition.TransitionCode;
 import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
 import org.apache.hadoop.hbase.regionserver.FlushRequester;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -156,7 +157,7 @@ class MockRegionServerServices implements RegionServerServices {
 
   @Override
   public Configuration getConfiguration() {
-    return null;
+    return zkw == null ? null : zkw.getConfiguration();
   }
 
   @Override
@@ -225,4 +226,16 @@ class MockRegionServerServices implements RegionServerServices {
     // TODO Auto-generated method stub
     return null;
   }
+
+  @Override
+  public boolean reportRegionTransition(TransitionCode code, long openSeqNum,
+      HRegionInfo... hris) {
+    return false;
+  }
+
+  @Override
+  public boolean reportRegionTransition(TransitionCode code,
+      HRegionInfo... hris) {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java
index 777bdb1..2755735 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java
@@ -80,6 +80,7 @@ public class TestDrainingServer {
   
   @BeforeClass
   public static void beforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setBoolean("hbase.assignment.usezk", true);
     TEST_UTIL.startMiniZKCluster();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
index 4e7fd23..380b337 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ConfigUtil;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -492,7 +493,9 @@ public class TestScannersFromClientSide {
       RegionStates states = master.getAssignmentManager().getRegionStates();
       states.regionOffline(hri);
       states.updateRegionState(hri, State.OPENING);
-      ZKAssign.createNodeOffline(zkw, hri, loc.getServerName());
+      if (ConfigUtil.useZKForAssignment(TEST_UTIL.getConfiguration())) {
+        ZKAssign.createNodeOffline(zkw, hri, loc.getServerName());
+      }
       ProtobufUtil.openRegion(rs.getRSRpcServices(), rs.getServerName(), hri);
       startTime = EnvironmentEdgeManager.currentTimeMillis();
       while (true) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 0924dd2..5e7d170 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
@@ -39,7 +40,6 @@ import org.apache.hadoop.hbase.catalog.CatalogTracker;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
@@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition.TransitionCode;
 import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
 import org.apache.hadoop.hbase.regionserver.FlushRequester;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -559,4 +560,15 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
   public ServerNonceManager getNonceManager() {
     return null;
   }
+
+  @Override
+  public boolean reportRegionTransition(TransitionCode code, HRegionInfo... hris) {
+    return false;
+  }
+
+  @Override
+  public boolean reportRegionTransition(TransitionCode code, long openSeqNum,
+      HRegionInfo... hris) {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
index eabb813..0d0b789 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoordinatedStateException;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -52,7 +53,6 @@ import org.apache.hadoop.hbase.catalog.MetaMockingUtil;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
 import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
@@ -128,6 +128,7 @@ public class TestAssignmentManager {
 
   @BeforeClass
   public static void beforeClass() throws Exception {
+    HTU.getConfiguration().setBoolean("hbase.assignment.usezk", true);
     HTU.startMiniZKCluster();
   }
 
@@ -147,6 +148,7 @@ public class TestAssignmentManager {
     this.server = Mockito.mock(Server.class);
     Mockito.when(server.getServerName()).thenReturn(ServerName.valueOf("master,1,1"));
     Mockito.when(server.getConfiguration()).thenReturn(HTU.getConfiguration());
+    Mockito.when(server.getCatalogTracker()).thenReturn(null);
     this.watcher =
       new ZooKeeperWatcher(HTU.getConfiguration(), "mockedServer", this.server, true);
     Mockito.when(server.getZooKeeper()).thenReturn(this.watcher);

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
index 8e9053b..de192dc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
@@ -60,9 +60,11 @@ import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ConfigUtil;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
@@ -78,11 +80,10 @@ import org.junit.experimental.categories.Category;
 public class TestAssignmentManagerOnCluster {
   private final static byte[] FAMILY = Bytes.toBytes("FAMILY");
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-  private final static Configuration conf = TEST_UTIL.getConfiguration();
+  final static Configuration conf = TEST_UTIL.getConfiguration();
   private static HBaseAdmin admin;
 
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
+  static void setupOnce() throws Exception {
     // Using the our load balancer to control region plans
     conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
       MyLoadBalancer.class, LoadBalancer.class);
@@ -95,6 +96,13 @@ public class TestAssignmentManagerOnCluster {
     admin = TEST_UTIL.getHBaseAdmin();
   }
 
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // Use ZK for region assignment
+    conf.setBoolean("hbase.assignment.usezk", true);
+    setupOnce();
+  }
+
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
     TEST_UTIL.shutdownMiniCluster();
@@ -561,16 +569,18 @@ public class TestAssignmentManagerOnCluster {
       }
       am.regionOffline(hri);
       ZooKeeperWatcher zkw = TEST_UTIL.getHBaseCluster().getMaster().getZooKeeper();
-      am.getRegionStates().updateRegionState(hri, State.OFFLINE);
-      ZKAssign.createNodeOffline(zkw, hri, destServerName);
-      ZKAssign.transitionNodeOpening(zkw, hri, destServerName);
-
-      // Wait till the event is processed and the region is in transition
-      long timeoutTime = System.currentTimeMillis() + 20000;
-      while (!am.getRegionStates().isRegionInTransition(hri)) {
-        assertTrue("Failed to process ZK opening event in time",
-          System.currentTimeMillis() < timeoutTime);
-        Thread.sleep(100);
+      am.getRegionStates().updateRegionState(hri, State.PENDING_OPEN, destServerName);
+      if (ConfigUtil.useZKForAssignment(conf)) {
+        ZKAssign.createNodeOffline(zkw, hri, destServerName);
+        ZKAssign.transitionNodeOpening(zkw, hri, destServerName);
+  
+        // Wait till the event is processed and the region is in transition
+        long timeoutTime = System.currentTimeMillis() + 20000;
+        while (!am.getRegionStates().isRegionInTransition(hri)) {
+          assertTrue("Failed to process ZK opening event in time",
+            System.currentTimeMillis() < timeoutTime);
+          Thread.sleep(100);
+        }
       }
 
       am.getTableStateManager().setTableState(table, ZooKeeperProtos.Table.State.DISABLING);
@@ -607,6 +617,10 @@ public class TestAssignmentManagerOnCluster {
       master.assignRegion(hri);
       AssignmentManager am = master.getAssignmentManager();
       assertTrue(am.waitForAssignment(hri));
+      while (!HBaseTestingUtility.getAllOnlineRegions(
+          TEST_UTIL.getHBaseCluster()).contains(hri.getRegionNameAsString())) {
+        Threads.sleep(100); // This won't take long
+      }
 
       MyRegionObserver.postCloseEnabled.set(true);
       am.unassign(hri);
@@ -704,8 +718,6 @@ public class TestAssignmentManagerOnCluster {
       ServerName serverName = master.getAssignmentManager().
         getRegionStates().getRegionServerOfRegion(hri);
       TEST_UTIL.assertRegionOnlyOnServer(hri, serverName, 200);
-      assertFalse("Region should be assigned on a new region server",
-        oldServerName.equals(serverName));
     } finally {
       MyRegionObserver.postOpenEnabled.set(false);
       TEST_UTIL.deleteTable(Bytes.toBytes(table));

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
index 1ba4e1b..de7ce1a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.LargeTests;
@@ -48,6 +49,8 @@ import org.apache.hadoop.hbase.RegionTransition;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableStateManager;
+import org.apache.hadoop.hbase.catalog.MetaEditor;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.master.RegionState.State;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -162,6 +165,7 @@ public class TestMasterFailover {
 
     // Create config to use for this cluster
     Configuration conf = HBaseConfiguration.create();
+    conf.setBoolean("hbase.assignment.usezk", true);
 
     // Start the cluster
     HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
@@ -512,6 +516,7 @@ public class TestMasterFailover {
     // Create and start the cluster
     HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
     Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setBoolean("hbase.assignment.usezk", true);
 
     conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
     conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 2);
@@ -971,6 +976,7 @@ public class TestMasterFailover {
     HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
     Configuration conf = TEST_UTIL.getConfiguration();
     conf.setInt("hbase.master.info.port", -1);
+    conf.setBoolean("hbase.assignment.usezk", true);
 
     TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
     MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
@@ -1014,6 +1020,7 @@ public class TestMasterFailover {
 
     // Create config to use for this cluster
     Configuration conf = HBaseConfiguration.create();
+    conf.setBoolean("hbase.assignment.usezk", true);
 
     // Start the cluster
     final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
@@ -1182,5 +1189,102 @@ public class TestMasterFailover {
     // Stop the cluster
     TEST_UTIL.shutdownMiniCluster();
   }
+
+  /**
+   * Test region in pending_open/close when master failover
+   */
+  @Test (timeout=180000)
+  public void testPendingOpenOrCloseWhenMasterFailover() throws Exception {
+    final int NUM_MASTERS = 1;
+    final int NUM_RS = 1;
+
+    // Create config to use for this cluster
+    Configuration conf = HBaseConfiguration.create();
+    conf.setBoolean("hbase.assignment.usezk", false);
+
+    // Start the cluster
+    HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
+    TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
+    MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+    log("Cluster started");
+
+    // get all the master threads
+    List<MasterThread> masterThreads = cluster.getMasterThreads();
+    assertEquals(1, masterThreads.size());
+
+    // only one master thread, let's wait for it to be initialized
+    assertTrue(cluster.waitForActiveAndReadyMaster());
+    HMaster master = masterThreads.get(0).getMaster();
+    assertTrue(master.isActiveMaster());
+    assertTrue(master.isInitialized());
+
+    // Create a table with a region online
+    HTable onlineTable = TEST_UTIL.createTable("onlineTable", "family");
+
+    // Create a table in META, so it has a region offline
+    HTableDescriptor offlineTable = new HTableDescriptor(
+      TableName.valueOf(Bytes.toBytes("offlineTable")));
+    offlineTable.addFamily(new HColumnDescriptor(Bytes.toBytes("family")));
+
+    FileSystem filesystem = FileSystem.get(conf);
+    Path rootdir = FSUtils.getRootDir(conf);
+    FSTableDescriptors fstd = new FSTableDescriptors(filesystem, rootdir);
+    fstd.createTableDescriptor(offlineTable);
+
+    HRegionInfo hriOffline = new HRegionInfo(offlineTable.getTableName(), null, null);
+    createRegion(hriOffline, rootdir, conf, offlineTable);
+    MetaEditor.addRegionToMeta(master.getCatalogTracker(), hriOffline);
+
+    log("Regions in hbase:meta and namespace have been created");
+
+    // at this point we only expect 3 regions to be assigned out
+    // (catalogs and namespace, + 1 online region)
+    assertEquals(3, cluster.countServedRegions());
+    HRegionInfo hriOnline = onlineTable.getRegionLocation("").getRegionInfo();
+
+    RegionStates regionStates = master.getAssignmentManager().getRegionStates();
+    RegionStateStore stateStore = master.getAssignmentManager().getRegionStateStore();
+
+    // Put the online region in pending_close. It is actually already opened.
+    // This is to simulate that the region close RPC is not sent out before failover
+    RegionState oldState = regionStates.getRegionState(hriOnline);
+    RegionState newState = new RegionState(
+      hriOnline, State.PENDING_CLOSE, oldState.getServerName());
+    stateStore.updateRegionState(HConstants.NO_SEQNUM, newState, oldState);
+
+    // Put the offline region in pending_open. It is actually not opened yet.
+    // This is to simulate that the region open RPC is not sent out before failover
+    oldState = new RegionState(hriOffline, State.OFFLINE);
+    newState = new RegionState(hriOffline, State.PENDING_OPEN, newState.getServerName());
+    stateStore.updateRegionState(HConstants.NO_SEQNUM, newState, oldState);
+
+    // Stop the master
+    log("Aborting master");
+    cluster.abortMaster(0);
+    cluster.waitOnMaster(0);
+    log("Master has aborted");
+
+    // Start up a new master
+    log("Starting up a new master");
+    master = cluster.startMaster().getMaster();
+    log("Waiting for master to be ready");
+    cluster.waitForActiveAndReadyMaster();
+    log("Master is ready");
+
+    // Wait till no region in transition any more
+    master.getAssignmentManager().waitUntilNoRegionsInTransition(60000);
+
+    // Get new region states since master restarted
+    regionStates = master.getAssignmentManager().getRegionStates();
+
+    // Both pending_open (RPC sent/not yet) regions should be online
+    assertTrue(regionStates.isRegionOnline(hriOffline));
+    assertTrue(regionStates.isRegionOnline(hriOnline));
+
+    log("Done with verification, shutting down cluster");
+
+    // Done, shutdown the cluster
+    TEST_UTIL.shutdownMiniCluster();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java
index e91b1c2..2cae6fb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java
@@ -69,6 +69,7 @@ public class TestOpenedRegionHandler {
   @Before
   public void setUp() throws Exception {
     conf = HBaseConfiguration.create();
+    conf.setBoolean("hbase.assignment.usezk", true);
     TEST_UTIL = HBaseTestingUtility.createLocalHTU(conf);
   }
 
@@ -84,6 +85,7 @@ public class TestOpenedRegionHandler {
     // Start the cluster
     log("Starting cluster");
     conf = HBaseConfiguration.create();
+    conf.setBoolean("hbase.assignment.usezk", true);
     resetConf = conf;
     TEST_UTIL = new HBaseTestingUtility(conf);
     TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
index 2cda4e9..372c495 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
@@ -58,6 +58,7 @@ public class TestRestartCluster {
 
   @Test (timeout=300000) public void testRestartClusterAfterKill()
   throws Exception {
+    UTIL.getConfiguration().setBoolean("hbase.assignment.usezk", true);
     UTIL.startMiniZKCluster();
     ZooKeeperWatcher zooKeeper =
       new ZooKeeperWatcher(UTIL.getConfiguration(), "cluster1", null, true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java
index 021f86a..e684d1e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java
@@ -71,6 +71,7 @@ public class TestZKBasedOpenCloseRegion {
 
   @BeforeClass public static void beforeAllTests() throws Exception {
     Configuration c = TEST_UTIL.getConfiguration();
+    c.setBoolean("hbase.assignment.usezk", true);
     c.setBoolean("dfs.support.append", true);
     c.setInt("hbase.regionserver.info.port", 0);
     TEST_UTIL.startMiniCluster(2);

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKLessAMOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKLessAMOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKLessAMOnCluster.java
new file mode 100644
index 0000000..83d33c5
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKLessAMOnCluster.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import org.apache.hadoop.hbase.MediumTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+/**
+ * This tests AssignmentManager with a testing cluster.
+ */
+@Category(MediumTests.class)
+public class TestZKLessAMOnCluster extends TestAssignmentManagerOnCluster {
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // Don't use ZK for region assignment
+    conf.setBoolean("hbase.assignment.usezk", false);
+    setupOnce();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TestAssignmentManagerOnCluster.tearDownAfterClass();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
index 14a44fa..afab1e0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ConfigUtil;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.PairOfSameType;
 import org.apache.hadoop.hbase.util.StoppableImplementation;
@@ -107,6 +108,7 @@ public class TestEndToEndSplitTransaction {
         .getRegionName();
     HRegion region = server.getRegion(regionName);
     SplitTransaction split = new SplitTransaction(region, splitRow);
+    split.useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
     split.prepare();
 
     // 1. phase I
@@ -139,10 +141,12 @@ public class TestEndToEndSplitTransaction {
     assertTrue(test(con, tableName, firstRow, server));
     assertTrue(test(con, tableName, lastRow, server));
 
-    // 4. phase III
-    ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+    if (split.useZKForAssignment) {
+      // 4. phase III
+      ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
         .getSplitTransactionCoordination().completeSplitTransaction(server, regions.getFirst(),
           regions.getSecond(), split.std, region);
+    }
     assertTrue(test(con, tableName, firstRow, server));
     assertTrue(test(con, tableName, lastRow, server));
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
index aac801e..cc0a123 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
@@ -31,7 +31,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -39,6 +38,8 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.LargeTests;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.UnknownRegionException;
 import org.apache.hadoop.hbase.catalog.MetaReader;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
@@ -47,14 +48,13 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.exceptions.MergeRegionException;
-import org.apache.hadoop.hbase.UnknownRegionException;
 import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.PairOfSameType;
 import org.junit.AfterClass;
@@ -86,13 +86,12 @@ public class TestRegionMergeTransactionOnCluster {
 
   private static int waitTime = 60 * 1000;
 
-  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
   private static HMaster master;
   private static HBaseAdmin admin;
 
-  @BeforeClass
-  public static void beforeAllTests() throws Exception {
+  static void setupOnce() throws Exception {
     // Start a cluster
     TEST_UTIL.startMiniCluster(NB_SERVERS);
     MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
@@ -101,6 +100,13 @@ public class TestRegionMergeTransactionOnCluster {
     admin = TEST_UTIL.getHBaseAdmin();
   }
 
+  @BeforeClass
+  public static void beforeAllTests() throws Exception {
+    // Use ZK for region assignment
+    TEST_UTIL.getConfiguration().setBoolean("hbase.assignment.usezk", true);
+    setupOnce();
+  }
+
   @AfterClass
   public static void afterAllTests() throws Exception {
     TEST_UTIL.shutdownMiniCluster();

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
index 10f2957..7cd7ed1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
@@ -72,6 +72,7 @@ public class TestRegionServerNoMaster {
 
   @BeforeClass
   public static void before() throws Exception {
+    HTU.getConfiguration().setBoolean("hbase.assignment.usezk", true);
     HTU.startMiniCluster(NB_SERVERS);
     final byte[] tableName = Bytes.toBytes(TestRegionServerNoMaster.class.getSimpleName());