You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/04/30 22:14:25 UTC
[15/24] hbase git commit: HBASE-14614 Procedure v2 - Core Assignment
Manager (Matteo Bertozzi) Move to a new AssignmentManager,
one that describes Assignment using a State Machine built on top of
ProcedureV2 facility.
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java
deleted file mode 100644
index 929cd4e..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master;
-
-import java.io.IOException;
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.concurrent.Executors;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.Server;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/**
- * Base class used bulk assigning and unassigning regions.
- * Encapsulates a fixed size thread pool of executors to run assignment/unassignment.
- * Implement {@link #populatePool(java.util.concurrent.ExecutorService)} and
- * {@link #waitUntilDone(long)}. The default implementation of
- * the {@link #getUncaughtExceptionHandler()} is to abort the hosting
- * Server.
- */
-@InterfaceAudience.Private
-public abstract class BulkAssigner {
- protected final Server server;
-
- /**
- * @param server An instance of Server
- */
- public BulkAssigner(final Server server) {
- this.server = server;
- }
-
- /**
- * @return What to use for a thread prefix when executor runs.
- */
- protected String getThreadNamePrefix() {
- return this.server.getServerName() + "-" + this.getClass().getName();
- }
-
- protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
- return new UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- // Abort if exception of any kind.
- server.abort("Uncaught exception in " + t.getName(), e);
- }
- };
- }
-
- protected int getThreadCount() {
- return this.server.getConfiguration().
- getInt("hbase.bulk.assignment.threadpool.size", 20);
- }
-
- protected long getTimeoutOnRIT() {
- return this.server.getConfiguration().
- getLong("hbase.bulk.assignment.waiton.empty.rit", 5 * 60 * 1000);
- }
-
- protected abstract void populatePool(
- final java.util.concurrent.ExecutorService pool) throws IOException;
-
- public boolean bulkAssign() throws InterruptedException, IOException {
- return bulkAssign(true);
- }
-
- /**
- * Run the bulk assign.
- *
- * @param sync
- * Whether to assign synchronously.
- * @throws InterruptedException
- * @return True if done.
- * @throws IOException
- */
- public boolean bulkAssign(boolean sync) throws InterruptedException,
- IOException {
- boolean result = false;
- ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
- builder.setDaemon(true);
- builder.setNameFormat(getThreadNamePrefix() + "-%1$d");
- builder.setUncaughtExceptionHandler(getUncaughtExceptionHandler());
- int threadCount = getThreadCount();
- java.util.concurrent.ExecutorService pool =
- Executors.newFixedThreadPool(threadCount, builder.build());
- try {
- populatePool(pool);
- // How long to wait on empty regions-in-transition. If we timeout, the
- // RIT monitor should do fixup.
- if (sync) result = waitUntilDone(getTimeoutOnRIT());
- } finally {
- // We're done with the pool. It'll exit when its done all in queue.
- pool.shutdown();
- }
- return result;
- }
-
- /**
- * Wait until bulk assign is done.
- * @param timeout How long to wait.
- * @throws InterruptedException
- * @return True if the condition we were waiting on happened.
- */
- protected abstract boolean waitUntilDone(final long timeout)
- throws InterruptedException;
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java
deleted file mode 100644
index d8c511e..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.ServerName;
-
-/**
- * Performs bulk reopen of the list of regions provided to it.
- */
-@InterfaceAudience.Private
-public class BulkReOpen extends BulkAssigner {
- private final Map<ServerName, List<HRegionInfo>> rsToRegions;
- private final AssignmentManager assignmentManager;
- private static final Log LOG = LogFactory.getLog(BulkReOpen.class);
-
- public BulkReOpen(final Server server,
- final Map<ServerName, List<HRegionInfo>> serverToRegions,
- final AssignmentManager am) {
- super(server);
- this.assignmentManager = am;
- this.rsToRegions = serverToRegions;
- }
-
- /**
- * Unassign all regions, so that they go through the regular region
- * assignment flow (in assignment manager) and are re-opened.
- */
- @Override
- protected void populatePool(ExecutorService pool) {
- LOG.debug("Creating threads for each region server ");
- for (Map.Entry<ServerName, List<HRegionInfo>> e : rsToRegions
- .entrySet()) {
- final List<HRegionInfo> hris = e.getValue();
- // add plans for the regions that need to be reopened
- Map<String, RegionPlan> plans = new HashMap<>();
- for (HRegionInfo hri : hris) {
- RegionPlan reOpenPlan = assignmentManager.getRegionReopenPlan(hri);
- plans.put(hri.getEncodedName(), reOpenPlan);
- }
- assignmentManager.addPlans(plans);
- pool.execute(new Runnable() {
- public void run() {
- try {
- unassign(hris);
- } catch (Throwable t) {
- LOG.warn("Failed bulking re-open " + hris.size()
- + " region(s)", t);
- }
- }
- });
- }
- }
-
- /**
- * Reopen the regions asynchronously, so always returns true immediately.
- * @return true
- */
- @Override
- protected boolean waitUntilDone(long timeout) {
- return true;
- }
-
- /**
- * Configuration knobs "hbase.bulk.reopen.threadpool.size" number of regions
- * that can be reopened concurrently. The maximum number of threads the master
- * creates is never more than the number of region servers.
- * If configuration is not defined it defaults to 20
- */
- protected int getThreadCount() {
- int defaultThreadCount = super.getThreadCount();
- return this.server.getConfiguration().getInt(
- "hbase.bulk.reopen.threadpool.size", defaultThreadCount);
- }
-
- public boolean bulkReOpen() throws InterruptedException, IOException {
- return bulkAssign();
- }
-
- /**
- * Unassign the list of regions. Configuration knobs:
- * hbase.bulk.waitbetween.reopen indicates the number of milliseconds to
- * wait before unassigning another region from this region server
- *
- * @param regions
- * @throws InterruptedException
- */
- private void unassign(
- List<HRegionInfo> regions) throws InterruptedException {
- int waitTime = this.server.getConfiguration().getInt(
- "hbase.bulk.waitbetween.reopen", 0);
- RegionStates regionStates = assignmentManager.getRegionStates();
- for (HRegionInfo region : regions) {
- if (server.isStopped()) {
- return;
- }
- if (regionStates.isRegionInTransition(region)) {
- continue;
- }
- assignmentManager.unassign(region);
- while (regionStates.isRegionInTransition(region)
- && !server.isStopped()) {
- regionStates.waitForUpdate(100);
- }
- if (waitTime > 0 && !server.isStopped()) {
- Thread.sleep(waitTime);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
index affd44c..6e727f6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.util.Bytes;
@@ -112,10 +113,13 @@ public class CatalogJanitor extends ScheduledChore {
&& !this.services.isInMaintenanceMode()
&& am != null
&& am.isFailoverCleanupDone()
- && am.getRegionStates().getRegionsInTransition().isEmpty()) {
+ && !am.hasRegionsInTransition()) {
scan();
} else {
- LOG.warn("CatalogJanitor disabled! Not running scan.");
+ LOG.warn("CatalogJanitor disabled! enabled=" + this.enabled.get() +
+ ", maintenanceMode=" + this.services.isInMaintenanceMode() +
+ ", am=" + am + ", failoverCleanupDone=" + (am != null && am.isFailoverCleanupDone()) +
+ ", hasRIT=" + (am != null && am.hasRegionsInTransition()));
}
} catch (IOException e) {
LOG.warn("Failed scan of catalog table", e);
@@ -167,6 +171,7 @@ public class CatalogJanitor extends ScheduledChore {
// Another table, stop scanning
return false;
}
+ if (LOG.isTraceEnabled()) LOG.trace("" + info + " IS-SPLIT_PARENT=" + info.isSplitParent());
if (info.isSplitParent()) splitParents.put(info, r);
if (r.getValue(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER) != null) {
mergedRegions.put(info, r);
@@ -347,8 +352,7 @@ public class CatalogJanitor extends ScheduledChore {
// Check whether it is a merged region and not clean reference
// No necessary to check MERGEB_QUALIFIER because these two qualifiers will
// be inserted/deleted together
- if (rowContent.getValue(HConstants.CATALOG_FAMILY,
- HConstants.MERGEA_QUALIFIER) != null) {
+ if (rowContent.getValue(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER) != null) {
// wait cleaning merge region first
return result;
}
@@ -362,6 +366,12 @@ public class CatalogJanitor extends ScheduledChore {
FileSystem fs = this.services.getMasterFileSystem().getFileSystem();
if (LOG.isTraceEnabled()) LOG.trace("Archiving parent region: " + parent);
HFileArchiver.archiveRegion(this.services.getConfiguration(), fs, parent);
+ AssignmentManager am = this.services.getAssignmentManager();
+ if (am != null) {
+ if (am.getRegionStates() != null) {
+ am.getRegionStates().deleteRegion(parent);
+ }
+ }
MetaTableAccessor.deleteRegion(this.connection, parent);
services.getServerManager().removeRegion(parent);
FavoredNodesManager fnm = this.services.getFavoredNodesManager();
@@ -469,4 +479,4 @@ public class CatalogJanitor extends ScheduledChore {
return cleanMergeRegion(region, mergeRegions.getFirst(),
mergeRegions.getSecond());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java
index faceba2..34a7633 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java
@@ -61,7 +61,7 @@ public class DeadServer {
/**
* Whether a dead server is being processed currently.
*/
- private boolean processing = false;
+ private volatile boolean processing = false;
/**
* A dead server that comes back alive has a different start code. The new start code should be
@@ -123,14 +123,14 @@ public class DeadServer {
* @param sn ServerName for the dead server.
*/
public synchronized void notifyServer(ServerName sn) {
- if (LOG.isDebugEnabled()) { LOG.debug("Started processing " + sn); }
+ if (LOG.isTraceEnabled()) { LOG.trace("Started processing " + sn); }
processing = true;
numProcessing++;
}
public synchronized void finish(ServerName sn) {
numProcessing--;
- if (LOG.isDebugEnabled()) LOG.debug("Finished " + sn + "; numProcessing=" + numProcessing);
+ if (LOG.isTraceEnabled()) LOG.trace("Finished " + sn + "; numProcessing=" + numProcessing);
assert numProcessing >= 0: "Number of dead servers in processing should always be non-negative";
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
deleted file mode 100644
index fc3607f..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master;
-
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.ServerName;
-
-/**
- * Run bulk assign. Does one RCP per regionserver passing a
- * batch of regions using {@link GeneralBulkAssigner.SingleServerBulkAssigner}.
- */
-@InterfaceAudience.Private
-public class GeneralBulkAssigner extends BulkAssigner {
- private static final Log LOG = LogFactory.getLog(GeneralBulkAssigner.class);
-
- private Map<ServerName, List<HRegionInfo>> failedPlans = new ConcurrentHashMap<>();
- private ExecutorService pool;
-
- final Map<ServerName, List<HRegionInfo>> bulkPlan;
- final AssignmentManager assignmentManager;
- final boolean waitTillAllAssigned;
-
- public GeneralBulkAssigner(final Server server,
- final Map<ServerName, List<HRegionInfo>> bulkPlan,
- final AssignmentManager am, final boolean waitTillAllAssigned) {
- super(server);
- this.bulkPlan = bulkPlan;
- this.assignmentManager = am;
- this.waitTillAllAssigned = waitTillAllAssigned;
- }
-
- @Override
- protected String getThreadNamePrefix() {
- return this.server.getServerName() + "-GeneralBulkAssigner";
- }
-
- @Override
- protected void populatePool(ExecutorService pool) {
- this.pool = pool; // shut it down later in case some assigner hangs
- for (Map.Entry<ServerName, List<HRegionInfo>> e: this.bulkPlan.entrySet()) {
- pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue(),
- this.assignmentManager, this.failedPlans));
- }
- }
-
- /**
- *
- * @param timeout How long to wait.
- * @return true if done.
- */
- @Override
- protected boolean waitUntilDone(final long timeout)
- throws InterruptedException {
- Set<HRegionInfo> regionSet = new HashSet<>();
- for (List<HRegionInfo> regionList : bulkPlan.values()) {
- regionSet.addAll(regionList);
- }
-
- pool.shutdown(); // no more task allowed
- int serverCount = bulkPlan.size();
- int regionCount = regionSet.size();
- long startTime = System.currentTimeMillis();
- long rpcWaitTime = startTime + timeout;
- while (!server.isStopped() && !pool.isTerminated()
- && rpcWaitTime > System.currentTimeMillis()) {
- if (failedPlans.isEmpty()) {
- pool.awaitTermination(100, TimeUnit.MILLISECONDS);
- } else {
- reassignFailedPlans();
- }
- }
- if (!pool.isTerminated()) {
- LOG.warn("bulk assigner is still running after "
- + (System.currentTimeMillis() - startTime) + "ms, shut it down now");
- // some assigner hangs, can't wait any more, shutdown the pool now
- List<Runnable> notStarted = pool.shutdownNow();
- if (notStarted != null && !notStarted.isEmpty()) {
- server.abort("some single server assigner hasn't started yet"
- + " when the bulk assigner timed out", null);
- return false;
- }
- }
-
- int reassigningRegions = 0;
- if (!failedPlans.isEmpty() && !server.isStopped()) {
- reassigningRegions = reassignFailedPlans();
- }
- assignmentManager.waitForAssignment(regionSet, waitTillAllAssigned,
- reassigningRegions, Math.max(System.currentTimeMillis(), rpcWaitTime));
-
- if (LOG.isDebugEnabled()) {
- long elapsedTime = System.currentTimeMillis() - startTime;
- String status = "successfully";
- if (!regionSet.isEmpty()) {
- status = "with " + regionSet.size() + " regions still in transition";
- }
- LOG.debug("bulk assigning total " + regionCount + " regions to "
- + serverCount + " servers, took " + elapsedTime + "ms, " + status);
- }
- return regionSet.isEmpty();
- }
-
- @Override
- protected long getTimeoutOnRIT() {
- // Guess timeout. Multiply the max number of regions on a server
- // by how long we think one region takes opening.
- Configuration conf = server.getConfiguration();
- long perRegionOpenTimeGuesstimate =
- conf.getLong("hbase.bulk.assignment.perregion.open.time", 1000);
- int maxRegionsPerServer = 1;
- for (List<HRegionInfo> regionList : bulkPlan.values()) {
- int size = regionList.size();
- if (size > maxRegionsPerServer) {
- maxRegionsPerServer = size;
- }
- }
- long timeout = perRegionOpenTimeGuesstimate * maxRegionsPerServer
- + conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000)
- + conf.getLong("hbase.bulk.assignment.perregionserver.rpc.waittime",
- 30000) * bulkPlan.size();
- LOG.debug("Timeout-on-RIT=" + timeout);
- return timeout;
- }
-
- @Override
- protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
- return new UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- LOG.warn("Assigning regions in " + t.getName(), e);
- }
- };
- }
-
- private int reassignFailedPlans() {
- List<HRegionInfo> reassigningRegions = new ArrayList<>();
- for (Map.Entry<ServerName, List<HRegionInfo>> e : failedPlans.entrySet()) {
- LOG.info("Failed assigning " + e.getValue().size()
- + " regions to server " + e.getKey() + ", reassigning them");
- reassigningRegions.addAll(failedPlans.remove(e.getKey()));
- }
- RegionStates regionStates = assignmentManager.getRegionStates();
- for (HRegionInfo region : reassigningRegions) {
- if (!regionStates.isRegionOnline(region)) {
- assignmentManager.invokeAssign(region);
- }
- }
- return reassigningRegions.size();
- }
-
- /**
- * Manage bulk assigning to a server.
- */
- static class SingleServerBulkAssigner implements Runnable {
- private final ServerName regionserver;
- private final List<HRegionInfo> regions;
- private final AssignmentManager assignmentManager;
- private final Map<ServerName, List<HRegionInfo>> failedPlans;
-
- SingleServerBulkAssigner(final ServerName regionserver,
- final List<HRegionInfo> regions, final AssignmentManager am,
- final Map<ServerName, List<HRegionInfo>> failedPlans) {
- this.regionserver = regionserver;
- this.regions = regions;
- this.assignmentManager = am;
- this.failedPlans = failedPlans;
- }
-
- @Override
- public void run() {
- try {
- if (!assignmentManager.assign(regionserver, regions)) {
- failedPlans.put(regionserver, regions);
- }
- } catch (Throwable t) {
- LOG.warn("Failed bulking assigning " + regions.size()
- + " region(s) to " + regionserver.getServerName()
- + ", and continue to bulk assign others", t);
- failedPlans.put(regionserver, regions);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index e4ba285..8af14c1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -36,6 +36,8 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -65,7 +67,6 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.ProcedureInfo;
-import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
@@ -89,6 +90,10 @@ import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.MasterRpcServices.BalanceSwitchMode;
+import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
+import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
+import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
import org.apache.hadoop.hbase.master.balancer.BalancerChore;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
@@ -109,16 +114,15 @@ import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
import org.apache.hadoop.hbase.master.procedure.DeleteColumnFamilyProcedure;
import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
+import org.apache.hadoop.hbase.master.procedure.DispatchMergingRegionsProcedure;
import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
-import org.apache.hadoop.hbase.master.procedure.MergeTableRegionsProcedure;
import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure;
import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
-import org.apache.hadoop.hbase.master.procedure.SplitTableRegionProcedure;
import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
import org.apache.hadoop.hbase.master.replication.ReplicationManager;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
@@ -333,7 +337,6 @@ public class HMaster extends HRegionServer implements MasterServices {
private RegionNormalizerChore normalizerChore;
private ClusterStatusChore clusterStatusChore;
private ClusterStatusPublisher clusterStatusPublisherChore = null;
- private PeriodicDoMetrics periodicDoMetricsChore = null;
CatalogJanitor catalogJanitorChore;
private ReplicationMetaCleaner replicationMetaCleaner;
@@ -432,19 +435,6 @@ public class HMaster extends HRegionServer implements MasterServices {
}
}
- private static class PeriodicDoMetrics extends ScheduledChore {
- private final HMaster server;
- public PeriodicDoMetrics(int doMetricsInterval, final HMaster server) {
- super(server.getServerName() + "-DoMetricsChore", server, doMetricsInterval);
- this.server = server;
- }
-
- @Override
- protected void chore() {
- server.doMetrics();
- }
- }
-
/**
* Initializes the HMaster. The steps are as follows:
* <p>
@@ -647,20 +637,6 @@ public class HMaster extends HRegionServer implements MasterServices {
return MasterDumpServlet.class;
}
- /**
- * Emit the HMaster metrics, such as region in transition metrics.
- * Surrounding in a try block just to be sure metrics doesn't abort HMaster.
- */
- private void doMetrics() {
- try {
- if (assignmentManager != null) {
- assignmentManager.updateRegionsInTransitionMetrics();
- }
- } catch (Throwable e) {
- LOG.error("Couldn't update metrics: " + e.getMessage());
- }
- }
-
MetricsMaster getMasterMetrics() {
return metricsMaster;
}
@@ -683,8 +659,9 @@ public class HMaster extends HRegionServer implements MasterServices {
this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
this.splitOrMergeTracker.start();
- this.assignmentManager = new AssignmentManager(this, serverManager,
- this.balancer, this.service, this.metricsMaster, tableStateManager);
+ // Create Assignment Manager
+ this.assignmentManager = new AssignmentManager(this);
+ this.assignmentManager.start();
this.replicationManager = new ReplicationManager(conf, zooKeeper, this);
@@ -870,10 +847,6 @@ public class HMaster extends HRegionServer implements MasterServices {
this.catalogJanitorChore = new CatalogJanitor(this);
getChoreService().scheduleChore(catalogJanitorChore);
- // Do Metrics periodically
- periodicDoMetricsChore = new PeriodicDoMetrics(msgInterval, this);
- getChoreService().scheduleChore(periodicDoMetricsChore);
-
status.setStatus("Starting cluster schema service");
initClusterSchemaService();
@@ -886,7 +859,8 @@ public class HMaster extends HRegionServer implements MasterServices {
}
status.markComplete("Initialization successful");
- LOG.info("Master has completed initialization");
+ LOG.info(String.format("Master has completed initialization %.3fsec",
+ (System.currentTimeMillis() - masterActiveTime) / 1000.0f));
configurationManager.registerObserver(this.balancer);
configurationManager.registerObserver(this.hfileCleaner);
@@ -965,8 +939,8 @@ public class HMaster extends HRegionServer implements MasterServices {
// Check zk for region servers that are up but didn't register
for (ServerName sn: this.regionServerTracker.getOnlineServers()) {
// The isServerOnline check is opportunistic, correctness is handled inside
- if (!this.serverManager.isServerOnline(sn)
- && serverManager.checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
+ if (!this.serverManager.isServerOnline(sn) &&
+ serverManager.checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
LOG.info("Registered server found up in zk but who has not yet reported in: " + sn);
}
}
@@ -1093,12 +1067,6 @@ public class HMaster extends HRegionServer implements MasterServices {
}
@Override
- protected void sendShutdownInterrupt() {
- super.sendShutdownInterrupt();
- stopProcedureExecutor();
- }
-
- @Override
protected void stopServiceThreads() {
if (masterJettyServer != null) {
LOG.info("Stopping master jetty server");
@@ -1120,15 +1088,20 @@ public class HMaster extends HRegionServer implements MasterServices {
if (LOG.isDebugEnabled()) {
LOG.debug("Stopping service threads");
}
+
// Clean up and close up shop
if (this.logCleaner != null) this.logCleaner.cancel(true);
if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
if (this.replicationZKNodeCleanerChore != null) this.replicationZKNodeCleanerChore.cancel(true);
if (this.replicationMetaCleaner != null) this.replicationMetaCleaner.cancel(true);
if (this.quotaManager != null) this.quotaManager.stop();
+
if (this.activeMasterManager != null) this.activeMasterManager.stop();
if (this.serverManager != null) this.serverManager.stop();
if (this.assignmentManager != null) this.assignmentManager.stop();
+
+ stopProcedureExecutor();
+
if (this.walManager != null) this.walManager.stop();
if (this.fileSystemManager != null) this.fileSystemManager.stop();
if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
@@ -1154,16 +1127,20 @@ public class HMaster extends HRegionServer implements MasterServices {
MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
procedureStore.start(numThreads);
procedureExecutor.start(numThreads, abortOnCorruption);
+ procEnv.getRemoteDispatcher().start();
}
private void stopProcedureExecutor() {
if (procedureExecutor != null) {
configurationManager.deregisterObserver(procedureExecutor.getEnvironment());
+ procedureExecutor.getEnvironment().getRemoteDispatcher().stop();
procedureExecutor.stop();
+ procedureExecutor = null;
}
if (procedureStore != null) {
procedureStore.stop(isAborted());
+ procedureStore = null;
}
}
@@ -1192,10 +1169,6 @@ public class HMaster extends HRegionServer implements MasterServices {
if (this.mobCompactThread != null) {
this.mobCompactThread.close();
}
-
- if (this.periodicDoMetricsChore != null) {
- periodicDoMetricsChore.cancel();
- }
}
/**
@@ -1253,7 +1226,7 @@ public class HMaster extends HRegionServer implements MasterServices {
// Sleep to next balance plan start time
// But if there are zero regions in transition, it can skip sleep to speed up.
while (!interrupted && System.currentTimeMillis() < nextBalanceStartTime
- && this.assignmentManager.getRegionStates().getRegionsInTransitionCount() != 0) {
+ && this.assignmentManager.getRegionStates().hasRegionsInTransition()) {
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
@@ -1264,7 +1237,7 @@ public class HMaster extends HRegionServer implements MasterServices {
// Throttling by max number regions in transition
while (!interrupted
&& maxRegionsInTransition > 0
- && this.assignmentManager.getRegionStates().getRegionsInTransitionCount()
+ && this.assignmentManager.getRegionStates().getRegionsInTransition().size()
>= maxRegionsInTransition && System.currentTimeMillis() <= cutoffTime) {
try {
// sleep if the number of regions in transition exceeds the limit
@@ -1297,13 +1270,12 @@ public class HMaster extends HRegionServer implements MasterServices {
synchronized (this.balancer) {
// If balance not true, don't run balancer.
if (!this.loadBalancerTracker.isBalancerOn()) return false;
- // Only allow one balance run at at time.
- if (this.assignmentManager.getRegionStates().isRegionsInTransition()) {
- Set<RegionState> regionsInTransition =
- this.assignmentManager.getRegionStates().getRegionsInTransition();
+ // Only allow one balance run at at time.
+ if (this.assignmentManager.hasRegionsInTransition()) {
+ List<RegionStateNode> regionsInTransition = assignmentManager.getRegionsInTransition();
// if hbase:meta region is in transition, result of assignment cannot be recorded
// ignore the force flag in that case
- boolean metaInTransition = assignmentManager.getRegionStates().isMetaRegionInTransition();
+ boolean metaInTransition = assignmentManager.isMetaRegionInTransition();
String prefix = force && !metaInTransition ? "R" : "Not r";
LOG.debug(prefix + "unning balancer because " + regionsInTransition.size() +
" region(s) in transition: " + org.apache.commons.lang.StringUtils.
@@ -1336,7 +1308,7 @@ public class HMaster extends HRegionServer implements MasterServices {
//Give the balancer the current cluster state.
this.balancer.setClusterStatus(getClusterStatus());
this.balancer.setClusterLoad(
- this.assignmentManager.getRegionStates().getAssignmentsByTable(true));
+ this.assignmentManager.getRegionStates().getAssignmentsByTable());
for (Entry<TableName, Map<ServerName, List<HRegionInfo>>> e : assignmentsByTable.entrySet()) {
List<RegionPlan> partialPlans = this.balancer.balanceCluster(e.getKey(), e.getValue());
@@ -1355,7 +1327,7 @@ public class HMaster extends HRegionServer implements MasterServices {
for (RegionPlan plan: plans) {
LOG.info("balance " + plan);
//TODO: bulk assign
- this.assignmentManager.balance(plan);
+ this.assignmentManager.moveAsync(plan);
rpCount++;
balanceThrottling(balanceStartTime + rpCount * balanceInterval, maxRegionsInTransition,
@@ -1471,6 +1443,59 @@ public class HMaster extends HRegionServer implements MasterServices {
}
@Override
+ public long dispatchMergingRegions(
+ final HRegionInfo regionInfoA,
+ final HRegionInfo regionInfoB,
+ final boolean forcible,
+ final long nonceGroup,
+ final long nonce) throws IOException {
+ checkInitialized();
+
+ TableName tableName = regionInfoA.getTable();
+ if (tableName == null || regionInfoB.getTable() == null) {
+ throw new UnknownRegionException ("Can't merge regions without table associated");
+ }
+
+ if (!tableName.equals(regionInfoB.getTable())) {
+ throw new IOException ("Cannot merge regions from two different tables");
+ }
+
+ if (regionInfoA.compareTo(regionInfoB) == 0) {
+ throw new MergeRegionException(
+ "Cannot merge a region to itself " + regionInfoA + ", " + regionInfoB);
+ }
+
+ final HRegionInfo [] regionsToMerge = new HRegionInfo[2];
+ regionsToMerge [0] = regionInfoA;
+ regionsToMerge [1] = regionInfoB;
+
+ return MasterProcedureUtil.submitProcedure(
+ new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
+ @Override
+ protected void run() throws IOException {
+ MasterCoprocessorHost mcph = getMaster().getMasterCoprocessorHost();
+ if (mcph != null) {
+ mcph.preDispatchMerge(regionInfoA, regionInfoB);
+ }
+
+ LOG.info(getClientIdAuditPrefix() + " Dispatch merge regions " +
+ regionsToMerge[0].getEncodedName() + " and " + regionsToMerge[1].getEncodedName());
+
+ submitProcedure(new DispatchMergingRegionsProcedure(
+ procedureExecutor.getEnvironment(), tableName, regionsToMerge, forcible));
+ if (mcph != null) {
+ mcph.postDispatchMerge(regionInfoA, regionInfoB);
+ }
+ }
+
+ @Override
+ protected String getDescription() {
+ return "DispatchMergingRegionsProcedure";
+ }
+ });
+ }
+
+ @Override
public long mergeRegions(
final HRegionInfo[] regionsToMerge,
final boolean forcible,
@@ -1513,40 +1538,38 @@ public class HMaster extends HRegionServer implements MasterServices {
@Override
protected String getDescription() {
- return "DisableTableProcedure";
+ return "MergeTableProcedure";
}
});
}
@Override
- public long splitRegion(
- final HRegionInfo regionInfo,
- final byte[] splitRow,
- final long nonceGroup,
- final long nonce) throws IOException {
+ public long splitRegion(final HRegionInfo regionInfo, final byte[] splitRow,
+ final long nonceGroup, final long nonce)
+ throws IOException {
checkInitialized();
-
return MasterProcedureUtil.submitProcedure(
new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
@Override
protected void run() throws IOException {
getMaster().getMasterCoprocessorHost().preSplitRegion(regionInfo.getTable(), splitRow);
-
- LOG.info(getClientIdAuditPrefix() + " Split region " + regionInfo);
+ LOG.info(getClientIdAuditPrefix() + " split " + regionInfo.getRegionNameAsString());
// Execute the operation asynchronously
- submitProcedure(new SplitTableRegionProcedure(procedureExecutor.getEnvironment(),
- regionInfo, splitRow));
+ submitProcedure(getAssignmentManager().createSplitProcedure(regionInfo, splitRow));
}
@Override
protected String getDescription() {
- return "DisableTableProcedure";
+ return "SplitTableProcedure";
}
});
}
- @VisibleForTesting // Public so can be accessed by tests.
+ // Public so can be accessed by tests. Blocks until move is done.
+ // Replace with an async implementation from which you can get
+ // a success/failure result.
+ @VisibleForTesting
public void move(final byte[] encodedRegionName,
final byte[] destServerName) throws HBaseIOException {
RegionState regionState = assignmentManager.getRegionStates().
@@ -1597,6 +1620,8 @@ public class HMaster extends HRegionServer implements MasterServices {
// Now we can do the move
RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), dest);
+ assert rp.getDestination() != null: rp.toString() + " " + dest;
+ assert rp.getSource() != null: rp.toString();
try {
checkInitialized();
@@ -1605,13 +1630,20 @@ public class HMaster extends HRegionServer implements MasterServices {
return;
}
}
- // warmup the region on the destination before initiating the move. this call
+ // Warmup the region on the destination before initiating the move. this call
// is synchronous and takes some time. doing it before the source region gets
// closed
serverManager.sendRegionWarmup(rp.getDestination(), hri);
LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
- this.assignmentManager.balance(rp);
+ Future<byte []> future = this.assignmentManager.moveAsync(rp);
+ try {
+ // Is this going to work? Will we throw exception on error?
+ // TODO: CompletableFuture rather than this stunted Future.
+ future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new HBaseIOException(e);
+ }
if (this.cpHost != null) {
this.cpHost.postMove(hri, rp.getSource(), rp.getDestination());
}
@@ -1950,7 +1982,7 @@ public class HMaster extends HRegionServer implements MasterServices {
status.cleanup();
}
}
- }, getServerName().toShortString() + ".activeMasterManager"));
+ }, getServerName().toShortString() + ".masterManager"));
}
private void checkCompression(final HTableDescriptor htd)
@@ -2382,8 +2414,9 @@ public class HMaster extends HRegionServer implements MasterServices {
String clusterId = fileSystemManager != null ?
fileSystemManager.getClusterId().toString() : null;
- Set<RegionState> regionsInTransition = assignmentManager != null ?
- assignmentManager.getRegionStates().getRegionsInTransition() : null;
+ List<RegionState> regionsInTransition = assignmentManager != null ?
+ assignmentManager.getRegionStates().getRegionsStateInTransition() : null;
+
String[] coprocessors = cpHost != null ? getMasterCoprocessors() : null;
boolean balancerOn = loadBalancerTracker != null ?
loadBalancerTracker.isBalancerOn() : false;
@@ -2701,7 +2734,7 @@ public class HMaster extends HRegionServer implements MasterServices {
* @see org.apache.hadoop.hbase.master.HMasterCommandLine
*/
public static void main(String [] args) {
- LOG.info("***** STARTING service '" + HMaster.class.getSimpleName() + "' *****");
+ LOG.info("STARTING service '" + HMaster.class.getSimpleName());
VersionInfo.logVersion();
new HMasterCommandLine(HMaster.class).doMain(args);
}
@@ -3146,6 +3179,7 @@ public class HMaster extends HRegionServer implements MasterServices {
* @param switchType see {@link org.apache.hadoop.hbase.client.MasterSwitchType}
* @return The state of the switch
*/
+ @Override
public boolean isSplitOrMergeEnabled(MasterSwitchType switchType) {
if (null == splitOrMergeTracker || isInMaintenanceMode()) {
return false;
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
index 01540b7..0e86925 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
@@ -45,7 +45,7 @@ import edu.umd.cs.findbugs.annotations.Nullable;
* locations for all Regions in a cluster.
*
* <p>This classes produces plans for the
- * {@link org.apache.hadoop.hbase.master.AssignmentManager}
+ * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager}
* to execute.
*/
@InterfaceAudience.Private
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
index 2f5e66e..3ecc83d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
@@ -810,6 +810,28 @@ public class MasterCoprocessorHost
});
}
+ public void preDispatchMerge(final HRegionInfo regionInfoA, final HRegionInfo regionInfoB)
+ throws IOException {
+ execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ @Override
+ public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.preDispatchMerge(ctx, regionInfoA, regionInfoB);
+ }
+ });
+ }
+
+ public void postDispatchMerge(final HRegionInfo regionInfoA, final HRegionInfo regionInfoB)
+ throws IOException {
+ execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ @Override
+ public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.postDispatchMerge(ctx, regionInfoA, regionInfoB);
+ }
+ });
+ }
+
public void preMergeRegions(final HRegionInfo[] regionsToMerge)
throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterDumpServlet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterDumpServlet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterDumpServlet.java
index a921ab5..a48444c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterDumpServlet.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterDumpServlet.java
@@ -24,7 +24,6 @@ import java.io.PrintStream;
import java.io.PrintWriter;
import java.util.Date;
import java.util.Map;
-import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -33,6 +32,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
+import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
import org.apache.hadoop.hbase.monitoring.LogMonitoring;
import org.apache.hadoop.hbase.monitoring.StateDumpServlet;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@ -117,9 +118,8 @@ public class MasterDumpServlet extends StateDumpServlet {
return;
}
- Set<RegionState> regionsInTransition = am.getRegionStates().getRegionsInTransition();
- for (RegionState rs : regionsInTransition) {
- String rid = rs.getRegion().getRegionNameAsString();
+ for (RegionStateNode rs : am.getRegionsInTransition()) {
+ String rid = rs.getRegionInfo().getEncodedName();
out.println("Region " + rid + ": " + rs.toDescriptiveString());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
index 1988e2d..049e659 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.master;
import java.io.IOException;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -33,8 +32,8 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.TableState;
+import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -108,14 +107,7 @@ public class MasterMetaBootstrap {
}
private void splitMetaLogBeforeAssignment(ServerName currentMetaServer) throws IOException {
- if (RecoveryMode.LOG_REPLAY == master.getMasterWalManager().getLogRecoveryMode()) {
- // In log replay mode, we mark hbase:meta region as recovering in ZK
- master.getMasterWalManager().prepareLogReplay(currentMetaServer,
- Collections.<HRegionInfo>singleton(HRegionInfo.FIRST_META_REGIONINFO));
- } else {
- // In recovered.edits mode: create recovered edits file for hbase:meta server
- master.getMasterWalManager().splitMetaLog(currentMetaServer);
- }
+ master.getMasterWalManager().splitMetaLog(currentMetaServer);
}
private void unassignExcessMetaReplica(int numMetaReplicasConfigured) {
@@ -151,7 +143,9 @@ public class MasterMetaBootstrap {
// Work on meta region
int assigned = 0;
- long timeout = master.getConfiguration().getLong("hbase.catalog.verification.timeout", 1000);
+ // TODO: Unimplemented
+ // long timeout =
+ // master.getConfiguration().getLong("hbase.catalog.verification.timeout", 1000);
if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
status.setStatus("Assigning hbase:meta region");
} else {
@@ -160,37 +154,10 @@ public class MasterMetaBootstrap {
// Get current meta state from zk.
RegionState metaState = MetaTableLocator.getMetaRegionState(master.getZooKeeper(), replicaId);
- HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO,
- replicaId);
- RegionStates regionStates = assignmentManager.getRegionStates();
- regionStates.createRegionState(hri, metaState.getState(),
- metaState.getServerName(), null);
-
- if (!metaState.isOpened() || !master.getMetaTableLocator().verifyMetaRegionLocation(
- master.getClusterConnection(), master.getZooKeeper(), timeout, replicaId)) {
- ServerName currentMetaServer = metaState.getServerName();
- if (master.getServerManager().isServerOnline(currentMetaServer)) {
- if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
- LOG.info("Meta was in transition on " + currentMetaServer);
- } else {
- LOG.info("Meta with replicaId " + replicaId + " was in transition on " +
- currentMetaServer);
- }
- assignmentManager.processRegionsInTransition(Collections.singletonList(metaState));
- } else {
- if (currentMetaServer != null) {
- if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
- splitMetaLogBeforeAssignment(currentMetaServer);
- regionStates.logSplit(HRegionInfo.FIRST_META_REGIONINFO);
- previouslyFailedMetaRSs.add(currentMetaServer);
- }
- }
- LOG.info("Re-assigning hbase:meta with replicaId, " + replicaId +
- " it was on " + currentMetaServer);
- assignmentManager.assignMeta(hri);
- }
- assigned++;
- }
+ LOG.debug("meta state from zookeeper: " + metaState);
+ HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(
+ HRegionInfo.FIRST_META_REGIONINFO, replicaId);
+ assignmentManager.assignMeta(hri, metaState.getServerName());
if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
// TODO: should we prevent from using state manager before meta was initialized?
@@ -199,14 +166,6 @@ public class MasterMetaBootstrap {
.setTableState(TableName.META_TABLE_NAME, TableState.State.ENABLED);
}
- if ((RecoveryMode.LOG_REPLAY == master.getMasterWalManager().getLogRecoveryMode())
- && (!previouslyFailedMetaRSs.isEmpty())) {
- // replay WAL edits mode need new hbase:meta RS is assigned firstly
- status.setStatus("replaying log for Meta Region");
- master.getMasterWalManager().splitMetaLog(previouslyFailedMetaRSs);
- }
-
- assignmentManager.setEnabledTable(TableName.META_TABLE_NAME);
master.getTableStateManager().start();
// Make sure a hbase:meta location is set. We need to enable SSH here since
@@ -214,7 +173,7 @@ public class MasterMetaBootstrap {
// by SSH so that system tables can be assigned.
// No need to wait for meta is assigned = 0 when meta is just verified.
if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) enableCrashedServerProcessing(assigned != 0);
- LOG.info("hbase:meta with replicaId " + replicaId + " assigned=" + assigned + ", location="
+ LOG.info("hbase:meta with replicaId " + replicaId + ", location="
+ master.getMetaTableLocator().getMetaRegionLocation(master.getZooKeeper(), replicaId));
status.setStatus("META assigned.");
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 40c4a71..302464e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
-import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
@@ -43,6 +42,7 @@ import org.apache.hadoop.hbase.UnknownRegionException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.TableState;
+import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.QosPriority;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.master.locking.LockProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.NonceProcedureRunnable;
@@ -78,7 +79,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.*;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatRequest;
@@ -95,13 +95,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
@@ -268,7 +265,11 @@ public class MasterRpcServices extends RSRpcServices
ClusterStatusProtos.ServerLoad sl = request.getLoad();
ServerName serverName = ProtobufUtil.toServerName(request.getServer());
ServerLoad oldLoad = master.getServerManager().getLoad(serverName);
- master.getServerManager().regionServerReport(serverName, new ServerLoad(sl));
+ ServerLoad newLoad = new ServerLoad(sl);
+ master.getServerManager().regionServerReport(serverName, newLoad);
+ int version = VersionInfoUtil.getCurrentClientVersionNumber();
+ master.getAssignmentManager().reportOnlineRegions(serverName,
+ version, newLoad.getRegionsLoad().keySet());
if (sl != null && master.metricsMaster != null) {
// Up our metrics.
master.metricsMaster.incrementRequests(sl.getTotalNumberOfRequests()
@@ -341,25 +342,25 @@ public class MasterRpcServices extends RSRpcServices
public AssignRegionResponse assignRegion(RpcController controller,
AssignRegionRequest req) throws ServiceException {
try {
- final byte [] regionName = req.getRegion().getValue().toByteArray();
- RegionSpecifierType type = req.getRegion().getType();
- AssignRegionResponse arr = AssignRegionResponse.newBuilder().build();
-
master.checkInitialized();
+
+ final RegionSpecifierType type = req.getRegion().getType();
if (type != RegionSpecifierType.REGION_NAME) {
LOG.warn("assignRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
+ " actual: " + type);
}
- RegionStates regionStates = master.getAssignmentManager().getRegionStates();
- HRegionInfo regionInfo = regionStates.getRegionInfo(regionName);
- if (regionInfo == null) throw new UnknownRegionException(Bytes.toString(regionName));
+
+ final byte[] regionName = req.getRegion().getValue().toByteArray();
+ final HRegionInfo regionInfo = master.getAssignmentManager().getRegionInfo(regionName);
+ if (regionInfo == null) throw new UnknownRegionException(Bytes.toStringBinary(regionName));
+
+ final AssignRegionResponse arr = AssignRegionResponse.newBuilder().build();
if (master.cpHost != null) {
if (master.cpHost.preAssign(regionInfo)) {
return arr;
}
}
- LOG.info(master.getClientIdAuditPrefix()
- + " assign " + regionInfo.getRegionNameAsString());
+ LOG.info(master.getClientIdAuditPrefix() + " assign " + regionInfo.getRegionNameAsString());
master.getAssignmentManager().assign(regionInfo, true);
if (master.cpHost != null) {
master.cpHost.postAssign(regionInfo);
@@ -370,6 +371,7 @@ public class MasterRpcServices extends RSRpcServices
}
}
+
@Override
public BalanceResponse balance(RpcController controller,
BalanceRequest request) throws ServiceException {
@@ -589,8 +591,7 @@ public class MasterRpcServices extends RSRpcServices
}
@Override
- public SplitTableRegionResponse splitRegion(
- final RpcController controller,
+ public SplitTableRegionResponse splitRegion(final RpcController controller,
final SplitTableRegionRequest request) throws ServiceException {
try {
long procId = master.splitRegion(
@@ -1177,24 +1178,24 @@ public class MasterRpcServices extends RSRpcServices
@Override
public OfflineRegionResponse offlineRegion(RpcController controller,
OfflineRegionRequest request) throws ServiceException {
- final byte [] regionName = request.getRegion().getValue().toByteArray();
- RegionSpecifierType type = request.getRegion().getType();
- if (type != RegionSpecifierType.REGION_NAME) {
- LOG.warn("moveRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
- + " actual: " + type);
- }
-
try {
master.checkInitialized();
- Pair<HRegionInfo, ServerName> pair =
- MetaTableAccessor.getRegion(master.getConnection(), regionName);
- if (pair == null) throw new UnknownRegionException(Bytes.toStringBinary(regionName));
- HRegionInfo hri = pair.getFirst();
+
+ final RegionSpecifierType type = request.getRegion().getType();
+ if (type != RegionSpecifierType.REGION_NAME) {
+ LOG.warn("moveRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
+ + " actual: " + type);
+ }
+
+ final byte[] regionName = request.getRegion().getValue().toByteArray();
+ final HRegionInfo hri = master.getAssignmentManager().getRegionInfo(regionName);
+ if (hri == null) throw new UnknownRegionException(Bytes.toStringBinary(regionName));
+
if (master.cpHost != null) {
master.cpHost.preRegionOffline(hri);
}
LOG.info(master.getClientIdAuditPrefix() + " offline " + hri.getRegionNameAsString());
- master.getAssignmentManager().regionOffline(hri);
+ master.getAssignmentManager().offlineRegion(hri);
if (master.cpHost != null) {
master.cpHost.postRegionOffline(hri);
}
@@ -1379,26 +1380,7 @@ public class MasterRpcServices extends RSRpcServices
ReportRegionStateTransitionRequest req) throws ServiceException {
try {
master.checkServiceStarted();
- RegionStateTransition rt = req.getTransition(0);
- RegionStates regionStates = master.getAssignmentManager().getRegionStates();
- for (RegionInfo ri : rt.getRegionInfoList()) {
- TableName tableName = ProtobufUtil.toTableName(ri.getTableName());
- if (!(TableName.META_TABLE_NAME.equals(tableName)
- && regionStates.getRegionState(HRegionInfo.FIRST_META_REGIONINFO) != null)
- && !master.getAssignmentManager().isFailoverCleanupDone()) {
- // Meta region is assigned before master finishes the
- // failover cleanup. So no need this check for it
- throw new PleaseHoldException("Master is rebuilding user regions");
- }
- }
- ServerName sn = ProtobufUtil.toServerName(req.getServer());
- String error = master.getAssignmentManager().onRegionTransition(sn, rt);
- ReportRegionStateTransitionResponse.Builder rrtr =
- ReportRegionStateTransitionResponse.newBuilder();
- if (error != null) {
- rrtr.setErrorMessage(error);
- }
- return rrtr.build();
+ return master.getAssignmentManager().reportRegionStateTransition(req);
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
@@ -1901,4 +1883,34 @@ public class MasterRpcServices extends RSRpcServices
throw new ServiceException(e);
}
}
-}
+
+ @Override
+ public DispatchMergingRegionsResponse dispatchMergingRegions(RpcController controller,
+ DispatchMergingRegionsRequest request) throws ServiceException {
+ final byte[] encodedNameOfRegionA = request.getRegionA().getValue().toByteArray();
+ final byte[] encodedNameOfRegionB = request.getRegionB().getValue().toByteArray();
+ if (request.getRegionA().getType() != RegionSpecifierType.ENCODED_REGION_NAME ||
+ request.getRegionB().getType() != RegionSpecifierType.ENCODED_REGION_NAME) {
+ LOG.warn("mergeRegions specifier type: expected: " + RegionSpecifierType.ENCODED_REGION_NAME +
+ " actual: region_a=" +
+ request.getRegionA().getType() + ", region_b=" +
+ request.getRegionB().getType());
+ }
+ RegionStates regionStates = master.getAssignmentManager().getRegionStates();
+ RegionState regionStateA = regionStates.getRegionState(Bytes.toString(encodedNameOfRegionA));
+ RegionState regionStateB = regionStates.getRegionState(Bytes.toString(encodedNameOfRegionB));
+ if (regionStateA == null || regionStateB == null) {
+ throw new ServiceException(new UnknownRegionException(
+ Bytes.toStringBinary(regionStateA == null? encodedNameOfRegionA: encodedNameOfRegionB)));
+ }
+ final HRegionInfo regionInfoA = regionStateA.getRegion();
+ final HRegionInfo regionInfoB = regionStateB.getRegion();
+ try {
+ long procId = master.dispatchMergingRegions(regionInfoA, regionInfoB, request.getForcible(),
+ request.getNonceGroup(), request.getNonce());
+ return DispatchMergingRegionsResponse.newBuilder().setProcId(procId).build();
+ } catch (IOException ioe) {
+ throw new ServiceException(ioe);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index 4924d72..fd17e6f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -32,7 +32,9 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
@@ -266,6 +268,23 @@ public interface MasterServices extends Server {
throws IOException;
/**
+ * Merge two regions. The real implementation is on the regionserver, master
+ * just move the regions together and send MERGE RPC to regionserver
+ * @param region_a region to merge
+ * @param region_b region to merge
+ * @param forcible true if do a compulsory merge, otherwise we will only merge
+ * two adjacent regions
+ * @return procedure Id
+ * @throws IOException
+ */
+ long dispatchMergingRegions(
+ final HRegionInfo region_a,
+ final HRegionInfo region_b,
+ final boolean forcible,
+ final long nonceGroup,
+ final long nonce) throws IOException;
+
+ /**
* Merge regions in a table.
* @param regionsToMerge daughter regions to merge
* @param forcible whether to force to merge even two regions are not adjacent
@@ -401,6 +420,8 @@ public interface MasterServices extends Server {
*/
boolean isStopping();
+ boolean isSplitOrMergeEnabled(MasterSwitchType switchType);
+
/**
* @return Favored Nodes Manager
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
index 105fa29..928702e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
@@ -18,8 +18,6 @@
*/
package org.apache.hadoop.hbase.master;
-import com.google.common.annotations.VisibleForTesting;
-
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
@@ -41,12 +39,13 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WALSplitter;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* This class abstracts a bunch of operations the HMaster needs
* when splitting log files e.g. finding log files, dirs etc.
@@ -332,16 +331,4 @@ public class MasterWalManager {
}
}
}
-
- /**
- * The function is used in SSH to set recovery mode based on configuration after all outstanding
- * log split tasks drained.
- */
- public void setLogRecoveryMode() throws IOException {
- this.splitLogManager.setRecoveryMode(false);
- }
-
- public RecoveryMode getLogRecoveryMode() {
- return this.splitLogManager.getRecoveryMode();
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManager.java
index 40e79ae..c7ce9a9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManager.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.master;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
public class MetricsAssignmentManager {
-
private final MetricsAssignmentManagerSource assignmentManagerSource;
public MetricsAssignmentManager() {
@@ -33,19 +32,11 @@ public class MetricsAssignmentManager {
return assignmentManagerSource;
}
- public void updateAssignmentTime(long time) {
- assignmentManagerSource.updateAssignmentTime(time);
- }
-
- public void updateBulkAssignTime(long time) {
- assignmentManagerSource.updateBulkAssignTime(time);
- }
-
/**
* set new value for number of regions in transition.
* @param ritCount
*/
- public void updateRITCount(int ritCount) {
+ public void updateRITCount(final int ritCount) {
assignmentManagerSource.setRIT(ritCount);
}
@@ -54,14 +45,15 @@ public class MetricsAssignmentManager {
* as defined by the property rit.metrics.threshold.time.
* @param ritCountOverThreshold
*/
- public void updateRITCountOverThreshold(int ritCountOverThreshold) {
+ public void updateRITCountOverThreshold(final int ritCountOverThreshold) {
assignmentManagerSource.setRITCountOverThreshold(ritCountOverThreshold);
}
+
/**
* update the timestamp for oldest region in transition metrics.
* @param timestamp
*/
- public void updateRITOldestAge(long timestamp) {
+ public void updateRITOldestAge(final long timestamp) {
assignmentManagerSource.setRITOldestAge(timestamp);
}
@@ -72,4 +64,27 @@ public class MetricsAssignmentManager {
public void updateRitDuration(long duration) {
assignmentManagerSource.updateRitDuration(duration);
}
+
+ /*
+ * Increment the count of assignment operation (assign/unassign).
+ */
+ public void incrementOperationCounter() {
+ assignmentManagerSource.incrementOperationCounter();
+ }
+
+ /**
+ * Add the time took to perform the last assign operation
+ * @param time
+ */
+ public void updateAssignTime(final long time) {
+ assignmentManagerSource.updateAssignTime(time);
+ }
+
+ /**
+ * Add the time took to perform the last unassign operation
+ * @param time
+ */
+ public void updateUnassignTime(final long time) {
+ assignmentManagerSource.updateUnassignTime(time);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/NoSuchProcedureException.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/NoSuchProcedureException.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/NoSuchProcedureException.java
new file mode 100644
index 0000000..e119e88
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/NoSuchProcedureException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+// Based on HBaseIOE rather than PE because easier to integrate when an IOE.
+public class NoSuchProcedureException extends HBaseIOException {
+ public NoSuchProcedureException() {
+ super();
+ }
+
+ public NoSuchProcedureException(String s) {
+ super(s);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java
index cd6b313..17eb346 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java
@@ -135,8 +135,8 @@ public class RegionPlan implements Comparable<RegionPlan> {
@Override
public String toString() {
- return "hri=" + this.hri.getRegionNameAsString() + ", src=" +
+ return "hri=" + this.hri.getRegionNameAsString() + ", source=" +
(this.source == null? "": this.source.toString()) +
- ", dest=" + (this.dest == null? "": this.dest.toString());
+ ", destination=" + (this.dest == null? "": this.dest.toString());
}
}