You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2014/08/26 17:15:20 UTC

[02/12] HBASE-11546 Backport ZK-less region assignment to 0.98 (Virag Kothari) [1/8]

http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ffa86e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
index 8a9e9c7..ed1003d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
@@ -21,27 +21,25 @@ package org.apache.hadoop.hbase.master.handler;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
 import java.util.Set;
 import java.util.concurrent.locks.Lock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
 import org.apache.hadoop.hbase.catalog.MetaReader;
-import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.master.DeadServer;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.master.RegionState.State;
@@ -163,14 +161,14 @@ public class ServerShutdownHandler extends EventHandler {
       // completed (zk is updated after edits to hbase:meta have gone in).  See
       // {@link SplitTransaction}.  We'd also have to be figure another way for
       // doing the below hbase:meta daughters fixup.
-      NavigableMap<HRegionInfo, Result> hris = null;
+      Set<HRegionInfo> hris = null;
       while (!this.server.isStopped()) {
         try {
           this.server.getCatalogTracker().waitForMeta();
           // Skip getting user regions if the server is stopped.
           if (!this.server.isStopped()) {
             hris = MetaReader.getServerUserRegions(this.server.getCatalogTracker(),
-                this.serverName);
+              this.serverName).keySet();
           }
           break;
         } catch (InterruptedException e) {
@@ -196,9 +194,8 @@ public class ServerShutdownHandler extends EventHandler {
           LOG.info("Splitting logs for " + serverName + " before assignment.");
           if (distributedLogReplay) {
             LOG.info("Mark regions in recovery before assignment.");
-            Set<ServerName> serverNames = new HashSet<ServerName>();
-            serverNames.add(serverName);
-            this.services.getMasterFileSystem().prepareLogReplay(serverNames);
+            MasterFileSystem mfs = this.services.getMasterFileSystem();
+            mfs.prepareLogReplay(serverName, hris);
           } else {
             this.services.getMasterFileSystem().splitLog(serverName);
           }
@@ -224,10 +221,9 @@ public class ServerShutdownHandler extends EventHandler {
       toAssignRegions.addAll(regionsInTransition);
 
       // Iterate regions that were on this server and assign them
-      if (hris != null) {
+      if (hris != null && !hris.isEmpty()) {
         RegionStates regionStates = am.getRegionStates();
-        for (Map.Entry<HRegionInfo, Result> e: hris.entrySet()) {
-          HRegionInfo hri = e.getKey();
+        for (HRegionInfo hri: hris) {
           if (regionsInTransition.contains(hri)) {
             continue;
           }
@@ -235,7 +231,7 @@ public class ServerShutdownHandler extends EventHandler {
           Lock lock = am.acquireRegionLock(encodedName);
           try {
             RegionState rit = regionStates.getRegionTransitionState(hri);
-            if (processDeadRegion(hri, e.getValue(), am, server.getCatalogTracker())) {
+            if (processDeadRegion(hri, am, server.getCatalogTracker())) {
               ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
               if (addressFromAM != null && !addressFromAM.equals(this.serverName)) {
                 // If this region is in transition on the dead server, it must be
@@ -261,7 +257,7 @@ public class ServerShutdownHandler extends EventHandler {
                 }
               } else if (regionStates.isRegionInState(
                   hri, State.SPLITTING_NEW, State.MERGING_NEW)) {
-                regionStates.regionOffline(hri);
+                regionStates.updateRegionState(hri, State.OFFLINE);
               }
               toAssignRegions.add(hri);
             } else if (rit != null) {
@@ -334,13 +330,12 @@ public class ServerShutdownHandler extends EventHandler {
    * Process a dead region from a dead RS. Checks if the region is disabled or
    * disabling or if the region has a partially completed split.
    * @param hri
-   * @param result
    * @param assignmentManager
    * @param catalogTracker
    * @return Returns true if specified region should be assigned, false if not.
    * @throws IOException
    */
-  public static boolean processDeadRegion(HRegionInfo hri, Result result,
+  public static boolean processDeadRegion(HRegionInfo hri,
       AssignmentManager assignmentManager, CatalogTracker catalogTracker)
   throws IOException {
     boolean tablePresent = assignmentManager.getZKTable().isTablePresent(hri.getTable());

http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ffa86e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index f681a72..f904238 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -191,7 +191,11 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition.TransitionCode;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionTransitionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionTransitionResponse;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
 import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
@@ -211,6 +215,7 @@ import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CompressionTest;
+import org.apache.hadoop.hbase.util.ConfigUtil;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -378,7 +383,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
   protected final int numRegionsToReport;
 
   // Stub to do region server status calls against the master.
-  private RegionServerStatusService.BlockingInterface rssStub;
+  private volatile RegionServerStatusService.BlockingInterface rssStub;
   // RPC client. Used to make the stub above that does region server status checking.
   RpcClient rpcClient;
 
@@ -500,6 +505,11 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
   // Table level lock manager for locking for region operations
   private TableLockManager tableLockManager;
 
+  private final boolean useZKForAssignment;
+
+  // Used for 11059
+  private ServerName serverName;
+
   /**
    * Nonce manager. Nonces are used to make operations like increment and append idempotent
    * in the case where client doesn't receive the response from a successful operation and
@@ -597,6 +607,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
     } catch (IllegalAccessException e) {
       throw new IllegalArgumentException(e);
     }
+    
     this.rpcServer = new RpcServer(this, name, getServices(),
       /*HBaseRPCErrorHandler.class, OnlineRegions.class},*/
       initialIsa, // BindAddress is IP we got for this server.
@@ -605,9 +616,11 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
 
     // Set our address.
     this.isa = this.rpcServer.getListenerAddress();
-
+    
     this.rpcServer.setErrorHandler(this);
     this.startcode = System.currentTimeMillis();
+    serverName = ServerName.valueOf(isa.getHostName(), isa.getPort(), startcode);
+    useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
 
     // login the zookeeper client principal (if using security)
     ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file",
@@ -1055,8 +1068,9 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
   @VisibleForTesting
   protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
   throws IOException {
-    if (this.rssStub == null) {
-      // the current server is stopping.
+    RegionServerStatusService.BlockingInterface rss = rssStub;
+    if (rss == null) {
+      // the current server could be stopping.
       return;
     }
     ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
@@ -1066,18 +1080,19 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
         this.serverNameFromMasterPOV.getVersionedBytes());
       request.setServer(ProtobufUtil.toServerName(sn));
       request.setLoad(sl);
-      this.rssStub.regionServerReport(null, request.build());
+      rss.regionServerReport(null, request.build());
     } catch (ServiceException se) {
       IOException ioe = ProtobufUtil.getRemoteException(se);
       if (ioe instanceof YouAreDeadException) {
         // This will be caught and handled as a fatal error in run()
         throw ioe;
       }
+      if (rssStub == rss) {
+        rssStub = null;
+      }
       // Couldn't connect to the master, get location from zk and reconnect
       // Method blocks until new master is found or we are stopped
-      Pair<ServerName, RegionServerStatusService.BlockingInterface> p =
-        createRegionServerStatusStub();
-      this.rssStub = p.getSecond();
+      createRegionServerStatusStub();
     }
   }
 
@@ -1799,10 +1814,16 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
     if (r.getRegionInfo().isMetaRegion()) {
       MetaRegionTracker.setMetaLocation(getZooKeeper(),
           this.serverNameFromMasterPOV);
-    } else {
+    } else if (useZKForAssignment) {
       MetaEditor.updateRegionLocation(ct, r.getRegionInfo(),
         this.serverNameFromMasterPOV, openSeqNum);
     }
+    if (!useZKForAssignment
+        && !reportRegionTransition(TransitionCode.OPENED, openSeqNum, r.getRegionInfo())) {
+      throw new IOException("Failed to report opened region to master: "
+          + r.getRegionNameAsString());
+    }
+
     LOG.info("Finished post open deploy task for " + r.getRegionNameAsString());
 
   }
@@ -1941,6 +1962,49 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
     return replicationSinkHandler;
   }
 
+  @Override
+  public boolean reportRegionTransition(TransitionCode code, HRegionInfo... hris) {
+    return reportRegionTransition(code, HConstants.NO_SEQNUM, hris);
+  }
+
+  @Override
+  public boolean reportRegionTransition(TransitionCode code, long openSeqNum, HRegionInfo... hris) {
+    ReportRegionTransitionRequest.Builder builder = ReportRegionTransitionRequest.newBuilder();
+    builder.setServer(ProtobufUtil.toServerName(serverName));
+    RegionTransition.Builder transition = builder.addTransitionBuilder();
+    transition.setTransitionCode(code);
+    if (code == TransitionCode.OPENED && openSeqNum >= 0) {
+      transition.setOpenSeqNum(openSeqNum);
+    }
+    for (HRegionInfo hri : hris) {
+      transition.addRegionInfo(HRegionInfo.convert(hri));
+    }
+    ReportRegionTransitionRequest request = builder.build();
+    while (keepLooping()) {
+      RegionServerStatusService.BlockingInterface rss = rssStub;
+      try {
+        if (rss == null) {
+          createRegionServerStatusStub();
+          continue;
+        }
+        ReportRegionTransitionResponse response = rss.reportRegionTransition(null, request);
+        if (response.hasErrorMessage()) {
+          LOG.info("Failed to transition " + hris[0] + " to " + code + ": "
+              + response.getErrorMessage());
+          return false;
+        }
+        return true;
+      } catch (ServiceException se) {
+        IOException ioe = ProtobufUtil.getRemoteException(se);
+        LOG.info("Failed to report region transition, will retry", ioe);
+        if (rssStub == rss) {
+          rssStub = null;
+        }
+      }
+    }
+    return false;
+  }
+
   /**
    * Get the current master from ZooKeeper and open the RPC connection to it.
    *
@@ -1949,8 +2013,11 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
    *
    * @return master + port, or null if server has been stopped
    */
-  private Pair<ServerName, RegionServerStatusService.BlockingInterface>
+  private synchronized ServerName
   createRegionServerStatusStub() {
+    if (rssStub != null) {
+      return masterAddressTracker.getMasterAddress();
+    }
     ServerName sn = null;
     long previousLogTime = 0;
     RegionServerStatusService.BlockingInterface master = null;
@@ -1997,7 +2064,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
         }
       }
     }
-    return new Pair<ServerName, RegionServerStatusService.BlockingInterface>(sn, intf);
+    rssStub = intf;
+    return sn;
   }
 
   /**
@@ -2016,12 +2084,9 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
    * @throws IOException
    */
   private RegionServerStartupResponse reportForDuty() throws IOException {
+    ServerName masterServerName = createRegionServerStatusStub();
+    if (masterServerName == null) return null;
     RegionServerStartupResponse result = null;
-    Pair<ServerName, RegionServerStatusService.BlockingInterface> p =
-      createRegionServerStatusStub();
-    this.rssStub = p.getSecond();
-    ServerName masterServerName = p.getFirst();
-    if (masterServerName == null) return result;
     try {
       this.requestCount.set(0);
       LOG.info("reportForDuty to master=" + masterServerName + " with port=" + this.isa.getPort() +

http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ffa86e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java
index 287ffa1..6593560 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java
@@ -46,8 +46,10 @@ import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition.TransitionCode;
 import org.apache.hadoop.hbase.regionserver.SplitTransaction.LoggingProgressable;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ConfigUtil;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
@@ -99,6 +101,7 @@ public class RegionMergeTransaction {
   private int znodeVersion = -1;
   // We only merge adjacent regions if forcible is false
   private final boolean forcible;
+  private boolean useZKForAssignment;
 
   /**
    * Types to add to the transaction journal. Each enum is a step in the merge
@@ -239,6 +242,8 @@ public class RegionMergeTransaction {
    */
   public HRegion execute(final Server server,
       final RegionServerServices services) throws IOException {
+    useZKForAssignment = server == null ? true :
+      ConfigUtil.useZKForAssignment(server.getConfiguration());
     if (rsCoprocessorHost == null) {
       rsCoprocessorHost = server != null ? ((HRegionServer) server).getCoprocessorHost() : null;
     }
@@ -315,7 +320,7 @@ public class RegionMergeTransaction {
     // will determine whether the region is merged or not in case of failures.
     // If it is successful, master will roll-forward, if not, master will
     // rollback
-    if (!testing) {
+    if (!testing && useZKForAssignment) {
       if (metaEntries.isEmpty()) {
         MetaEditor.mergeRegions(server.getCatalogTracker(), mergedRegion.getRegionInfo(), region_a
             .getRegionInfo(), region_b.getRegionInfo(), server.getServerName());
@@ -323,6 +328,14 @@ public class RegionMergeTransaction {
         mergeRegionsAndPutMetaEntries(server.getCatalogTracker(), mergedRegion.getRegionInfo(),
           region_a.getRegionInfo(), region_b.getRegionInfo(), server.getServerName(), metaEntries);
       }
+    } else if (services != null && !useZKForAssignment) {
+      if (!services.reportRegionTransition(TransitionCode.MERGE_PONR,
+          mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
+        // Passed PONR, let SSH clean it up
+        throw new IOException("Failed to notify master that merge passed PONR: "
+          + region_a.getRegionInfo().getRegionNameAsString() + " and "
+          + region_b.getRegionInfo().getRegionNameAsString());
+      }
     }
     return mergedRegion;
   }
@@ -352,6 +365,7 @@ public class RegionMergeTransaction {
     addLocation(putOfMerged, serverName, 1);
   }
 
+  @SuppressWarnings("deprecation")
   public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
     p.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes
         .toBytes(sn.getHostAndPort()));
@@ -365,7 +379,7 @@ public class RegionMergeTransaction {
       boolean testing) throws IOException {
     // Set ephemeral MERGING znode up in zk. Mocked servers sometimes don't
     // have zookeeper so don't do zk stuff if server or zookeeper is null
-    if (server != null && server.getZooKeeper() != null) {
+    if (useZKAndZKIsSet(server)) {
       try {
         createNodeMerging(server.getZooKeeper(), this.mergedRegionInfo,
           server.getServerName(), region_a.getRegionInfo(), region_b.getRegionInfo());
@@ -373,9 +387,16 @@ public class RegionMergeTransaction {
         throw new IOException("Failed creating PENDING_MERGE znode on "
             + this.mergedRegionInfo.getRegionNameAsString(), e);
       }
+    } else if (services != null && !useZKForAssignment) {
+      if (!services.reportRegionTransition(TransitionCode.READY_TO_MERGE,
+          mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
+        throw new IOException("Failed to get ok from master to merge "
+          + region_a.getRegionInfo().getRegionNameAsString() + " and "
+          + region_b.getRegionInfo().getRegionNameAsString());
+      }
     }
     this.journal.add(JournalEntry.SET_MERGING_IN_ZK);
-    if (server != null && server.getZooKeeper() != null) {
+    if (useZKAndZKIsSet(server)) {
       // After creating the merge node, wait for master to transition it
       // from PENDING_MERGE to MERGING so that we can move on. We want master
       // knows about it and won't transition any region which is merging.
@@ -399,7 +420,7 @@ public class RegionMergeTransaction {
     // clean this up.
     mergeStoreFiles(hstoreFilesOfRegionA, hstoreFilesOfRegionB);
 
-    if (server != null && server.getZooKeeper() != null) {
+    if (server != null && useZKAndZKIsSet(server)) {
       try {
         // Do one more check on the merging znode (before it is too late) in case
         // any merging region is moved somehow. If so, the znode transition will fail.
@@ -548,7 +569,13 @@ public class RegionMergeTransaction {
 
     if (services != null) {
       try {
-        services.postOpenDeployTasks(merged, server.getCatalogTracker());
+        if (useZKForAssignment) {
+          services.postOpenDeployTasks(merged, server.getCatalogTracker());
+        } else if (!services.reportRegionTransition(TransitionCode.MERGED,
+            mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
+          throw new IOException("Failed to report merged region to master: "
+            + mergedRegionInfo.getShortNameToLog());
+        }
         services.addToOnlineRegions(merged);
       } catch (KeeperException ke) {
         throw new IOException(ke);
@@ -567,43 +594,41 @@ public class RegionMergeTransaction {
    */
   void transitionZKNode(final Server server, final RegionServerServices services,
       HRegion mergedRegion) throws IOException {
-    if (server == null || server.getZooKeeper() == null) {
-      return;
-    }
-
-    // Tell master about merge by updating zk. If we fail, abort.
-    try {
-      this.znodeVersion = transitionMergingNode(server.getZooKeeper(),
-        this.mergedRegionInfo, region_a.getRegionInfo(),
-        region_b.getRegionInfo(), server.getServerName(), this.znodeVersion,
-        RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGED);
-
-      long startTime = EnvironmentEdgeManager.currentTimeMillis();
-      int spins = 0;
-      // Now wait for the master to process the merge. We know it's done
-      // when the znode is deleted. The reason we keep tickling the znode is
-      // that it's possible for the master to miss an event.
-      do {
-        if (spins % 10 == 0) {
-          LOG.debug("Still waiting on the master to process the merge for "
-              + this.mergedRegionInfo.getEncodedName() + ", waited "
-              + (EnvironmentEdgeManager.currentTimeMillis() - startTime) + "ms");
-        }
-        Thread.sleep(100);
-        // When this returns -1 it means the znode doesn't exist
+    if (useZKAndZKIsSet(server)) {
+      // Tell master about merge by updating zk. If we fail, abort.
+      try {
         this.znodeVersion = transitionMergingNode(server.getZooKeeper(),
           this.mergedRegionInfo, region_a.getRegionInfo(),
           region_b.getRegionInfo(), server.getServerName(), this.znodeVersion,
-          RS_ZK_REGION_MERGED, RS_ZK_REGION_MERGED);
-        spins++;
-      } while (this.znodeVersion != -1 && !server.isStopped()
-          && !services.isStopping());
-    } catch (Exception e) {
-      if (e instanceof InterruptedException) {
-        Thread.currentThread().interrupt();
+          RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGED);
+  
+        long startTime = EnvironmentEdgeManager.currentTimeMillis();
+        int spins = 0;
+        // Now wait for the master to process the merge. We know it's done
+        // when the znode is deleted. The reason we keep tickling the znode is
+        // that it's possible for the master to miss an event.
+        do {
+          if (spins % 10 == 0) {
+            LOG.debug("Still waiting on the master to process the merge for "
+                + this.mergedRegionInfo.getEncodedName() + ", waited "
+                + (EnvironmentEdgeManager.currentTimeMillis() - startTime) + "ms");
+          }
+          Thread.sleep(100);
+          // When this returns -1 it means the znode doesn't exist
+          this.znodeVersion = transitionMergingNode(server.getZooKeeper(),
+            this.mergedRegionInfo, region_a.getRegionInfo(),
+            region_b.getRegionInfo(), server.getServerName(), this.znodeVersion,
+            RS_ZK_REGION_MERGED, RS_ZK_REGION_MERGED);
+          spins++;
+        } while (this.znodeVersion != -1 && !server.isStopped()
+            && !services.isStopping());
+      } catch (Exception e) {
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+        }
+        throw new IOException("Failed telling master about merge "
+            + mergedRegionInfo.getEncodedName(), e);
       }
-      throw new IOException("Failed telling master about merge "
-          + mergedRegionInfo.getEncodedName(), e);
     }
 
     if (rsCoprocessorHost != null) {
@@ -745,8 +770,12 @@ public class RegionMergeTransaction {
       switch (je) {
 
         case SET_MERGING_IN_ZK:
-          if (server != null && server.getZooKeeper() != null) {
+          if (useZKAndZKIsSet(server)) {
             cleanZK(server, this.mergedRegionInfo);
+          } else if (services != null && !useZKForAssignment
+              && !services.reportRegionTransition(TransitionCode.MERGE_REVERTED,
+                  mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
+            return false;
           }
           break;
 
@@ -822,6 +851,10 @@ public class RegionMergeTransaction {
     return this.mergesdir;
   }
 
+  private boolean useZKAndZKIsSet(final Server server) {
+    return server != null && useZKForAssignment && server.getZooKeeper() != null;
+  }
+
   private static void cleanZK(final Server server, final HRegionInfo hri) {
     try {
       // Only delete if its in expected state; could have been hijacked.

http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ffa86e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index 2be10cd..de109f3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.ipc.PriorityFunction;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.master.TableLockManager;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition.TransitionCode;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.zookeeper.KeeperException;
 
@@ -81,6 +82,16 @@ public interface RegionServerServices
   throws KeeperException, IOException;
 
   /**
+   * Notify master that a handler requests to change a region state
+   */
+  boolean reportRegionTransition(TransitionCode code, long openSeqNum, HRegionInfo... hris);
+
+  /**
+   * Notify master that a handler requests to change a region state
+   */
+  boolean reportRegionTransition(TransitionCode code, HRegionInfo... hris);
+
+  /**
    * Returns a reference to the region server's RPC server
    */
   RpcServerInterface getRpcServer();

http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ffa86e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
index 3394ccd..50e716c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
@@ -49,8 +49,10 @@ import org.apache.hadoop.hbase.catalog.MetaEditor;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition.TransitionCode;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.ConfigUtil;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.PairOfSameType;
@@ -98,6 +100,7 @@ public class SplitTransaction {
   private HRegionInfo hri_b;
   private long fileSplitTimeout = 30000;
   private int znodeVersion = -1;
+  boolean useZKForAssignment;
 
   /*
    * Row to split around
@@ -281,15 +284,22 @@ public class SplitTransaction {
     // will determine whether the region is split or not in case of failures.
     // If it is successful, master will roll-forward, if not, master will rollback
     // and assign the parent region.
-    if (!testing) {
+    if (!testing && useZKForAssignment) {
       if (metaEntries == null || metaEntries.isEmpty()) {
-        MetaEditor.splitRegion(server.getCatalogTracker(),
-            parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(),
-            daughterRegions.getSecond().getRegionInfo(), server.getServerName());
+        MetaEditor.splitRegion(server.getCatalogTracker(), parent.getRegionInfo(), daughterRegions
+            .getFirst().getRegionInfo(), daughterRegions.getSecond().getRegionInfo(), server
+            .getServerName());
       } else {
-        offlineParentInMetaAndputMetaEntries(server.getCatalogTracker(),
-          parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), daughterRegions
-              .getSecond().getRegionInfo(), server.getServerName(), metaEntries);
+        offlineParentInMetaAndputMetaEntries(server.getCatalogTracker(), parent.getRegionInfo(),
+          daughterRegions.getFirst().getRegionInfo(), daughterRegions.getSecond().getRegionInfo(),
+          server.getServerName(), metaEntries);
+      }
+    } else if (services != null && !useZKForAssignment) {
+      if (!services.reportRegionTransition(TransitionCode.SPLIT_PONR, parent.getRegionInfo(),
+        hri_a, hri_b)) {
+        // Passed PONR, let SSH clean it up
+        throw new IOException("Failed to notify master that split passed PONR: "
+            + parent.getRegionInfo().getRegionNameAsString());
       }
     }
     return daughterRegions;
@@ -299,7 +309,7 @@ public class SplitTransaction {
       final RegionServerServices services, boolean testing) throws IOException {
     // Set ephemeral SPLITTING znode up in zk.  Mocked servers sometimes don't
     // have zookeeper so don't do zk stuff if server or zookeeper is null
-    if (server != null && server.getZooKeeper() != null) {
+    if (server != null && server.getZooKeeper() != null && useZKForAssignment) {
       try {
         createNodeSplitting(server.getZooKeeper(),
           parent.getRegionInfo(), server.getServerName(), hri_a, hri_b);
@@ -307,9 +317,15 @@ public class SplitTransaction {
         throw new IOException("Failed creating PENDING_SPLIT znode on " +
           this.parent.getRegionNameAsString(), e);
       }
+    } else if (services != null && !useZKForAssignment) {
+      if (!services.reportRegionTransition(TransitionCode.READY_TO_SPLIT,
+        parent.getRegionInfo(), hri_a, hri_b)) {
+        throw new IOException("Failed to get ok from master to split "
+            + parent.getRegionNameAsString());
+      }
     }
     this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);
-    if (server != null && server.getZooKeeper() != null) {
+    if (server != null && server.getZooKeeper() != null && useZKForAssignment) {
       // After creating the split node, wait for master to transition it
       // from PENDING_SPLIT to SPLITTING so that we can move on. We want master
       // knows about it and won't transition any region which is splitting.
@@ -411,11 +427,19 @@ public class SplitTransaction {
       }
       if (services != null) {
         try {
-          // add 2nd daughter first (see HBASE-4335)
-          services.postOpenDeployTasks(b, server.getCatalogTracker());
+          if (useZKForAssignment) {
+            // add 2nd daughter first (see HBASE-4335)
+            services.postOpenDeployTasks(b, server.getCatalogTracker());
+          } else if (!services.reportRegionTransition(TransitionCode.SPLIT,
+              parent.getRegionInfo(), hri_a, hri_b)) {
+            throw new IOException("Failed to report split region to master: "
+              + parent.getRegionInfo().getShortNameToLog());
+          }
           // Should add it to OnlineRegions
           services.addToOnlineRegions(b);
-          services.postOpenDeployTasks(a, server.getCatalogTracker());
+          if (useZKForAssignment) {
+            services.postOpenDeployTasks(a, server.getCatalogTracker());
+          }
           services.addToOnlineRegions(a);
         } catch (KeeperException ke) {
           throw new IOException(ke);
@@ -471,10 +495,7 @@ public class SplitTransaction {
       }
     }
 
-    // Coprocessor callback
-    if (this.parent.getCoprocessorHost() != null) {
-      this.parent.getCoprocessorHost().postSplit(a,b);
-    }
+    
 
     // Leaving here, the splitdir with its dross will be in place but since the
     // split was successful, just leave it; it'll be cleaned when parent is
@@ -565,6 +586,8 @@ public class SplitTransaction {
   public PairOfSameType<HRegion> execute(final Server server,
       final RegionServerServices services)
   throws IOException {
+    useZKForAssignment =
+        server == null ? true : ConfigUtil.useZKForAssignment(server.getConfiguration());
     PairOfSameType<HRegion> regions = createDaughters(server, services);
     if (this.parent.getCoprocessorHost() != null) {
       this.parent.getCoprocessorHost().preSplitAfterPONR();
@@ -576,7 +599,13 @@ public class SplitTransaction {
       final RegionServerServices services, PairOfSameType<HRegion> regions)
       throws IOException {
     openDaughters(server, services, regions.getFirst(), regions.getSecond());
-    transitionZKNode(server, services, regions.getFirst(), regions.getSecond());
+    if (server != null && server.getZooKeeper() != null && useZKForAssignment) {
+      transitionZKNode(server, services, regions.getFirst(), regions.getSecond());
+    }
+    // Coprocessor callback
+    if (this.parent.getCoprocessorHost() != null) {
+      this.parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond());
+    }
     return regions;
   }
 
@@ -801,8 +830,13 @@ public class SplitTransaction {
       switch(je) {
 
       case SET_SPLITTING_IN_ZK:
-        if (server != null && server.getZooKeeper() != null) {
+        if (server != null && server.getZooKeeper() != null && useZKForAssignment) {
           cleanZK(server, this.parent.getRegionInfo());
+        } else if (services != null
+            && !useZKForAssignment
+            && !services.reportRegionTransition(TransitionCode.SPLIT_REVERTED,
+              parent.getRegionInfo(), hri_a, hri_b)) {
+          return false;
         }
         break;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ffa86e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
index 54c77c7..80b8d66 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
@@ -28,8 +28,10 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition.TransitionCode;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.util.ConfigUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
 import org.apache.zookeeper.KeeperException;
 
@@ -62,6 +64,7 @@ public class CloseRegionHandler extends EventHandler {
   // CLOSING.
   private final boolean zk;
   private ServerName destination;
+  private final boolean useZKForAssignment;
 
   // This is executed after receiving an CLOSE RPC from the master.
   public CloseRegionHandler(final Server server,
@@ -100,9 +103,8 @@ public class CloseRegionHandler extends EventHandler {
     this(server, rsServices, regionInfo, abort, zk, versionOfClosingNode, eventType, null);
   }
 
-    protected CloseRegionHandler(final Server server,
-      final RegionServerServices rsServices, HRegionInfo regionInfo,
-      boolean abort, final boolean zk, final int versionOfClosingNode,
+  protected CloseRegionHandler(final Server server, final RegionServerServices rsServices,
+      HRegionInfo regionInfo, boolean abort, final boolean zk, final int versionOfClosingNode,
       EventType eventType, ServerName destination) {
     super(server, eventType);
     this.server = server;
@@ -112,6 +114,7 @@ public class CloseRegionHandler extends EventHandler {
     this.zk = zk;
     this.expectedVersion = versionOfClosingNode;
     this.destination = destination;
+    useZKForAssignment = ConfigUtil.useZKForAssignment(server.getConfiguration());
   }
 
   public HRegionInfo getRegionInfo() {
@@ -137,7 +140,8 @@ public class CloseRegionHandler extends EventHandler {
 
       // Close the region
       try {
-        if (zk && !ZKAssign.checkClosingState(server.getZooKeeper(), regionInfo, expectedVersion)){
+        if (zk && useZKForAssignment
+            && !ZKAssign.checkClosingState(server.getZooKeeper(), regionInfo, expectedVersion)) {
           // bad znode state
           return; // We're node deleting the znode, but it's not ours...
         }
@@ -162,16 +166,18 @@ public class CloseRegionHandler extends EventHandler {
       }
 
       this.rsServices.removeFromOnlineRegions(region, destination);
-
-      if (this.zk) {
-        if (setClosedState(this.expectedVersion, region)) {
-          LOG.debug("Set closed state in zk for " + name + " on " + this.server.getServerName());
-        } else {
-          LOG.debug("Set closed state in zk UNSUCCESSFUL for " + name + " on " +
-            this.server.getServerName());
+      if (!useZKForAssignment) {
+        rsServices.reportRegionTransition(TransitionCode.CLOSED, regionInfo);
+      } else {
+        if (this.zk) {
+          if (setClosedState(this.expectedVersion, region)) {
+            LOG.debug("Set closed state in zk for " + name + " on " + this.server.getServerName());
+          } else {
+            LOG.debug("Set closed state in zk UNSUCCESSFUL for " + name + " on "
+                + this.server.getServerName());
+          }
         }
       }
-
       // Done!  Region is closed on this RS
       LOG.debug("Closed " + region.getRegionNameAsString());
     } finally {

http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ffa86e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
index fb689f9..ec78aa4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
@@ -30,10 +30,12 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition.TransitionCode;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.ConfigUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.zookeeper.KeeperException;
@@ -61,6 +63,8 @@ public class OpenRegionHandler extends EventHandler {
   //version of the offline node that was set by the master
   private volatile int versionOfOfflineNode = -1;
 
+  private final boolean useZKForAssignment;
+
   public OpenRegionHandler(final Server server,
       final RegionServerServices rsServices, HRegionInfo regionInfo,
       HTableDescriptor htd) {
@@ -88,6 +92,7 @@ public class OpenRegionHandler extends EventHandler {
     assignmentTimeout = this.server.getConfiguration().
       getInt(AssignmentManager.ASSIGNMENT_TIMEOUT,
         AssignmentManager.DEFAULT_ASSIGNMENT_TIMEOUT_DEFAULT);
+    useZKForAssignment = ConfigUtil.useZKForAssignment(server.getConfiguration());
   }
 
   public HRegionInfo getRegionInfo() {
@@ -128,7 +133,8 @@ public class OpenRegionHandler extends EventHandler {
         return;
       }
 
-      if (!transitionZookeeperOfflineToOpening(encodedName, versionOfOfflineNode)) {
+      if (useZKForAssignment
+          && !transitionZookeeperOfflineToOpening(encodedName, versionOfOfflineNode)) {
         LOG.warn("Region was hijacked? Opening cancelled for encodedName=" + encodedName);
         // This is a desperate attempt: the znode is unlikely to be ours. But we can't do more.
         return;
@@ -142,7 +148,7 @@ public class OpenRegionHandler extends EventHandler {
       }
 
       boolean failed = true;
-      if (tickleOpening("post_region_open")) {
+      if (!useZKForAssignment || tickleOpening("post_region_open")) {
         if (updateMeta(region)) {
           failed = false;
         }
@@ -153,7 +159,7 @@ public class OpenRegionHandler extends EventHandler {
       }
 
 
-      if (!isRegionStillOpening() || !transitionToOpened(region)) {
+      if (!isRegionStillOpening() || (useZKForAssignment && !transitionToOpened(region))) {
         // If we fail to transition to opened, it's because of one of two cases:
         //    (a) we lost our ZK lease
         // OR (b) someone else opened the region before us
@@ -218,10 +224,16 @@ public class OpenRegionHandler extends EventHandler {
           cleanupFailedOpen(region);
         }
       } finally {
+        if (!useZKForAssignment) {
+          rsServices.reportRegionTransition(TransitionCode.FAILED_OPEN, regionInfo);
+        } else {
         // Even if cleanupFailed open fails we need to do this transition
         // See HBASE-7698
         tryTransitionFromOpeningToFailedOpen(regionInfo);
+        }
       }
+    } else if (!useZKForAssignment) {
+      rsServices.reportRegionTransition(TransitionCode.FAILED_OPEN, regionInfo);
     } else {
       // If still transition to OPENING is not done, we need to transition znode
       // to FAILED_OPEN
@@ -262,7 +274,9 @@ public class OpenRegionHandler extends EventHandler {
       if (elapsed > period) {
         // Only tickle OPENING if postOpenDeployTasks is taking some time.
         lastUpdate = now;
-        tickleOpening = tickleOpening("post_open_deploy");
+        if (useZKForAssignment) {
+          tickleOpening = tickleOpening("post_open_deploy");
+        }
       }
       synchronized (signaller) {
         try {
@@ -467,12 +481,20 @@ public class OpenRegionHandler extends EventHandler {
           this.server.getConfiguration(),
           this.rsServices,
         new CancelableProgressable() {
-          public boolean progress() {
-            // We may lose the znode ownership during the open.  Currently its
-            // too hard interrupting ongoing region open.  Just let it complete
-            // and check we still have the znode after region open.
-            return tickleOpening("open_region_progress");
-          }
+              public boolean progress() {
+                if (useZKForAssignment) {
+                  // We may lose the znode ownership during the open. Currently its
+                  // too hard interrupting ongoing region open. Just let it complete
+                  // and check we still have the znode after region open.
+                  // if tickle failed, we need to cancel opening region.
+                  return tickleOpening("open_region_progress");
+                }
+                if (!isRegionStillOpening()) {
+                  LOG.warn("Open region aborted since it isn't opening any more");
+                  return false;
+                }
+                return true;
+              }
         });
     } catch (Throwable t) {
       // We failed open. Our caller will see the 'null' return value

http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ffa86e/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConfigUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConfigUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConfigUtil.java
new file mode 100644
index 0000000..2183ee9
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConfigUtil.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Some configuration related utilities
+ */
+@InterfaceAudience.Private
+public class ConfigUtil {
+
+  public static boolean useZKForAssignment(Configuration conf) {
+    // To change the default, please also update ZooKeeperWatcher.java
+    return conf.getBoolean("hbase.assignment.usezk", true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ffa86e/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index 50ad030..e399ad5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.master.TableLockManager;
 import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition.TransitionCode;
 import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
 import org.apache.hadoop.hbase.regionserver.FlushRequester;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -153,7 +154,7 @@ class MockRegionServerServices implements RegionServerServices {
 
   @Override
   public Configuration getConfiguration() {
-    return null;
+    return zkw == null ? null : zkw.getConfiguration();
   }
 
   @Override
@@ -227,4 +228,16 @@ class MockRegionServerServices implements RegionServerServices {
     // TODO Auto-generated method stub
     return null;
   }
+
+  @Override
+  public boolean reportRegionTransition(TransitionCode code, long openSeqNum,
+      HRegionInfo... hris) {
+    return false;
+  }
+
+  @Override
+  public boolean reportRegionTransition(TransitionCode code,
+      HRegionInfo... hris) {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ffa86e/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java
index 9a51729..1a74c31 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java
@@ -79,6 +79,7 @@ public class TestDrainingServer {
   
   @BeforeClass
   public static void beforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setBoolean("hbase.assignment.usezk", true);
     TEST_UTIL.startMiniZKCluster();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ffa86e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
index 46b02a0..46a4d79 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ConfigUtil;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -491,7 +492,9 @@ public class TestScannersFromClientSide {
       RegionStates states = master.getAssignmentManager().getRegionStates();
       states.regionOffline(hri);
       states.updateRegionState(hri, State.OPENING);
-      ZKAssign.createNodeOffline(zkw, hri, loc.getServerName());
+      if (ConfigUtil.useZKForAssignment(TEST_UTIL.getConfiguration())) {
+        ZKAssign.createNodeOffline(zkw, hri, loc.getServerName());
+      }
       ProtobufUtil.openRegion(rs, rs.getServerName(), hri);
       startTime = EnvironmentEdgeManager.currentTimeMillis();
       while (true) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ffa86e/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 45b9885..db7a75b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition.TransitionCode;
 import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
 import org.apache.hadoop.hbase.regionserver.FlushRequester;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -560,4 +561,15 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
   public ServerNonceManager getNonceManager() {
     return null;
   }
+
+  @Override
+  public boolean reportRegionTransition(TransitionCode code, HRegionInfo... hris) {
+    return false;
+  }
+
+  @Override
+  public boolean reportRegionTransition(TransitionCode code, long openSeqNum,
+      HRegionInfo... hris) {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ffa86e/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
index 1ba8eac..09ac83d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
@@ -120,6 +120,7 @@ public class TestAssignmentManager {
 
   @BeforeClass
   public static void beforeClass() throws Exception {
+    HTU.getConfiguration().setBoolean("hbase.assignment.usezk", true);
     HTU.startMiniZKCluster();
   }
 
@@ -139,6 +140,7 @@ public class TestAssignmentManager {
     this.server = Mockito.mock(Server.class);
     Mockito.when(server.getServerName()).thenReturn(ServerName.valueOf("master,1,1"));
     Mockito.when(server.getConfiguration()).thenReturn(HTU.getConfiguration());
+    Mockito.when(server.getCatalogTracker()).thenReturn(null);
     this.watcher =
       new ZooKeeperWatcher(HTU.getConfiguration(), "mockedServer", this.server, true);
     Mockito.when(server.getZooKeeper()).thenReturn(this.watcher);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ffa86e/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
index 8ffc9fa..eb15240 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.master.RegionState.State;
 import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ConfigUtil;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
@@ -75,11 +76,10 @@ import org.junit.experimental.categories.Category;
 public class TestAssignmentManagerOnCluster {
   private final static byte[] FAMILY = Bytes.toBytes("FAMILY");
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-  private final static Configuration conf = TEST_UTIL.getConfiguration();
+  final static Configuration conf = TEST_UTIL.getConfiguration();
   private static HBaseAdmin admin;
 
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
+  static void setupOnce() throws Exception {
     // Using the our load balancer to control region plans
     conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
       MyLoadBalancer.class, LoadBalancer.class);
@@ -92,6 +92,13 @@ public class TestAssignmentManagerOnCluster {
     admin = TEST_UTIL.getHBaseAdmin();
   }
 
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // Use ZK for region assignment
+    conf.setBoolean("hbase.assignment.usezk", true);
+    setupOnce();
+  }
+
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
     TEST_UTIL.shutdownMiniCluster();
@@ -554,16 +561,18 @@ public class TestAssignmentManagerOnCluster {
       }
       am.regionOffline(hri);
       ZooKeeperWatcher zkw = TEST_UTIL.getHBaseCluster().getMaster().getZooKeeper();
-      am.getRegionStates().updateRegionState(hri, State.OFFLINE);
-      ZKAssign.createNodeOffline(zkw, hri, destServerName);
-      ZKAssign.transitionNodeOpening(zkw, hri, destServerName);
-
-      // Wait till the event is processed and the region is in transition
-      long timeoutTime = System.currentTimeMillis() + 20000;
-      while (!am.getRegionStates().isRegionInTransition(hri)) {
-        assertTrue("Failed to process ZK opening event in time",
-          System.currentTimeMillis() < timeoutTime);
-        Thread.sleep(100);
+      am.getRegionStates().updateRegionState(hri, State.PENDING_OPEN, destServerName);
+      if (ConfigUtil.useZKForAssignment(conf)) {
+        ZKAssign.createNodeOffline(zkw, hri, destServerName);
+        ZKAssign.transitionNodeOpening(zkw, hri, destServerName);
+  
+        // Wait till the event is processed and the region is in transition
+        long timeoutTime = System.currentTimeMillis() + 20000;
+        while (!am.getRegionStates().isRegionInTransition(hri)) {
+          assertTrue("Failed to process ZK opening event in time",
+            System.currentTimeMillis() < timeoutTime);
+          Thread.sleep(100);
+        }
       }
 
       am.getZKTable().setDisablingTable(table);
@@ -697,8 +706,6 @@ public class TestAssignmentManagerOnCluster {
       ServerName serverName = master.getAssignmentManager().
         getRegionStates().getRegionServerOfRegion(hri);
       TEST_UTIL.assertRegionOnlyOnServer(hri, serverName, 200);
-      assertFalse("Region should be assigned on a new region server",
-        oldServerName.equals(serverName));
     } finally {
       MyRegionObserver.postOpenEnabled.set(false);
       TEST_UTIL.deleteTable(Bytes.toBytes(table));

http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ffa86e/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
index 07be41d..7a786f3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.LargeTests;
@@ -47,6 +48,8 @@ import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.RegionTransition;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.catalog.MetaEditor;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.master.RegionState.State;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -159,6 +162,7 @@ public class TestMasterFailover {
 
     // Create config to use for this cluster
     Configuration conf = HBaseConfiguration.create();
+    conf.setBoolean("hbase.assignment.usezk", true);
 
     // Start the cluster
     HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
@@ -494,6 +498,7 @@ public class TestMasterFailover {
     // Create and start the cluster
     HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
     Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setBoolean("hbase.assignment.usezk", true);
 
     conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
     conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 2);
@@ -932,6 +937,7 @@ public class TestMasterFailover {
     HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
     Configuration conf = TEST_UTIL.getConfiguration();
     conf.setInt("hbase.master.info.port", -1);
+    conf.setBoolean("hbase.assignment.usezk", true);
 
     TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
     MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
@@ -984,6 +990,7 @@ public class TestMasterFailover {
 
     // Create config to use for this cluster
     Configuration conf = HBaseConfiguration.create();
+    conf.setBoolean("hbase.assignment.usezk", true);
 
     // Start the cluster
     final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
@@ -1152,5 +1159,101 @@ public class TestMasterFailover {
     // Stop the cluster
     TEST_UTIL.shutdownMiniCluster();
   }
+
+  /**
+   * Test region in pending_open/close when master failover
+   */
+  @Test (timeout=180000)
+  public void testPendingOpenOrCloseWhenMasterFailover() throws Exception {
+    final int NUM_MASTERS = 1;
+    final int NUM_RS = 1;
+
+    // Create config to use for this cluster
+    Configuration conf = HBaseConfiguration.create();
+    conf.setBoolean("hbase.assignment.usezk", false);
+
+    // Start the cluster
+    HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
+    TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
+    MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+    log("Cluster started");
+
+    // get all the master threads
+    List<MasterThread> masterThreads = cluster.getMasterThreads();
+    assertEquals(1, masterThreads.size());
+
+    // only one master thread, let's wait for it to be initialized
+    assertTrue(cluster.waitForActiveAndReadyMaster());
+    HMaster master = masterThreads.get(0).getMaster();
+    assertTrue(master.isActiveMaster());
+    assertTrue(master.isInitialized());
+
+    // Create a table with a region online
+    HTable onlineTable = TEST_UTIL.createTable("onlineTable", "family");
+
+    // Create a table in META, so it has a region offline
+    HTableDescriptor offlineTable = new HTableDescriptor(
+      TableName.valueOf(Bytes.toBytes("offlineTable")));
+    offlineTable.addFamily(new HColumnDescriptor(Bytes.toBytes("family")));
+
+    FileSystem filesystem = FileSystem.get(conf);
+    Path rootdir = FSUtils.getRootDir(conf);
+    FSTableDescriptors fstd = new FSTableDescriptors(filesystem, rootdir);
+    fstd.createTableDescriptor(offlineTable);
+
+    HRegionInfo hriOffline = new HRegionInfo(offlineTable.getTableName(), null, null);
+    createRegion(hriOffline, rootdir, conf, offlineTable);
+    MetaEditor.addRegionToMeta(master.getCatalogTracker(), hriOffline);
+
+    log("Regions in hbase:meta and namespace have been created");
+
+    // at this point we only expect 3 regions to be assigned out
+    // (catalogs and namespace, + 1 online region)
+    assertEquals(3, cluster.countServedRegions());
+    HRegionInfo hriOnline = onlineTable.getRegionLocation("").getRegionInfo();
+
+    RegionStates regionStates = master.getAssignmentManager().getRegionStates();
+    RegionStateStore stateStore = master.getAssignmentManager().getRegionStateStore();
+
+    // Put the online region in pending_close. It is actually already opened.
+    // This is to simulate that the region close RPC is not sent out before failover
+    RegionState oldState = regionStates.getRegionState(hriOnline);
+    RegionState newState = new RegionState(hriOnline, State.PENDING_CLOSE, oldState.getServerName());
+    stateStore.updateRegionState(HConstants.NO_SEQNUM, newState, oldState);
+
+    // Put the offline region in pending_open. It is actually not opened yet.
+    // This is to simulate that the region open RPC is not sent out before failover
+    oldState = new RegionState(hriOffline, State.OFFLINE);
+    newState = new RegionState(hriOffline, State.PENDING_OPEN, newState.getServerName());
+    stateStore.updateRegionState(HConstants.NO_SEQNUM, newState, oldState);
+
+    // Stop the master
+    log("Aborting master");
+    cluster.abortMaster(0);
+    cluster.waitOnMaster(0);
+    log("Master has aborted");
+
+    // Start up a new master
+    log("Starting up a new master");
+    master = cluster.startMaster().getMaster();
+    log("Waiting for master to be ready");
+    cluster.waitForActiveAndReadyMaster();
+    log("Master is ready");
+
+    // Wait till no region in transition any more
+    master.getAssignmentManager().waitUntilNoRegionsInTransition(60000);
+
+    // Get new region states since master restarted
+    regionStates = master.getAssignmentManager().getRegionStates();
+
+    // Both pending_open (RPC sent/not yet) regions should be online
+    assertTrue(regionStates.isRegionOnline(hriOffline));
+    assertTrue(regionStates.isRegionOnline(hriOnline));
+
+    log("Done with verification, shutting down cluster");
+
+    // Done, shutdown the cluster
+    TEST_UTIL.shutdownMiniCluster();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ffa86e/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java
index f68c015..da1df20 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java
@@ -65,6 +65,7 @@ public class TestOpenedRegionHandler {
   @Before
   public void setUp() throws Exception {
     conf = HBaseConfiguration.create();
+    conf.setBoolean("hbase.assignment.usezk", true);
     TEST_UTIL = HBaseTestingUtility.createLocalHTU(conf);
   }
 
@@ -80,6 +81,7 @@ public class TestOpenedRegionHandler {
     // Start the cluster
     log("Starting cluster");
     conf = HBaseConfiguration.create();
+    conf.setBoolean("hbase.assignment.usezk", true);
     resetConf = conf;
     conf.setInt("hbase.master.assignment.timeoutmonitor.period", 2000);
     conf.setInt("hbase.master.assignment.timeoutmonitor.timeout", 5000);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ffa86e/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
index 2cda4e9..372c495 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
@@ -58,6 +58,7 @@ public class TestRestartCluster {
 
   @Test (timeout=300000) public void testRestartClusterAfterKill()
   throws Exception {
+    UTIL.getConfiguration().setBoolean("hbase.assignment.usezk", true);
     UTIL.startMiniZKCluster();
     ZooKeeperWatcher zooKeeper =
       new ZooKeeperWatcher(UTIL.getConfiguration(), "cluster1", null, true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ffa86e/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java
index a923d49..d3bad28 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java
@@ -71,6 +71,7 @@ public class TestZKBasedOpenCloseRegion {
 
   @BeforeClass public static void beforeAllTests() throws Exception {
     Configuration c = TEST_UTIL.getConfiguration();
+    c.setBoolean("hbase.assignment.usezk", true);
     c.setBoolean("dfs.support.append", true);
     c.setInt("hbase.regionserver.info.port", 0);
     TEST_UTIL.startMiniCluster(2);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ffa86e/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKLessAMOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKLessAMOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKLessAMOnCluster.java
new file mode 100644
index 0000000..83d33c5
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKLessAMOnCluster.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import org.apache.hadoop.hbase.MediumTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+/**
+ * This tests AssignmentManager with a testing cluster.
+ */
+@Category(MediumTests.class)
+public class TestZKLessAMOnCluster extends TestAssignmentManagerOnCluster {
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // Don't use ZK for region assignment
+    conf.setBoolean("hbase.assignment.usezk", false);
+    setupOnce();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TestAssignmentManagerOnCluster.tearDownAfterClass();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ffa86e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
index 25b6a59..f2788fd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ConfigUtil;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.PairOfSameType;
 import org.apache.hadoop.hbase.util.StoppableImplementation;
@@ -105,6 +106,7 @@ public class TestEndToEndSplitTransaction {
         .getRegionName();
     HRegion region = server.getRegion(regionName);
     SplitTransaction split = new SplitTransaction(region, splitRow);
+    split.useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
     split.prepare();
 
     // 1. phase I
@@ -138,8 +140,9 @@ public class TestEndToEndSplitTransaction {
     assertTrue(test(con, tableName, lastRow, server));
 
     // 4. phase III
-    split.transitionZKNode(server, server, regions.getFirst(),
-        regions.getSecond());
+    if (split.useZKForAssignment) {
+      split.transitionZKNode(server, server, regions.getFirst(), regions.getSecond());
+    }
     assertTrue(test(con, tableName, firstRow, server));
     assertTrue(test(con, tableName, lastRow, server));
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ffa86e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
index aac801e..cc0a123 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
@@ -31,7 +31,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -39,6 +38,8 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.LargeTests;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.UnknownRegionException;
 import org.apache.hadoop.hbase.catalog.MetaReader;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
@@ -47,14 +48,13 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.exceptions.MergeRegionException;
-import org.apache.hadoop.hbase.UnknownRegionException;
 import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.PairOfSameType;
 import org.junit.AfterClass;
@@ -86,13 +86,12 @@ public class TestRegionMergeTransactionOnCluster {
 
   private static int waitTime = 60 * 1000;
 
-  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
   private static HMaster master;
   private static HBaseAdmin admin;
 
-  @BeforeClass
-  public static void beforeAllTests() throws Exception {
+  static void setupOnce() throws Exception {
     // Start a cluster
     TEST_UTIL.startMiniCluster(NB_SERVERS);
     MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
@@ -101,6 +100,13 @@ public class TestRegionMergeTransactionOnCluster {
     admin = TEST_UTIL.getHBaseAdmin();
   }
 
+  @BeforeClass
+  public static void beforeAllTests() throws Exception {
+    // Use ZK for region assignment
+    TEST_UTIL.getConfiguration().setBoolean("hbase.assignment.usezk", true);
+    setupOnce();
+  }
+
   @AfterClass
   public static void afterAllTests() throws Exception {
     TEST_UTIL.shutdownMiniCluster();

http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ffa86e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
index 8ffc719..799a277 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
@@ -65,6 +65,7 @@ public class TestRegionServerNoMaster {
 
   @BeforeClass
   public static void before() throws Exception {
+    HTU.getConfiguration().setBoolean("hbase.assignment.usezk", true);
     HTU.startMiniCluster(NB_SERVERS);
     final byte[] tableName = Bytes.toBytes(TestRegionServerNoMaster.class.getSimpleName());
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ffa86e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
index af15d78..b547011 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
@@ -114,19 +114,24 @@ public class TestSplitTransactionOnCluster {
   private static volatile boolean secondSplit = false;
   private static volatile boolean callRollBack = false;
   private static volatile boolean firstSplitCompleted = false;
+  private static boolean useZKForAssignment = true;
 
-  private static final HBaseTestingUtility TESTING_UTIL =
+  static final HBaseTestingUtility TESTING_UTIL =
     new HBaseTestingUtility();
 
-  @BeforeClass public static void before() throws Exception {
+  static void setupOnce() throws Exception {
     TESTING_UTIL.getConfiguration().setInt("hbase.balancer.period", 60000);
-    // Needed because some tests have splits happening on RS that are killed
-    // We don't want to wait 3min for the master to figure it out
-    TESTING_UTIL.getConfiguration().setInt(
-        "hbase.master.assignment.timeoutmonitor.timeout", 4000);
+    useZKForAssignment =
+        TESTING_UTIL.getConfiguration().getBoolean("hbase.assignment.usezk", false);
     TESTING_UTIL.startMiniCluster(NB_SERVERS);
   }
 
+  @BeforeClass public static void before() throws Exception {
+    // Use ZK for region assignment
+    TESTING_UTIL.getConfiguration().setBoolean("hbase.assignment.usezk", true);
+    setupOnce();
+  }
+
   @AfterClass public static void after() throws Exception {
     TESTING_UTIL.shutdownMiniCluster();
   }
@@ -173,6 +178,12 @@ public class TestSplitTransactionOnCluster {
   public void testShouldFailSplitIfZNodeDoesNotExistDueToPrevRollBack() throws Exception {
     final TableName tableName =
         TableName.valueOf("testShouldFailSplitIfZNodeDoesNotExistDueToPrevRollBack");
+
+    if (!useZKForAssignment) {
+      // This test doesn't apply if not using ZK for assignment
+      return;
+    }
+
     try {
       // Create table then get the single region for our new table.
       HTable t = createTableAndWait(tableName.getName(), Bytes.toBytes("cf"));
@@ -349,42 +360,46 @@ public class TestSplitTransactionOnCluster {
       AssignmentManager.TEST_SKIP_SPLIT_HANDLING = true;
       // Now try splitting and it should work.
       split(hri, server, regionCount);
-      // Get daughters
-      List<HRegion> daughters = checkAndGetDaughters(tableName);
-      // Assert the ephemeral node is up in zk.
+        // Assert the ephemeral node is up in zk.
       String path = ZKAssign.getNodeName(TESTING_UTIL.getZooKeeperWatcher(),
         hri.getEncodedName());
       RegionTransition rt = null;
       Stat stats = null;
-      // Wait till the znode moved to SPLIT
-      for (int i=0; i<100; i++) {
-        stats = TESTING_UTIL.getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false);
-        rt = RegionTransition.parseFrom(ZKAssign.getData(TESTING_UTIL.getZooKeeperWatcher(),
-          hri.getEncodedName()));
-        if (rt.getEventType().equals(EventType.RS_ZK_REGION_SPLIT)) break;
-        Thread.sleep(100);
+      List<HRegion> daughters = null;
+      if (useZKForAssignment) {
+        daughters = checkAndGetDaughters(tableName);
+
+        // Wait till the znode moved to SPLIT
+        for (int i=0; i<100; i++) {
+          stats = TESTING_UTIL.getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false);
+          rt = RegionTransition.parseFrom(ZKAssign.getData(TESTING_UTIL.getZooKeeperWatcher(),
+            hri.getEncodedName()));
+          if (rt.getEventType().equals(EventType.RS_ZK_REGION_SPLIT)) break;
+          Thread.sleep(100);
+        }
+        LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats=" + stats);
+        assertTrue(rt != null && rt.getEventType().equals(EventType.RS_ZK_REGION_SPLIT));
+        // Now crash the server
+        cluster.abortRegionServer(tableRegionIndex);
       }
-      LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats=" + stats);
-      assertTrue(rt != null && rt.getEventType().equals(EventType.RS_ZK_REGION_SPLIT));
-      // Now crash the server
-      cluster.abortRegionServer(tableRegionIndex);
       waitUntilRegionServerDead();
-      awaitDaughters(tableName, daughters.size());
+      awaitDaughters(tableName, 2);
+      if (useZKForAssignment) {
+        regions = cluster.getRegions(tableName);
+        for (HRegion r: regions) {
+          assertTrue(daughters.contains(r));
+        }
 
-      // Assert daughters are online.
-      regions = cluster.getRegions(tableName);
-      for (HRegion r: regions) {
-        assertTrue(daughters.contains(r));
-      }
-      // Finally assert that the ephemeral SPLIT znode was cleaned up.
-      for (int i=0; i<100; i++) {
-        // wait a bit (10s max) for the node to disappear
-        stats = TESTING_UTIL.getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false);
-        if (stats == null) break;
-        Thread.sleep(100);
+        // Finally assert that the ephemeral SPLIT znode was cleaned up.
+        for (int i=0; i<100; i++) {
+          // wait a bit (10s max) for the node to disappear
+          stats = TESTING_UTIL.getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false);
+          if (stats == null) break;
+          Thread.sleep(100);
+        }
+        LOG.info("EPHEMERAL NODE AFTER SERVER ABORT, path=" + path + ", stats=" + stats);
+        assertTrue(stats == null);
       }
-      LOG.info("EPHEMERAL NODE AFTER SERVER ABORT, path=" + path + ", stats=" + stats);
-      assertTrue(stats == null);
     } finally {
       // Set this flag back.
       AssignmentManager.TEST_SKIP_SPLIT_HANDLING = false;
@@ -407,6 +422,8 @@ public class TestSplitTransactionOnCluster {
 
     int tableRegionIndex = ensureTableRegionNotOnSameServerAsMeta(admin, hri);
 
+    RegionStates regionStates = cluster.getMaster().getAssignmentManager().getRegionStates();
+
     // Turn off balancer so it doesn't cut in and mess up our placements.
     this.admin.setBalancerRunning(false, true);
     // Turn off the meta scanner so it don't remove parent on us.
@@ -421,8 +438,12 @@ public class TestSplitTransactionOnCluster {
       // Insert into zk a blocking znode, a znode of same name as region
       // so it gets in way of our splitting.
       ServerName fakedServer = ServerName.valueOf("any.old.server", 1234, -1);
-      ZKAssign.createNodeClosing(TESTING_UTIL.getZooKeeperWatcher(),
-        hri, fakedServer);
+      if (useZKForAssignment) {
+        ZKAssign.createNodeClosing(TESTING_UTIL.getZooKeeperWatcher(),
+          hri, fakedServer);
+      } else {
+        regionStates.updateRegionState(hri, RegionState.State.CLOSING);
+      }
       // Now try splitting.... should fail.  And each should successfully
       // rollback.
       this.admin.split(hri.getRegionNameAsString());
@@ -433,9 +454,13 @@ public class TestSplitTransactionOnCluster {
         Thread.sleep(100);
         assertEquals(regionCount, ProtobufUtil.getOnlineRegions(server).size());
       }
-      // Now clear the zknode
-      ZKAssign.deleteClosingNode(TESTING_UTIL.getZooKeeperWatcher(),
-        hri, fakedServer);
+      if (useZKForAssignment) {
+        // Now clear the zknode
+        ZKAssign.deleteClosingNode(TESTING_UTIL.getZooKeeperWatcher(),
+          hri, fakedServer);
+      } else {
+        regionStates.regionOnline(hri, server.getServerName());
+      }
       // Now try splitting and it should work.
       split(hri, server, regionCount);
       // Get daughters
@@ -627,6 +652,11 @@ public class TestSplitTransactionOnCluster {
       KeeperException, DeserializationException, ServiceException {
     final byte[] tableName = Bytes.toBytes("testMasterRestartWhenSplittingIsPartial");
 
+    if (!useZKForAssignment) {
+      // This test doesn't apply if not using ZK for assignment
+      return;
+    }
+
     // Create table then get the single region for our new table.
     HTable t = createTableAndWait(tableName, HConstants.CATALOG_FAMILY);
     List<HRegion> regions = cluster.getRegions(tableName);
@@ -786,7 +816,7 @@ public class TestSplitTransactionOnCluster {
    * @throws InterruptedException
    * @throws KeeperException
    */
-  @Test
+  @Test(timeout = 60000)
   public void testSplitBeforeSettingSplittingInZK() throws Exception,
       InterruptedException, KeeperException {
     testSplitBeforeSettingSplittingInZKInternals();
@@ -857,7 +887,7 @@ public class TestSplitTransactionOnCluster {
    * If a table has regions that have no store files in a region, they should split successfully
    * into two regions with no store files.
    */
-  @Test
+  @Test(timeout = 60000)
   public void testSplitRegionWithNoStoreFiles()
       throws Exception {
     final TableName tableName =
@@ -1120,13 +1150,21 @@ public class TestSplitTransactionOnCluster {
   private void split(final HRegionInfo hri, final HRegionServer server, final int regionCount)
       throws IOException, InterruptedException {
     this.admin.split(hri.getRegionNameAsString());
-    for (int i = 0; ProtobufUtil.getOnlineRegions(server).size() <= regionCount && i < 300; i++) {
-      LOG.debug("Waiting on region to split");
-      Thread.sleep(100);
-    }
+    try {
+      for (int i = 0; ProtobufUtil.getOnlineRegions(server).size() <= regionCount && i < 300; i++) {
+        LOG.debug("Waiting on region to split");
+        Thread.sleep(100);
+      }
 
-    assertFalse("Waited too long for split",
+      assertFalse("Waited too long for split",
         ProtobufUtil.getOnlineRegions(server).size() <= regionCount);
+    } catch (RegionServerStoppedException e) {
+      if (useZKForAssignment) {
+        // If not using ZK for assignment, the exception may be expected.
+        LOG.error(e);
+        throw e;
+      }
+    }
   }
 
   /**