You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/30 23:51:22 UTC
[11/26] hbase git commit: HBASE-14614 Procedure v2 - Core Assignment
Manager (Matteo Bertozzi) Move to a new AssignmentManager,
one that describes Assignment using a State Machine built on top of
ProcedureV2 facility.
http://git-wip-us.apache.org/repos/asf/hbase/blob/b2925bc0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
index ced7abc..c3900dd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -107,10 +106,12 @@ public class CreateTableProcedure
setNextState(CreateTableState.CREATE_TABLE_ASSIGN_REGIONS);
break;
case CREATE_TABLE_ASSIGN_REGIONS:
- assignRegions(env, getTableName(), newRegions);
+ setEnablingState(env, getTableName());
+ addChildProcedure(env.getAssignmentManager().createAssignProcedures(newRegions));
setNextState(CreateTableState.CREATE_TABLE_UPDATE_DESC_CACHE);
break;
case CREATE_TABLE_UPDATE_DESC_CACHE:
+ setEnabledState(env, getTableName());
updateTableDescCache(env, getTableName());
setNextState(CreateTableState.CREATE_TABLE_POST_OPERATION);
break;
@@ -333,21 +334,21 @@ public class CreateTableProcedure
protected static List<HRegionInfo> addTableToMeta(final MasterProcedureEnv env,
final HTableDescriptor hTableDescriptor,
final List<HRegionInfo> regions) throws IOException {
- if (regions != null && regions.size() > 0) {
- ProcedureSyncWait.waitMetaRegions(env);
+ assert (regions != null && regions.size() > 0) : "expected at least 1 region, got " + regions;
- // Add regions to META
- addRegionsToMeta(env, hTableDescriptor, regions);
- // Add replicas if needed
- List<HRegionInfo> newRegions = addReplicas(env, hTableDescriptor, regions);
+ ProcedureSyncWait.waitMetaRegions(env);
- // Setup replication for region replicas if needed
- if (hTableDescriptor.getRegionReplication() > 1) {
- ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterConfiguration());
- }
- return newRegions;
+ // Add replicas if needed
+ List<HRegionInfo> newRegions = addReplicas(env, hTableDescriptor, regions);
+
+ // Add regions to META
+ addRegionsToMeta(env, hTableDescriptor, newRegions);
+
+ // Setup replication for region replicas if needed
+ if (hTableDescriptor.getRegionReplication() > 1) {
+ ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterConfiguration());
}
- return regions;
+ return newRegions;
}
/**
@@ -374,18 +375,16 @@ public class CreateTableProcedure
return hRegionInfos;
}
- protected static void assignRegions(final MasterProcedureEnv env,
- final TableName tableName, final List<HRegionInfo> regions) throws IOException {
- ProcedureSyncWait.waitRegionServers(env);
+ protected static void setEnablingState(final MasterProcedureEnv env, final TableName tableName)
+ throws IOException {
// Mark the table as Enabling
env.getMasterServices().getTableStateManager()
.setTableState(tableName, TableState.State.ENABLING);
+ }
- // Trigger immediate assignment of the regions in round-robin fashion
- final AssignmentManager assignmentManager = env.getMasterServices().getAssignmentManager();
- ModifyRegionUtils.assignRegions(assignmentManager, regions);
-
+ protected static void setEnabledState(final MasterProcedureEnv env, final TableName tableName)
+ throws IOException {
// Enable table
env.getMasterServices().getTableStateManager()
.setTableState(tableName, TableState.State.ENABLED);
http://git-wip-us.apache.org/repos/asf/hbase/blob/b2925bc0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
index 096172a..78bd715 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -106,7 +105,10 @@ public class DeleteColumnFamilyProcedure
setNextState(DeleteColumnFamilyState.DELETE_COLUMN_FAMILY_REOPEN_ALL_REGIONS);
break;
case DELETE_COLUMN_FAMILY_REOPEN_ALL_REGIONS:
- reOpenAllRegionsIfTableIsOnline(env);
+ if (env.getAssignmentManager().isTableEnabled(getTableName())) {
+ addChildProcedure(env.getAssignmentManager()
+ .createReopenProcedures(getRegionInfoList(env)));
+ }
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException(this + " unhandled state=" + state);
@@ -292,7 +294,8 @@ public class DeleteColumnFamilyProcedure
env.getMasterServices().getTableDescriptors().add(unmodifiedHTableDescriptor);
// Make sure regions are opened after table descriptor is updated.
- reOpenAllRegionsIfTableIsOnline(env);
+ //reOpenAllRegionsIfTableIsOnline(env);
+ // TODO: NUKE ROLLBACK!!!!
}
/**
@@ -316,25 +319,6 @@ public class DeleteColumnFamilyProcedure
}
/**
- * Last action from the procedure - executed when online schema change is supported.
- * @param env MasterProcedureEnv
- * @throws IOException
- */
- private void reOpenAllRegionsIfTableIsOnline(final MasterProcedureEnv env) throws IOException {
- // This operation only run when the table is enabled.
- if (!env.getMasterServices().getTableStateManager()
- .isTableState(getTableName(), TableState.State.ENABLED)) {
- return;
- }
-
- if (MasterDDLOperationHelper.reOpenAllRegions(env, getTableName(), getRegionInfoList(env))) {
- LOG.info("Completed delete column family operation on table " + getTableName());
- } else {
- LOG.warn("Error on reopening the regions on table " + getTableName());
- }
- }
-
- /**
* The procedure could be restarted from a different machine. If the variable is null, we need to
* retrieve it.
* @return traceEnabled
@@ -376,7 +360,8 @@ public class DeleteColumnFamilyProcedure
private List<HRegionInfo> getRegionInfoList(final MasterProcedureEnv env) throws IOException {
if (regionInfoList == null) {
- regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, getTableName());
+ regionInfoList = env.getAssignmentManager().getRegionStates()
+ .getRegionsOfTable(getTableName());
}
return regionInfoList;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b2925bc0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
index bda68eb..04dfc60 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.exceptions.HBaseException;
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
-import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.mob.MobConstants;
@@ -97,8 +96,8 @@ public class DeleteTableProcedure
}
// TODO: Move out... in the acquireLock()
- LOG.debug("waiting for '" + getTableName() + "' regions in transition");
- regions = ProcedureSyncWait.getRegionsFromMeta(env, getTableName());
+ LOG.debug("Waiting for '" + getTableName() + "' regions in transition");
+ regions = env.getAssignmentManager().getRegionStates().getRegionsOfTable(getTableName());
assert regions != null && !regions.isEmpty() : "unexpected 0 regions";
ProcedureSyncWait.waitRegionInTransition(env, regions);
@@ -350,8 +349,7 @@ public class DeleteTableProcedure
final TableName tableName) throws IOException {
Connection connection = env.getMasterServices().getConnection();
Scan tableScan = MetaTableAccessor.getScanForTableName(connection, tableName);
- try (Table metaTable =
- connection.getTable(TableName.META_TABLE_NAME)) {
+ try (Table metaTable = connection.getTable(TableName.META_TABLE_NAME)) {
List<Delete> deletes = new ArrayList<>();
try (ResultScanner resScanner = metaTable.getScanner(tableScan)) {
for (Result result : resScanner) {
@@ -385,11 +383,9 @@ public class DeleteTableProcedure
protected static void deleteAssignmentState(final MasterProcedureEnv env,
final TableName tableName) throws IOException {
- final AssignmentManager am = env.getMasterServices().getAssignmentManager();
-
// Clean up regions of the table in RegionStates.
LOG.debug("Removing '" + tableName + "' from region states.");
- am.getRegionStates().tableDeleted(tableName);
+ env.getMasterServices().getAssignmentManager().deleteTable(tableName);
// If entry for this table states, remove it.
LOG.debug("Marking '" + tableName + "' as deleted.");
http://git-wip-us.apache.org/repos/asf/hbase/blob/b2925bc0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
index b53ce45..409ca26 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
@@ -21,12 +21,9 @@ package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
@@ -34,17 +31,11 @@ import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.constraint.ConstraintException;
-import org.apache.hadoop.hbase.master.AssignmentManager;
-import org.apache.hadoop.hbase.master.BulkAssigner;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
-import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.DisableTableState;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.htrace.Trace;
@InterfaceAudience.Private
public class DisableTableProcedure
@@ -116,12 +107,8 @@ public class DisableTableProcedure
setNextState(DisableTableState.DISABLE_TABLE_MARK_REGIONS_OFFLINE);
break;
case DISABLE_TABLE_MARK_REGIONS_OFFLINE:
- if (markRegionsOffline(env, tableName, true) ==
- MarkRegionOfflineOpResult.MARK_ALL_REGIONS_OFFLINE_SUCCESSFUL) {
- setNextState(DisableTableState.DISABLE_TABLE_SET_DISABLED_TABLE_STATE);
- } else {
- LOG.trace("Retrying later to disable the missing regions");
- }
+ addChildProcedure(env.getAssignmentManager().createUnassignProcedures(tableName));
+ setNextState(DisableTableState.DISABLE_TABLE_SET_DISABLED_TABLE_STATE);
break;
case DISABLE_TABLE_SET_DISABLED_TABLE_STATE:
setTableStateToDisabled(env, tableName);
@@ -249,7 +236,7 @@ public class DisableTableProcedure
// set the state later on). A quick state check should be enough for us to move forward.
TableStateManager tsm = env.getMasterServices().getTableStateManager();
TableState.State state = tsm.getTableState(tableName);
- if(!state.equals(TableState.State.ENABLED)){
+ if (!state.equals(TableState.State.ENABLED)){
LOG.info("Table " + tableName + " isn't enabled;is "+state.name()+"; skipping disable");
setFailure("master-disable-table", new TableNotEnabledException(
tableName+" state is "+state.name()));
@@ -290,83 +277,6 @@ public class DisableTableProcedure
}
/**
- * Mark regions of the table offline with retries
- * @param env MasterProcedureEnv
- * @param tableName the target table
- * @param retryRequired whether to retry if the first run failed
- * @return whether the operation is fully completed or being interrupted.
- * @throws IOException
- */
- protected static MarkRegionOfflineOpResult markRegionsOffline(
- final MasterProcedureEnv env,
- final TableName tableName,
- final Boolean retryRequired) throws IOException {
- // Dev consideration: add a config to control max number of retry. For now, it is hard coded.
- int maxTry = (retryRequired ? 10 : 1);
- MarkRegionOfflineOpResult operationResult =
- MarkRegionOfflineOpResult.BULK_ASSIGN_REGIONS_FAILED;
- do {
- try {
- operationResult = markRegionsOffline(env, tableName);
- if (operationResult == MarkRegionOfflineOpResult.MARK_ALL_REGIONS_OFFLINE_SUCCESSFUL) {
- break;
- }
- maxTry--;
- } catch (Exception e) {
- LOG.warn("Received exception while marking regions online. tries left: " + maxTry, e);
- maxTry--;
- if (maxTry > 0) {
- continue; // we still have some retry left, try again.
- }
- throw e;
- }
- } while (maxTry > 0);
-
- if (operationResult != MarkRegionOfflineOpResult.MARK_ALL_REGIONS_OFFLINE_SUCCESSFUL) {
- LOG.warn("Some or all regions of the Table '" + tableName + "' were still online");
- }
-
- return operationResult;
- }
-
- /**
- * Mark regions of the table offline
- * @param env MasterProcedureEnv
- * @param tableName the target table
- * @return whether the operation is fully completed or being interrupted.
- * @throws IOException
- */
- private static MarkRegionOfflineOpResult markRegionsOffline(
- final MasterProcedureEnv env,
- final TableName tableName) throws IOException {
- // Get list of online regions that are of this table. Regions that are
- // already closed will not be included in this list; i.e. the returned
- // list is not ALL regions in a table, its all online regions according
- // to the in-memory state on this master.
- MarkRegionOfflineOpResult operationResult =
- MarkRegionOfflineOpResult.MARK_ALL_REGIONS_OFFLINE_SUCCESSFUL;
- final List<HRegionInfo> regions =
- env.getMasterServices().getAssignmentManager().getRegionStates()
- .getRegionsOfTable(tableName);
- if (regions.size() > 0) {
- LOG.info("Offlining " + regions.size() + " regions.");
-
- BulkDisabler bd = new BulkDisabler(env, tableName, regions);
- try {
- if (!bd.bulkAssign()) {
- operationResult = MarkRegionOfflineOpResult.BULK_ASSIGN_REGIONS_FAILED;
- }
- } catch (InterruptedException e) {
- LOG.warn("Disable was interrupted");
- // Preserve the interrupt.
- Thread.currentThread().interrupt();
- operationResult = MarkRegionOfflineOpResult.MARK_ALL_REGIONS_OFFLINE_INTERRUPTED;
- }
- }
- return operationResult;
- }
-
- /**
* Mark table state to Disabled
* @param env MasterProcedureEnv
* @throws IOException
@@ -428,64 +338,4 @@ public class DisableTableProcedure
}
}
}
-
- /**
- * Run bulk disable.
- */
- private static class BulkDisabler extends BulkAssigner {
- private final AssignmentManager assignmentManager;
- private final List<HRegionInfo> regions;
- private final TableName tableName;
- private final int waitingTimeForEvents;
-
- public BulkDisabler(final MasterProcedureEnv env, final TableName tableName,
- final List<HRegionInfo> regions) {
- super(env.getMasterServices());
- this.assignmentManager = env.getMasterServices().getAssignmentManager();
- this.tableName = tableName;
- this.regions = regions;
- this.waitingTimeForEvents =
- env.getMasterServices().getConfiguration()
- .getInt("hbase.master.event.waiting.time", 1000);
- }
-
- @Override
- protected void populatePool(ExecutorService pool) {
- RegionStates regionStates = assignmentManager.getRegionStates();
- for (final HRegionInfo region : regions) {
- if (regionStates.isRegionInTransition(region)
- && !regionStates.isRegionInState(region, RegionState.State.FAILED_CLOSE)) {
- continue;
- }
- pool.execute(Trace.wrap("DisableTableHandler.BulkDisabler", new Runnable() {
- @Override
- public void run() {
- assignmentManager.unassign(region);
- }
- }));
- }
- }
-
- @Override
- protected boolean waitUntilDone(long timeout) throws InterruptedException {
- long startTime = EnvironmentEdgeManager.currentTime();
- long remaining = timeout;
- List<HRegionInfo> regions = null;
- long lastLogTime = startTime;
- while (!server.isStopped() && remaining > 0) {
- Thread.sleep(waitingTimeForEvents);
- regions = assignmentManager.getRegionStates().getRegionsOfTable(tableName);
- long now = EnvironmentEdgeManager.currentTime();
- // Don't log more than once every ten seconds. Its obnoxious. And only log table regions
- // if we are waiting a while for them to go down...
- if (LOG.isDebugEnabled() && ((now - lastLogTime) > 10000)) {
- lastLogTime = now;
- LOG.debug("Disable waiting until done; " + remaining + " ms remaining; " + regions);
- }
- if (regions.isEmpty()) break;
- remaining = timeout - (now - startTime);
- }
- return regions != null && regions.isEmpty();
- }
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/b2925bc0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DispatchMergingRegionsProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DispatchMergingRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DispatchMergingRegionsProcedure.java
new file mode 100644
index 0000000..15ed429
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DispatchMergingRegionsProcedure.java
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.RegionLoad;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.UnknownRegionException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.MergeRegionException;
+import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
+import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
+import org.apache.hadoop.hbase.master.CatalogJanitor;
+import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.DispatchMergingRegionsState;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * The procedure to Merge a region in a table.
+ */
+@InterfaceAudience.Private
+public class DispatchMergingRegionsProcedure
+ extends AbstractStateMachineTableProcedure<DispatchMergingRegionsState> {
+ private static final Log LOG = LogFactory.getLog(DispatchMergingRegionsProcedure.class);
+
+ private final AtomicBoolean aborted = new AtomicBoolean(false);
+ private Boolean traceEnabled;
+ private AssignmentManager assignmentManager;
+ private int timeout;
+ private ServerName regionLocation;
+ private String regionsToMergeListFullName;
+ private String regionsToMergeListEncodedName;
+
+ private TableName tableName;
+ private HRegionInfo [] regionsToMerge;
+ private boolean forcible;
+
+ public DispatchMergingRegionsProcedure() {
+ this.traceEnabled = isTraceEnabled();
+ this.assignmentManager = null;
+ this.timeout = -1;
+ this.regionLocation = null;
+ this.regionsToMergeListFullName = null;
+ this.regionsToMergeListEncodedName = null;
+ }
+
+ public DispatchMergingRegionsProcedure(
+ final MasterProcedureEnv env,
+ final TableName tableName,
+ final HRegionInfo [] regionsToMerge,
+ final boolean forcible) {
+ super(env);
+ this.traceEnabled = isTraceEnabled();
+ this.assignmentManager = getAssignmentManager(env);
+ this.tableName = tableName;
+ // For now, we only merge 2 regions. It could be extended to more than 2 regions in
+ // the future.
+ assert(regionsToMerge.length == 2);
+ this.regionsToMerge = regionsToMerge;
+ this.forcible = forcible;
+
+ this.timeout = -1;
+ this.regionsToMergeListFullName = getRegionsToMergeListFullNameString();
+ this.regionsToMergeListEncodedName = getRegionsToMergeListEncodedNameString();
+ }
+
+ @Override
+ protected Flow executeFromState(
+ final MasterProcedureEnv env,
+ final DispatchMergingRegionsState state) throws InterruptedException {
+ if (isTraceEnabled()) {
+ LOG.trace(this + " execute state=" + state);
+ }
+
+ try {
+ switch (state) {
+ case DISPATCH_MERGING_REGIONS_PREPARE:
+ prepareMergeRegion(env);
+ setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_PRE_OPERATION);
+ break;
+ case DISPATCH_MERGING_REGIONS_PRE_OPERATION:
+ //Unused for now - reserve to add preMerge coprocessor in the future
+ setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_MOVE_REGION_TO_SAME_RS);
+ break;
+ case DISPATCH_MERGING_REGIONS_MOVE_REGION_TO_SAME_RS:
+ if (MoveRegionsToSameRS(env)) {
+ setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_DO_MERGE_IN_RS);
+ } else {
+ LOG.info("Cancel merging regions " + getRegionsToMergeListFullNameString()
+ + ", because can't move them to the same RS");
+ setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_POST_OPERATION);
+ }
+ break;
+ case DISPATCH_MERGING_REGIONS_DO_MERGE_IN_RS:
+ doMergeInRS(env);
+ setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_POST_OPERATION);
+ break;
+ case DISPATCH_MERGING_REGIONS_POST_OPERATION:
+ //Unused for now - reserve to add postCompletedMerge coprocessor in the future
+ return Flow.NO_MORE_STATE;
+ default:
+ throw new UnsupportedOperationException(this + " unhandled state=" + state);
+ }
+ } catch (IOException e) {
+ LOG.warn("Error trying to merge regions " + getRegionsToMergeListFullNameString() +
+ " in the table " + tableName + " (in state=" + state + ")", e);
+
+ setFailure("master-merge-regions", e);
+ }
+ return Flow.HAS_MORE_STATE;
+ }
+
+ @Override
+ protected void rollbackState(
+ final MasterProcedureEnv env,
+ final DispatchMergingRegionsState state) throws IOException, InterruptedException {
+ if (isTraceEnabled()) {
+ LOG.trace(this + " rollback state=" + state);
+ }
+
+ try {
+ switch (state) {
+ case DISPATCH_MERGING_REGIONS_POST_OPERATION:
+ case DISPATCH_MERGING_REGIONS_DO_MERGE_IN_RS:
+ String msg = this + " We are in the " + state + " state."
+ + " It is complicated to rollback the merge operation that region server is working on."
+ + " Rollback is not supported and we should let the merge operation to complete";
+ LOG.warn(msg);
+ // PONR
+ throw new UnsupportedOperationException(this + " unhandled state=" + state);
+ case DISPATCH_MERGING_REGIONS_MOVE_REGION_TO_SAME_RS:
+ break; // nothing to rollback
+ case DISPATCH_MERGING_REGIONS_PRE_OPERATION:
+ break; // nothing to rollback
+ case DISPATCH_MERGING_REGIONS_PREPARE:
+ break; // nothing to rollback
+ default:
+ throw new UnsupportedOperationException(this + " unhandled state=" + state);
+ }
+ } catch (Exception e) {
+ // This will be retried. Unless there is a bug in the code,
+ // this should be just a "temporary error" (e.g. network down)
+ LOG.warn("Failed rollback attempt step " + state + " for merging the regions "
+ + getRegionsToMergeListFullNameString() + " in table " + tableName, e);
+ throw e;
+ }
+ }
+
+ @Override
+ protected DispatchMergingRegionsState getState(final int stateId) {
+ return DispatchMergingRegionsState.valueOf(stateId);
+ }
+
+ @Override
+ protected int getStateId(final DispatchMergingRegionsState state) {
+ return state.getNumber();
+ }
+
+ @Override
+ protected DispatchMergingRegionsState getInitialState() {
+ return DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_PREPARE;
+ }
+
+ /*
+ * Check whether we are in the state that can be rollback
+ */
+ @Override
+ protected boolean isRollbackSupported(final DispatchMergingRegionsState state) {
+ switch (state) {
+ case DISPATCH_MERGING_REGIONS_POST_OPERATION:
+ case DISPATCH_MERGING_REGIONS_DO_MERGE_IN_RS:
+ // It is not safe to rollback if we reach to these states.
+ return false;
+ default:
+ break;
+ }
+ return true;
+ }
+
+ @Override
+ public void serializeStateData(final OutputStream stream) throws IOException {
+ super.serializeStateData(stream);
+
+ MasterProcedureProtos.DispatchMergingRegionsStateData.Builder dispatchMergingRegionsMsg =
+ MasterProcedureProtos.DispatchMergingRegionsStateData.newBuilder()
+ .setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
+ .setTableName(ProtobufUtil.toProtoTableName(tableName))
+ .setForcible(forcible);
+ for (HRegionInfo hri: regionsToMerge) {
+ dispatchMergingRegionsMsg.addRegionInfo(HRegionInfo.convert(hri));
+ }
+ dispatchMergingRegionsMsg.build().writeDelimitedTo(stream);
+ }
+
+ @Override
+ public void deserializeStateData(final InputStream stream) throws IOException {
+ super.deserializeStateData(stream);
+
+ MasterProcedureProtos.DispatchMergingRegionsStateData dispatchMergingRegionsMsg =
+ MasterProcedureProtos.DispatchMergingRegionsStateData.parseDelimitedFrom(stream);
+ setUser(MasterProcedureUtil.toUserInfo(dispatchMergingRegionsMsg.getUserInfo()));
+ tableName = ProtobufUtil.toTableName(dispatchMergingRegionsMsg.getTableName());
+
+ assert(dispatchMergingRegionsMsg.getRegionInfoCount() == 2);
+ regionsToMerge = new HRegionInfo[dispatchMergingRegionsMsg.getRegionInfoCount()];
+ for (int i = 0; i < regionsToMerge.length; i++) {
+ regionsToMerge[i] = HRegionInfo.convert(dispatchMergingRegionsMsg.getRegionInfo(i));
+ }
+ }
+
+ @Override
+ public void toStringClassDetails(StringBuilder sb) {
+ sb.append(getClass().getSimpleName());
+ sb.append(" (table=");
+ sb.append(tableName);
+ sb.append(" regions=");
+ sb.append(getRegionsToMergeListFullNameString());
+ sb.append(" forcible=");
+ sb.append(forcible);
+ sb.append(")");
+ }
+
+ @Override
+ protected LockState acquireLock(final MasterProcedureEnv env) {
+ if (!getTableName().isSystemTable() && env.waitInitialized(this)) {
+ return LockState.LOCK_EVENT_WAIT;
+ }
+ if (env.getProcedureScheduler().waitRegions(this, getTableName(), regionsToMerge)) {
+ return LockState.LOCK_EVENT_WAIT;
+ }
+ return LockState.LOCK_ACQUIRED;
+ }
+
+ @Override
+ protected void releaseLock(final MasterProcedureEnv env) {
+ env.getProcedureScheduler().wakeRegions(this, getTableName(), regionsToMerge[0], regionsToMerge[1]);
+ }
+
+ @Override
+ public TableName getTableName() {
+ return tableName;
+ }
+
+ @Override
+ public TableOperationType getTableOperationType() {
+ return TableOperationType.REGION_MERGE;
+ }
+
+ /**
+ * Prepare merge and do some check
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ private void prepareMergeRegion(final MasterProcedureEnv env) throws IOException {
+ // Note: the following logic assumes that we only have 2 regions to merge. In the future,
+ // if we want to extend to more than 2 regions, the code needs to modify a little bit.
+ //
+ CatalogJanitor catalogJanitor = env.getMasterServices().getCatalogJanitor();
+ boolean regionAHasMergeQualifier = !catalogJanitor.cleanMergeQualifier(regionsToMerge[0]);
+ if (regionAHasMergeQualifier
+ || !catalogJanitor.cleanMergeQualifier(regionsToMerge[1])) {
+ String msg = "Skip merging regions " + regionsToMerge[0].getRegionNameAsString()
+ + ", " + regionsToMerge[1].getRegionNameAsString() + ", because region "
+ + (regionAHasMergeQualifier ? regionsToMerge[0].getEncodedName() : regionsToMerge[1]
+ .getEncodedName()) + " has merge qualifier";
+ LOG.info(msg);
+ throw new MergeRegionException(msg);
+ }
+
+ RegionStates regionStates = getAssignmentManager(env).getRegionStates();
+ RegionState regionStateA = regionStates.getRegionState(regionsToMerge[0].getEncodedName());
+ RegionState regionStateB = regionStates.getRegionState(regionsToMerge[1].getEncodedName());
+ if (regionStateA == null || regionStateB == null) {
+ throw new UnknownRegionException(
+ regionStateA == null ?
+ regionsToMerge[0].getEncodedName() : regionsToMerge[1].getEncodedName());
+ }
+
+ if (!regionStateA.isOpened() || !regionStateB.isOpened()) {
+ throw new MergeRegionException(
+ "Unable to merge regions not online " + regionStateA + ", " + regionStateB);
+ }
+
+ if (regionsToMerge[0].getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
+ regionsToMerge[1].getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
+ throw new MergeRegionException("Can't merge non-default replicas");
+ }
+
+ if (!forcible && !HRegionInfo.areAdjacent(regionsToMerge[0], regionsToMerge[1])) {
+ throw new MergeRegionException(
+ "Unable to merge not adjacent regions "
+ + regionsToMerge[0].getRegionNameAsString() + ", "
+ + regionsToMerge[1].getRegionNameAsString()
+ + " where forcible = " + forcible);
+ }
+ }
+
+ /**
+ * Move all regions to the same region server
+ * @param env MasterProcedureEnv
+ * @return whether target regions hosted by the same RS
+ * @throws IOException
+ */
+ private boolean MoveRegionsToSameRS(final MasterProcedureEnv env) throws IOException {
+ // Make sure regions are on the same regionserver before send merge
+ // regions request to region server.
+ //
+ boolean onSameRS = isRegionsOnTheSameServer(env);
+ if (!onSameRS) {
+ // Note: the following logic assumes that we only have 2 regions to merge. In the future,
+ // if we want to extend to more than 2 regions, the code needs to modify a little bit.
+ //
+ RegionStates regionStates = getAssignmentManager(env).getRegionStates();
+ ServerName regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[1]);
+
+ RegionLoad loadOfRegionA = getRegionLoad(env, regionLocation, regionsToMerge[0]);
+ RegionLoad loadOfRegionB = getRegionLoad(env, regionLocation2, regionsToMerge[1]);
+ if (loadOfRegionA != null && loadOfRegionB != null
+ && loadOfRegionA.getRequestsCount() < loadOfRegionB.getRequestsCount()) {
+ // switch regionsToMerge[0] and regionsToMerge[1]
+ HRegionInfo tmpRegion = this.regionsToMerge[0];
+ this.regionsToMerge[0] = this.regionsToMerge[1];
+ this.regionsToMerge[1] = tmpRegion;
+ ServerName tmpLocation = regionLocation;
+ regionLocation = regionLocation2;
+ regionLocation2 = tmpLocation;
+ }
+
+ long startTime = EnvironmentEdgeManager.currentTime();
+
+ RegionPlan regionPlan = new RegionPlan(regionsToMerge[1], regionLocation2, regionLocation);
+ LOG.info("Moving regions to same server for merge: " + regionPlan.toString());
+ getAssignmentManager(env).moveAsync(regionPlan);
+ do {
+ try {
+ Thread.sleep(20);
+ // Make sure check RIT first, then get region location, otherwise
+ // we would make a wrong result if region is online between getting
+ // region location and checking RIT
+ boolean isRIT = regionStates.isRegionInTransition(regionsToMerge[1]);
+ regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[1]);
+ onSameRS = regionLocation.equals(regionLocation2);
+ if (onSameRS || !isRIT) {
+ // Regions are on the same RS, or regionsToMerge[1] is not in
+ // RegionInTransition any more
+ break;
+ }
+ } catch (InterruptedException e) {
+ InterruptedIOException iioe = new InterruptedIOException();
+ iioe.initCause(e);
+ throw iioe;
+ }
+ } while ((EnvironmentEdgeManager.currentTime() - startTime) <= getTimeout(env));
+ }
+ return onSameRS;
+ }
+
+ /**
+ * Do the real merge operation in the region server that hosts regions
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ private void doMergeInRS(final MasterProcedureEnv env) throws IOException {
+ long duration = 0;
+ long startTime = EnvironmentEdgeManager.currentTime();
+ do {
+ try {
+ if (getServerName(env) == null) {
+ // The merge probably already happen. Check
+ RegionState regionState = getAssignmentManager(env).getRegionStates().getRegionState(
+ regionsToMerge[0].getEncodedName());
+ if (regionState.isMerging() || regionState.isMerged()) {
+ LOG.info("Merge regions " + getRegionsToMergeListEncodedNameString() +
+ " is in progress or completed. No need to send a new request.");
+ } else {
+ LOG.warn("Cannot sending merge to hosting server of the regions " +
+ getRegionsToMergeListEncodedNameString() + " as the server is unknown");
+ }
+ return;
+ }
+ // TODO: the following RPC call is not idempotent. Multiple calls (eg. after master
+ // failover, re-execute this step) could result in some exception thrown that does not
+ // paint the correct picture. This behavior is on-par with old releases. Improvement
+ // could happen in the future.
+ env.getMasterServices().getServerManager().sendRegionsMerge(
+ getServerName(env),
+ regionsToMerge[0],
+ regionsToMerge[1],
+ forcible,
+ getUser());
+ LOG.info("Sent merge to server " + getServerName(env) + " for region " +
+ getRegionsToMergeListEncodedNameString() + ", forcible=" + forcible);
+ return;
+ } catch (RegionOpeningException roe) {
+ // Do a retry since region should be online on RS immediately
+ LOG.warn("Failed mergering regions in " + getServerName(env) + ", retrying...", roe);
+ } catch (Exception ie) {
+ LOG.warn("Failed sending merge to " + getServerName(env) + " for regions " +
+ getRegionsToMergeListEncodedNameString() + ", forcible=" + forcible, ie);
+ return;
+ }
+ } while ((duration = EnvironmentEdgeManager.currentTime() - startTime) <= getTimeout(env));
+
+ // If we reaches here, it means that we get timed out.
+ String msg = "Failed sending merge to " + getServerName(env) + " after " + duration + "ms";
+ LOG.warn(msg);
+ throw new IOException(msg);
+ }
+
+ private RegionLoad getRegionLoad(
+ final MasterProcedureEnv env,
+ final ServerName sn,
+ final HRegionInfo hri) {
+ ServerManager serverManager = env.getMasterServices().getServerManager();
+ ServerLoad load = serverManager.getLoad(sn);
+ if (load != null) {
+ Map<byte[], RegionLoad> regionsLoad = load.getRegionsLoad();
+ if (regionsLoad != null) {
+ return regionsLoad.get(hri.getRegionName());
+ }
+ }
+ return null;
+ }
+
+ /**
+ * The procedure could be restarted from a different machine. If the variable is null, we need to
+ * retrieve it.
+ * @param env MasterProcedureEnv
+ * @return whether target regions hosted by the same RS
+ */
+ private boolean isRegionsOnTheSameServer(final MasterProcedureEnv env) throws IOException{
+ Boolean onSameRS = true;
+ int i = 0;
+ RegionStates regionStates = getAssignmentManager(env).getRegionStates();
+ regionLocation = regionStates.getRegionServerOfRegion(regionsToMerge[i]);
+ if (regionLocation != null) {
+ for(i = 1; i < regionsToMerge.length; i++) {
+ ServerName regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[i]);
+ if (regionLocation2 != null) {
+ if (onSameRS) {
+ onSameRS = regionLocation.equals(regionLocation2);
+ }
+ } else {
+ // At least one region is not online, merge will fail, no need to continue.
+ break;
+ }
+ }
+ if (i == regionsToMerge.length) {
+ // Finish checking all regions, return the result;
+ return onSameRS;
+ }
+ }
+
+ // If reaching here, at least one region is not online.
+ String msg = "Skip merging regions " + getRegionsToMergeListFullNameString() +
+ ", because region " + regionsToMerge[i].getEncodedName() + " is not online now.";
+ LOG.warn(msg);
+ throw new IOException(msg);
+ }
+
+ /**
+ * The procedure could be restarted from a different machine. If the variable is null, we need to
+ * retrieve it.
+ * @param env MasterProcedureEnv
+ * @return assignmentManager
+ */
+ private AssignmentManager getAssignmentManager(final MasterProcedureEnv env) {
+ if (assignmentManager == null) {
+ assignmentManager = env.getMasterServices().getAssignmentManager();
+ }
+ return assignmentManager;
+ }
+
+ /**
+ * The procedure could be restarted from a different machine. If the variable is null, we need to
+ * retrieve it.
+ * @param env MasterProcedureEnv
+ * @return timeout value
+ */
+ private int getTimeout(final MasterProcedureEnv env) {
+ if (timeout == -1) {
+ timeout = env.getMasterConfiguration().getInt(
+ "hbase.master.regionmerge.timeout", regionsToMerge.length * 60 * 1000);
+ }
+ return timeout;
+ }
+
+ /**
+ * The procedure could be restarted from a different machine. If the variable is null, we need to
+ * retrieve it.
+ * @param env MasterProcedureEnv
+ * @return serverName
+ */
+ private ServerName getServerName(final MasterProcedureEnv env) {
+ if (regionLocation == null) {
+ regionLocation =
+ getAssignmentManager(env).getRegionStates().getRegionServerOfRegion(regionsToMerge[0]);
+ }
+ return regionLocation;
+ }
+
+ /**
+ * The procedure could be restarted from a different machine. If the variable is null, we need to
+ * retrieve it.
+ * @param fullName whether return only encoded name
+ * @return region names in a list
+ */
+ private String getRegionsToMergeListFullNameString() {
+ if (regionsToMergeListFullName == null) {
+ StringBuilder sb = new StringBuilder("[");
+ int i = 0;
+ while(i < regionsToMerge.length - 1) {
+ sb.append(regionsToMerge[i].getRegionNameAsString() + ", ");
+ i++;
+ }
+ sb.append(regionsToMerge[i].getRegionNameAsString() + " ]");
+ regionsToMergeListFullName = sb.toString();
+ }
+ return regionsToMergeListFullName;
+ }
+
+ /**
+ * The procedure could be restarted from a different machine. If the variable is null, we need to
+ * retrieve it.
+ * @return encoded region names
+ */
+ private String getRegionsToMergeListEncodedNameString() {
+ if (regionsToMergeListEncodedName == null) {
+ StringBuilder sb = new StringBuilder("[");
+ int i = 0;
+ while(i < regionsToMerge.length - 1) {
+ sb.append(regionsToMerge[i].getEncodedName() + ", ");
+ i++;
+ }
+ sb.append(regionsToMerge[i].getEncodedName() + " ]");
+ regionsToMergeListEncodedName = sb.toString();
+ }
+ return regionsToMergeListEncodedName;
+ }
+
+ /**
+ * The procedure could be restarted from a different machine. If the variable is null, we need to
+ * retrieve it.
+ * @return traceEnabled
+ */
+ private Boolean isTraceEnabled() {
+ if (traceEnabled == null) {
+ traceEnabled = LOG.isTraceEnabled();
+ }
+ return traceEnabled;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b2925bc0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java
index 4d67edd..4f4b5b1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java
@@ -21,34 +21,20 @@ package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.master.AssignmentManager;
-import org.apache.hadoop.hbase.master.BulkAssigner;
-import org.apache.hadoop.hbase.master.GeneralBulkAssigner;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
-import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.master.RegionStates;
-import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.EnableTableState;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
@InterfaceAudience.Private
public class EnableTableProcedure
@@ -114,7 +100,7 @@ public class EnableTableProcedure
setNextState(EnableTableState.ENABLE_TABLE_MARK_REGIONS_ONLINE);
break;
case ENABLE_TABLE_MARK_REGIONS_ONLINE:
- markRegionsOnline(env, tableName, true);
+ addChildProcedure(env.getAssignmentManager().createAssignProcedures(tableName));
setNextState(EnableTableState.ENABLE_TABLE_SET_ENABLED_TABLE_STATE);
break;
case ENABLE_TABLE_SET_ENABLED_TABLE_STATE:
@@ -287,137 +273,6 @@ public class EnableTableProcedure
}
/**
- * Mark offline regions of the table online with retry
- * @param env MasterProcedureEnv
- * @param tableName the target table
- * @param retryRequired whether to retry if the first run failed
- * @throws IOException
- */
- protected static void markRegionsOnline(
- final MasterProcedureEnv env,
- final TableName tableName,
- final Boolean retryRequired) throws IOException {
- // This is best effort approach to make all regions of a table online. If we fail to do
- // that, it is ok that the table has some offline regions; user can fix it manually.
-
- // Dev consideration: add a config to control max number of retry. For now, it is hard coded.
- int maxTry = (retryRequired ? 10 : 1);
- boolean done = false;
-
- do {
- try {
- done = markRegionsOnline(env, tableName);
- if (done) {
- break;
- }
- maxTry--;
- } catch (Exception e) {
- LOG.warn("Received exception while marking regions online. tries left: " + maxTry, e);
- maxTry--;
- if (maxTry > 0) {
- continue; // we still have some retry left, try again.
- }
- throw e;
- }
- } while (maxTry > 0);
-
- if (!done) {
- LOG.warn("Some or all regions of the Table '" + tableName + "' were offline");
- }
- }
-
- /**
- * Mark offline regions of the table online
- * @param env MasterProcedureEnv
- * @param tableName the target table
- * @return whether the operation is fully completed or being interrupted.
- * @throws IOException
- */
- private static boolean markRegionsOnline(final MasterProcedureEnv env, final TableName tableName)
- throws IOException {
- final AssignmentManager assignmentManager = env.getMasterServices().getAssignmentManager();
- final MasterServices masterServices = env.getMasterServices();
- final ServerManager serverManager = masterServices.getServerManager();
- boolean done = false;
- // Get the regions of this table. We're done when all listed
- // tables are onlined.
- List<Pair<HRegionInfo, ServerName>> tableRegionsAndLocations;
-
- if (TableName.META_TABLE_NAME.equals(tableName)) {
- tableRegionsAndLocations =
- new MetaTableLocator().getMetaRegionsAndLocations(masterServices.getZooKeeper());
- } else {
- tableRegionsAndLocations =
- MetaTableAccessor.getTableRegionsAndLocations(masterServices.getConnection(), tableName);
- }
-
- int countOfRegionsInTable = tableRegionsAndLocations.size();
- Map<HRegionInfo, ServerName> regionsToAssign =
- regionsToAssignWithServerName(env, tableRegionsAndLocations);
-
- // need to potentially create some regions for the replicas
- List<HRegionInfo> unrecordedReplicas =
- AssignmentManager.replicaRegionsNotRecordedInMeta(new HashSet<>(
- regionsToAssign.keySet()), masterServices);
- Map<ServerName, List<HRegionInfo>> srvToUnassignedRegs =
- assignmentManager.getBalancer().roundRobinAssignment(unrecordedReplicas,
- serverManager.getOnlineServersList());
- if (srvToUnassignedRegs != null) {
- for (Map.Entry<ServerName, List<HRegionInfo>> entry : srvToUnassignedRegs.entrySet()) {
- for (HRegionInfo h : entry.getValue()) {
- regionsToAssign.put(h, entry.getKey());
- }
- }
- }
-
- int offlineRegionsCount = regionsToAssign.size();
-
- LOG.info("Table '" + tableName + "' has " + countOfRegionsInTable + " regions, of which "
- + offlineRegionsCount + " are offline.");
- if (offlineRegionsCount == 0) {
- return true;
- }
-
- List<ServerName> onlineServers = serverManager.createDestinationServersList();
- Map<ServerName, List<HRegionInfo>> bulkPlan =
- env.getMasterServices().getAssignmentManager().getBalancer()
- .retainAssignment(regionsToAssign, onlineServers);
- if (bulkPlan != null) {
- LOG.info("Bulk assigning " + offlineRegionsCount + " region(s) across " + bulkPlan.size()
- + " server(s), retainAssignment=true");
-
- BulkAssigner ba = new GeneralBulkAssigner(masterServices, bulkPlan, assignmentManager, true);
- try {
- if (ba.bulkAssign()) {
- done = true;
- }
- } catch (InterruptedException e) {
- LOG.warn("Enable operation was interrupted when enabling table '" + tableName + "'");
- // Preserve the interrupt.
- Thread.currentThread().interrupt();
- }
- } else {
- LOG.info("Balancer was unable to find suitable servers for table " + tableName
- + ", leaving unassigned");
- }
- return done;
- }
-
- /**
- * Mark regions of the table offline during recovery
- * @param env MasterProcedureEnv
- */
- private void markRegionsOfflineDuringRecovery(final MasterProcedureEnv env) {
- try {
- // This is a best effort attempt. We will move on even it does not succeed. We will retry
- // several times until we giving up.
- DisableTableProcedure.markRegionsOffline(env, tableName, true);
- } catch (Exception e) {
- LOG.debug("Failed to offline all regions of table " + tableName + ". Ignoring", e);
- }
- }
-
- /**
* Mark table state to Enabled
* @param env MasterProcedureEnv
* @throws IOException
@@ -457,31 +312,6 @@ public class EnableTableProcedure
}
/**
- * @param regionsInMeta
- * @return List of regions neither in transition nor assigned.
- * @throws IOException
- */
- private static Map<HRegionInfo, ServerName> regionsToAssignWithServerName(
- final MasterProcedureEnv env,
- final List<Pair<HRegionInfo, ServerName>> regionsInMeta) throws IOException {
- Map<HRegionInfo, ServerName> regionsToAssign = new HashMap<>(regionsInMeta.size());
- RegionStates regionStates = env.getMasterServices().getAssignmentManager().getRegionStates();
- for (Pair<HRegionInfo, ServerName> regionLocation : regionsInMeta) {
- HRegionInfo hri = regionLocation.getFirst();
- ServerName sn = regionLocation.getSecond();
- if (regionStates.isRegionOffline(hri)) {
- regionsToAssign.put(hri, sn);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping assign for the region " + hri + " during enable table "
- + hri.getTable() + " because its already in tranition or assigned.");
- }
- }
- }
- return regionsToAssign;
- }
-
- /**
* Coprocessor Action.
* @param env MasterProcedureEnv
* @param state the procedure state
http://git-wip-us.apache.org/repos/asf/hbase/blob/b2925bc0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterDDLOperationHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterDDLOperationHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterDDLOperationHelper.java
index 4b9a7ab..31d05a7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterDDLOperationHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterDDLOperationHelper.java
@@ -19,32 +19,19 @@
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.List;
-import java.util.NavigableMap;
-import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.master.AssignmentManager;
-import org.apache.hadoop.hbase.master.BulkReOpen;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.util.Bytes;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
/**
* Helper class for schema change procedures
*/
@@ -60,16 +47,13 @@ public final class MasterDDLOperationHelper {
public static void deleteColumnFamilyFromFileSystem(
final MasterProcedureEnv env,
final TableName tableName,
- List<HRegionInfo> regionInfoList,
+ final List<HRegionInfo> regionInfoList,
final byte[] familyName,
- boolean hasMob) throws IOException {
+ final boolean hasMob) throws IOException {
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
if (LOG.isDebugEnabled()) {
LOG.debug("Removing family=" + Bytes.toString(familyName) + " from table=" + tableName);
}
- if (regionInfoList == null) {
- regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, tableName);
- }
for (HRegionInfo hri : regionInfoList) {
// Delete the family directory in FS for all the regions one by one
mfs.deleteFamilyFromFS(hri, familyName);
@@ -81,77 +65,4 @@ public final class MasterDDLOperationHelper {
mfs.deleteFamilyFromFS(mobRootDir, mobRegionInfo, familyName);
}
}
-
- /**
- * Reopen all regions from a table after a schema change operation.
- **/
- public static boolean reOpenAllRegions(
- final MasterProcedureEnv env,
- final TableName tableName,
- final List<HRegionInfo> regionInfoList) throws IOException {
- boolean done = false;
- LOG.info("Bucketing regions by region server...");
- List<HRegionLocation> regionLocations = null;
- Connection connection = env.getMasterServices().getConnection();
- try (RegionLocator locator = connection.getRegionLocator(tableName)) {
- regionLocations = locator.getAllRegionLocations();
- }
- // Convert List<HRegionLocation> to Map<HRegionInfo, ServerName>.
- NavigableMap<HRegionInfo, ServerName> hri2Sn = new TreeMap<>();
- for (HRegionLocation location : regionLocations) {
- hri2Sn.put(location.getRegionInfo(), location.getServerName());
- }
- TreeMap<ServerName, List<HRegionInfo>> serverToRegions = Maps.newTreeMap();
- List<HRegionInfo> reRegions = new ArrayList<>();
- for (HRegionInfo hri : regionInfoList) {
- ServerName sn = hri2Sn.get(hri);
- // Skip the offlined split parent region
- // See HBASE-4578 for more information.
- if (null == sn) {
- LOG.info("Skip " + hri);
- continue;
- }
- if (!serverToRegions.containsKey(sn)) {
- LinkedList<HRegionInfo> hriList = Lists.newLinkedList();
- serverToRegions.put(sn, hriList);
- }
- reRegions.add(hri);
- serverToRegions.get(sn).add(hri);
- }
-
- LOG.info("Reopening " + reRegions.size() + " regions on " + serverToRegions.size()
- + " region servers.");
- AssignmentManager am = env.getMasterServices().getAssignmentManager();
- am.setRegionsToReopen(reRegions);
- BulkReOpen bulkReopen = new BulkReOpen(env.getMasterServices(), serverToRegions, am);
- while (true) {
- try {
- if (bulkReopen.bulkReOpen()) {
- done = true;
- break;
- } else {
- LOG.warn("Timeout before reopening all regions");
- }
- } catch (InterruptedException e) {
- LOG.warn("Reopen was interrupted");
- // Preserve the interrupt.
- Thread.currentThread().interrupt();
- break;
- }
- }
- return done;
- }
-
- /**
- * Get the region info list of a table from meta if it is not already known by the caller.
- **/
- public static List<HRegionInfo> getRegionInfoList(
- final MasterProcedureEnv env,
- final TableName tableName,
- List<HRegionInfo> regionInfoList) throws IOException {
- if (regionInfoList == null) {
- regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, tableName);
- }
- return regionInfoList;
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b2925bc0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
index c21137d..f815bea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
@@ -29,7 +29,7 @@ public final class MasterProcedureConstants {
/** Number of threads used by the procedure executor */
public static final String MASTER_PROCEDURE_THREADS = "hbase.master.procedure.threads";
- public static final int DEFAULT_MIN_MASTER_PROCEDURE_THREADS = 4;
+ public static final int DEFAULT_MIN_MASTER_PROCEDURE_THREADS = 16;
/**
* Procedure replay sanity check. In case a WAL is missing or unreadable we
http://git-wip-us.apache.org/repos/asf/hbase/blob/b2925bc0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
index 2cd5b08..0f1c40f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
@@ -93,12 +94,19 @@ public class MasterProcedureEnv implements ConfigurationObserver {
}
}
+ private final RSProcedureDispatcher remoteDispatcher;
private final MasterProcedureScheduler procSched;
private final MasterServices master;
public MasterProcedureEnv(final MasterServices master) {
+ this(master, new RSProcedureDispatcher(master));
+ }
+
+ public MasterProcedureEnv(final MasterServices master,
+ final RSProcedureDispatcher remoteDispatcher) {
this.master = master;
this.procSched = new MasterProcedureScheduler(master.getConfiguration());
+ this.remoteDispatcher = remoteDispatcher;
}
public User getRequestUser() {
@@ -117,6 +125,10 @@ public class MasterProcedureEnv implements ConfigurationObserver {
return master.getConfiguration();
}
+ public AssignmentManager getAssignmentManager() {
+ return master.getAssignmentManager();
+ }
+
public MasterCoprocessorHost getMasterCoprocessorHost() {
return master.getMasterCoprocessorHost();
}
@@ -125,7 +137,12 @@ public class MasterProcedureEnv implements ConfigurationObserver {
return procSched;
}
+ public RSProcedureDispatcher getRemoteDispatcher() {
+ return remoteDispatcher;
+ }
+
public boolean isRunning() {
+ if (this.master == null || this.master.getMasterProcedureExecutor() == null) return false;
return master.getMasterProcedureExecutor().isRunning();
}
@@ -134,11 +151,18 @@ public class MasterProcedureEnv implements ConfigurationObserver {
}
public boolean waitInitialized(Procedure proc) {
- return procSched.waitEvent(((HMaster)master).getInitializedEvent(), proc);
+ return procSched.waitEvent(master.getInitializedEvent(), proc);
}
public boolean waitServerCrashProcessingEnabled(Procedure proc) {
- return procSched.waitEvent(((HMaster)master).getServerCrashProcessingEnabledEvent(), proc);
+ if (master instanceof HMaster) {
+ return procSched.waitEvent(((HMaster)master).getServerCrashProcessingEnabledEvent(), proc);
+ }
+ return false;
+ }
+
+ public boolean waitFailoverCleanup(Procedure proc) {
+ return procSched.waitEvent(master.getAssignmentManager().getFailoverCleanupEvent(), proc);
}
public void setEventReady(ProcedureEvent event, boolean isReady) {
@@ -153,4 +177,4 @@ public class MasterProcedureEnv implements ConfigurationObserver {
public void onConfigurationChange(Configuration conf) {
master.getMasterProcedureExecutor().refreshConfiguration(conf);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b2925bc0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
index 15b557a..1410748 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
@@ -598,11 +598,13 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
return false;
// region operations are using the shared-lock on the table
// and then they will grab an xlock on the region.
- case SPLIT:
- case MERGE:
- case ASSIGN:
- case UNASSIGN:
+ case REGION_SPLIT:
+ case REGION_MERGE:
+ case REGION_ASSIGN:
+ case REGION_UNASSIGN:
case REGION_EDIT:
+ case REGION_GC:
+ case MERGED_REGIONS_GC:
return false;
default:
break;
@@ -815,7 +817,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
boolean hasLock = true;
final LockAndQueue[] regionLocks = new LockAndQueue[regionInfo.length];
for (int i = 0; i < regionInfo.length; ++i) {
- assert regionInfo[i].getTable().equals(table);
+ LOG.info(procedure + " " + table + " " + regionInfo[i].getRegionNameAsString());
+ assert table != null;
+ assert regionInfo[i] != null;
+ assert regionInfo[i].getTable() != null;
+ assert regionInfo[i].getTable().equals(table): regionInfo[i] + " " + procedure;
assert i == 0 || regionInfo[i] != regionInfo[i - 1] : "duplicate region: " + regionInfo[i];
regionLocks[i] = locking.getRegionLock(regionInfo[i].getEncodedName());
@@ -1254,7 +1260,12 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
*/
@VisibleForTesting
public String dumpLocks() throws IOException {
- // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter
- return this.locking.toString();
+ schedLock();
+ try {
+ // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter
+ return this.locking.toString();
+ } finally {
+ schedUnlock();
+ }
}
}