You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/23 18:37:49 UTC
[12/28] hbase git commit: HBASE-18087 Fix unit tests in
TestTableFavoredNodes
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java
index 4d67edd..4f4b5b1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java
@@ -21,34 +21,20 @@ package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.master.AssignmentManager;
-import org.apache.hadoop.hbase.master.BulkAssigner;
-import org.apache.hadoop.hbase.master.GeneralBulkAssigner;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
-import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.master.RegionStates;
-import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.EnableTableState;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
@InterfaceAudience.Private
public class EnableTableProcedure
@@ -114,7 +100,7 @@ public class EnableTableProcedure
setNextState(EnableTableState.ENABLE_TABLE_MARK_REGIONS_ONLINE);
break;
case ENABLE_TABLE_MARK_REGIONS_ONLINE:
- markRegionsOnline(env, tableName, true);
+ addChildProcedure(env.getAssignmentManager().createAssignProcedures(tableName));
setNextState(EnableTableState.ENABLE_TABLE_SET_ENABLED_TABLE_STATE);
break;
case ENABLE_TABLE_SET_ENABLED_TABLE_STATE:
@@ -287,137 +273,6 @@ public class EnableTableProcedure
}
/**
- * Mark offline regions of the table online with retry
- * @param env MasterProcedureEnv
- * @param tableName the target table
- * @param retryRequired whether to retry if the first run failed
- * @throws IOException
- */
- protected static void markRegionsOnline(
- final MasterProcedureEnv env,
- final TableName tableName,
- final Boolean retryRequired) throws IOException {
- // This is best effort approach to make all regions of a table online. If we fail to do
- // that, it is ok that the table has some offline regions; user can fix it manually.
-
- // Dev consideration: add a config to control max number of retry. For now, it is hard coded.
- int maxTry = (retryRequired ? 10 : 1);
- boolean done = false;
-
- do {
- try {
- done = markRegionsOnline(env, tableName);
- if (done) {
- break;
- }
- maxTry--;
- } catch (Exception e) {
- LOG.warn("Received exception while marking regions online. tries left: " + maxTry, e);
- maxTry--;
- if (maxTry > 0) {
- continue; // we still have some retry left, try again.
- }
- throw e;
- }
- } while (maxTry > 0);
-
- if (!done) {
- LOG.warn("Some or all regions of the Table '" + tableName + "' were offline");
- }
- }
-
- /**
- * Mark offline regions of the table online
- * @param env MasterProcedureEnv
- * @param tableName the target table
- * @return whether the operation is fully completed or being interrupted.
- * @throws IOException
- */
- private static boolean markRegionsOnline(final MasterProcedureEnv env, final TableName tableName)
- throws IOException {
- final AssignmentManager assignmentManager = env.getMasterServices().getAssignmentManager();
- final MasterServices masterServices = env.getMasterServices();
- final ServerManager serverManager = masterServices.getServerManager();
- boolean done = false;
- // Get the regions of this table. We're done when all listed
- // tables are onlined.
- List<Pair<HRegionInfo, ServerName>> tableRegionsAndLocations;
-
- if (TableName.META_TABLE_NAME.equals(tableName)) {
- tableRegionsAndLocations =
- new MetaTableLocator().getMetaRegionsAndLocations(masterServices.getZooKeeper());
- } else {
- tableRegionsAndLocations =
- MetaTableAccessor.getTableRegionsAndLocations(masterServices.getConnection(), tableName);
- }
-
- int countOfRegionsInTable = tableRegionsAndLocations.size();
- Map<HRegionInfo, ServerName> regionsToAssign =
- regionsToAssignWithServerName(env, tableRegionsAndLocations);
-
- // need to potentially create some regions for the replicas
- List<HRegionInfo> unrecordedReplicas =
- AssignmentManager.replicaRegionsNotRecordedInMeta(new HashSet<>(
- regionsToAssign.keySet()), masterServices);
- Map<ServerName, List<HRegionInfo>> srvToUnassignedRegs =
- assignmentManager.getBalancer().roundRobinAssignment(unrecordedReplicas,
- serverManager.getOnlineServersList());
- if (srvToUnassignedRegs != null) {
- for (Map.Entry<ServerName, List<HRegionInfo>> entry : srvToUnassignedRegs.entrySet()) {
- for (HRegionInfo h : entry.getValue()) {
- regionsToAssign.put(h, entry.getKey());
- }
- }
- }
-
- int offlineRegionsCount = regionsToAssign.size();
-
- LOG.info("Table '" + tableName + "' has " + countOfRegionsInTable + " regions, of which "
- + offlineRegionsCount + " are offline.");
- if (offlineRegionsCount == 0) {
- return true;
- }
-
- List<ServerName> onlineServers = serverManager.createDestinationServersList();
- Map<ServerName, List<HRegionInfo>> bulkPlan =
- env.getMasterServices().getAssignmentManager().getBalancer()
- .retainAssignment(regionsToAssign, onlineServers);
- if (bulkPlan != null) {
- LOG.info("Bulk assigning " + offlineRegionsCount + " region(s) across " + bulkPlan.size()
- + " server(s), retainAssignment=true");
-
- BulkAssigner ba = new GeneralBulkAssigner(masterServices, bulkPlan, assignmentManager, true);
- try {
- if (ba.bulkAssign()) {
- done = true;
- }
- } catch (InterruptedException e) {
- LOG.warn("Enable operation was interrupted when enabling table '" + tableName + "'");
- // Preserve the interrupt.
- Thread.currentThread().interrupt();
- }
- } else {
- LOG.info("Balancer was unable to find suitable servers for table " + tableName
- + ", leaving unassigned");
- }
- return done;
- }
-
- /**
- * Mark regions of the table offline during recovery
- * @param env MasterProcedureEnv
- */
- private void markRegionsOfflineDuringRecovery(final MasterProcedureEnv env) {
- try {
- // This is a best effort attempt. We will move on even it does not succeed. We will retry
- // several times until we giving up.
- DisableTableProcedure.markRegionsOffline(env, tableName, true);
- } catch (Exception e) {
- LOG.debug("Failed to offline all regions of table " + tableName + ". Ignoring", e);
- }
- }
-
- /**
* Mark table state to Enabled
* @param env MasterProcedureEnv
* @throws IOException
@@ -457,31 +312,6 @@ public class EnableTableProcedure
}
/**
- * @param regionsInMeta
- * @return List of regions neither in transition nor assigned.
- * @throws IOException
- */
- private static Map<HRegionInfo, ServerName> regionsToAssignWithServerName(
- final MasterProcedureEnv env,
- final List<Pair<HRegionInfo, ServerName>> regionsInMeta) throws IOException {
- Map<HRegionInfo, ServerName> regionsToAssign = new HashMap<>(regionsInMeta.size());
- RegionStates regionStates = env.getMasterServices().getAssignmentManager().getRegionStates();
- for (Pair<HRegionInfo, ServerName> regionLocation : regionsInMeta) {
- HRegionInfo hri = regionLocation.getFirst();
- ServerName sn = regionLocation.getSecond();
- if (regionStates.isRegionOffline(hri)) {
- regionsToAssign.put(hri, sn);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping assign for the region " + hri + " during enable table "
- + hri.getTable() + " because its already in tranition or assigned.");
- }
- }
- }
- return regionsToAssign;
- }
-
- /**
* Coprocessor Action.
* @param env MasterProcedureEnv
* @param state the procedure state
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterDDLOperationHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterDDLOperationHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterDDLOperationHelper.java
index 4b9a7ab..31d05a7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterDDLOperationHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterDDLOperationHelper.java
@@ -19,32 +19,19 @@
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.List;
-import java.util.NavigableMap;
-import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.master.AssignmentManager;
-import org.apache.hadoop.hbase.master.BulkReOpen;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.util.Bytes;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
/**
* Helper class for schema change procedures
*/
@@ -60,16 +47,13 @@ public final class MasterDDLOperationHelper {
public static void deleteColumnFamilyFromFileSystem(
final MasterProcedureEnv env,
final TableName tableName,
- List<HRegionInfo> regionInfoList,
+ final List<HRegionInfo> regionInfoList,
final byte[] familyName,
- boolean hasMob) throws IOException {
+ final boolean hasMob) throws IOException {
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
if (LOG.isDebugEnabled()) {
LOG.debug("Removing family=" + Bytes.toString(familyName) + " from table=" + tableName);
}
- if (regionInfoList == null) {
- regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, tableName);
- }
for (HRegionInfo hri : regionInfoList) {
// Delete the family directory in FS for all the regions one by one
mfs.deleteFamilyFromFS(hri, familyName);
@@ -81,77 +65,4 @@ public final class MasterDDLOperationHelper {
mfs.deleteFamilyFromFS(mobRootDir, mobRegionInfo, familyName);
}
}
-
- /**
- * Reopen all regions from a table after a schema change operation.
- **/
- public static boolean reOpenAllRegions(
- final MasterProcedureEnv env,
- final TableName tableName,
- final List<HRegionInfo> regionInfoList) throws IOException {
- boolean done = false;
- LOG.info("Bucketing regions by region server...");
- List<HRegionLocation> regionLocations = null;
- Connection connection = env.getMasterServices().getConnection();
- try (RegionLocator locator = connection.getRegionLocator(tableName)) {
- regionLocations = locator.getAllRegionLocations();
- }
- // Convert List<HRegionLocation> to Map<HRegionInfo, ServerName>.
- NavigableMap<HRegionInfo, ServerName> hri2Sn = new TreeMap<>();
- for (HRegionLocation location : regionLocations) {
- hri2Sn.put(location.getRegionInfo(), location.getServerName());
- }
- TreeMap<ServerName, List<HRegionInfo>> serverToRegions = Maps.newTreeMap();
- List<HRegionInfo> reRegions = new ArrayList<>();
- for (HRegionInfo hri : regionInfoList) {
- ServerName sn = hri2Sn.get(hri);
- // Skip the offlined split parent region
- // See HBASE-4578 for more information.
- if (null == sn) {
- LOG.info("Skip " + hri);
- continue;
- }
- if (!serverToRegions.containsKey(sn)) {
- LinkedList<HRegionInfo> hriList = Lists.newLinkedList();
- serverToRegions.put(sn, hriList);
- }
- reRegions.add(hri);
- serverToRegions.get(sn).add(hri);
- }
-
- LOG.info("Reopening " + reRegions.size() + " regions on " + serverToRegions.size()
- + " region servers.");
- AssignmentManager am = env.getMasterServices().getAssignmentManager();
- am.setRegionsToReopen(reRegions);
- BulkReOpen bulkReopen = new BulkReOpen(env.getMasterServices(), serverToRegions, am);
- while (true) {
- try {
- if (bulkReopen.bulkReOpen()) {
- done = true;
- break;
- } else {
- LOG.warn("Timeout before reopening all regions");
- }
- } catch (InterruptedException e) {
- LOG.warn("Reopen was interrupted");
- // Preserve the interrupt.
- Thread.currentThread().interrupt();
- break;
- }
- }
- return done;
- }
-
- /**
- * Get the region info list of a table from meta if it is not already known by the caller.
- **/
- public static List<HRegionInfo> getRegionInfoList(
- final MasterProcedureEnv env,
- final TableName tableName,
- List<HRegionInfo> regionInfoList) throws IOException {
- if (regionInfoList == null) {
- regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, tableName);
- }
- return regionInfoList;
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
index c21137d..f815bea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
@@ -29,7 +29,7 @@ public final class MasterProcedureConstants {
/** Number of threads used by the procedure executor */
public static final String MASTER_PROCEDURE_THREADS = "hbase.master.procedure.threads";
- public static final int DEFAULT_MIN_MASTER_PROCEDURE_THREADS = 4;
+ public static final int DEFAULT_MIN_MASTER_PROCEDURE_THREADS = 16;
/**
* Procedure replay sanity check. In case a WAL is missing or unreadable we
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
index 2cd5b08..0f1c40f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
@@ -93,12 +94,19 @@ public class MasterProcedureEnv implements ConfigurationObserver {
}
}
+ private final RSProcedureDispatcher remoteDispatcher;
private final MasterProcedureScheduler procSched;
private final MasterServices master;
public MasterProcedureEnv(final MasterServices master) {
+ this(master, new RSProcedureDispatcher(master));
+ }
+
+ public MasterProcedureEnv(final MasterServices master,
+ final RSProcedureDispatcher remoteDispatcher) {
this.master = master;
this.procSched = new MasterProcedureScheduler(master.getConfiguration());
+ this.remoteDispatcher = remoteDispatcher;
}
public User getRequestUser() {
@@ -117,6 +125,10 @@ public class MasterProcedureEnv implements ConfigurationObserver {
return master.getConfiguration();
}
+ public AssignmentManager getAssignmentManager() {
+ return master.getAssignmentManager();
+ }
+
public MasterCoprocessorHost getMasterCoprocessorHost() {
return master.getMasterCoprocessorHost();
}
@@ -125,7 +137,12 @@ public class MasterProcedureEnv implements ConfigurationObserver {
return procSched;
}
+ public RSProcedureDispatcher getRemoteDispatcher() {
+ return remoteDispatcher;
+ }
+
public boolean isRunning() {
+ if (this.master == null || this.master.getMasterProcedureExecutor() == null) return false;
return master.getMasterProcedureExecutor().isRunning();
}
@@ -134,11 +151,18 @@ public class MasterProcedureEnv implements ConfigurationObserver {
}
public boolean waitInitialized(Procedure proc) {
- return procSched.waitEvent(((HMaster)master).getInitializedEvent(), proc);
+ return procSched.waitEvent(master.getInitializedEvent(), proc);
}
public boolean waitServerCrashProcessingEnabled(Procedure proc) {
- return procSched.waitEvent(((HMaster)master).getServerCrashProcessingEnabledEvent(), proc);
+ if (master instanceof HMaster) {
+ return procSched.waitEvent(((HMaster)master).getServerCrashProcessingEnabledEvent(), proc);
+ }
+ return false;
+ }
+
+ public boolean waitFailoverCleanup(Procedure proc) {
+ return procSched.waitEvent(master.getAssignmentManager().getFailoverCleanupEvent(), proc);
}
public void setEventReady(ProcedureEvent event, boolean isReady) {
@@ -153,4 +177,4 @@ public class MasterProcedureEnv implements ConfigurationObserver {
public void onConfigurationChange(Configuration conf) {
master.getMasterProcedureExecutor().refreshConfiguration(conf);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
index b0baf85..61e984c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
@@ -572,11 +572,13 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
return false;
// region operations are using the shared-lock on the table
// and then they will grab an xlock on the region.
- case SPLIT:
- case MERGE:
- case ASSIGN:
- case UNASSIGN:
+ case REGION_SPLIT:
+ case REGION_MERGE:
+ case REGION_ASSIGN:
+ case REGION_UNASSIGN:
case REGION_EDIT:
+ case REGION_GC:
+ case MERGED_REGIONS_GC:
return false;
default:
break;
@@ -1200,7 +1202,12 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
*/
@VisibleForTesting
public String dumpLocks() throws IOException {
- // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter
- return this.locking.toString();
+ schedLock();
+ try {
+ // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter
+ return this.locking.toString();
+ } finally {
+ schedUnlock();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java
deleted file mode 100644
index 3600fe0..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java
+++ /dev/null
@@ -1,906 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.master.procedure;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InterruptedIOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MetaMutationAnnotation;
-import org.apache.hadoop.hbase.RegionLoad;
-import org.apache.hadoop.hbase.ServerLoad;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.UnknownRegionException;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.exceptions.MergeRegionException;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.master.AssignmentManager;
-import org.apache.hadoop.hbase.master.CatalogJanitor;
-import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
-import org.apache.hadoop.hbase.master.MasterFileSystem;
-import org.apache.hadoop.hbase.master.RegionPlan;
-import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.master.RegionStates;
-import org.apache.hadoop.hbase.master.ServerManager;
-import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MergeTableRegionsState;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSUtils;
-
-/**
- * The procedure to Merge a region in a table.
- */
-@InterfaceAudience.Private
-public class MergeTableRegionsProcedure
- extends AbstractStateMachineTableProcedure<MergeTableRegionsState> {
- private static final Log LOG = LogFactory.getLog(MergeTableRegionsProcedure.class);
-
- private Boolean traceEnabled;
- private AssignmentManager assignmentManager;
- private int timeout;
- private ServerName regionLocation;
- private String regionsToMergeListFullName;
- private String regionsToMergeListEncodedName;
-
- private HRegionInfo [] regionsToMerge;
- private HRegionInfo mergedRegionInfo;
- private boolean forcible;
-
- public MergeTableRegionsProcedure() {
- this.traceEnabled = isTraceEnabled();
- this.assignmentManager = null;
- this.timeout = -1;
- this.regionLocation = null;
- this.regionsToMergeListFullName = null;
- this.regionsToMergeListEncodedName = null;
- }
-
- public MergeTableRegionsProcedure(
- final MasterProcedureEnv env,
- final HRegionInfo[] regionsToMerge,
- final boolean forcible) throws IOException {
- super(env);
- this.traceEnabled = isTraceEnabled();
- this.assignmentManager = getAssignmentManager(env);
- // For now, we only merge 2 regions. It could be extended to more than 2 regions in
- // the future.
- assert(regionsToMerge.length == 2);
- assert(regionsToMerge[0].getTable() == regionsToMerge[1].getTable());
- this.regionsToMerge = regionsToMerge;
- this.forcible = forcible;
-
- this.timeout = -1;
- this.regionsToMergeListFullName = getRegionsToMergeListFullNameString();
- this.regionsToMergeListEncodedName = getRegionsToMergeListEncodedNameString();
-
- // Check daughter regions and make sure that we have valid daughter regions before
- // doing the real work.
- checkDaughterRegions();
- // WARN: make sure there is no parent region of the two merging regions in
- // hbase:meta If exists, fixing up daughters would cause daughter regions(we
- // have merged one) online again when we restart master, so we should clear
- // the parent region to prevent the above case
- // Since HBASE-7721, we don't need fix up daughters any more. so here do
- // nothing
- setupMergedRegionInfo();
- }
-
- @Override
- protected Flow executeFromState(
- final MasterProcedureEnv env,
- final MergeTableRegionsState state) throws InterruptedException {
- if (isTraceEnabled()) {
- LOG.trace(this + " execute state=" + state);
- }
-
- try {
- switch (state) {
- case MERGE_TABLE_REGIONS_PREPARE:
- prepareMergeRegion(env);
- setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_MOVE_REGION_TO_SAME_RS);
- break;
- case MERGE_TABLE_REGIONS_MOVE_REGION_TO_SAME_RS:
- if (MoveRegionsToSameRS(env)) {
- setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION);
- } else {
- LOG.info("Cancel merging regions " + getRegionsToMergeListFullNameString()
- + ", because can't move them to the same RS");
- setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_OPERATION);
- }
- break;
- case MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION:
- preMergeRegions(env);
- setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE);
- break;
- case MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE:
- setRegionStateToMerging(env);
- setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CLOSE_REGIONS);
- break;
- case MERGE_TABLE_REGIONS_CLOSE_REGIONS:
- closeRegionsForMerge(env);
- setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CREATE_MERGED_REGION);
- break;
- case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION:
- createMergedRegion(env);
- setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION);
- break;
- case MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION:
- preMergeRegionsCommit(env);
- setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_UPDATE_META);
- break;
- case MERGE_TABLE_REGIONS_UPDATE_META:
- updateMetaForMergedRegions(env);
- setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION);
- break;
- case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION:
- postMergeRegionsCommit(env);
- setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_OPEN_MERGED_REGION);
- break;
- case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION:
- openMergedRegions(env);
- setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_OPERATION);
- break;
- case MERGE_TABLE_REGIONS_POST_OPERATION:
- postCompletedMergeRegions(env);
- return Flow.NO_MORE_STATE;
- default:
- throw new UnsupportedOperationException(this + " unhandled state=" + state);
- }
- } catch (IOException e) {
- LOG.warn("Error trying to merge regions " + getRegionsToMergeListFullNameString() +
- " in the table " + getTableName() + " (in state=" + state + ")", e);
-
- setFailure("master-merge-regions", e);
- }
- return Flow.HAS_MORE_STATE;
- }
-
- @Override
- protected void rollbackState(
- final MasterProcedureEnv env,
- final MergeTableRegionsState state) throws IOException, InterruptedException {
- if (isTraceEnabled()) {
- LOG.trace(this + " rollback state=" + state);
- }
-
- try {
- switch (state) {
- case MERGE_TABLE_REGIONS_POST_OPERATION:
- case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION:
- case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION:
- case MERGE_TABLE_REGIONS_UPDATE_META:
- String msg = this + " We are in the " + state + " state."
- + " It is complicated to rollback the merge operation that region server is working on."
- + " Rollback is not supported and we should let the merge operation to complete";
- LOG.warn(msg);
- // PONR
- throw new UnsupportedOperationException(this + " unhandled state=" + state);
- case MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION:
- break;
- case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION:
- cleanupMergedRegion(env);
- break;
- case MERGE_TABLE_REGIONS_CLOSE_REGIONS:
- rollbackCloseRegionsForMerge(env);
- break;
- case MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE:
- setRegionStateToRevertMerging(env);
- break;
- case MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION:
- postRollBackMergeRegions(env);
- break;
- case MERGE_TABLE_REGIONS_MOVE_REGION_TO_SAME_RS:
- break; // nothing to rollback
- case MERGE_TABLE_REGIONS_PREPARE:
- break; // nothing to rollback
- default:
- throw new UnsupportedOperationException(this + " unhandled state=" + state);
- }
- } catch (Exception e) {
- // This will be retried. Unless there is a bug in the code,
- // this should be just a "temporary error" (e.g. network down)
- LOG.warn("Failed rollback attempt step " + state + " for merging the regions "
- + getRegionsToMergeListFullNameString() + " in table " + getTableName(), e);
- throw e;
- }
- }
-
- /*
- * Check whether we are in the state that can be rollback
- */
- @Override
- protected boolean isRollbackSupported(final MergeTableRegionsState state) {
- switch (state) {
- case MERGE_TABLE_REGIONS_POST_OPERATION:
- case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION:
- case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION:
- case MERGE_TABLE_REGIONS_UPDATE_META:
- // It is not safe to rollback if we reach to these states.
- return false;
- default:
- break;
- }
- return true;
- }
-
- @Override
- protected MergeTableRegionsState getState(final int stateId) {
- return MergeTableRegionsState.forNumber(stateId);
- }
-
- @Override
- protected int getStateId(final MergeTableRegionsState state) {
- return state.getNumber();
- }
-
- @Override
- protected MergeTableRegionsState getInitialState() {
- return MergeTableRegionsState.MERGE_TABLE_REGIONS_PREPARE;
- }
-
- @Override
- public void serializeStateData(final OutputStream stream) throws IOException {
- super.serializeStateData(stream);
-
- MasterProcedureProtos.MergeTableRegionsStateData.Builder mergeTableRegionsMsg =
- MasterProcedureProtos.MergeTableRegionsStateData.newBuilder()
- .setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
- .setMergedRegionInfo(HRegionInfo.convert(mergedRegionInfo))
- .setForcible(forcible);
- for (HRegionInfo hri: regionsToMerge) {
- mergeTableRegionsMsg.addRegionInfo(HRegionInfo.convert(hri));
- }
- mergeTableRegionsMsg.build().writeDelimitedTo(stream);
- }
-
- @Override
- public void deserializeStateData(final InputStream stream) throws IOException {
- super.deserializeStateData(stream);
-
- MasterProcedureProtos.MergeTableRegionsStateData mergeTableRegionsMsg =
- MasterProcedureProtos.MergeTableRegionsStateData.parseDelimitedFrom(stream);
- setUser(MasterProcedureUtil.toUserInfo(mergeTableRegionsMsg.getUserInfo()));
-
- assert(mergeTableRegionsMsg.getRegionInfoCount() == 2);
- regionsToMerge = new HRegionInfo[mergeTableRegionsMsg.getRegionInfoCount()];
- for (int i = 0; i < regionsToMerge.length; i++) {
- regionsToMerge[i] = HRegionInfo.convert(mergeTableRegionsMsg.getRegionInfo(i));
- }
-
- mergedRegionInfo = HRegionInfo.convert(mergeTableRegionsMsg.getMergedRegionInfo());
- }
-
- @Override
- public void toStringClassDetails(StringBuilder sb) {
- sb.append(getClass().getSimpleName());
- sb.append(" (table=");
- sb.append(getTableName());
- sb.append(" regions=");
- sb.append(getRegionsToMergeListFullNameString());
- sb.append(" forcible=");
- sb.append(forcible);
- sb.append(")");
- }
-
- @Override
- protected LockState acquireLock(final MasterProcedureEnv env) {
- if (env.waitInitialized(this)) {
- return LockState.LOCK_EVENT_WAIT;
- }
- return env.getProcedureScheduler().waitRegions(this, getTableName(),
- regionsToMerge[0], regionsToMerge[1])?
- LockState.LOCK_EVENT_WAIT: LockState.LOCK_ACQUIRED;
- }
-
- @Override
- protected void releaseLock(final MasterProcedureEnv env) {
- env.getProcedureScheduler().wakeRegions(this, getTableName(),
- regionsToMerge[0], regionsToMerge[1]);
- }
-
- @Override
- public TableName getTableName() {
- return regionsToMerge[0].getTable();
- }
-
- @Override
- public TableOperationType getTableOperationType() {
- return TableOperationType.MERGE;
- }
-
- /**
- * check daughter regions
- * @throws IOException
- */
- private void checkDaughterRegions() throws IOException {
- // Note: the following logic assumes that we only have 2 regions to merge. In the future,
- // if we want to extend to more than 2 regions, the code needs to modify a little bit.
- //
- if (regionsToMerge[0].getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
- regionsToMerge[1].getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
- throw new MergeRegionException("Can't merge non-default replicas");
- }
-
- if (!HRegionInfo.areAdjacent(regionsToMerge[0], regionsToMerge[1])) {
- String msg = "Trying to merge non-adjacent regions "
- + getRegionsToMergeListFullNameString() + " where forcible = " + forcible;
- LOG.warn(msg);
- if (!forcible) {
- throw new DoNotRetryIOException(msg);
- }
- }
- }
-
- /**
- * Prepare merge and do some check
- * @param env MasterProcedureEnv
- * @throws IOException
- */
- private void prepareMergeRegion(final MasterProcedureEnv env) throws IOException {
- // Note: the following logic assumes that we only have 2 regions to merge. In the future,
- // if we want to extend to more than 2 regions, the code needs to modify a little bit.
- //
- CatalogJanitor catalogJanitor = env.getMasterServices().getCatalogJanitor();
- boolean regionAHasMergeQualifier = !catalogJanitor.cleanMergeQualifier(regionsToMerge[0]);
- if (regionAHasMergeQualifier
- || !catalogJanitor.cleanMergeQualifier(regionsToMerge[1])) {
- String msg = "Skip merging regions " + getRegionsToMergeListFullNameString()
- + ", because region "
- + (regionAHasMergeQualifier ? regionsToMerge[0].getEncodedName() : regionsToMerge[1]
- .getEncodedName()) + " has merge qualifier";
- LOG.warn(msg);
- throw new MergeRegionException(msg);
- }
-
- RegionStates regionStates = getAssignmentManager(env).getRegionStates();
- RegionState regionStateA = regionStates.getRegionState(regionsToMerge[0].getEncodedName());
- RegionState regionStateB = regionStates.getRegionState(regionsToMerge[1].getEncodedName());
- if (regionStateA == null || regionStateB == null) {
- throw new UnknownRegionException(
- regionStateA == null ?
- regionsToMerge[0].getEncodedName() : regionsToMerge[1].getEncodedName());
- }
-
- if (!regionStateA.isOpened() || !regionStateB.isOpened()) {
- throw new MergeRegionException(
- "Unable to merge regions not online " + regionStateA + ", " + regionStateB);
- }
- }
-
- /**
- * Create merged region info through the specified two regions
- */
- private void setupMergedRegionInfo() {
- long rid = EnvironmentEdgeManager.currentTime();
- // Regionid is timestamp. Merged region's id can't be less than that of
- // merging regions else will insert at wrong location in hbase:meta
- if (rid < regionsToMerge[0].getRegionId() || rid < regionsToMerge[1].getRegionId()) {
- LOG.warn("Clock skew; merging regions id are " + regionsToMerge[0].getRegionId()
- + " and " + regionsToMerge[1].getRegionId() + ", but current time here is " + rid);
- rid = Math.max(regionsToMerge[0].getRegionId(), regionsToMerge[1].getRegionId()) + 1;
- }
-
- byte[] startKey = null;
- byte[] endKey = null;
- // Choose the smaller as start key
- if (regionsToMerge[0].compareTo(regionsToMerge[1]) <= 0) {
- startKey = regionsToMerge[0].getStartKey();
- } else {
- startKey = regionsToMerge[1].getStartKey();
- }
- // Choose the bigger as end key
- if (Bytes.equals(regionsToMerge[0].getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
- || (!Bytes.equals(regionsToMerge[1].getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
- && Bytes.compareTo(regionsToMerge[0].getEndKey(), regionsToMerge[1].getEndKey()) > 0)) {
- endKey = regionsToMerge[0].getEndKey();
- } else {
- endKey = regionsToMerge[1].getEndKey();
- }
-
- // Merged region is sorted between two merging regions in META
- mergedRegionInfo = new HRegionInfo(getTableName(), startKey, endKey, false, rid);
- }
-
- /**
- * Move all regions to the same region server
- * @param env MasterProcedureEnv
- * @return whether target regions hosted by the same RS
- * @throws IOException
- */
- private boolean MoveRegionsToSameRS(final MasterProcedureEnv env) throws IOException {
- // Make sure regions are on the same regionserver before send merge
- // regions request to region server.
- //
- boolean onSameRS = isRegionsOnTheSameServer(env);
- if (!onSameRS) {
- // Note: the following logic assumes that we only have 2 regions to merge. In the future,
- // if we want to extend to more than 2 regions, the code needs to modify a little bit.
- //
- RegionStates regionStates = getAssignmentManager(env).getRegionStates();
- ServerName regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[1]);
-
- RegionLoad loadOfRegionA = getRegionLoad(env, regionLocation, regionsToMerge[0]);
- RegionLoad loadOfRegionB = getRegionLoad(env, regionLocation2, regionsToMerge[1]);
- if (loadOfRegionA != null && loadOfRegionB != null
- && loadOfRegionA.getRequestsCount() < loadOfRegionB.getRequestsCount()) {
- // switch regionsToMerge[0] and regionsToMerge[1]
- HRegionInfo tmpRegion = this.regionsToMerge[0];
- this.regionsToMerge[0] = this.regionsToMerge[1];
- this.regionsToMerge[1] = tmpRegion;
- ServerName tmpLocation = regionLocation;
- regionLocation = regionLocation2;
- regionLocation2 = tmpLocation;
- }
-
- long startTime = EnvironmentEdgeManager.currentTime();
-
- RegionPlan regionPlan = new RegionPlan(regionsToMerge[1], regionLocation2, regionLocation);
- LOG.info("Moving regions to same server for merge: " + regionPlan.toString());
- getAssignmentManager(env).balance(regionPlan);
- do {
- try {
- Thread.sleep(20);
- // Make sure check RIT first, then get region location, otherwise
- // we would make a wrong result if region is online between getting
- // region location and checking RIT
- boolean isRIT = regionStates.isRegionInTransition(regionsToMerge[1]);
- regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[1]);
- onSameRS = regionLocation.equals(regionLocation2);
- if (onSameRS || !isRIT) {
- // Regions are on the same RS, or regionsToMerge[1] is not in
- // RegionInTransition any more
- break;
- }
- } catch (InterruptedException e) {
- InterruptedIOException iioe = new InterruptedIOException();
- iioe.initCause(e);
- throw iioe;
- }
- } while ((EnvironmentEdgeManager.currentTime() - startTime) <= getTimeout(env));
- }
- return onSameRS;
- }
-
- /**
- * Pre merge region action
- * @param env MasterProcedureEnv
- **/
- private void preMergeRegions(final MasterProcedureEnv env) throws IOException {
- final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
- if (cpHost != null) {
- boolean ret = cpHost.preMergeRegionsAction(regionsToMerge, getUser());
- if (ret) {
- throw new IOException(
- "Coprocessor bypassing regions " + getRegionsToMergeListFullNameString() + " merge.");
- }
- }
- }
-
- /**
- * Action after rollback a merge table regions action.
- * @param env MasterProcedureEnv
- * @throws IOException
- */
- private void postRollBackMergeRegions(final MasterProcedureEnv env) throws IOException {
- final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
- if (cpHost != null) {
- cpHost.postRollBackMergeRegionsAction(regionsToMerge, getUser());
- }
- }
-
- /**
- * Set the region states to MERGING state
- * @param env MasterProcedureEnv
- * @throws IOException
- */
- public void setRegionStateToMerging(final MasterProcedureEnv env) throws IOException {
- RegionStateTransition.Builder transition = RegionStateTransition.newBuilder();
- transition.setTransitionCode(TransitionCode.READY_TO_MERGE);
- transition.addRegionInfo(HRegionInfo.convert(mergedRegionInfo));
- transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[0]));
- transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[1]));
- if (env.getMasterServices().getAssignmentManager().onRegionTransition(
- getServerName(env), transition.build()) != null) {
- throw new IOException("Failed to update region state to MERGING for "
- + getRegionsToMergeListFullNameString());
- }
- }
-
- /**
- * Rollback the region state change
- * @param env MasterProcedureEnv
- * @throws IOException
- */
- private void setRegionStateToRevertMerging(final MasterProcedureEnv env) throws IOException {
- RegionStateTransition.Builder transition = RegionStateTransition.newBuilder();
- transition.setTransitionCode(TransitionCode.MERGE_REVERTED);
- transition.addRegionInfo(HRegionInfo.convert(mergedRegionInfo));
- transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[0]));
- transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[1]));
- String msg = env.getMasterServices().getAssignmentManager().onRegionTransition(
- getServerName(env), transition.build());
- if (msg != null) {
- // If daughter regions are online, the msg is coming from RPC retry. Ignore it.
- RegionStates regionStates = getAssignmentManager(env).getRegionStates();
- if (!regionStates.isRegionOnline(regionsToMerge[0]) ||
- !regionStates.isRegionOnline(regionsToMerge[1])) {
- throw new IOException("Failed to update region state for "
- + getRegionsToMergeListFullNameString()
- + " as part of operation for reverting merge. Error message: " + msg);
- }
- }
- }
-
- /**
- * Create merged region
- * @param env MasterProcedureEnv
- * @throws IOException
- */
- private void createMergedRegion(final MasterProcedureEnv env) throws IOException {
- final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
- final Path tabledir = FSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
- final FileSystem fs = mfs.getFileSystem();
- HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
- env.getMasterConfiguration(), fs, tabledir, regionsToMerge[0], false);
- regionFs.createMergesDir();
-
- mergeStoreFiles(env, regionFs, regionFs.getMergesDir());
- HRegionFileSystem regionFs2 = HRegionFileSystem.openRegionFromFileSystem(
- env.getMasterConfiguration(), fs, tabledir, regionsToMerge[1], false);
- mergeStoreFiles(env, regionFs2, regionFs.getMergesDir());
-
- regionFs.commitMergedRegion(mergedRegionInfo);
- }
-
- /**
- * Create reference file(s) of merging regions under the merges directory
- * @param env MasterProcedureEnv
- * @param regionFs region file system
- * @param mergedDir the temp directory of merged region
- * @throws IOException
- */
- private void mergeStoreFiles(
- final MasterProcedureEnv env, final HRegionFileSystem regionFs, final Path mergedDir)
- throws IOException {
- final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
- final Configuration conf = env.getMasterConfiguration();
- final HTableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
-
- for (String family: regionFs.getFamilies()) {
- final HColumnDescriptor hcd = htd.getFamily(family.getBytes());
- final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
-
- if (storeFiles != null && storeFiles.size() > 0) {
- final CacheConfig cacheConf = new CacheConfig(conf, hcd);
- for (StoreFileInfo storeFileInfo: storeFiles) {
- // Create reference file(s) of the region in mergedDir
- regionFs.mergeStoreFile(mergedRegionInfo, family, new StoreFile(mfs.getFileSystem(),
- storeFileInfo, conf, cacheConf, hcd.getBloomFilterType(), true),
- mergedDir);
- }
- }
- }
- }
-
- /**
- * Clean up merged region
- * @param env MasterProcedureEnv
- * @throws IOException
- */
- private void cleanupMergedRegion(final MasterProcedureEnv env) throws IOException {
- final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
- final Path tabledir = FSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
- final FileSystem fs = mfs.getFileSystem();
- HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
- env.getMasterConfiguration(), fs, tabledir, regionsToMerge[0], false);
- regionFs.cleanupMergedRegion(mergedRegionInfo);
- }
-
- /**
- * RPC to region server that host the regions to merge, ask for close these regions
- * @param env MasterProcedureEnv
- * @throws IOException
- */
- private void closeRegionsForMerge(final MasterProcedureEnv env) throws IOException {
- boolean success = env.getMasterServices().getServerManager().sendRegionCloseForSplitOrMerge(
- getServerName(env), regionsToMerge[0], regionsToMerge[1]);
- if (!success) {
- throw new IOException("Close regions " + getRegionsToMergeListFullNameString()
- + " for merging failed. Check region server log for more details.");
- }
- }
-
- /**
- * Rollback close regions
- * @param env MasterProcedureEnv
- **/
- private void rollbackCloseRegionsForMerge(final MasterProcedureEnv env) throws IOException {
- // Check whether the region is closed; if so, open it in the same server
- RegionStates regionStates = getAssignmentManager(env).getRegionStates();
- for(int i = 1; i < regionsToMerge.length; i++) {
- RegionState state = regionStates.getRegionState(regionsToMerge[i]);
- if (state != null && (state.isClosing() || state.isClosed())) {
- env.getMasterServices().getServerManager().sendRegionOpen(
- getServerName(env),
- regionsToMerge[i],
- ServerName.EMPTY_SERVER_LIST);
- }
- }
- }
-
- /**
- * Post merge region action
- * @param env MasterProcedureEnv
- **/
- private void preMergeRegionsCommit(final MasterProcedureEnv env) throws IOException {
- final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
- if (cpHost != null) {
- @MetaMutationAnnotation
- final List<Mutation> metaEntries = new ArrayList<>();
- boolean ret = cpHost.preMergeRegionsCommit(regionsToMerge, metaEntries, getUser());
-
- if (ret) {
- throw new IOException(
- "Coprocessor bypassing regions " + getRegionsToMergeListFullNameString() + " merge.");
- }
- try {
- for (Mutation p : metaEntries) {
- HRegionInfo.parseRegionName(p.getRow());
- }
- } catch (IOException e) {
- LOG.error("Row key of mutation from coprocessor is not parsable as region name."
- + "Mutations from coprocessor should only be for hbase:meta table.", e);
- throw e;
- }
- }
- }
-
- /**
- * Add merged region to META and delete original regions.
- * @param env MasterProcedureEnv
- * @throws IOException
- */
- private void updateMetaForMergedRegions(final MasterProcedureEnv env) throws IOException {
- RegionStateTransition.Builder transition = RegionStateTransition.newBuilder();
- transition.setTransitionCode(TransitionCode.MERGE_PONR);
- transition.addRegionInfo(HRegionInfo.convert(mergedRegionInfo));
- transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[0]));
- transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[1]));
- // Add merged region and delete original regions
- // as an atomic update. See HBASE-7721. This update to hbase:meta makes the region
- // will determine whether the region is merged or not in case of failures.
- if (env.getMasterServices().getAssignmentManager().onRegionTransition(
- getServerName(env), transition.build()) != null) {
- throw new IOException("Failed to update meta to add merged region that merges "
- + getRegionsToMergeListFullNameString());
- }
- }
-
- /**
- * Post merge region action
- * @param env MasterProcedureEnv
- **/
- private void postMergeRegionsCommit(final MasterProcedureEnv env) throws IOException {
- final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
- if (cpHost != null) {
- cpHost.postMergeRegionsCommit(regionsToMerge, mergedRegionInfo, getUser());
- }
- }
-
- /**
- * Assign merged region
- * @param env MasterProcedureEnv
- * @throws IOException
- * @throws InterruptedException
- **/
- private void openMergedRegions(final MasterProcedureEnv env)
- throws IOException, InterruptedException {
- // Check whether the merged region is already opened; if so,
- // this is retry and we should just ignore.
- RegionState regionState =
- getAssignmentManager(env).getRegionStates().getRegionState(mergedRegionInfo);
- if (regionState != null && regionState.isOpened()) {
- LOG.info("Skip opening merged region " + mergedRegionInfo.getRegionNameAsString()
- + " as it is already opened.");
- return;
- }
-
- // TODO: The new AM should provide an API to force assign the merged region to the same RS
- // as daughter regions; if the RS is unavailable, then assign to a different RS.
- env.getMasterServices().getAssignmentManager().assignMergedRegion(
- mergedRegionInfo, regionsToMerge[0], regionsToMerge[1]);
- }
-
- /**
- * Post merge region action
- * @param env MasterProcedureEnv
- **/
- private void postCompletedMergeRegions(final MasterProcedureEnv env) throws IOException {
- final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
- if (cpHost != null) {
- cpHost.postCompletedMergeRegionsAction(regionsToMerge, mergedRegionInfo, getUser());
- }
- }
-
- private RegionLoad getRegionLoad(
- final MasterProcedureEnv env,
- final ServerName sn,
- final HRegionInfo hri) {
- ServerManager serverManager = env.getMasterServices().getServerManager();
- ServerLoad load = serverManager.getLoad(sn);
- if (load != null) {
- Map<byte[], RegionLoad> regionsLoad = load.getRegionsLoad();
- if (regionsLoad != null) {
- return regionsLoad.get(hri.getRegionName());
- }
- }
- return null;
- }
-
- /**
- * The procedure could be restarted from a different machine. If the variable is null, we need to
- * retrieve it.
- * @param env MasterProcedureEnv
- * @return whether target regions hosted by the same RS
- */
- private boolean isRegionsOnTheSameServer(final MasterProcedureEnv env) throws IOException{
- Boolean onSameRS = true;
- int i = 0;
- RegionStates regionStates = getAssignmentManager(env).getRegionStates();
- regionLocation = regionStates.getRegionServerOfRegion(regionsToMerge[i]);
- if (regionLocation != null) {
- for(i = 1; i < regionsToMerge.length; i++) {
- ServerName regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[i]);
- if (regionLocation2 != null) {
- if (onSameRS) {
- onSameRS = regionLocation.equals(regionLocation2);
- }
- } else {
- // At least one region is not online, merge will fail, no need to continue.
- break;
- }
- }
- if (i == regionsToMerge.length) {
- // Finish checking all regions, return the result;
- return onSameRS;
- }
- }
-
- // If reaching here, at least one region is not online.
- String msg = "Skip merging regions " + getRegionsToMergeListFullNameString() +
- ", because region " + regionsToMerge[i].getEncodedName() + " is not online now.";
- LOG.warn(msg);
- throw new IOException(msg);
- }
-
- /**
- * The procedure could be restarted from a different machine. If the variable is null, we need to
- * retrieve it.
- * @param env MasterProcedureEnv
- * @return assignmentManager
- */
- private AssignmentManager getAssignmentManager(final MasterProcedureEnv env) {
- if (assignmentManager == null) {
- assignmentManager = env.getMasterServices().getAssignmentManager();
- }
- return assignmentManager;
- }
-
- /**
- * The procedure could be restarted from a different machine. If the variable is null, we need to
- * retrieve it.
- * @param env MasterProcedureEnv
- * @return timeout value
- */
- private int getTimeout(final MasterProcedureEnv env) {
- if (timeout == -1) {
- timeout = env.getMasterConfiguration().getInt(
- "hbase.master.regionmerge.timeout", regionsToMerge.length * 60 * 1000);
- }
- return timeout;
- }
-
- /**
- * The procedure could be restarted from a different machine. If the variable is null, we need to
- * retrieve it.
- * @param env MasterProcedureEnv
- * @return serverName
- */
- private ServerName getServerName(final MasterProcedureEnv env) {
- if (regionLocation == null) {
- regionLocation =
- getAssignmentManager(env).getRegionStates().getRegionServerOfRegion(regionsToMerge[0]);
- }
- return regionLocation;
- }
-
- /**
- * The procedure could be restarted from a different machine. If the variable is null, we need to
- * retrieve it.
- * @param fullName whether return only encoded name
- * @return region names in a list
- */
- private String getRegionsToMergeListFullNameString() {
- if (regionsToMergeListFullName == null) {
- StringBuilder sb = new StringBuilder("[");
- int i = 0;
- while(i < regionsToMerge.length - 1) {
- sb.append(regionsToMerge[i].getRegionNameAsString() + ", ");
- i++;
- }
- sb.append(regionsToMerge[i].getRegionNameAsString() + " ]");
- regionsToMergeListFullName = sb.toString();
- }
- return regionsToMergeListFullName;
- }
-
- /**
- * The procedure could be restarted from a different machine. If the variable is null, we need to
- * retrieve it.
- * @return encoded region names
- */
- private String getRegionsToMergeListEncodedNameString() {
- if (regionsToMergeListEncodedName == null) {
- StringBuilder sb = new StringBuilder("[");
- int i = 0;
- while(i < regionsToMerge.length - 1) {
- sb.append(regionsToMerge[i].getEncodedName() + ", ");
- i++;
- }
- sb.append(regionsToMerge[i].getEncodedName() + " ]");
- regionsToMergeListEncodedName = sb.toString();
- }
- return regionsToMergeListEncodedName;
- }
-
- /**
- * The procedure could be restarted from a different machine. If the variable is null, we need to
- * retrieve it.
- * @return traceEnabled
- */
- private Boolean isTraceEnabled() {
- if (traceEnabled == null) {
- traceEnabled = LOG.isTraceEnabled();
- }
- return traceEnabled;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
index 52bb4d5..622c19f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
@@ -21,17 +21,14 @@ package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
@@ -97,7 +94,9 @@ public class ModifyColumnFamilyProcedure
setNextState(ModifyColumnFamilyState.MODIFY_COLUMN_FAMILY_REOPEN_ALL_REGIONS);
break;
case MODIFY_COLUMN_FAMILY_REOPEN_ALL_REGIONS:
- reOpenAllRegionsIfTableIsOnline(env);
+ if (env.getAssignmentManager().isTableEnabled(getTableName())) {
+ addChildProcedure(env.getAssignmentManager().createReopenProcedures(getTableName()));
+ }
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException(this + " unhandled state=" + state);
@@ -265,7 +264,8 @@ public class ModifyColumnFamilyProcedure
env.getMasterServices().getTableDescriptors().add(unmodifiedHTableDescriptor);
// Make sure regions are opened after table descriptor is updated.
- reOpenAllRegionsIfTableIsOnline(env);
+ //reOpenAllRegionsIfTableIsOnline(env);
+ // TODO: NUKE ROLLBACK!!!!
}
/**
@@ -281,26 +281,6 @@ public class ModifyColumnFamilyProcedure
}
/**
- * Last action from the procedure - executed when online schema change is supported.
- * @param env MasterProcedureEnv
- * @throws IOException
- */
- private void reOpenAllRegionsIfTableIsOnline(final MasterProcedureEnv env) throws IOException {
- // This operation only run when the table is enabled.
- if (!env.getMasterServices().getTableStateManager()
- .isTableState(getTableName(), TableState.State.ENABLED)) {
- return;
- }
-
- List<HRegionInfo> regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, getTableName());
- if (MasterDDLOperationHelper.reOpenAllRegions(env, getTableName(), regionInfoList)) {
- LOG.info("Completed add column family operation on table " + getTableName());
- } else {
- LOG.warn("Error on reopening the regions on table " + getTableName());
- }
- }
-
- /**
* The procedure could be restarted from a different machine. If the variable is null, we need to
* retrieve it.
* @return traceEnabled
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
index 6a70f62..20a6a03 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
@@ -120,7 +120,10 @@ public class ModifyTableProcedure
setNextState(ModifyTableState.MODIFY_TABLE_REOPEN_ALL_REGIONS);
break;
case MODIFY_TABLE_REOPEN_ALL_REGIONS:
- reOpenAllRegionsIfTableIsOnline(env);
+ if (env.getAssignmentManager().isTableEnabled(getTableName())) {
+ addChildProcedure(env.getAssignmentManager()
+ .createReopenProcedures(getRegionInfoList(env)));
+ }
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);
@@ -299,7 +302,8 @@ public class ModifyTableProcedure
deleteFromFs(env, modifiedHTableDescriptor, unmodifiedHTableDescriptor);
// Make sure regions are opened after table descriptor is updated.
- reOpenAllRegionsIfTableIsOnline(env);
+ //reOpenAllRegionsIfTableIsOnline(env);
+ // TODO: NUKE ROLLBACK!!!!
}
/**
@@ -374,25 +378,6 @@ public class ModifyTableProcedure
}
/**
- * Last action from the procedure - executed when online schema change is supported.
- * @param env MasterProcedureEnv
- * @throws IOException
- */
- private void reOpenAllRegionsIfTableIsOnline(final MasterProcedureEnv env) throws IOException {
- // This operation only run when the table is enabled.
- if (!env.getMasterServices().getTableStateManager()
- .isTableState(getTableName(), TableState.State.ENABLED)) {
- return;
- }
-
- if (MasterDDLOperationHelper.reOpenAllRegions(env, getTableName(), getRegionInfoList(env))) {
- LOG.info("Completed modify table operation on table " + getTableName());
- } else {
- LOG.warn("Error on reopening the regions on table " + getTableName());
- }
- }
-
- /**
* The procedure could be restarted from a different machine. If the variable is null, we need to
* retrieve it.
* @return traceEnabled whether the trace is enabled
@@ -430,7 +415,8 @@ public class ModifyTableProcedure
private List<HRegionInfo> getRegionInfoList(final MasterProcedureEnv env) throws IOException {
if (regionInfoList == null) {
- regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, getTableName());
+ regionInfoList = env.getAssignmentManager().getRegionStates()
+ .getRegionsOfTable(getTableName());
}
return regionInfoList;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
index 3777c79..5199bf8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
@@ -21,30 +21,26 @@ package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateException;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.ProcedureInfo;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
-import org.apache.hadoop.hbase.master.AssignmentManager;
-import org.apache.hadoop.hbase.master.RegionState.State;
-import org.apache.hadoop.hbase.master.RegionStates;
-import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
/**
* Helper to synchronously wait on conditions.
@@ -64,19 +60,93 @@ public final class ProcedureSyncWait {
T evaluate() throws IOException;
}
+ private static class ProcedureFuture implements Future<byte[]> {
+ private final ProcedureExecutor<MasterProcedureEnv> procExec;
+ private final long procId;
+
+ private boolean hasResult = false;
+ private byte[] result = null;
+
+ public ProcedureFuture(ProcedureExecutor<MasterProcedureEnv> procExec, long procId) {
+ this.procExec = procExec;
+ this.procId = procId;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) { return false; }
+
+ @Override
+ public boolean isCancelled() { return false; }
+
+ @Override
+ public boolean isDone() { return hasResult; }
+
+ @Override
+ public byte[] get() throws InterruptedException, ExecutionException {
+ if (hasResult) return result;
+ try {
+ return waitForProcedureToComplete(procExec, procId, Long.MAX_VALUE);
+ } catch (Exception e) {
+ throw new ExecutionException(e);
+ }
+ }
+
+ @Override
+ public byte[] get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ if (hasResult) return result;
+ try {
+ result = waitForProcedureToComplete(procExec, procId, unit.toMillis(timeout));
+ hasResult = true;
+ return result;
+ } catch (TimeoutIOException e) {
+ throw new TimeoutException(e.getMessage());
+ } catch (Exception e) {
+ throw new ExecutionException(e);
+ }
+ }
+ }
+
+ public static Future<byte[]> submitProcedure(final ProcedureExecutor<MasterProcedureEnv> procExec,
+ final Procedure proc) {
+ if (proc.isInitializing()) {
+ procExec.submitProcedure(proc);
+ }
+ return new ProcedureFuture(procExec, proc.getProcId());
+ }
+
public static byte[] submitAndWaitProcedure(ProcedureExecutor<MasterProcedureEnv> procExec,
final Procedure proc) throws IOException {
- long procId = procExec.submitProcedure(proc);
- return waitForProcedureToComplete(procExec, procId);
+ if (proc.isInitializing()) {
+ procExec.submitProcedure(proc);
+ }
+ return waitForProcedureToCompleteIOE(procExec, proc.getProcId(), Long.MAX_VALUE);
}
- private static byte[] waitForProcedureToComplete(ProcedureExecutor<MasterProcedureEnv> procExec,
- final long procId) throws IOException {
- while (!procExec.isFinished(procId) && procExec.isRunning()) {
- // TODO: add a config to make it tunable
- // Dev Consideration: are we waiting forever, or we can set up some timeout value?
- Threads.sleepWithoutInterrupt(250);
+ public static byte[] waitForProcedureToCompleteIOE(
+ final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final long timeout)
+ throws IOException {
+ try {
+ return waitForProcedureToComplete(procExec, procId, timeout);
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IOException(e);
}
+ }
+
+ public static byte[] waitForProcedureToComplete(
+ final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final long timeout)
+ throws IOException {
+ waitFor(procExec.getEnvironment(), "pid=" + procId,
+ new ProcedureSyncWait.Predicate<Boolean>() {
+ @Override
+ public Boolean evaluate() throws IOException {
+ return !procExec.isRunning() || procExec.isFinished(procId);
+ }
+ }
+ );
+
ProcedureInfo result = procExec.getResult(procId);
if (result != null) {
if (result.isFailed()) {
@@ -86,7 +156,7 @@ public final class ProcedureSyncWait {
return result.getResult();
} else {
if (procExec.isRunning()) {
- throw new IOException("Procedure " + procId + "not found");
+ throw new IOException("pid= " + procId + "not found");
} else {
throw new IOException("The Master is Aborting");
}
@@ -104,6 +174,7 @@ public final class ProcedureSyncWait {
public static <T> T waitFor(MasterProcedureEnv env, long waitTime, long waitingTimeForEvents,
String purpose, Predicate<T> predicate) throws IOException {
final long done = EnvironmentEdgeManager.currentTime() + waitTime;
+ boolean logged = false;
do {
T result = predicate.evaluate();
if (result != null && !result.equals(Boolean.FALSE)) {
@@ -115,7 +186,12 @@ public final class ProcedureSyncWait {
LOG.warn("Interrupted while sleeping, waiting on " + purpose);
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
}
- LOG.debug("Waiting on " + purpose);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("waitFor " + purpose);
+ } else {
+ if (!logged) LOG.debug("waitFor " + purpose);
+ }
+ logged = true;
} while (EnvironmentEdgeManager.currentTime() < done && env.isRunning());
throw new TimeoutIOException("Timed out while waiting on " + purpose);
@@ -133,44 +209,14 @@ public final class ProcedureSyncWait {
}
}
- protected static void waitRegionServers(final MasterProcedureEnv env) throws IOException {
- final ServerManager sm = env.getMasterServices().getServerManager();
- ProcedureSyncWait.waitFor(env, "server to assign region(s)",
- new ProcedureSyncWait.Predicate<Boolean>() {
- @Override
- public Boolean evaluate() throws IOException {
- List<ServerName> servers = sm.createDestinationServersList();
- return servers != null && !servers.isEmpty();
- }
- });
- }
-
- protected static List<HRegionInfo> getRegionsFromMeta(final MasterProcedureEnv env,
- final TableName tableName) throws IOException {
- return ProcedureSyncWait.waitFor(env, "regions of table=" + tableName + " from meta",
- new ProcedureSyncWait.Predicate<List<HRegionInfo>>() {
- @Override
- public List<HRegionInfo> evaluate() throws IOException {
- if (TableName.META_TABLE_NAME.equals(tableName)) {
- return new MetaTableLocator().getMetaRegions(env.getMasterServices().getZooKeeper());
- }
- return MetaTableAccessor.getTableRegions(env.getMasterServices().getConnection(),tableName);
- }
- });
- }
-
protected static void waitRegionInTransition(final MasterProcedureEnv env,
final List<HRegionInfo> regions) throws IOException, CoordinatedStateException {
- final AssignmentManager am = env.getMasterServices().getAssignmentManager();
- final RegionStates states = am.getRegionStates();
+ final RegionStates states = env.getAssignmentManager().getRegionStates();
for (final HRegionInfo region : regions) {
ProcedureSyncWait.waitFor(env, "regions " + region.getRegionNameAsString() + " in transition",
new ProcedureSyncWait.Predicate<Boolean>() {
@Override
public Boolean evaluate() throws IOException {
- if (states.isRegionInState(region, State.FAILED_OPEN)) {
- am.regionOffline(region);
- }
return !states.isRegionInTransition(region);
}
});