You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/23 18:37:46 UTC
[09/28] hbase git commit: HBASE-18087 Fix unit tests in
TestTableFavoredNodes
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
deleted file mode 100644
index 7791ea7..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
+++ /dev/null
@@ -1,695 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.conf.ConfigurationManager;
-import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
-import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
-import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.StealJobQueue;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.util.StringUtils;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-/**
- * Compact region on request and then run split if appropriate
- */
-@InterfaceAudience.Private
-public class CompactSplitThread implements CompactionRequestor, PropagatingConfigurationObserver {
- private static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
-
- // Configuration key for the large compaction threads.
- public final static String LARGE_COMPACTION_THREADS =
- "hbase.regionserver.thread.compaction.large";
- public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
-
- // Configuration key for the small compaction threads.
- public final static String SMALL_COMPACTION_THREADS =
- "hbase.regionserver.thread.compaction.small";
- public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
-
- // Configuration key for split threads
- public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
- public final static int SPLIT_THREADS_DEFAULT = 1;
-
- public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
- "hbase.regionserver.regionSplitLimit";
- public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000;
-
- private final HRegionServer server;
- private final Configuration conf;
-
- private final ThreadPoolExecutor longCompactions;
- private final ThreadPoolExecutor shortCompactions;
- private final ThreadPoolExecutor splits;
-
- private volatile ThroughputController compactionThroughputController;
-
- /**
- * Splitting should not take place if the total number of regions exceed this.
- * This is not a hard limit to the number of regions but it is a guideline to
- * stop splitting after number of online regions is greater than this.
- */
- private int regionSplitLimit;
-
- /** @param server */
- CompactSplitThread(HRegionServer server) {
- super();
- this.server = server;
- this.conf = server.getConfiguration();
- this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT,
- DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
-
- int largeThreads = Math.max(1, conf.getInt(
- LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
- int smallThreads = conf.getInt(
- SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
-
- int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
-
- // if we have throttle threads, make sure the user also specified size
- Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
-
- final String n = Thread.currentThread().getName();
-
- StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<>();
- this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
- 60, TimeUnit.SECONDS, stealJobQueue,
- new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- String name = n + "-longCompactions-" + System.currentTimeMillis();
- return new Thread(r, name);
- }
- });
- this.longCompactions.setRejectedExecutionHandler(new Rejection());
- this.longCompactions.prestartAllCoreThreads();
- this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
- 60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(),
- new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- String name = n + "-shortCompactions-" + System.currentTimeMillis();
- return new Thread(r, name);
- }
- });
- this.shortCompactions
- .setRejectedExecutionHandler(new Rejection());
- this.splits = (ThreadPoolExecutor)
- Executors.newFixedThreadPool(splitThreads,
- new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- String name = n + "-splits-" + System.currentTimeMillis();
- return new Thread(r, name);
- }
- });
-
- // compaction throughput controller
- this.compactionThroughputController =
- CompactionThroughputControllerFactory.create(server, conf);
- }
-
- @Override
- public String toString() {
- return "compaction_queue=("
- + longCompactions.getQueue().size() + ":"
- + shortCompactions.getQueue().size() + ")"
- + ", split_queue=" + splits.getQueue().size();
- }
-
- public String dumpQueue() {
- StringBuffer queueLists = new StringBuffer();
- queueLists.append("Compaction/Split Queue dump:\n");
- queueLists.append(" LargeCompation Queue:\n");
- BlockingQueue<Runnable> lq = longCompactions.getQueue();
- Iterator<Runnable> it = lq.iterator();
- while (it.hasNext()) {
- queueLists.append(" " + it.next().toString());
- queueLists.append("\n");
- }
-
- if (shortCompactions != null) {
- queueLists.append("\n");
- queueLists.append(" SmallCompation Queue:\n");
- lq = shortCompactions.getQueue();
- it = lq.iterator();
- while (it.hasNext()) {
- queueLists.append(" " + it.next().toString());
- queueLists.append("\n");
- }
- }
-
- queueLists.append("\n");
- queueLists.append(" Split Queue:\n");
- lq = splits.getQueue();
- it = lq.iterator();
- while (it.hasNext()) {
- queueLists.append(" " + it.next().toString());
- queueLists.append("\n");
- }
-
- return queueLists.toString();
- }
-
- public synchronized boolean requestSplit(final Region r) {
- // don't split regions that are blocking
- if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= Store.PRIORITY_USER) {
- byte[] midKey = ((HRegion)r).checkSplit();
- if (midKey != null) {
- requestSplit(r, midKey);
- return true;
- }
- }
- return false;
- }
-
- public synchronized void requestSplit(final Region r, byte[] midKey) {
- requestSplit(r, midKey, null);
- }
-
- /*
- * The User parameter allows the split thread to assume the correct user identity
- */
- public synchronized void requestSplit(final Region r, byte[] midKey, User user) {
- if (midKey == null) {
- LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() +
- " not splittable because midkey=null");
- if (((HRegion)r).shouldForceSplit()) {
- ((HRegion)r).clearSplit();
- }
- return;
- }
- try {
- this.splits.execute(new SplitRequest(r, midKey, this.server, user));
- if (LOG.isDebugEnabled()) {
- LOG.debug("Split requested for " + r + ". " + this);
- }
- } catch (RejectedExecutionException ree) {
- LOG.info("Could not execute split for " + r, ree);
- }
- }
-
- @Override
- public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why)
- throws IOException {
- return requestCompaction(r, why, null);
- }
-
- @Override
- public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
- List<Pair<CompactionRequest, Store>> requests) throws IOException {
- return requestCompaction(r, why, Store.NO_PRIORITY, requests, null);
- }
-
- @Override
- public synchronized CompactionRequest requestCompaction(final Region r, final Store s,
- final String why, CompactionRequest request) throws IOException {
- return requestCompaction(r, s, why, Store.NO_PRIORITY, request, null);
- }
-
- @Override
- public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
- int p, List<Pair<CompactionRequest, Store>> requests, User user) throws IOException {
- return requestCompactionInternal(r, why, p, requests, true, user);
- }
-
- private List<CompactionRequest> requestCompactionInternal(final Region r, final String why,
- int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow, User user)
- throws IOException {
- // not a special compaction request, so make our own list
- List<CompactionRequest> ret = null;
- if (requests == null) {
- ret = selectNow ? new ArrayList<>(r.getStores().size()) : null;
- for (Store s : r.getStores()) {
- CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user);
- if (selectNow) ret.add(cr);
- }
- } else {
- Preconditions.checkArgument(selectNow); // only system requests have selectNow == false
- ret = new ArrayList<>(requests.size());
- for (Pair<CompactionRequest, Store> pair : requests) {
- ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst(), user));
- }
- }
- return ret;
- }
-
- public CompactionRequest requestCompaction(final Region r, final Store s,
- final String why, int priority, CompactionRequest request, User user) throws IOException {
- return requestCompactionInternal(r, s, why, priority, request, true, user);
- }
-
- public synchronized void requestSystemCompaction(
- final Region r, final String why) throws IOException {
- requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false, null);
- }
-
- public void requestSystemCompaction(
- final Region r, final Store s, final String why) throws IOException {
- requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false, null);
- }
-
- /**
- * @param r region store belongs to
- * @param s Store to request compaction on
- * @param why Why compaction requested -- used in debug messages
- * @param priority override the default priority (NO_PRIORITY == decide)
- * @param request custom compaction request. Can be <tt>null</tt> in which case a simple
- * compaction will be used.
- */
- private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s,
- final String why, int priority, CompactionRequest request, boolean selectNow, User user)
- throws IOException {
- if (this.server.isStopped()
- || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
- return null;
- }
-
- CompactionContext compaction = null;
- if (selectNow) {
- compaction = selectCompaction(r, s, priority, request, user);
- if (compaction == null) return null; // message logged inside
- }
-
- final RegionServerSpaceQuotaManager spaceQuotaManager =
- this.server.getRegionServerSpaceQuotaManager();
- if (spaceQuotaManager != null && spaceQuotaManager.areCompactionsDisabled(
- r.getTableDesc().getTableName())) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Ignoring compaction request for " + r + " as an active space quota violation "
- + " policy disallows compactions.");
- }
- return null;
- }
-
- // We assume that most compactions are small. So, put system compactions into small
- // pool; we will do selection there, and move to large pool if necessary.
- ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
- ? longCompactions : shortCompactions;
- pool.execute(new CompactionRunner(s, r, compaction, pool, user));
- if (LOG.isDebugEnabled()) {
- String type = (pool == shortCompactions) ? "Small " : "Large ";
- LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
- + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
- }
- return selectNow ? compaction.getRequest() : null;
- }
-
- private CompactionContext selectCompaction(final Region r, final Store s,
- int priority, CompactionRequest request, User user) throws IOException {
- CompactionContext compaction = s.requestCompaction(priority, request, user);
- if (compaction == null) {
- if(LOG.isDebugEnabled() && r.getRegionInfo() != null) {
- LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() +
- " because compaction request was cancelled");
- }
- return null;
- }
- assert compaction.hasSelection();
- if (priority != Store.NO_PRIORITY) {
- compaction.getRequest().setPriority(priority);
- }
- return compaction;
- }
-
- /**
- * Only interrupt once it's done with a run through the work loop.
- */
- void interruptIfNecessary() {
- splits.shutdown();
- longCompactions.shutdown();
- shortCompactions.shutdown();
- }
-
- private void waitFor(ThreadPoolExecutor t, String name) {
- boolean done = false;
- while (!done) {
- try {
- done = t.awaitTermination(60, TimeUnit.SECONDS);
- LOG.info("Waiting for " + name + " to finish...");
- if (!done) {
- t.shutdownNow();
- }
- } catch (InterruptedException ie) {
- LOG.warn("Interrupted waiting for " + name + " to finish...");
- }
- }
- }
-
- void join() {
- waitFor(splits, "Split Thread");
- waitFor(longCompactions, "Large Compaction Thread");
- waitFor(shortCompactions, "Small Compaction Thread");
- }
-
- /**
- * Returns the current size of the queue containing regions that are
- * processed.
- *
- * @return The current size of the regions queue.
- */
- public int getCompactionQueueSize() {
- return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
- }
-
- public int getLargeCompactionQueueSize() {
- return longCompactions.getQueue().size();
- }
-
-
- public int getSmallCompactionQueueSize() {
- return shortCompactions.getQueue().size();
- }
-
- public int getSplitQueueSize() {
- return splits.getQueue().size();
- }
-
- private boolean shouldSplitRegion() {
- if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) {
- LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". "
- + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt");
- }
- return (regionSplitLimit > server.getNumberOfOnlineRegions());
- }
-
- /**
- * @return the regionSplitLimit
- */
- public int getRegionSplitLimit() {
- return this.regionSplitLimit;
- }
-
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
- justification="Contrived use of compareTo")
- private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
- private final Store store;
- private final HRegion region;
- private CompactionContext compaction;
- private int queuedPriority;
- private ThreadPoolExecutor parent;
- private User user;
- private long time;
-
- public CompactionRunner(Store store, Region region,
- CompactionContext compaction, ThreadPoolExecutor parent, User user) {
- super();
- this.store = store;
- this.region = (HRegion)region;
- this.compaction = compaction;
- this.queuedPriority = (this.compaction == null)
- ? store.getCompactPriority() : compaction.getRequest().getPriority();
- this.parent = parent;
- this.user = user;
- this.time = System.currentTimeMillis();
- }
-
- @Override
- public String toString() {
- return (this.compaction != null) ? ("Request = " + compaction.getRequest())
- : ("regionName = " + region.toString() + ", storeName = " + store.toString() +
- ", priority = " + queuedPriority + ", time = " + time);
- }
-
- private void doCompaction(User user) {
- // Common case - system compaction without a file selection. Select now.
- if (this.compaction == null) {
- int oldPriority = this.queuedPriority;
- this.queuedPriority = this.store.getCompactPriority();
- if (this.queuedPriority > oldPriority) {
- // Store priority decreased while we were in queue (due to some other compaction?),
- // requeue with new priority to avoid blocking potential higher priorities.
- this.parent.execute(this);
- return;
- }
- try {
- this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user);
- } catch (IOException ex) {
- LOG.error("Compaction selection failed " + this, ex);
- server.checkFileSystem();
- return;
- }
- if (this.compaction == null) return; // nothing to do
- // Now see if we are in correct pool for the size; if not, go to the correct one.
- // We might end up waiting for a while, so cancel the selection.
- assert this.compaction.hasSelection();
- ThreadPoolExecutor pool = store.throttleCompaction(
- compaction.getRequest().getSize()) ? longCompactions : shortCompactions;
-
- // Long compaction pool can process small job
- // Short compaction pool should not process large job
- if (this.parent == shortCompactions && pool == longCompactions) {
- this.store.cancelRequestedCompaction(this.compaction);
- this.compaction = null;
- this.parent = pool;
- this.parent.execute(this);
- return;
- }
- }
- // Finally we can compact something.
- assert this.compaction != null;
-
- this.compaction.getRequest().beforeExecute();
- try {
- // Note: please don't put single-compaction logic here;
- // put it into region/store/etc. This is CST logic.
- long start = EnvironmentEdgeManager.currentTime();
- boolean completed =
- region.compact(compaction, store, compactionThroughputController, user);
- long now = EnvironmentEdgeManager.currentTime();
- LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
- this + "; duration=" + StringUtils.formatTimeDiff(now, start));
- if (completed) {
- // degenerate case: blocked regions require recursive enqueues
- if (store.getCompactPriority() <= 0) {
- requestSystemCompaction(region, store, "Recursive enqueue");
- } else {
- // see if the compaction has caused us to exceed max region size
- requestSplit(region);
- }
- }
- } catch (IOException ex) {
- IOException remoteEx =
- ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
- LOG.error("Compaction failed " + this, remoteEx);
- if (remoteEx != ex) {
- LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
- }
- region.reportCompactionRequestFailure();
- server.checkFileSystem();
- } catch (Exception ex) {
- LOG.error("Compaction failed " + this, ex);
- region.reportCompactionRequestFailure();
- server.checkFileSystem();
- } finally {
- LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
- }
- this.compaction.getRequest().afterExecute();
- }
-
- @Override
- public void run() {
- Preconditions.checkNotNull(server);
- if (server.isStopped()
- || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
- return;
- }
- doCompaction(user);
- }
-
- private String formatStackTrace(Exception ex) {
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- return sw.toString();
- }
-
- @Override
- public int compareTo(CompactionRunner o) {
- // Only compare the underlying request (if any), for queue sorting purposes.
- int compareVal = queuedPriority - o.queuedPriority; // compare priority
- if (compareVal != 0) return compareVal;
- CompactionContext tc = this.compaction, oc = o.compaction;
- // Sort pre-selected (user?) compactions before system ones with equal priority.
- return (tc == null) ? ((oc == null) ? 0 : 1)
- : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest()));
- }
- }
-
- /**
- * Cleanup class to use when rejecting a compaction request from the queue.
- */
- private static class Rejection implements RejectedExecutionHandler {
- @Override
- public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
- if (runnable instanceof CompactionRunner) {
- CompactionRunner runner = (CompactionRunner)runnable;
- LOG.debug("Compaction Rejected: " + runner);
- runner.store.cancelRequestedCompaction(runner.compaction);
- }
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void onConfigurationChange(Configuration newConf) {
- // Check if number of large / small compaction threads has changed, and then
- // adjust the core pool size of the thread pools, by using the
- // setCorePoolSize() method. According to the javadocs, it is safe to
- // change the core pool size on-the-fly. We need to reset the maximum
- // pool size, as well.
- int largeThreads = Math.max(1, newConf.getInt(
- LARGE_COMPACTION_THREADS,
- LARGE_COMPACTION_THREADS_DEFAULT));
- if (this.longCompactions.getCorePoolSize() != largeThreads) {
- LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS +
- " from " + this.longCompactions.getCorePoolSize() + " to " +
- largeThreads);
- if(this.longCompactions.getCorePoolSize() < largeThreads) {
- this.longCompactions.setMaximumPoolSize(largeThreads);
- this.longCompactions.setCorePoolSize(largeThreads);
- } else {
- this.longCompactions.setCorePoolSize(largeThreads);
- this.longCompactions.setMaximumPoolSize(largeThreads);
- }
- }
-
- int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS,
- SMALL_COMPACTION_THREADS_DEFAULT);
- if (this.shortCompactions.getCorePoolSize() != smallThreads) {
- LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS +
- " from " + this.shortCompactions.getCorePoolSize() + " to " +
- smallThreads);
- if(this.shortCompactions.getCorePoolSize() < smallThreads) {
- this.shortCompactions.setMaximumPoolSize(smallThreads);
- this.shortCompactions.setCorePoolSize(smallThreads);
- } else {
- this.shortCompactions.setCorePoolSize(smallThreads);
- this.shortCompactions.setMaximumPoolSize(smallThreads);
- }
- }
-
- int splitThreads = newConf.getInt(SPLIT_THREADS,
- SPLIT_THREADS_DEFAULT);
- if (this.splits.getCorePoolSize() != splitThreads) {
- LOG.info("Changing the value of " + SPLIT_THREADS +
- " from " + this.splits.getCorePoolSize() + " to " +
- splitThreads);
- if(this.splits.getCorePoolSize() < splitThreads) {
- this.splits.setMaximumPoolSize(splitThreads);
- this.splits.setCorePoolSize(splitThreads);
- } else {
- this.splits.setCorePoolSize(splitThreads);
- this.splits.setMaximumPoolSize(splitThreads);
- }
- }
-
- ThroughputController old = this.compactionThroughputController;
- if (old != null) {
- old.stop("configuration change");
- }
- this.compactionThroughputController =
- CompactionThroughputControllerFactory.create(server, newConf);
-
- // We change this atomically here instead of reloading the config in order that upstream
- // would be the only one with the flexibility to reload the config.
- this.conf.reloadConfiguration();
- }
-
- protected int getSmallCompactionThreadNum() {
- return this.shortCompactions.getCorePoolSize();
- }
-
- protected int getLargeCompactionThreadNum() {
- return this.longCompactions.getCorePoolSize();
- }
-
- protected int getSplitThreadNum() {
- return this.splits.getCorePoolSize();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void registerChildren(ConfigurationManager manager) {
- // No children to register.
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void deregisterChildren(ConfigurationManager manager) {
- // No children to register
- }
-
- @VisibleForTesting
- public ThroughputController getCompactionThroughputController() {
- return compactionThroughputController;
- }
-
- @VisibleForTesting
- /**
- * Shutdown the long compaction thread pool.
- * Should only be used in unit test to prevent long compaction thread pool from stealing job
- * from short compaction queue
- */
- void shutdownLongCompactions(){
- this.longCompactions.shutdown();
- }
-
- public void clearLongCompactionsQueue() {
- longCompactions.getQueue().clear();
- }
-
- public void clearShortCompactionsQueue() {
- shortCompactions.getQueue().clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java
index 2773e00..6b8948b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java
@@ -34,8 +34,8 @@ import com.google.common.annotations.VisibleForTesting;
/**
* A chore service that periodically cleans up the compacted files when there are no active readers
- * using those compacted files and also helps in clearing the block cache with these compacted
- * file entries
+ * using those compacted files and also helps in clearing the block cache of these compacted
+ * file entries.
*/
@InterfaceAudience.Private
public class CompactedHFilesDischarger extends ScheduledChore {
@@ -71,45 +71,56 @@ public class CompactedHFilesDischarger extends ScheduledChore {
this.useExecutor = useExecutor;
}
+ /**
+ * CompactedHFilesDischarger runs asynchronously by default using the hosting
+ * RegionServer's Executor. In tests it can be useful to force a synchronous
+ * cleanup. Use this method to set no-executor before you call run.
+ * @return The old setting for <code>useExecutor</code>
+ */
+ @VisibleForTesting
+ boolean setUseExecutor(final boolean useExecutor) {
+ boolean oldSetting = this.useExecutor;
+ this.useExecutor = useExecutor;
+ return oldSetting;
+ }
+
@Override
public void chore() {
// Noop if rss is null. This will never happen in a normal condition except for cases
// when the test case is not spinning up a cluster
if (regionServerServices == null) return;
List<Region> onlineRegions = regionServerServices.getOnlineRegions();
- if (onlineRegions != null) {
- for (Region region : onlineRegions) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(
- "Started the compacted hfiles cleaner for the region " + region.getRegionInfo());
- }
- for (Store store : region.getStores()) {
- try {
- if (useExecutor && regionServerServices != null) {
- CompactedHFilesDischargeHandler handler = new CompactedHFilesDischargeHandler(
- (Server) regionServerServices, EventType.RS_COMPACTED_FILES_DISCHARGER,
- (HStore) store);
- regionServerServices.getExecutorService().submit(handler);
- } else {
- // call synchronously if the RegionServerServices are not
- // available
- store.closeAndArchiveCompactedFiles();
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("Completed archiving the compacted files for the region "
- + region.getRegionInfo() + " under the store " + store.getColumnFamilyName());
- }
- } catch (Exception e) {
- LOG.error("Exception while trying to close and archive the compacted store "
- + "files of the store " + store.getColumnFamilyName() + " in the" + " region "
- + region.getRegionInfo(), e);
+ if (onlineRegions == null) return;
+ for (Region region : onlineRegions) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Started compacted hfiles cleaner on " + region.getRegionInfo());
+ }
+ for (Store store : region.getStores()) {
+ try {
+ if (useExecutor && regionServerServices != null) {
+ CompactedHFilesDischargeHandler handler = new CompactedHFilesDischargeHandler(
+ (Server) regionServerServices, EventType.RS_COMPACTED_FILES_DISCHARGER,
+ (HStore) store);
+ regionServerServices.getExecutorService().submit(handler);
+ } else {
+ // call synchronously if the RegionServerServices are not
+ // available
+ store.closeAndArchiveCompactedFiles();
}
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Completed archiving the compacted files for the region "
+ + region.getRegionInfo() + " under the store " + store.getColumnFamilyName());
+ }
+ } catch (Exception e) {
+ LOG.error("Exception while trying to close and archive the compacted store "
+ + "files of the store " + store.getColumnFamilyName() + " in the" + " region "
+ + region.getRegionInfo(), e);
}
- if (LOG.isTraceEnabled()) {
- LOG.trace(
- "Completed the compacted hfiles cleaner for the region " + region.getRegionInfo());
- }
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(
+ "Completed the compacted hfiles cleaner for the region " + region.getRegionInfo());
}
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index a4a7537..8cc9cd7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1390,14 +1390,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return !isClosed() && !isClosing();
}
- /** @return true if region is splittable */
+ @Override
public boolean isSplittable() {
- return isAvailable() && !hasReferences();
+ boolean result = isAvailable() && !hasReferences();
+ LOG.info("ASKED IF SPLITTABLE " + result, new Throwable("LOGGING"));
+ return result;
}
- /**
- * @return true if region is mergeable
- */
+ @Override
public boolean isMergeable() {
if (!isAvailable()) {
LOG.debug("Region " + this
@@ -5086,11 +5086,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY",
- justification = "Notify is about post replay. Intentional")
@Override
public boolean refreshStoreFiles() throws IOException {
- if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
+ return refreshStoreFiles(false);
+ }
+
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY",
+ justification = "Notify is about post replay. Intentional")
+ protected boolean refreshStoreFiles(boolean force) throws IOException {
+ if (!force && ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
return false; // if primary nothing to do
}
@@ -5848,7 +5852,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
- KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
+ KeyValueScanner scanner;
+ try {
+ scanner = store.getScanner(scan, entry.getValue(), this.readPt);
+ } catch (FileNotFoundException e) {
+ throw handleFileNotFound(e);
+ }
instantiatedScanners.add(scanner);
if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
|| this.filter.isFamilyEssential(entry.getKey())) {
@@ -5872,19 +5881,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
- private void handleFileNotFound(Throwable fnfe) {
+ private FileNotFoundException handleFileNotFound(FileNotFoundException fnfe) {
// Try reopening the region since we have lost some storefiles.
// See HBASE-17712 for more details.
- LOG.warn("A store file got lost, so close and reopen region", fnfe);
+ LOG.warn("Store file is lost; close and reopen region", fnfe);
if (regionUnassigner != null) {
regionUnassigner.unassign();
}
+ return fnfe;
}
private IOException handleException(List<KeyValueScanner> instantiatedScanners,
Throwable t) {
if (t instanceof FileNotFoundException) {
- handleFileNotFound(t);
+ handleFileNotFound((FileNotFoundException)t);
}
// remove scaner read point before throw the exception
scannerReadPoints.remove(this);
@@ -6030,29 +6040,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
boolean tmpKeepProgress = scannerContext.getKeepProgress();
// Scanning between column families and thus the scope is between cells
LimitScope limitScope = LimitScope.BETWEEN_CELLS;
- do {
- // We want to maintain any progress that is made towards the limits while scanning across
- // different column families. To do this, we toggle the keep progress flag on during calls
- // to the StoreScanner to ensure that any progress made thus far is not wiped away.
- scannerContext.setKeepProgress(true);
- heap.next(results, scannerContext);
- scannerContext.setKeepProgress(tmpKeepProgress);
-
- nextKv = heap.peek();
- moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);
- if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext);
- if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {
- return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
- } else if (scannerContext.checkSizeLimit(limitScope)) {
- ScannerContext.NextState state =
- moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
- return scannerContext.setScannerState(state).hasMoreValues();
- } else if (scannerContext.checkTimeLimit(limitScope)) {
- ScannerContext.NextState state =
- moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
- return scannerContext.setScannerState(state).hasMoreValues();
- }
- } while (moreCellsInRow);
+ try {
+ do {
+ // We want to maintain any progress that is made towards the limits while scanning across
+ // different column families. To do this, we toggle the keep progress flag on during calls
+ // to the StoreScanner to ensure that any progress made thus far is not wiped away.
+ scannerContext.setKeepProgress(true);
+ heap.next(results, scannerContext);
+ scannerContext.setKeepProgress(tmpKeepProgress);
+
+ nextKv = heap.peek();
+ moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);
+ if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext);
+ if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {
+ return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
+ } else if (scannerContext.checkSizeLimit(limitScope)) {
+ ScannerContext.NextState state =
+ moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
+ return scannerContext.setScannerState(state).hasMoreValues();
+ } else if (scannerContext.checkTimeLimit(limitScope)) {
+ ScannerContext.NextState state =
+ moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
+ return scannerContext.setScannerState(state).hasMoreValues();
+ }
+ } while (moreCellsInRow);
+ } catch (FileNotFoundException e) {
+ throw handleFileNotFound(e);
+ }
return nextKv != null;
}
@@ -6401,8 +6415,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
result = this.joinedHeap.requestSeek(kv, true, true) || result;
}
} catch (FileNotFoundException e) {
- handleFileNotFound(e);
- throw e;
+ throw handleFileNotFound(e);
} finally {
closeRegionOperation();
}
@@ -7787,6 +7800,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return null;
}
+ // Can't split a region that is closing.
+ if (this.isClosing()) {
+ return null;
+ }
+
if (!splitPolicy.shouldSplit()) {
return null;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index 014427d..59a0fe5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -318,13 +318,15 @@ public class HRegionFileSystem {
* @throws IOException
*/
public boolean hasReferences(final String familyName) throws IOException {
- FileStatus[] files = FSUtils.listStatus(fs, getStoreDir(familyName));
+ Path storeDir = getStoreDir(familyName);
+ FileStatus[] files = FSUtils.listStatus(fs, storeDir);
if (files != null) {
for(FileStatus stat: files) {
if(stat.isDirectory()) {
continue;
}
if(StoreFileInfo.isReference(stat.getPath())) {
+ if (LOG.isTraceEnabled()) LOG.trace("Reference " + stat.getPath());
return true;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 3ca061a..9315b0a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -86,7 +86,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionUtils;
-import org.apache.hadoop.hbase.client.NonceGenerator;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.locking.EntityLock;
@@ -170,8 +169,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionResponse;
import org.apache.hadoop.hbase.trace.SpanReceiverHost;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
@@ -179,7 +176,6 @@ import org.apache.hadoop.hbase.util.CompressionTest;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.JSONBean;
import org.apache.hadoop.hbase.util.JvmPauseMonitor;
@@ -208,21 +204,23 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat;
-import sun.misc.Signal;
-import sun.misc.SignalHandler;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
+import sun.misc.Signal;
+import sun.misc.SignalHandler;
+
/**
* HRegionServer makes a set of HRegions available to clients. It checks in with
* the HMaster. There are many HRegionServers in a single HBase deployment.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
-@SuppressWarnings({ "deprecation", "restriction" })
+@SuppressWarnings({ "deprecation"})
public class HRegionServer extends HasThread implements
RegionServerServices, LastSequenceId, ConfigurationObserver {
+ // Time to pause if master says 'please hold'. Make configurable if needed.
+ private static final int INIT_PAUSE_TIME_MS = 1000;
public static final String REGION_LOCK_AWAIT_TIME_SEC =
"hbase.regionserver.region.lock.await.time.sec";
@@ -283,7 +281,7 @@ public class HRegionServer extends HasThread implements
protected ReplicationSinkService replicationSinkHandler;
// Compactions
- public CompactSplitThread compactSplitThread;
+ public CompactSplit compactSplitThread;
/**
* Map of regions currently being served by this region server. Key is the
@@ -514,7 +512,8 @@ public class HRegionServer extends HasThread implements
*/
protected final ConfigurationManager configurationManager;
- private CompactedHFilesDischarger compactedFileDischarger;
+ @VisibleForTesting
+ CompactedHFilesDischarger compactedFileDischarger;
private volatile ThroughputController flushThroughputController;
@@ -914,7 +913,7 @@ public class HRegionServer extends HasThread implements
this.cacheFlusher = new MemStoreFlusher(conf, this);
// Compaction thread
- this.compactSplitThread = new CompactSplitThread(this);
+ this.compactSplitThread = new CompactSplit(this);
// Background thread to check for compactions; needed if region has not gotten updates
// in a while. It will take care of not checking too frequently on store-by-store basis.
@@ -1432,7 +1431,7 @@ public class HRegionServer extends HasThread implements
// Only print out regions still closing if a small number else will
// swamp the log.
if (count < 10 && LOG.isDebugEnabled()) {
- LOG.debug(this.onlineRegions);
+ LOG.debug("Online Regions=" + this.onlineRegions);
}
}
}
@@ -1779,7 +1778,7 @@ public class HRegionServer extends HasThread implements
final static int RANGE_OF_DELAY = 5 * 60 * 1000; // 5 min in milliseconds
final static int MIN_DELAY_TIME = 0; // millisec
public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
- super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval);
+ super("MemstoreFlusherChore", server, cacheFlushInterval);
this.server = server;
}
@@ -2192,6 +2191,8 @@ public class HRegionServer extends HasThread implements
transition.addRegionInfo(HRegionInfo.convert(hri));
}
ReportRegionStateTransitionRequest request = builder.build();
+ int tries = 0;
+ long pauseTime = INIT_PAUSE_TIME_MS;
while (keepLooping()) {
RegionServerStatusService.BlockingInterface rss = rssStub;
try {
@@ -2202,95 +2203,40 @@ public class HRegionServer extends HasThread implements
ReportRegionStateTransitionResponse response =
rss.reportRegionStateTransition(null, request);
if (response.hasErrorMessage()) {
- LOG.info("Failed to transition " + hris[0]
+ LOG.info("Failed transition " + hris[0]
+ " to " + code + ": " + response.getErrorMessage());
return false;
}
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("TRANSITION REPORTED " + request);
+ }
return true;
} catch (ServiceException se) {
IOException ioe = ProtobufUtil.getRemoteException(se);
- LOG.info("Failed to report region transition, will retry", ioe);
- if (rssStub == rss) {
- rssStub = null;
- }
- }
- }
- return false;
- }
-
- @Override
- public long requestRegionSplit(final HRegionInfo regionInfo, final byte[] splitRow) {
- NonceGenerator ng = clusterConnection.getNonceGenerator();
- final long nonceGroup = ng.getNonceGroup();
- final long nonce = ng.newNonce();
- long procId = -1;
- SplitTableRegionRequest request =
- RequestConverter.buildSplitTableRegionRequest(regionInfo, splitRow, nonceGroup, nonce);
-
- while (keepLooping()) {
- RegionServerStatusService.BlockingInterface rss = rssStub;
- try {
- if (rss == null) {
- createRegionServerStatusStub();
- continue;
- }
- SplitTableRegionResponse response = rss.splitRegion(null, request);
-
- //TODO: should we limit the retry number before quitting?
- if (response == null || (procId = response.getProcId()) == -1) {
- LOG.warn("Failed to split " + regionInfo + " retrying...");
- continue;
+ boolean pause = ioe instanceof ServerNotRunningYetException ||
+ ioe instanceof PleaseHoldException;
+ if (pause) {
+ // Do backoff else we flood the Master with requests.
+ pauseTime = ConnectionUtils.getPauseTime(pauseTime, tries);
+ } else {
+ pauseTime = INIT_PAUSE_TIME_MS; // Reset.
}
-
- break;
- } catch (ServiceException se) {
- // TODO: retry or just fail
- IOException ioe = ProtobufUtil.getRemoteException(se);
- LOG.info("Failed to split region, will retry", ioe);
+ LOG.info("Failed report of region transition; retry (#" + tries + ")" +
+ (pause?
+ " after " + pauseTime + "ms delay (Master is coming online...).":
+ " immediately."),
+ ioe);
+ if (pause) Threads.sleep(pauseTime);
+ tries++;
if (rssStub == rss) {
rssStub = null;
}
}
}
- return procId;
- }
-
- @Override
- public boolean isProcedureFinished(final long procId) throws IOException {
- GetProcedureResultRequest request =
- GetProcedureResultRequest.newBuilder().setProcId(procId).build();
-
- while (keepLooping()) {
- RegionServerStatusService.BlockingInterface rss = rssStub;
- try {
- if (rss == null) {
- createRegionServerStatusStub();
- continue;
- }
- // TODO: find a way to get proc result
- GetProcedureResultResponse response = rss.getProcedureResult(null, request);
-
- if (response == null) {
- LOG.warn("Failed to get procedure (id=" + procId + ") status.");
- return false;
- } else if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
- return false;
- } else if (response.hasException()) {
- // Procedure failed.
- throw ForeignExceptionUtil.toIOException(response.getException());
- }
- // Procedure completes successfully
- break;
- } catch (ServiceException se) {
- // TODO: retry or just fail
- IOException ioe = ProtobufUtil.getRemoteException(se);
- LOG.warn("Failed to get split region procedure result. Retrying", ioe);
- if (rssStub == rss) {
- rssStub = null;
- }
- }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("TRANSITION NOT REPORTED " + request);
}
- return true;
+ return false;
}
/**
@@ -2981,7 +2927,7 @@ public class HRegionServer extends HasThread implements
* @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
*/
public static void main(String[] args) throws Exception {
- LOG.info("***** STARTING service '" + HRegionServer.class.getSimpleName() + "' *****");
+ LOG.info("STARTING service '" + HRegionServer.class.getSimpleName());
VersionInfo.logVersion();
Configuration conf = HBaseConfiguration.create();
@SuppressWarnings("unchecked")
@@ -3286,7 +3232,7 @@ public class HRegionServer extends HasThread implements
throw new RegionOpeningException("Region " + regionNameStr +
" is opening on " + this.serverName);
}
- throw new NotServingRegionException("Region " + regionNameStr +
+ throw new NotServingRegionException("" + regionNameStr +
" is not online on " + this.serverName);
}
return region;
@@ -3404,7 +3350,7 @@ public class HRegionServer extends HasThread implements
}
// This map will contains all the regions that we closed for a move.
- // We add the time it was moved as we don't want to keep too old information
+ // We add the time it was moved as we don't want to keep too old information
protected Map<String, MovedRegionInfo> movedRegions =
new ConcurrentHashMap<>(3000);
@@ -3516,9 +3462,9 @@ public class HRegionServer extends HasThread implements
}
/**
- * @return the underlying {@link CompactSplitThread} for the servers
+ * @return the underlying {@link CompactSplit} for the servers
*/
- public CompactSplitThread getCompactSplitThread() {
+ public CompactSplit getCompactSplitThread() {
return this.compactSplitThread;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index b3ca94d..ed19dc9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -18,8 +18,6 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import com.google.common.annotations.VisibleForTesting;
-
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
@@ -27,8 +25,17 @@ import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -72,6 +79,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
+import org.apache.hadoop.hbase.exceptions.MergeRegionException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
@@ -119,12 +127,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
@@ -137,6 +145,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerIn
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.MergeRegionsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.MergeRegionsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
@@ -194,7 +204,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMet
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceViolationPolicy;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
@@ -212,6 +221,8 @@ import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.zookeeper.KeeperException;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Implements the regionserver RPC services.
*/
@@ -1465,36 +1476,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
- @Override
- @QosPriority(priority=HConstants.ADMIN_QOS)
- public CloseRegionForSplitOrMergeResponse closeRegionForSplitOrMerge(
- final RpcController controller,
- final CloseRegionForSplitOrMergeRequest request) throws ServiceException {
- try {
- checkOpen();
-
- List<String> encodedRegionNameList = new ArrayList<>();
- for(int i = 0; i < request.getRegionCount(); i++) {
- final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion(i));
-
- // Can be null if we're calling close on a region that's not online
- final Region targetRegion = regionServer.getFromOnlineRegions(encodedRegionName);
- if ((targetRegion != null) && (targetRegion.getCoprocessorHost() != null)) {
- targetRegion.getCoprocessorHost().preClose(false);
- encodedRegionNameList.add(encodedRegionName);
- }
- }
- requestCount.increment();
- LOG.info("Close and offline " + encodedRegionNameList + " regions.");
- boolean closed = regionServer.closeAndOfflineRegionForSplitOrMerge(encodedRegionNameList);
- CloseRegionForSplitOrMergeResponse.Builder builder =
- CloseRegionForSplitOrMergeResponse.newBuilder().setClosed(closed);
- return builder.build();
- } catch (IOException ie) {
- throw new ServiceException(ie);
- }
- }
-
/**
* Compact a region on the region server.
*
@@ -1645,6 +1626,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (request.hasCompactionState() && request.getCompactionState()) {
builder.setCompactionState(region.getCompactionState());
}
+ builder.setSplittable(region.isSplittable());
+ builder.setMergeable(region.isMergeable());
builder.setIsRecovering(region.isRecovering());
return builder.build();
} catch (IOException ie) {
@@ -1855,8 +1838,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// The region is already online. This should not happen any more.
String error = "Received OPEN for the region:"
+ region.getRegionNameAsString() + ", which is already online";
- regionServer.abort(error);
- throw new IOException(error);
+ LOG.warn(error);
+ //regionServer.abort(error);
+ //throw new IOException(error);
+ builder.addOpeningState(RegionOpeningState.OPENED);
+ continue;
}
LOG.info("Open " + region.getRegionNameAsString());
@@ -3396,4 +3382,62 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
throw new ServiceException(e);
}
}
+
+ @Override
+ public ExecuteProceduresResponse executeProcedures(RpcController controller,
+ ExecuteProceduresRequest request) throws ServiceException {
+ ExecuteProceduresResponse.Builder builder = ExecuteProceduresResponse.newBuilder();
+ if (request.getOpenRegionCount() > 0) {
+ for (OpenRegionRequest req: request.getOpenRegionList()) {
+ builder.addOpenRegion(openRegion(controller, req));
+ }
+ }
+ if (request.getCloseRegionCount() > 0) {
+ for (CloseRegionRequest req: request.getCloseRegionList()) {
+ builder.addCloseRegion(closeRegion(controller, req));
+ }
+ }
+ return builder.build();
+ }
+
+ /**
+ * Merge regions on the region server.
+ *
+ * @param controller the RPC controller
+ * @param request the request
+ * @return merge regions response
+ * @throws ServiceException
+ */
+ @Override
+ @QosPriority(priority = HConstants.ADMIN_QOS)
+ // UNUSED AS OF AMv2 PURGE!
+ public MergeRegionsResponse mergeRegions(final RpcController controller,
+ final MergeRegionsRequest request) throws ServiceException {
+ try {
+ checkOpen();
+ requestCount.increment();
+ Region regionA = getRegion(request.getRegionA());
+ Region regionB = getRegion(request.getRegionB());
+ boolean forcible = request.getForcible();
+ long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
+ regionA.startRegionOperation(Operation.MERGE_REGION);
+ regionB.startRegionOperation(Operation.MERGE_REGION);
+ if (regionA.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
+ regionB.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
+ throw new ServiceException(new MergeRegionException("Can't merge non-default replicas"));
+ }
+ LOG.info("Receiving merging request for " + regionA + ", " + regionB
+ + ",forcible=" + forcible);
+ regionA.flush(true);
+ regionB.flush(true);
+ regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible,
+ masterSystemTime, RpcServer.getRequestUser());
+ return MergeRegionsResponse.newBuilder().build();
+ } catch (DroppedSnapshotException ex) {
+ regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
+ throw new ServiceException(ex);
+ } catch (IOException ie) {
+ throw new ServiceException(ie);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 295b825..6c4eca9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -96,6 +96,14 @@ public interface Region extends ConfigurationObserver {
/** @return True if region is read only */
boolean isReadOnly();
+ /** @return true if region is splittable */
+ boolean isSplittable();
+
+ /**
+ * @return true if region is mergeable
+ */
+ boolean isMergeable();
+
/**
* Return the list of Stores managed by this region
* <p>Use with caution. Exposed for use of fixup utilities.
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
new file mode 100644
index 0000000..e95932b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
@@ -0,0 +1,108 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.security.PrivilegedAction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Handles processing region merges. Put in a queue, owned by HRegionServer.
+ */
+// UNUSED: REMOVE!!!
+@InterfaceAudience.Private
+class RegionMergeRequest implements Runnable {
+ private static final Log LOG = LogFactory.getLog(RegionMergeRequest.class);
+ private final HRegionInfo region_a;
+ private final HRegionInfo region_b;
+ private final HRegionServer server;
+ private final boolean forcible;
+ private final User user;
+
+ RegionMergeRequest(Region a, Region b, HRegionServer hrs, boolean forcible,
+ long masterSystemTime, User user) {
+ Preconditions.checkNotNull(hrs);
+ this.region_a = a.getRegionInfo();
+ this.region_b = b.getRegionInfo();
+ this.server = hrs;
+ this.forcible = forcible;
+ this.user = user;
+ }
+
+ @Override
+ public String toString() {
+ return "MergeRequest,regions:" + region_a + ", " + region_b + ", forcible="
+ + forcible;
+ }
+
+ private void doMerge() {
+ boolean success = false;
+ //server.metricsRegionServer.incrMergeRequest();
+
+ if (user != null && user.getUGI() != null) {
+ user.getUGI().doAs (new PrivilegedAction<Void>() {
+ @Override
+ public Void run() {
+ requestRegionMerge();
+ return null;
+ }
+ });
+ } else {
+ requestRegionMerge();
+ }
+ }
+
+ private void requestRegionMerge() {
+ final TableName table = region_a.getTable();
+ if (!table.equals(region_b.getTable())) {
+ LOG.error("Can't merge regions from two different tables: " + region_a + ", " + region_b);
+ return;
+ }
+
+ // TODO: fake merged region for compat with the report protocol
+ final HRegionInfo merged = new HRegionInfo(table);
+
+ // Send the split request to the master. the master will do the validation on the split-key.
+ // The parent region will be unassigned and the two new regions will be assigned.
+ // hri_a and hri_b objects may not reflect the regions that will be created, those objectes
+ // are created just to pass the information to the reportRegionStateTransition().
+ if (!server.reportRegionStateTransition(TransitionCode.READY_TO_MERGE, merged, region_a, region_b)) {
+ LOG.error("Unable to ask master to merge: " + region_a + ", " + region_b);
+ }
+ }
+
+ @Override
+ public void run() {
+ if (this.server.isStopping() || this.server.isStopped()) {
+ LOG.debug("Skipping merge because server is stopping="
+ + this.server.isStopping() + " or stopped=" + this.server.isStopped());
+ return;
+ }
+
+ doMerge();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index 54aeaa6..5afa652 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -183,16 +183,6 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
boolean reportRegionStateTransition(TransitionCode code, HRegionInfo... hris);
/**
- * Notify master that a region wants to be splitted.
- */
- long requestRegionSplit(final HRegionInfo regionInfo, final byte[] splitRow);
-
- /**
- * Check with master whether a procedure is completed (either succeed or fail)
- */
- boolean isProcedureFinished(final long procId) throws IOException;
-
- /**
* Returns a reference to the region server's RPC server
*/
RpcServerInterface getRpcServer();
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java
index b347b4b..8eb78a2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java
@@ -48,8 +48,7 @@ class RegionUnassigner {
return;
}
unassigning = true;
- new Thread("Unassign-" + regionInfo) {
-
+ new Thread("RegionUnassigner." + regionInfo.getEncodedName()) {
@Override
public void run() {
LOG.info("Unassign " + regionInfo.getRegionNameAsString());
@@ -65,4 +64,4 @@ class RegionUnassigner {
}
}.start();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
index eb9811d..bd59c53 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
@@ -18,16 +18,16 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import java.io.IOException;
import java.security.PrivilegedAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.ipc.RemoteException;
import com.google.common.base.Preconditions;
@@ -37,14 +37,14 @@ import com.google.common.base.Preconditions;
@InterfaceAudience.Private
class SplitRequest implements Runnable {
private static final Log LOG = LogFactory.getLog(SplitRequest.class);
- private final HRegion parent;
+ private final HRegionInfo parent;
private final byte[] midKey;
private final HRegionServer server;
private final User user;
SplitRequest(Region region, byte[] midKey, HRegionServer hrs, User user) {
Preconditions.checkNotNull(hrs);
- this.parent = (HRegion)region;
+ this.parent = region.getRegionInfo();
this.midKey = midKey;
this.server = hrs;
this.user = user;
@@ -56,67 +56,30 @@ class SplitRequest implements Runnable {
}
private void doSplitting() {
- boolean success = false;
server.metricsRegionServer.incrSplitRequest();
- long startTime = EnvironmentEdgeManager.currentTime();
-
- try {
- long procId;
- if (user != null && user.getUGI() != null) {
- procId = user.getUGI().doAs (new PrivilegedAction<Long>() {
- @Override
- public Long run() {
- try {
- return server.requestRegionSplit(parent.getRegionInfo(), midKey);
- } catch (Exception e) {
- LOG.error("Failed to complete region split ", e);
- }
- return (long)-1;
- }
- });
- } else {
- procId = server.requestRegionSplit(parent.getRegionInfo(), midKey);
- }
-
- if (procId != -1) {
- // wait for the split to complete or get interrupted. If the split completes successfully,
- // the procedure will return true; if the split fails, the procedure would throw exception.
- //
- try {
- while (!(success = server.isProcedureFinished(procId))) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- LOG.warn("Split region " + parent + " is still in progress. Not waiting...");
- break;
- }
- }
- } catch (IOException e) {
- LOG.error("Split region " + parent + " failed.", e);
+ if (user != null && user.getUGI() != null) {
+ user.getUGI().doAs (new PrivilegedAction<Void>() {
+ @Override
+ public Void run() {
+ requestRegionSplit();
+ return null;
}
- } else {
- LOG.error("Fail to split region " + parent);
- }
- } finally {
- if (this.parent.getCoprocessorHost() != null) {
- try {
- this.parent.getCoprocessorHost().postCompleteSplit();
- } catch (IOException io) {
- LOG.error("Split failed " + this,
- io instanceof RemoteException ? ((RemoteException) io).unwrapRemoteException() : io);
- }
- }
-
- // Update regionserver metrics with the split transaction total running time
- server.metricsRegionServer.updateSplitTime(EnvironmentEdgeManager.currentTime() - startTime);
-
- if (parent.shouldForceSplit()) {
- parent.clearSplit();
- }
+ });
+ } else {
+ requestRegionSplit();
+ }
+ }
- if (success) {
- server.metricsRegionServer.incrSplitSuccess();
- }
+ private void requestRegionSplit() {
+ final TableName table = parent.getTable();
+ final HRegionInfo hri_a = new HRegionInfo(table, parent.getStartKey(), midKey);
+ final HRegionInfo hri_b = new HRegionInfo(table, midKey, parent.getEndKey());
+ // Send the split request to the master. the master will do the validation on the split-key.
+ // The parent region will be unassigned and the two new regions will be assigned.
+ // hri_a and hri_b objects may not reflect the regions that will be created, those objects
+ // are created just to pass the information to the reportRegionStateTransition().
+ if (!server.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT, parent, hri_a, hri_b)) {
+ LOG.error("Unable to ask master to split " + parent.getRegionNameAsString());
}
}
@@ -130,4 +93,4 @@ class SplitRequest implements Runnable {
doSplitting();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
index 5ff7a1e..3ecc750 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
@@ -125,4 +125,4 @@ public class CloseRegionHandler extends EventHandler {
remove(this.regionInfo.getEncodedNameAsBytes(), Boolean.FALSE);
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
index dca02e4..f1e42a6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
@@ -559,7 +559,7 @@ public class HBaseFsck extends Configured implements Closeable {
errors.print("Number of requests: " + status.getRequestsCount());
errors.print("Number of regions: " + status.getRegionsCount());
- Set<RegionState> rits = status.getRegionsInTransition();
+ List<RegionState> rits = status.getRegionsInTransition();
errors.print("Number of regions in transition: " + rits.size());
if (details) {
for (RegionState state: rits) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
index d7749c2..8ea7012 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
@@ -41,7 +41,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
/**
* Utility methods for interacting with the regions.
@@ -223,7 +223,7 @@ public abstract class ModifyRegionUtils {
static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration conf,
final String threadNamePrefix, int regionNumber) {
int maxThreads = Math.min(regionNumber, conf.getInt(
- "hbase.hregion.open.and.init.threads.max", 10));
+ "hbase.hregion.open.and.init.threads.max", 16));
ThreadPoolExecutor regionOpenAndInitThreadPool = Threads
.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
new ThreadFactory() {
@@ -236,24 +236,4 @@ public abstract class ModifyRegionUtils {
});
return regionOpenAndInitThreadPool;
}
-
- /**
- * Triggers a bulk assignment of the specified regions
- *
- * @param assignmentManager the Assignment Manger
- * @param regionInfos the list of regions to assign
- * @throws IOException if an error occurred during the assignment
- */
- public static void assignRegions(final AssignmentManager assignmentManager,
- final List<HRegionInfo> regionInfos) throws IOException {
- try {
- assignmentManager.getRegionStates().createRegionStates(regionInfos);
- assignmentManager.assign(regionInfos);
- } catch (InterruptedException e) {
- LOG.error("Caught " + e + " during round-robin assignment");
- InterruptedIOException ie = new InterruptedIOException(e.getMessage());
- ie.initCause(e);
- throw ie;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index e8069ec..517a0cd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -289,8 +289,8 @@ public class WALSplitter {
this.fileBeingSplit = logfile;
try {
long logLength = logfile.getLen();
- LOG.info("Splitting wal: " + logPath + ", length=" + logLength);
- LOG.info("DistributedLogReplay = " + this.distributedLogReplay);
+ LOG.info("Splitting WAL=" + logPath + ", length=" + logLength +
+ ", distributedLogReplay=" + this.distributedLogReplay);
status.setStatus("Opening log file");
if (reporter != null && !reporter.progress()) {
progress_failed = true;
@@ -298,7 +298,7 @@ public class WALSplitter {
}
in = getReader(logfile, skipErrors, reporter);
if (in == null) {
- LOG.warn("Nothing to split in log file " + logPath);
+ LOG.warn("Nothing to split in WAL=" + logPath);
return true;
}
int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
@@ -377,7 +377,7 @@ public class WALSplitter {
iie.initCause(ie);
throw iie;
} catch (CorruptedLogFileException e) {
- LOG.warn("Could not parse, corrupted log file " + logPath, e);
+ LOG.warn("Could not parse, corrupted WAL=" + logPath, e);
if (this.csm != null) {
// Some tests pass in a csm of null.
this.csm.getSplitLogWorkerCoordination().markCorrupted(rootDir,
@@ -397,7 +397,7 @@ public class WALSplitter {
in.close();
}
} catch (IOException exception) {
- LOG.warn("Could not close wal reader: " + exception.getMessage());
+ LOG.warn("Could not close WAL reader: " + exception.getMessage());
LOG.debug("exception details", exception);
}
try {
@@ -1595,8 +1595,10 @@ public class WALSplitter {
if (wap == null) {
wap = getWriterAndPath(logEntry);
if (wap == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("getWriterAndPath decided we don't need to write edits for " + logEntry);
+ if (LOG.isTraceEnabled()) {
+ // This log spews the full edit. Can be massive in the log. Enable only debugging
+ // WAL lost edit issues.
+ LOG.trace("getWriterAndPath decided we don't need to write edits for " + logEntry);
}
return;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java
index 69cd233..a6a5c17 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java
@@ -88,8 +88,8 @@ public class RegionServerTracker extends ZooKeeperListener {
int magicLen = ProtobufUtil.lengthOfPBMagic();
ProtobufUtil.mergeFrom(rsInfoBuilder, data, magicLen, data.length - magicLen);
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Added tracking of RS " + nodePath);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Added tracking of RS " + nodePath);
}
} catch (KeeperException e) {
LOG.warn("Get Rs info port from ephemeral node", e);
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index afc070d..b527195 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
@@ -46,7 +47,10 @@ import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.FileUtils;
@@ -86,10 +90,10 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
-import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -3323,13 +3327,14 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
public void moveRegionAndWait(HRegionInfo destRegion, ServerName destServer)
throws InterruptedException, IOException {
HMaster master = getMiniHBaseCluster().getMaster();
- getHBaseAdmin().move(destRegion.getEncodedNameAsBytes(),
+ // TODO: Here we start the move. The move can take a while.
+ getAdmin().move(destRegion.getEncodedNameAsBytes(),
Bytes.toBytes(destServer.getServerName()));
while (true) {
ServerName serverName = master.getAssignmentManager().getRegionStates()
.getRegionServerOfRegion(destRegion);
if (serverName != null && serverName.equals(destServer)) {
- assertRegionOnServer(destRegion, serverName, 200);
+ assertRegionOnServer(destRegion, serverName, 2000);
break;
}
Thread.sleep(10);
@@ -3994,8 +3999,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
if (master == null) return false;
AssignmentManager am = master.getAssignmentManager();
if (am == null) return false;
- final RegionStates regionStates = am.getRegionStates();
- return !regionStates.isRegionsInTransition();
+ return !am.hasRegionsInTransition();
}
};
}