You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/23 18:37:46 UTC

[09/28] hbase git commit: HBASE-18087 Fix unit tests in TestTableFavoredNodes

http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
deleted file mode 100644
index 7791ea7..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
+++ /dev/null
@@ -1,695 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.conf.ConfigurationManager;
-import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
-import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
-import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.StealJobQueue;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.util.StringUtils;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-/**
- * Compact region on request and then run split if appropriate
- */
-@InterfaceAudience.Private
-public class CompactSplitThread implements CompactionRequestor, PropagatingConfigurationObserver {
-  private static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
-
-  // Configuration key for the large compaction threads.
-  public final static String LARGE_COMPACTION_THREADS =
-      "hbase.regionserver.thread.compaction.large";
-  public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
-  
-  // Configuration key for the small compaction threads.
-  public final static String SMALL_COMPACTION_THREADS =
-      "hbase.regionserver.thread.compaction.small";
-  public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
-  
-  // Configuration key for split threads
-  public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
-  public final static int SPLIT_THREADS_DEFAULT = 1;
-
-  public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
-      "hbase.regionserver.regionSplitLimit";
-  public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000;
-
-  private final HRegionServer server;
-  private final Configuration conf;
-
-  private final ThreadPoolExecutor longCompactions;
-  private final ThreadPoolExecutor shortCompactions;
-  private final ThreadPoolExecutor splits;
-
-  private volatile ThroughputController compactionThroughputController;
-
-  /**
-   * Splitting should not take place if the total number of regions exceed this.
-   * This is not a hard limit to the number of regions but it is a guideline to
-   * stop splitting after number of online regions is greater than this.
-   */
-  private int regionSplitLimit;
-
-  /** @param server */
-  CompactSplitThread(HRegionServer server) {
-    super();
-    this.server = server;
-    this.conf = server.getConfiguration();
-    this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT,
-        DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
-
-    int largeThreads = Math.max(1, conf.getInt(
-        LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
-    int smallThreads = conf.getInt(
-        SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
-
-    int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
-
-    // if we have throttle threads, make sure the user also specified size
-    Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
-
-    final String n = Thread.currentThread().getName();
-
-    StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<>();
-    this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
-        60, TimeUnit.SECONDS, stealJobQueue,
-        new ThreadFactory() {
-          @Override
-          public Thread newThread(Runnable r) {
-            String name = n + "-longCompactions-" + System.currentTimeMillis();
-            return new Thread(r, name);
-          }
-      });
-    this.longCompactions.setRejectedExecutionHandler(new Rejection());
-    this.longCompactions.prestartAllCoreThreads();
-    this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
-        60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(),
-        new ThreadFactory() {
-          @Override
-          public Thread newThread(Runnable r) {
-            String name = n + "-shortCompactions-" + System.currentTimeMillis();
-            return new Thread(r, name);
-          }
-      });
-    this.shortCompactions
-        .setRejectedExecutionHandler(new Rejection());
-    this.splits = (ThreadPoolExecutor)
-        Executors.newFixedThreadPool(splitThreads,
-            new ThreadFactory() {
-          @Override
-          public Thread newThread(Runnable r) {
-            String name = n + "-splits-" + System.currentTimeMillis();
-            return new Thread(r, name);
-          }
-      });
-
-    // compaction throughput controller
-    this.compactionThroughputController =
-        CompactionThroughputControllerFactory.create(server, conf);
-  }
-
-  @Override
-  public String toString() {
-    return "compaction_queue=("
-        + longCompactions.getQueue().size() + ":"
-        + shortCompactions.getQueue().size() + ")"
-        + ", split_queue=" + splits.getQueue().size();
-  }
-  
-  public String dumpQueue() {
-    StringBuffer queueLists = new StringBuffer();
-    queueLists.append("Compaction/Split Queue dump:\n");
-    queueLists.append("  LargeCompation Queue:\n");
-    BlockingQueue<Runnable> lq = longCompactions.getQueue();
-    Iterator<Runnable> it = lq.iterator();
-    while (it.hasNext()) {
-      queueLists.append("    " + it.next().toString());
-      queueLists.append("\n");
-    }
-
-    if (shortCompactions != null) {
-      queueLists.append("\n");
-      queueLists.append("  SmallCompation Queue:\n");
-      lq = shortCompactions.getQueue();
-      it = lq.iterator();
-      while (it.hasNext()) {
-        queueLists.append("    " + it.next().toString());
-        queueLists.append("\n");
-      }
-    }
-
-    queueLists.append("\n");
-    queueLists.append("  Split Queue:\n");
-    lq = splits.getQueue();
-    it = lq.iterator();
-    while (it.hasNext()) {
-      queueLists.append("    " + it.next().toString());
-      queueLists.append("\n");
-    }
-
-    return queueLists.toString();
-  }
-
-  public synchronized boolean requestSplit(final Region r) {
-    // don't split regions that are blocking
-    if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= Store.PRIORITY_USER) {
-      byte[] midKey = ((HRegion)r).checkSplit();
-      if (midKey != null) {
-        requestSplit(r, midKey);
-        return true;
-      }
-    }
-    return false;
-  }
-
-  public synchronized void requestSplit(final Region r, byte[] midKey) {
-    requestSplit(r, midKey, null);
-  }
-
-  /*
-   * The User parameter allows the split thread to assume the correct user identity
-   */
-  public synchronized void requestSplit(final Region r, byte[] midKey, User user) {
-    if (midKey == null) {
-      LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() +
-        " not splittable because midkey=null");
-      if (((HRegion)r).shouldForceSplit()) {
-        ((HRegion)r).clearSplit();
-      }
-      return;
-    }
-    try {
-      this.splits.execute(new SplitRequest(r, midKey, this.server, user));
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Split requested for " + r + ".  " + this);
-      }
-    } catch (RejectedExecutionException ree) {
-      LOG.info("Could not execute split for " + r, ree);
-    }
-  }
-
-  @Override
-  public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why)
-      throws IOException {
-    return requestCompaction(r, why, null);
-  }
-
-  @Override
-  public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
-      List<Pair<CompactionRequest, Store>> requests) throws IOException {
-    return requestCompaction(r, why, Store.NO_PRIORITY, requests, null);
-  }
-
-  @Override
-  public synchronized CompactionRequest requestCompaction(final Region r, final Store s,
-      final String why, CompactionRequest request) throws IOException {
-    return requestCompaction(r, s, why, Store.NO_PRIORITY, request, null);
-  }
-
-  @Override
-  public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
-      int p, List<Pair<CompactionRequest, Store>> requests, User user) throws IOException {
-    return requestCompactionInternal(r, why, p, requests, true, user);
-  }
-
-  private List<CompactionRequest> requestCompactionInternal(final Region r, final String why,
-      int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow, User user)
-          throws IOException {
-    // not a special compaction request, so make our own list
-    List<CompactionRequest> ret = null;
-    if (requests == null) {
-      ret = selectNow ? new ArrayList<>(r.getStores().size()) : null;
-      for (Store s : r.getStores()) {
-        CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user);
-        if (selectNow) ret.add(cr);
-      }
-    } else {
-      Preconditions.checkArgument(selectNow); // only system requests have selectNow == false
-      ret = new ArrayList<>(requests.size());
-      for (Pair<CompactionRequest, Store> pair : requests) {
-        ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst(), user));
-      }
-    }
-    return ret;
-  }
-
-  public CompactionRequest requestCompaction(final Region r, final Store s,
-      final String why, int priority, CompactionRequest request, User user) throws IOException {
-    return requestCompactionInternal(r, s, why, priority, request, true, user);
-  }
-
-  public synchronized void requestSystemCompaction(
-      final Region r, final String why) throws IOException {
-    requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false, null);
-  }
-
-  public void requestSystemCompaction(
-      final Region r, final Store s, final String why) throws IOException {
-    requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false, null);
-  }
-
-  /**
-   * @param r region store belongs to
-   * @param s Store to request compaction on
-   * @param why Why compaction requested -- used in debug messages
-   * @param priority override the default priority (NO_PRIORITY == decide)
-   * @param request custom compaction request. Can be <tt>null</tt> in which case a simple
-   *          compaction will be used.
-   */
-  private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s,
-      final String why, int priority, CompactionRequest request, boolean selectNow, User user)
-          throws IOException {
-    if (this.server.isStopped()
-        || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
-      return null;
-    }
-
-    CompactionContext compaction = null;
-    if (selectNow) {
-      compaction = selectCompaction(r, s, priority, request, user);
-      if (compaction == null) return null; // message logged inside
-    }
-
-    final RegionServerSpaceQuotaManager spaceQuotaManager =
-      this.server.getRegionServerSpaceQuotaManager();
-    if (spaceQuotaManager != null && spaceQuotaManager.areCompactionsDisabled(
-        r.getTableDesc().getTableName())) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Ignoring compaction request for " + r + " as an active space quota violation "
-            + " policy disallows compactions.");
-      }
-      return null;
-    }
-
-    // We assume that most compactions are small. So, put system compactions into small
-    // pool; we will do selection there, and move to large pool if necessary.
-    ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
-      ? longCompactions : shortCompactions;
-    pool.execute(new CompactionRunner(s, r, compaction, pool, user));
-    if (LOG.isDebugEnabled()) {
-      String type = (pool == shortCompactions) ? "Small " : "Large ";
-      LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
-          + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
-    }
-    return selectNow ? compaction.getRequest() : null;
-  }
-
-  private CompactionContext selectCompaction(final Region r, final Store s,
-      int priority, CompactionRequest request, User user) throws IOException {
-    CompactionContext compaction = s.requestCompaction(priority, request, user);
-    if (compaction == null) {
-      if(LOG.isDebugEnabled() && r.getRegionInfo() != null) {
-        LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() +
-            " because compaction request was cancelled");
-      }
-      return null;
-    }
-    assert compaction.hasSelection();
-    if (priority != Store.NO_PRIORITY) {
-      compaction.getRequest().setPriority(priority);
-    }
-    return compaction;
-  }
-
-  /**
-   * Only interrupt once it's done with a run through the work loop.
-   */
-  void interruptIfNecessary() {
-    splits.shutdown();
-    longCompactions.shutdown();
-    shortCompactions.shutdown();
-  }
-
-  private void waitFor(ThreadPoolExecutor t, String name) {
-    boolean done = false;
-    while (!done) {
-      try {
-        done = t.awaitTermination(60, TimeUnit.SECONDS);
-        LOG.info("Waiting for " + name + " to finish...");
-        if (!done) {
-          t.shutdownNow();
-        }
-      } catch (InterruptedException ie) {
-        LOG.warn("Interrupted waiting for " + name + " to finish...");
-      }
-    }
-  }
-
-  void join() {
-    waitFor(splits, "Split Thread");
-    waitFor(longCompactions, "Large Compaction Thread");
-    waitFor(shortCompactions, "Small Compaction Thread");
-  }
-
-  /**
-   * Returns the current size of the queue containing regions that are
-   * processed.
-   *
-   * @return The current size of the regions queue.
-   */
-  public int getCompactionQueueSize() {
-    return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
-  }
-
-  public int getLargeCompactionQueueSize() {
-    return longCompactions.getQueue().size();
-  }
-
-
-  public int getSmallCompactionQueueSize() {
-    return shortCompactions.getQueue().size();
-  }
-
-  public int getSplitQueueSize() {
-    return splits.getQueue().size();
-  }
-
-  private boolean shouldSplitRegion() {
-    if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) {
-      LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". "
-          + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt");
-    }
-    return (regionSplitLimit > server.getNumberOfOnlineRegions());
-  }
-
-  /**
-   * @return the regionSplitLimit
-   */
-  public int getRegionSplitLimit() {
-    return this.regionSplitLimit;
-  }
-
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
-      justification="Contrived use of compareTo")
-  private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
-    private final Store store;
-    private final HRegion region;
-    private CompactionContext compaction;
-    private int queuedPriority;
-    private ThreadPoolExecutor parent;
-    private User user;
-    private long time;
-
-    public CompactionRunner(Store store, Region region,
-        CompactionContext compaction, ThreadPoolExecutor parent, User user) {
-      super();
-      this.store = store;
-      this.region = (HRegion)region;
-      this.compaction = compaction;
-      this.queuedPriority = (this.compaction == null)
-          ? store.getCompactPriority() : compaction.getRequest().getPriority();
-      this.parent = parent;
-      this.user = user;
-      this.time =  System.currentTimeMillis();
-    }
-
-    @Override
-    public String toString() {
-      return (this.compaction != null) ? ("Request = " + compaction.getRequest())
-          : ("regionName = " + region.toString() + ", storeName = " + store.toString() +
-             ", priority = " + queuedPriority + ", time = " + time);
-    }
-
-    private void doCompaction(User user) {
-      // Common case - system compaction without a file selection. Select now.
-      if (this.compaction == null) {
-        int oldPriority = this.queuedPriority;
-        this.queuedPriority = this.store.getCompactPriority();
-        if (this.queuedPriority > oldPriority) {
-          // Store priority decreased while we were in queue (due to some other compaction?),
-          // requeue with new priority to avoid blocking potential higher priorities.
-          this.parent.execute(this);
-          return;
-        }
-        try {
-          this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user);
-        } catch (IOException ex) {
-          LOG.error("Compaction selection failed " + this, ex);
-          server.checkFileSystem();
-          return;
-        }
-        if (this.compaction == null) return; // nothing to do
-        // Now see if we are in correct pool for the size; if not, go to the correct one.
-        // We might end up waiting for a while, so cancel the selection.
-        assert this.compaction.hasSelection();
-        ThreadPoolExecutor pool = store.throttleCompaction(
-            compaction.getRequest().getSize()) ? longCompactions : shortCompactions;
-
-        // Long compaction pool can process small job
-        // Short compaction pool should not process large job
-        if (this.parent == shortCompactions && pool == longCompactions) {
-          this.store.cancelRequestedCompaction(this.compaction);
-          this.compaction = null;
-          this.parent = pool;
-          this.parent.execute(this);
-          return;
-        }
-      }
-      // Finally we can compact something.
-      assert this.compaction != null;
-
-      this.compaction.getRequest().beforeExecute();
-      try {
-        // Note: please don't put single-compaction logic here;
-        //       put it into region/store/etc. This is CST logic.
-        long start = EnvironmentEdgeManager.currentTime();
-        boolean completed =
-            region.compact(compaction, store, compactionThroughputController, user);
-        long now = EnvironmentEdgeManager.currentTime();
-        LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
-              this + "; duration=" + StringUtils.formatTimeDiff(now, start));
-        if (completed) {
-          // degenerate case: blocked regions require recursive enqueues
-          if (store.getCompactPriority() <= 0) {
-            requestSystemCompaction(region, store, "Recursive enqueue");
-          } else {
-            // see if the compaction has caused us to exceed max region size
-            requestSplit(region);
-          }
-        }
-      } catch (IOException ex) {
-        IOException remoteEx =
-            ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
-        LOG.error("Compaction failed " + this, remoteEx);
-        if (remoteEx != ex) {
-          LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
-        }
-        region.reportCompactionRequestFailure();
-        server.checkFileSystem();
-      } catch (Exception ex) {
-        LOG.error("Compaction failed " + this, ex);
-        region.reportCompactionRequestFailure();
-        server.checkFileSystem();
-      } finally {
-        LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
-      }
-      this.compaction.getRequest().afterExecute();
-    }
-
-    @Override
-    public void run() {
-      Preconditions.checkNotNull(server);
-      if (server.isStopped()
-          || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
-        return;
-      }
-      doCompaction(user);
-    }
-
-    private String formatStackTrace(Exception ex) {
-      StringWriter sw = new StringWriter();
-      PrintWriter pw = new PrintWriter(sw);
-      ex.printStackTrace(pw);
-      pw.flush();
-      return sw.toString();
-    }
-
-    @Override
-    public int compareTo(CompactionRunner o) {
-      // Only compare the underlying request (if any), for queue sorting purposes.
-      int compareVal = queuedPriority - o.queuedPriority; // compare priority
-      if (compareVal != 0) return compareVal;
-      CompactionContext tc = this.compaction, oc = o.compaction;
-      // Sort pre-selected (user?) compactions before system ones with equal priority.
-      return (tc == null) ? ((oc == null) ? 0 : 1)
-          : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest()));
-    }
-  }
-
-  /**
-   * Cleanup class to use when rejecting a compaction request from the queue.
-   */
-  private static class Rejection implements RejectedExecutionHandler {
-    @Override
-    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
-      if (runnable instanceof CompactionRunner) {
-        CompactionRunner runner = (CompactionRunner)runnable;
-        LOG.debug("Compaction Rejected: " + runner);
-        runner.store.cancelRequestedCompaction(runner.compaction);
-      }
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void onConfigurationChange(Configuration newConf) {
-    // Check if number of large / small compaction threads has changed, and then
-    // adjust the core pool size of the thread pools, by using the
-    // setCorePoolSize() method. According to the javadocs, it is safe to
-    // change the core pool size on-the-fly. We need to reset the maximum
-    // pool size, as well.
-    int largeThreads = Math.max(1, newConf.getInt(
-            LARGE_COMPACTION_THREADS,
-            LARGE_COMPACTION_THREADS_DEFAULT));
-    if (this.longCompactions.getCorePoolSize() != largeThreads) {
-      LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS +
-              " from " + this.longCompactions.getCorePoolSize() + " to " +
-              largeThreads);
-      if(this.longCompactions.getCorePoolSize() < largeThreads) {
-        this.longCompactions.setMaximumPoolSize(largeThreads);
-        this.longCompactions.setCorePoolSize(largeThreads);
-      } else {
-        this.longCompactions.setCorePoolSize(largeThreads);
-        this.longCompactions.setMaximumPoolSize(largeThreads);
-      }
-    }
-
-    int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS,
-            SMALL_COMPACTION_THREADS_DEFAULT);
-    if (this.shortCompactions.getCorePoolSize() != smallThreads) {
-      LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS +
-                " from " + this.shortCompactions.getCorePoolSize() + " to " +
-                smallThreads);
-      if(this.shortCompactions.getCorePoolSize() < smallThreads) {
-        this.shortCompactions.setMaximumPoolSize(smallThreads);
-        this.shortCompactions.setCorePoolSize(smallThreads);
-      } else {
-        this.shortCompactions.setCorePoolSize(smallThreads);
-        this.shortCompactions.setMaximumPoolSize(smallThreads);
-      }
-    }
-
-    int splitThreads = newConf.getInt(SPLIT_THREADS,
-            SPLIT_THREADS_DEFAULT);
-    if (this.splits.getCorePoolSize() != splitThreads) {
-      LOG.info("Changing the value of " + SPLIT_THREADS +
-                " from " + this.splits.getCorePoolSize() + " to " +
-                splitThreads);
-      if(this.splits.getCorePoolSize() < splitThreads) {
-        this.splits.setMaximumPoolSize(splitThreads);
-        this.splits.setCorePoolSize(splitThreads);
-      } else {
-        this.splits.setCorePoolSize(splitThreads);
-        this.splits.setMaximumPoolSize(splitThreads);
-      }
-    }
-
-    ThroughputController old = this.compactionThroughputController;
-    if (old != null) {
-      old.stop("configuration change");
-    }
-    this.compactionThroughputController =
-        CompactionThroughputControllerFactory.create(server, newConf);
-
-    // We change this atomically here instead of reloading the config in order that upstream
-    // would be the only one with the flexibility to reload the config.
-    this.conf.reloadConfiguration();
-  }
-
-  protected int getSmallCompactionThreadNum() {
-    return this.shortCompactions.getCorePoolSize();
-  }
-
-  protected int getLargeCompactionThreadNum() {
-    return this.longCompactions.getCorePoolSize();
-  }
-
-  protected int getSplitThreadNum() {
-    return this.splits.getCorePoolSize();
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void registerChildren(ConfigurationManager manager) {
-    // No children to register.
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void deregisterChildren(ConfigurationManager manager) {
-    // No children to register
-  }
-
-  @VisibleForTesting
-  public ThroughputController getCompactionThroughputController() {
-    return compactionThroughputController;
-  }
-
-  @VisibleForTesting
-  /**
-   * Shutdown the long compaction thread pool.
-   * Should only be used in unit test to prevent long compaction thread pool from stealing job
-   * from short compaction queue
-   */
-  void shutdownLongCompactions(){
-    this.longCompactions.shutdown();
-  }
-
-  public void clearLongCompactionsQueue() {
-    longCompactions.getQueue().clear();
-  }
-
-  public void clearShortCompactionsQueue() {
-    shortCompactions.getQueue().clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java
index 2773e00..6b8948b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java
@@ -34,8 +34,8 @@ import com.google.common.annotations.VisibleForTesting;
 
 /**
  * A chore service that periodically cleans up the compacted files when there are no active readers
- * using those compacted files and also helps in clearing the block cache with these compacted
- * file entries
+ * using those compacted files and also helps in clearing the block cache of these compacted
+ * file entries.
  */
 @InterfaceAudience.Private
 public class CompactedHFilesDischarger extends ScheduledChore {
@@ -71,45 +71,56 @@ public class CompactedHFilesDischarger extends ScheduledChore {
     this.useExecutor = useExecutor;
   }
 
+  /**
+   * CompactedHFilesDischarger runs asynchronously by default using the hosting
+   * RegionServer's Executor. In tests it can be useful to force a synchronous
+   * cleanup. Use this method to set no-executor before you call run.
+   * @return The old setting for <code>useExecutor</code>
+   */
+  @VisibleForTesting
+  boolean setUseExecutor(final boolean useExecutor) {
+    boolean oldSetting = this.useExecutor;
+    this.useExecutor = useExecutor;
+    return oldSetting;
+  }
+
   @Override
   public void chore() {
     // Noop if rss is null. This will never happen in a normal condition except for cases
     // when the test case is not spinning up a cluster
     if (regionServerServices == null) return;
     List<Region> onlineRegions = regionServerServices.getOnlineRegions();
-    if (onlineRegions != null) {
-      for (Region region : onlineRegions) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(
-              "Started the compacted hfiles cleaner for the region " + region.getRegionInfo());
-        }
-        for (Store store : region.getStores()) {
-          try {
-            if (useExecutor && regionServerServices != null) {
-              CompactedHFilesDischargeHandler handler = new CompactedHFilesDischargeHandler(
-                  (Server) regionServerServices, EventType.RS_COMPACTED_FILES_DISCHARGER,
-                  (HStore) store);
-              regionServerServices.getExecutorService().submit(handler);
-            } else {
-              // call synchronously if the RegionServerServices are not
-              // available
-              store.closeAndArchiveCompactedFiles();
-            }
-            if (LOG.isTraceEnabled()) {
-              LOG.trace("Completed archiving the compacted files for the region "
-                  + region.getRegionInfo() + " under the store " + store.getColumnFamilyName());
-            }
-          } catch (Exception e) {
-            LOG.error("Exception while trying to close and archive the compacted store "
-                + "files of the store  " + store.getColumnFamilyName() + " in the" + " region "
-                + region.getRegionInfo(), e);
+    if (onlineRegions == null) return;
+    for (Region region : onlineRegions) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Started compacted hfiles cleaner on " + region.getRegionInfo());
+      }
+      for (Store store : region.getStores()) {
+        try {
+          if (useExecutor && regionServerServices != null) {
+            CompactedHFilesDischargeHandler handler = new CompactedHFilesDischargeHandler(
+                (Server) regionServerServices, EventType.RS_COMPACTED_FILES_DISCHARGER,
+                (HStore) store);
+            regionServerServices.getExecutorService().submit(handler);
+          } else {
+            // call synchronously if the RegionServerServices are not
+            // available
+            store.closeAndArchiveCompactedFiles();
           }
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Completed archiving the compacted files for the region "
+                + region.getRegionInfo() + " under the store " + store.getColumnFamilyName());
+          }
+        } catch (Exception e) {
+          LOG.error("Exception while trying to close and archive the compacted store "
+              + "files of the store  " + store.getColumnFamilyName() + " in the" + " region "
+              + region.getRegionInfo(), e);
         }
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(
-              "Completed the compacted hfiles cleaner for the region " + region.getRegionInfo());
-        }
+      }
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(
+            "Completed the compacted hfiles cleaner for the region " + region.getRegionInfo());
       }
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index a4a7537..8cc9cd7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1390,14 +1390,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     return !isClosed() && !isClosing();
   }
 
-  /** @return true if region is splittable */
+  @Override
   public boolean isSplittable() {
-    return isAvailable() && !hasReferences();
+    boolean result = isAvailable() && !hasReferences();
+    LOG.info("ASKED IF SPLITTABLE " + result, new Throwable("LOGGING"));
+    return result;
   }
 
-  /**
-   * @return true if region is mergeable
-   */
+  @Override
   public boolean isMergeable() {
     if (!isAvailable()) {
       LOG.debug("Region " + this
@@ -5086,11 +5086,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY",
-      justification = "Notify is about post replay. Intentional")
   @Override
   public boolean refreshStoreFiles() throws IOException {
-    if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
+    return refreshStoreFiles(false);
+  }
+
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY",
+      justification = "Notify is about post replay. Intentional")
+  protected boolean refreshStoreFiles(boolean force) throws IOException {
+    if (!force && ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
       return false; // if primary nothing to do
     }
 
@@ -5848,7 +5852,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       try {
         for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
           Store store = stores.get(entry.getKey());
-          KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
+          KeyValueScanner scanner;
+          try {
+            scanner = store.getScanner(scan, entry.getValue(), this.readPt);
+          } catch (FileNotFoundException e) {
+            throw handleFileNotFound(e);
+          }
           instantiatedScanners.add(scanner);
           if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
               || this.filter.isFamilyEssential(entry.getKey())) {
@@ -5872,19 +5881,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
     }
 
-    private void handleFileNotFound(Throwable fnfe) {
+    private FileNotFoundException handleFileNotFound(FileNotFoundException fnfe) {
       // Try reopening the region since we have lost some storefiles.
       // See HBASE-17712 for more details.
-      LOG.warn("A store file got lost, so close and reopen region", fnfe);
+      LOG.warn("Store file is lost; close and reopen region", fnfe);
       if (regionUnassigner != null) {
         regionUnassigner.unassign();
       }
+      return fnfe;
     }
 
     private IOException handleException(List<KeyValueScanner> instantiatedScanners,
         Throwable t) {
       if (t instanceof FileNotFoundException) {
-        handleFileNotFound(t);
+        handleFileNotFound((FileNotFoundException)t);
       }
       // remove scaner read point before throw the exception
       scannerReadPoints.remove(this);
@@ -6030,29 +6040,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       boolean tmpKeepProgress = scannerContext.getKeepProgress();
       // Scanning between column families and thus the scope is between cells
       LimitScope limitScope = LimitScope.BETWEEN_CELLS;
-      do {
-        // We want to maintain any progress that is made towards the limits while scanning across
-        // different column families. To do this, we toggle the keep progress flag on during calls
-        // to the StoreScanner to ensure that any progress made thus far is not wiped away.
-        scannerContext.setKeepProgress(true);
-        heap.next(results, scannerContext);
-        scannerContext.setKeepProgress(tmpKeepProgress);
-
-        nextKv = heap.peek();
-        moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);
-        if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext);
-        if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {
-          return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
-        } else if (scannerContext.checkSizeLimit(limitScope)) {
-          ScannerContext.NextState state =
-              moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
-          return scannerContext.setScannerState(state).hasMoreValues();
-        } else if (scannerContext.checkTimeLimit(limitScope)) {
-          ScannerContext.NextState state =
-              moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
-          return scannerContext.setScannerState(state).hasMoreValues();
-        }
-      } while (moreCellsInRow);
+      try {
+        do {
+          // We want to maintain any progress that is made towards the limits while scanning across
+          // different column families. To do this, we toggle the keep progress flag on during calls
+          // to the StoreScanner to ensure that any progress made thus far is not wiped away.
+          scannerContext.setKeepProgress(true);
+          heap.next(results, scannerContext);
+          scannerContext.setKeepProgress(tmpKeepProgress);
+
+          nextKv = heap.peek();
+          moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);
+          if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext);
+          if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {
+            return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
+          } else if (scannerContext.checkSizeLimit(limitScope)) {
+            ScannerContext.NextState state =
+                moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
+            return scannerContext.setScannerState(state).hasMoreValues();
+          } else if (scannerContext.checkTimeLimit(limitScope)) {
+            ScannerContext.NextState state =
+                moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
+            return scannerContext.setScannerState(state).hasMoreValues();
+          }
+        } while (moreCellsInRow);
+      } catch (FileNotFoundException e) {
+        throw handleFileNotFound(e);
+      }
       return nextKv != null;
     }
 
@@ -6401,8 +6415,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           result = this.joinedHeap.requestSeek(kv, true, true) || result;
         }
       } catch (FileNotFoundException e) {
-        handleFileNotFound(e);
-        throw e;
+        throw handleFileNotFound(e);
       } finally {
         closeRegionOperation();
       }
@@ -7787,6 +7800,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       return null;
     }
 
+    // Can't split a region that is closing.
+    if (this.isClosing()) {
+      return null;
+    }
+
     if (!splitPolicy.shouldSplit()) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index 014427d..59a0fe5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -318,13 +318,15 @@ public class HRegionFileSystem {
    * @throws IOException
    */
   public boolean hasReferences(final String familyName) throws IOException {
-    FileStatus[] files = FSUtils.listStatus(fs, getStoreDir(familyName));
+    Path storeDir = getStoreDir(familyName);
+    FileStatus[] files = FSUtils.listStatus(fs, storeDir);
     if (files != null) {
       for(FileStatus stat: files) {
         if(stat.isDirectory()) {
           continue;
         }
         if(StoreFileInfo.isReference(stat.getPath())) {
+          if (LOG.isTraceEnabled()) LOG.trace("Reference " + stat.getPath());
           return true;
         }
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 3ca061a..9315b0a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -86,7 +86,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
-import org.apache.hadoop.hbase.client.NonceGenerator;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
 import org.apache.hadoop.hbase.client.locking.EntityLock;
@@ -170,8 +169,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionResponse;
 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
 import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -179,7 +176,6 @@ import org.apache.hadoop.hbase.util.CompressionTest;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
 import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.JSONBean;
 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
@@ -208,21 +204,23 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.data.Stat;
 
-import sun.misc.Signal;
-import sun.misc.SignalHandler;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
+import sun.misc.Signal;
+import sun.misc.SignalHandler;
+
 /**
  * HRegionServer makes a set of HRegions available to clients. It checks in with
  * the HMaster. There are many HRegionServers in a single HBase deployment.
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
-@SuppressWarnings({ "deprecation", "restriction" })
+@SuppressWarnings({ "deprecation"})
 public class HRegionServer extends HasThread implements
     RegionServerServices, LastSequenceId, ConfigurationObserver {
+  // Time to pause if master says 'please hold'. Make configurable if needed.
+  private static final int INIT_PAUSE_TIME_MS = 1000;
 
   public static final String REGION_LOCK_AWAIT_TIME_SEC =
       "hbase.regionserver.region.lock.await.time.sec";
@@ -283,7 +281,7 @@ public class HRegionServer extends HasThread implements
   protected ReplicationSinkService replicationSinkHandler;
 
   // Compactions
-  public CompactSplitThread compactSplitThread;
+  public CompactSplit compactSplitThread;
 
   /**
    * Map of regions currently being served by this region server. Key is the
@@ -514,7 +512,8 @@ public class HRegionServer extends HasThread implements
    */
   protected final ConfigurationManager configurationManager;
 
-  private CompactedHFilesDischarger compactedFileDischarger;
+  @VisibleForTesting
+  CompactedHFilesDischarger compactedFileDischarger;
 
   private volatile ThroughputController flushThroughputController;
 
@@ -914,7 +913,7 @@ public class HRegionServer extends HasThread implements
     this.cacheFlusher = new MemStoreFlusher(conf, this);
 
     // Compaction thread
-    this.compactSplitThread = new CompactSplitThread(this);
+    this.compactSplitThread = new CompactSplit(this);
 
     // Background thread to check for compactions; needed if region has not gotten updates
     // in a while. It will take care of not checking too frequently on store-by-store basis.
@@ -1432,7 +1431,7 @@ public class HRegionServer extends HasThread implements
             // Only print out regions still closing if a small number else will
             // swamp the log.
             if (count < 10 && LOG.isDebugEnabled()) {
-              LOG.debug(this.onlineRegions);
+              LOG.debug("Online Regions=" + this.onlineRegions);
             }
           }
         }
@@ -1779,7 +1778,7 @@ public class HRegionServer extends HasThread implements
     final static int RANGE_OF_DELAY = 5 * 60 * 1000; // 5 min in milliseconds
     final static int MIN_DELAY_TIME = 0; // millisec
     public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
-      super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval);
+      super("MemstoreFlusherChore", server, cacheFlushInterval);
       this.server = server;
     }
 
@@ -2192,6 +2191,8 @@ public class HRegionServer extends HasThread implements
       transition.addRegionInfo(HRegionInfo.convert(hri));
     }
     ReportRegionStateTransitionRequest request = builder.build();
+    int tries = 0;
+    long pauseTime = INIT_PAUSE_TIME_MS;
     while (keepLooping()) {
       RegionServerStatusService.BlockingInterface rss = rssStub;
       try {
@@ -2202,95 +2203,40 @@ public class HRegionServer extends HasThread implements
         ReportRegionStateTransitionResponse response =
           rss.reportRegionStateTransition(null, request);
         if (response.hasErrorMessage()) {
-          LOG.info("Failed to transition " + hris[0]
+          LOG.info("Failed transition " + hris[0]
             + " to " + code + ": " + response.getErrorMessage());
           return false;
         }
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("TRANSITION REPORTED " + request);
+        }
         return true;
       } catch (ServiceException se) {
         IOException ioe = ProtobufUtil.getRemoteException(se);
-        LOG.info("Failed to report region transition, will retry", ioe);
-        if (rssStub == rss) {
-          rssStub = null;
-        }
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public long requestRegionSplit(final HRegionInfo regionInfo, final byte[] splitRow) {
-    NonceGenerator ng = clusterConnection.getNonceGenerator();
-    final long nonceGroup = ng.getNonceGroup();
-    final long nonce = ng.newNonce();
-    long procId = -1;
-    SplitTableRegionRequest request =
-        RequestConverter.buildSplitTableRegionRequest(regionInfo, splitRow, nonceGroup, nonce);
-
-    while (keepLooping()) {
-      RegionServerStatusService.BlockingInterface rss = rssStub;
-      try {
-        if (rss == null) {
-          createRegionServerStatusStub();
-          continue;
-        }
-        SplitTableRegionResponse response = rss.splitRegion(null, request);
-
-        //TODO: should we limit the retry number before quitting?
-        if (response == null || (procId = response.getProcId()) == -1) {
-          LOG.warn("Failed to split " + regionInfo + " retrying...");
-          continue;
+        boolean pause = ioe instanceof ServerNotRunningYetException ||
+            ioe instanceof PleaseHoldException;
+        if (pause) {
+          // Do backoff else we flood the Master with requests.
+          pauseTime = ConnectionUtils.getPauseTime(pauseTime, tries);
+        } else {
+          pauseTime = INIT_PAUSE_TIME_MS; // Reset.
         }
-
-        break;
-      } catch (ServiceException se) {
-        // TODO: retry or just fail
-        IOException ioe = ProtobufUtil.getRemoteException(se);
-        LOG.info("Failed to split region, will retry", ioe);
+        LOG.info("Failed report of region transition; retry (#" + tries + ")" +
+            (pause?
+                " after " + pauseTime + "ms delay (Master is coming online...).":
+                " immediately."),
+            ioe);
+        if (pause) Threads.sleep(pauseTime);
+        tries++;
         if (rssStub == rss) {
           rssStub = null;
         }
       }
     }
-    return procId;
-  }
-
-  @Override
-  public boolean isProcedureFinished(final long procId) throws IOException {
-    GetProcedureResultRequest request =
-        GetProcedureResultRequest.newBuilder().setProcId(procId).build();
-
-    while (keepLooping()) {
-      RegionServerStatusService.BlockingInterface rss = rssStub;
-      try {
-        if (rss == null) {
-          createRegionServerStatusStub();
-          continue;
-        }
-        // TODO: find a way to get proc result
-        GetProcedureResultResponse response = rss.getProcedureResult(null, request);
-
-        if (response == null) {
-          LOG.warn("Failed to get procedure (id=" + procId + ") status.");
-          return false;
-        } else if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
-          return false;
-        } else if (response.hasException()) {
-          // Procedure failed.
-          throw ForeignExceptionUtil.toIOException(response.getException());
-        }
-        // Procedure completes successfully
-        break;
-      } catch (ServiceException se) {
-        // TODO: retry or just fail
-        IOException ioe = ProtobufUtil.getRemoteException(se);
-        LOG.warn("Failed to get split region procedure result.  Retrying", ioe);
-        if (rssStub == rss) {
-          rssStub = null;
-        }
-      }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("TRANSITION NOT REPORTED " + request);
     }
-    return true;
+    return false;
   }
 
   /**
@@ -2981,7 +2927,7 @@ public class HRegionServer extends HasThread implements
    * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
    */
   public static void main(String[] args) throws Exception {
-    LOG.info("***** STARTING service '" + HRegionServer.class.getSimpleName() + "' *****");
+    LOG.info("STARTING service '" + HRegionServer.class.getSimpleName());
     VersionInfo.logVersion();
     Configuration conf = HBaseConfiguration.create();
     @SuppressWarnings("unchecked")
@@ -3286,7 +3232,7 @@ public class HRegionServer extends HasThread implements
         throw new RegionOpeningException("Region " + regionNameStr +
           " is opening on " + this.serverName);
       }
-      throw new NotServingRegionException("Region " + regionNameStr +
+      throw new NotServingRegionException("" + regionNameStr +
         " is not online on " + this.serverName);
     }
     return region;
@@ -3404,7 +3350,7 @@ public class HRegionServer extends HasThread implements
   }
 
   // This map will contains all the regions that we closed for a move.
-  //  We add the time it was moved as we don't want to keep too old information
+  // We add the time it was moved as we don't want to keep too old information
   protected Map<String, MovedRegionInfo> movedRegions =
       new ConcurrentHashMap<>(3000);
 
@@ -3516,9 +3462,9 @@ public class HRegionServer extends HasThread implements
   }
 
   /**
-   * @return the underlying {@link CompactSplitThread} for the servers
+   * @return the underlying {@link CompactSplit} for the servers
    */
-  public CompactSplitThread getCompactSplitThread() {
+  public CompactSplit getCompactSplitThread() {
     return this.compactSplitThread;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index b3ca94d..ed19dc9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -18,8 +18,6 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import com.google.common.annotations.VisibleForTesting;
-
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -27,8 +25,17 @@ import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -72,6 +79,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.VersionInfoUtil;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
+import org.apache.hadoop.hbase.exceptions.MergeRegionException;
 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
@@ -119,12 +127,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
@@ -137,6 +145,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerIn
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.MergeRegionsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.MergeRegionsResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
@@ -194,7 +204,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMet
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceViolationPolicy;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
@@ -212,6 +221,8 @@ import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
 import org.apache.zookeeper.KeeperException;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Implements the regionserver RPC services.
  */
@@ -1465,36 +1476,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     }
   }
 
-  @Override
-  @QosPriority(priority=HConstants.ADMIN_QOS)
-  public CloseRegionForSplitOrMergeResponse closeRegionForSplitOrMerge(
-      final RpcController controller,
-      final CloseRegionForSplitOrMergeRequest request) throws ServiceException {
-    try {
-      checkOpen();
-
-      List<String> encodedRegionNameList = new ArrayList<>();
-      for(int i = 0; i < request.getRegionCount(); i++) {
-        final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion(i));
-
-        // Can be null if we're calling close on a region that's not online
-        final Region targetRegion = regionServer.getFromOnlineRegions(encodedRegionName);
-        if ((targetRegion != null) && (targetRegion.getCoprocessorHost() != null)) {
-          targetRegion.getCoprocessorHost().preClose(false);
-          encodedRegionNameList.add(encodedRegionName);
-        }
-      }
-      requestCount.increment();
-      LOG.info("Close and offline " + encodedRegionNameList + " regions.");
-      boolean closed = regionServer.closeAndOfflineRegionForSplitOrMerge(encodedRegionNameList);
-      CloseRegionForSplitOrMergeResponse.Builder builder =
-          CloseRegionForSplitOrMergeResponse.newBuilder().setClosed(closed);
-      return builder.build();
-    } catch (IOException ie) {
-      throw new ServiceException(ie);
-    }
-  }
-
   /**
    * Compact a region on the region server.
    *
@@ -1645,6 +1626,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       if (request.hasCompactionState() && request.getCompactionState()) {
         builder.setCompactionState(region.getCompactionState());
       }
+      builder.setSplittable(region.isSplittable());
+      builder.setMergeable(region.isMergeable());
       builder.setIsRecovering(region.isRecovering());
       return builder.build();
     } catch (IOException ie) {
@@ -1855,8 +1838,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           // The region is already online. This should not happen any more.
           String error = "Received OPEN for the region:"
             + region.getRegionNameAsString() + ", which is already online";
-          regionServer.abort(error);
-          throw new IOException(error);
+          LOG.warn(error);
+          //regionServer.abort(error);
+          //throw new IOException(error);
+          builder.addOpeningState(RegionOpeningState.OPENED);
+          continue;
         }
         LOG.info("Open " + region.getRegionNameAsString());
 
@@ -3396,4 +3382,62 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public ExecuteProceduresResponse executeProcedures(RpcController controller,
+       ExecuteProceduresRequest request) throws ServiceException {
+    ExecuteProceduresResponse.Builder builder = ExecuteProceduresResponse.newBuilder();
+    if (request.getOpenRegionCount() > 0) {
+      for (OpenRegionRequest req: request.getOpenRegionList()) {
+        builder.addOpenRegion(openRegion(controller, req));
+      }
+     }
+     if (request.getCloseRegionCount() > 0) {
+       for (CloseRegionRequest req: request.getCloseRegionList()) {
+         builder.addCloseRegion(closeRegion(controller, req));
+       }
+     }
+     return builder.build();
+  }
+
+  /**
+   * Merge regions on the region server.
+   *
+   * @param controller the RPC controller
+   * @param request the request
+   * @return merge regions response
+   * @throws ServiceException
+   */
+  @Override
+  @QosPriority(priority = HConstants.ADMIN_QOS)
+  // UNUSED AS OF AMv2 PURGE!
+  public MergeRegionsResponse mergeRegions(final RpcController controller,
+      final MergeRegionsRequest request) throws ServiceException {
+    try {
+      checkOpen();
+      requestCount.increment();
+      Region regionA = getRegion(request.getRegionA());
+      Region regionB = getRegion(request.getRegionB());
+      boolean forcible = request.getForcible();
+      long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
+      regionA.startRegionOperation(Operation.MERGE_REGION);
+      regionB.startRegionOperation(Operation.MERGE_REGION);
+      if (regionA.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
+          regionB.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
+        throw new ServiceException(new MergeRegionException("Can't merge non-default replicas"));
+      }
+      LOG.info("Receiving merging request for  " + regionA + ", " + regionB
+          + ",forcible=" + forcible);
+      regionA.flush(true);
+      regionB.flush(true);
+      regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible,
+          masterSystemTime, RpcServer.getRequestUser());
+      return MergeRegionsResponse.newBuilder().build();
+    } catch (DroppedSnapshotException ex) {
+      regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
+      throw new ServiceException(ex);
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 295b825..6c4eca9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -96,6 +96,14 @@ public interface Region extends ConfigurationObserver {
   /** @return True if region is read only */
   boolean isReadOnly();
 
+  /** @return true if region is splittable */
+  boolean isSplittable();
+
+  /**
+   * @return true if region is mergeable
+   */
+  boolean isMergeable();
+
   /**
    * Return the list of Stores managed by this region
    * <p>Use with caution.  Exposed for use of fixup utilities.

http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
new file mode 100644
index 0000000..e95932b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
@@ -0,0 +1,108 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.security.PrivilegedAction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Handles processing region merges. Put in a queue, owned by HRegionServer.
+ */
+// UNUSED: REMOVE!!!
+@InterfaceAudience.Private
+class RegionMergeRequest implements Runnable {
+  private static final Log LOG = LogFactory.getLog(RegionMergeRequest.class);
+  private final HRegionInfo region_a;
+  private final HRegionInfo region_b;
+  private final HRegionServer server;
+  private final boolean forcible;
+  private final User user;
+
+  RegionMergeRequest(Region a, Region b, HRegionServer hrs, boolean forcible,
+      long masterSystemTime, User user) {
+    Preconditions.checkNotNull(hrs);
+    this.region_a = a.getRegionInfo();
+    this.region_b = b.getRegionInfo();
+    this.server = hrs;
+    this.forcible = forcible;
+    this.user = user;
+  }
+
+  @Override
+  public String toString() {
+    return "MergeRequest,regions:" + region_a + ", " + region_b + ", forcible="
+        + forcible;
+  }
+
+  private void doMerge() {
+    boolean success = false;
+    //server.metricsRegionServer.incrMergeRequest();
+
+    if (user != null && user.getUGI() != null) {
+      user.getUGI().doAs (new PrivilegedAction<Void>() {
+        @Override
+        public Void run() {
+          requestRegionMerge();
+          return null;
+        }
+      });
+    } else {
+      requestRegionMerge();
+    }
+  }
+
+  private void requestRegionMerge() {
+    final TableName table = region_a.getTable();
+    if (!table.equals(region_b.getTable())) {
+      LOG.error("Can't merge regions from two different tables: " + region_a + ", " + region_b);
+      return;
+    }
+
+    // TODO: fake merged region for compat with the report protocol
+    final HRegionInfo merged = new HRegionInfo(table);
+
+    // Send the split request to the master. the master will do the validation on the split-key.
+    // The parent region will be unassigned and the two new regions will be assigned.
+    // hri_a and hri_b objects may not reflect the regions that will be created, those objectes
+    // are created just to pass the information to the reportRegionStateTransition().
+    if (!server.reportRegionStateTransition(TransitionCode.READY_TO_MERGE, merged, region_a, region_b)) {
+      LOG.error("Unable to ask master to merge: " + region_a + ", " + region_b);
+    }
+  }
+
+  @Override
+  public void run() {
+    if (this.server.isStopping() || this.server.isStopped()) {
+      LOG.debug("Skipping merge because server is stopping="
+          + this.server.isStopping() + " or stopped=" + this.server.isStopped());
+      return;
+    }
+
+    doMerge();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index 54aeaa6..5afa652 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -183,16 +183,6 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
   boolean reportRegionStateTransition(TransitionCode code, HRegionInfo... hris);
 
   /**
-   * Notify master that a region wants to be splitted.
-   */
-  long requestRegionSplit(final HRegionInfo regionInfo, final byte[] splitRow);
-
-  /**
-   * Check with master whether a procedure is completed (either succeed or fail)
-   */
-  boolean isProcedureFinished(final long procId) throws IOException;
-
-  /**
    * Returns a reference to the region server's RPC server
    */
   RpcServerInterface getRpcServer();

http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java
index b347b4b..8eb78a2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java
@@ -48,8 +48,7 @@ class RegionUnassigner {
       return;
     }
     unassigning = true;
-    new Thread("Unassign-" + regionInfo) {
-
+    new Thread("RegionUnassigner." + regionInfo.getEncodedName()) {
       @Override
       public void run() {
         LOG.info("Unassign " + regionInfo.getRegionNameAsString());
@@ -65,4 +64,4 @@ class RegionUnassigner {
       }
     }.start();
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
index eb9811d..bd59c53 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
@@ -18,16 +18,16 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import java.io.IOException;
 import java.security.PrivilegedAction;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.ipc.RemoteException;
 
 import com.google.common.base.Preconditions;
 
@@ -37,14 +37,14 @@ import com.google.common.base.Preconditions;
 @InterfaceAudience.Private
 class SplitRequest implements Runnable {
   private static final Log LOG = LogFactory.getLog(SplitRequest.class);
-  private final HRegion parent;
+  private final HRegionInfo parent;
   private final byte[] midKey;
   private final HRegionServer server;
   private final User user;
 
   SplitRequest(Region region, byte[] midKey, HRegionServer hrs, User user) {
     Preconditions.checkNotNull(hrs);
-    this.parent = (HRegion)region;
+    this.parent = region.getRegionInfo();
     this.midKey = midKey;
     this.server = hrs;
     this.user = user;
@@ -56,67 +56,30 @@ class SplitRequest implements Runnable {
   }
 
   private void doSplitting() {
-    boolean success = false;
     server.metricsRegionServer.incrSplitRequest();
-    long startTime = EnvironmentEdgeManager.currentTime();
-
-    try {
-      long procId;
-      if (user != null && user.getUGI() != null) {
-        procId = user.getUGI().doAs (new PrivilegedAction<Long>() {
-          @Override
-          public Long run() {
-            try {
-              return server.requestRegionSplit(parent.getRegionInfo(), midKey);
-            } catch (Exception e) {
-              LOG.error("Failed to complete region split ", e);
-            }
-            return (long)-1;
-          }
-        });
-      } else {
-        procId = server.requestRegionSplit(parent.getRegionInfo(), midKey);
-      }
-
-      if (procId != -1) {
-        // wait for the split to complete or get interrupted.  If the split completes successfully,
-        // the procedure will return true; if the split fails, the procedure would throw exception.
-        //
-        try {
-          while (!(success = server.isProcedureFinished(procId))) {
-            try {
-              Thread.sleep(1000);
-            } catch (InterruptedException e) {
-              LOG.warn("Split region " + parent + " is still in progress.  Not waiting...");
-              break;
-            }
-          }
-        } catch (IOException e) {
-          LOG.error("Split region " + parent + " failed.", e);
+    if (user != null && user.getUGI() != null) {
+      user.getUGI().doAs (new PrivilegedAction<Void>() {
+        @Override
+        public Void run() {
+          requestRegionSplit();
+          return null;
         }
-      } else {
-        LOG.error("Fail to split region " + parent);
-      }
-    } finally {
-      if (this.parent.getCoprocessorHost() != null) {
-        try {
-          this.parent.getCoprocessorHost().postCompleteSplit();
-        } catch (IOException io) {
-          LOG.error("Split failed " + this,
-            io instanceof RemoteException ? ((RemoteException) io).unwrapRemoteException() : io);
-        }
-      }
-
-      // Update regionserver metrics with the split transaction total running time
-      server.metricsRegionServer.updateSplitTime(EnvironmentEdgeManager.currentTime() - startTime);
-
-      if (parent.shouldForceSplit()) {
-        parent.clearSplit();
-      }
+      });
+    } else {
+      requestRegionSplit();
+    }
+  }
 
-      if (success) {
-        server.metricsRegionServer.incrSplitSuccess();
-      }
+  private void requestRegionSplit() {
+    final TableName table = parent.getTable();
+    final HRegionInfo hri_a = new HRegionInfo(table, parent.getStartKey(), midKey);
+    final HRegionInfo hri_b = new HRegionInfo(table, midKey, parent.getEndKey());
+    // Send the split request to the master. the master will do the validation on the split-key.
+    // The parent region will be unassigned and the two new regions will be assigned.
+    // hri_a and hri_b objects may not reflect the regions that will be created, those objects
+    // are created just to pass the information to the reportRegionStateTransition().
+    if (!server.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT, parent, hri_a, hri_b)) {
+      LOG.error("Unable to ask master to split " + parent.getRegionNameAsString());
     }
   }
 
@@ -130,4 +93,4 @@ class SplitRequest implements Runnable {
 
     doSplitting();
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
index 5ff7a1e..3ecc750 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
@@ -125,4 +125,4 @@ public class CloseRegionHandler extends EventHandler {
         remove(this.regionInfo.getEncodedNameAsBytes(), Boolean.FALSE);
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
index dca02e4..f1e42a6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
@@ -559,7 +559,7 @@ public class HBaseFsck extends Configured implements Closeable {
     errors.print("Number of requests: " + status.getRequestsCount());
     errors.print("Number of regions: " + status.getRegionsCount());
 
-    Set<RegionState> rits = status.getRegionsInTransition();
+    List<RegionState> rits = status.getRegionsInTransition();
     errors.print("Number of regions in transition: " + rits.size());
     if (details) {
       for (RegionState state: rits) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
index d7749c2..8ea7012 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
@@ -41,7 +41,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
 
 /**
  * Utility methods for interacting with the regions.
@@ -223,7 +223,7 @@ public abstract class ModifyRegionUtils {
   static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration conf,
       final String threadNamePrefix, int regionNumber) {
     int maxThreads = Math.min(regionNumber, conf.getInt(
-        "hbase.hregion.open.and.init.threads.max", 10));
+        "hbase.hregion.open.and.init.threads.max", 16));
     ThreadPoolExecutor regionOpenAndInitThreadPool = Threads
     .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
         new ThreadFactory() {
@@ -236,24 +236,4 @@ public abstract class ModifyRegionUtils {
         });
     return regionOpenAndInitThreadPool;
   }
-
-  /**
-   * Triggers a bulk assignment of the specified regions
-   *
-   * @param assignmentManager the Assignment Manger
-   * @param regionInfos the list of regions to assign
-   * @throws IOException if an error occurred during the assignment
-   */
-  public static void assignRegions(final AssignmentManager assignmentManager,
-      final List<HRegionInfo> regionInfos) throws IOException {
-    try {
-      assignmentManager.getRegionStates().createRegionStates(regionInfos);
-      assignmentManager.assign(regionInfos);
-    } catch (InterruptedException e) {
-      LOG.error("Caught " + e + " during round-robin assignment");
-      InterruptedIOException ie = new InterruptedIOException(e.getMessage());
-      ie.initCause(e);
-      throw ie;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index e8069ec..517a0cd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -289,8 +289,8 @@ public class WALSplitter {
     this.fileBeingSplit = logfile;
     try {
       long logLength = logfile.getLen();
-      LOG.info("Splitting wal: " + logPath + ", length=" + logLength);
-      LOG.info("DistributedLogReplay = " + this.distributedLogReplay);
+      LOG.info("Splitting WAL=" + logPath + ", length=" + logLength +
+          ", distributedLogReplay=" + this.distributedLogReplay);
       status.setStatus("Opening log file");
       if (reporter != null && !reporter.progress()) {
         progress_failed = true;
@@ -298,7 +298,7 @@ public class WALSplitter {
       }
       in = getReader(logfile, skipErrors, reporter);
       if (in == null) {
-        LOG.warn("Nothing to split in log file " + logPath);
+        LOG.warn("Nothing to split in WAL=" + logPath);
         return true;
       }
       int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
@@ -377,7 +377,7 @@ public class WALSplitter {
       iie.initCause(ie);
       throw iie;
     } catch (CorruptedLogFileException e) {
-      LOG.warn("Could not parse, corrupted log file " + logPath, e);
+      LOG.warn("Could not parse, corrupted WAL=" + logPath, e);
       if (this.csm != null) {
         // Some tests pass in a csm of null.
         this.csm.getSplitLogWorkerCoordination().markCorrupted(rootDir,
@@ -397,7 +397,7 @@ public class WALSplitter {
           in.close();
         }
       } catch (IOException exception) {
-        LOG.warn("Could not close wal reader: " + exception.getMessage());
+        LOG.warn("Could not close WAL reader: " + exception.getMessage());
         LOG.debug("exception details", exception);
       }
       try {
@@ -1595,8 +1595,10 @@ public class WALSplitter {
           if (wap == null) {
             wap = getWriterAndPath(logEntry);
             if (wap == null) {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("getWriterAndPath decided we don't need to write edits for " + logEntry);
+              if (LOG.isTraceEnabled()) {
+                // This log spews the full edit. Can be massive in the log. Enable only debugging
+                // WAL lost edit issues.
+                LOG.trace("getWriterAndPath decided we don't need to write edits for " + logEntry);
               }
               return;
             }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java
index 69cd233..a6a5c17 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java
@@ -88,8 +88,8 @@ public class RegionServerTracker extends ZooKeeperListener {
               int magicLen = ProtobufUtil.lengthOfPBMagic();
               ProtobufUtil.mergeFrom(rsInfoBuilder, data, magicLen, data.length - magicLen);
             }
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Added tracking of RS " + nodePath);
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Added tracking of RS " + nodePath);
             }
           } catch (KeeperException e) {
             LOG.warn("Get Rs info port from ephemeral node", e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index afc070d..b527195 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.io.OutputStream;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
@@ -46,7 +47,10 @@ import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.io.FileUtils;
@@ -86,10 +90,10 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
-import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.ChunkCreator;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -3323,13 +3327,14 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
   public void moveRegionAndWait(HRegionInfo destRegion, ServerName destServer)
       throws InterruptedException, IOException {
     HMaster master = getMiniHBaseCluster().getMaster();
-    getHBaseAdmin().move(destRegion.getEncodedNameAsBytes(),
+    // TODO: Here we start the move. The move can take a while.
+    getAdmin().move(destRegion.getEncodedNameAsBytes(),
         Bytes.toBytes(destServer.getServerName()));
     while (true) {
       ServerName serverName = master.getAssignmentManager().getRegionStates()
           .getRegionServerOfRegion(destRegion);
       if (serverName != null && serverName.equals(destServer)) {
-        assertRegionOnServer(destRegion, serverName, 200);
+        assertRegionOnServer(destRegion, serverName, 2000);
         break;
       }
       Thread.sleep(10);
@@ -3994,8 +3999,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
         if (master == null) return false;
         AssignmentManager am = master.getAssignmentManager();
         if (am == null) return false;
-        final RegionStates regionStates = am.getRegionStates();
-        return !regionStates.isRegionsInTransition();
+        return !am.hasRegionsInTransition();
       }
     };
   }