You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/06 03:35:18 UTC
[11/25] hbase git commit: HBASE-14614 Procedure v2 - Core Assignment
Manager (Matteo Bertozzi) Move to a new AssignmentManager,
one that describes Assignment using a State Machine built on top of
ProcedureV2 facility.
http://git-wip-us.apache.org/repos/asf/hbase/blob/ea447378/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java
deleted file mode 100644
index 3600fe0..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java
+++ /dev/null
@@ -1,906 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.master.procedure;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InterruptedIOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MetaMutationAnnotation;
-import org.apache.hadoop.hbase.RegionLoad;
-import org.apache.hadoop.hbase.ServerLoad;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.UnknownRegionException;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.exceptions.MergeRegionException;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.master.AssignmentManager;
-import org.apache.hadoop.hbase.master.CatalogJanitor;
-import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
-import org.apache.hadoop.hbase.master.MasterFileSystem;
-import org.apache.hadoop.hbase.master.RegionPlan;
-import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.master.RegionStates;
-import org.apache.hadoop.hbase.master.ServerManager;
-import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MergeTableRegionsState;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSUtils;
-
-/**
- * The procedure to Merge a region in a table.
- */
-@InterfaceAudience.Private
-public class MergeTableRegionsProcedure
- extends AbstractStateMachineTableProcedure<MergeTableRegionsState> {
- private static final Log LOG = LogFactory.getLog(MergeTableRegionsProcedure.class);
-
- private Boolean traceEnabled;
- private AssignmentManager assignmentManager;
- private int timeout;
- private ServerName regionLocation;
- private String regionsToMergeListFullName;
- private String regionsToMergeListEncodedName;
-
- private HRegionInfo [] regionsToMerge;
- private HRegionInfo mergedRegionInfo;
- private boolean forcible;
-
- public MergeTableRegionsProcedure() {
- this.traceEnabled = isTraceEnabled();
- this.assignmentManager = null;
- this.timeout = -1;
- this.regionLocation = null;
- this.regionsToMergeListFullName = null;
- this.regionsToMergeListEncodedName = null;
- }
-
- public MergeTableRegionsProcedure(
- final MasterProcedureEnv env,
- final HRegionInfo[] regionsToMerge,
- final boolean forcible) throws IOException {
- super(env);
- this.traceEnabled = isTraceEnabled();
- this.assignmentManager = getAssignmentManager(env);
- // For now, we only merge 2 regions. It could be extended to more than 2 regions in
- // the future.
- assert(regionsToMerge.length == 2);
- assert(regionsToMerge[0].getTable() == regionsToMerge[1].getTable());
- this.regionsToMerge = regionsToMerge;
- this.forcible = forcible;
-
- this.timeout = -1;
- this.regionsToMergeListFullName = getRegionsToMergeListFullNameString();
- this.regionsToMergeListEncodedName = getRegionsToMergeListEncodedNameString();
-
- // Check daughter regions and make sure that we have valid daughter regions before
- // doing the real work.
- checkDaughterRegions();
- // WARN: make sure there is no parent region of the two merging regions in
- // hbase:meta If exists, fixing up daughters would cause daughter regions(we
- // have merged one) online again when we restart master, so we should clear
- // the parent region to prevent the above case
- // Since HBASE-7721, we don't need fix up daughters any more. so here do
- // nothing
- setupMergedRegionInfo();
- }
-
- @Override
- protected Flow executeFromState(
- final MasterProcedureEnv env,
- final MergeTableRegionsState state) throws InterruptedException {
- if (isTraceEnabled()) {
- LOG.trace(this + " execute state=" + state);
- }
-
- try {
- switch (state) {
- case MERGE_TABLE_REGIONS_PREPARE:
- prepareMergeRegion(env);
- setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_MOVE_REGION_TO_SAME_RS);
- break;
- case MERGE_TABLE_REGIONS_MOVE_REGION_TO_SAME_RS:
- if (MoveRegionsToSameRS(env)) {
- setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION);
- } else {
- LOG.info("Cancel merging regions " + getRegionsToMergeListFullNameString()
- + ", because can't move them to the same RS");
- setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_OPERATION);
- }
- break;
- case MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION:
- preMergeRegions(env);
- setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE);
- break;
- case MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE:
- setRegionStateToMerging(env);
- setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CLOSE_REGIONS);
- break;
- case MERGE_TABLE_REGIONS_CLOSE_REGIONS:
- closeRegionsForMerge(env);
- setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CREATE_MERGED_REGION);
- break;
- case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION:
- createMergedRegion(env);
- setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION);
- break;
- case MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION:
- preMergeRegionsCommit(env);
- setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_UPDATE_META);
- break;
- case MERGE_TABLE_REGIONS_UPDATE_META:
- updateMetaForMergedRegions(env);
- setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION);
- break;
- case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION:
- postMergeRegionsCommit(env);
- setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_OPEN_MERGED_REGION);
- break;
- case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION:
- openMergedRegions(env);
- setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_OPERATION);
- break;
- case MERGE_TABLE_REGIONS_POST_OPERATION:
- postCompletedMergeRegions(env);
- return Flow.NO_MORE_STATE;
- default:
- throw new UnsupportedOperationException(this + " unhandled state=" + state);
- }
- } catch (IOException e) {
- LOG.warn("Error trying to merge regions " + getRegionsToMergeListFullNameString() +
- " in the table " + getTableName() + " (in state=" + state + ")", e);
-
- setFailure("master-merge-regions", e);
- }
- return Flow.HAS_MORE_STATE;
- }
-
- @Override
- protected void rollbackState(
- final MasterProcedureEnv env,
- final MergeTableRegionsState state) throws IOException, InterruptedException {
- if (isTraceEnabled()) {
- LOG.trace(this + " rollback state=" + state);
- }
-
- try {
- switch (state) {
- case MERGE_TABLE_REGIONS_POST_OPERATION:
- case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION:
- case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION:
- case MERGE_TABLE_REGIONS_UPDATE_META:
- String msg = this + " We are in the " + state + " state."
- + " It is complicated to rollback the merge operation that region server is working on."
- + " Rollback is not supported and we should let the merge operation to complete";
- LOG.warn(msg);
- // PONR
- throw new UnsupportedOperationException(this + " unhandled state=" + state);
- case MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION:
- break;
- case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION:
- cleanupMergedRegion(env);
- break;
- case MERGE_TABLE_REGIONS_CLOSE_REGIONS:
- rollbackCloseRegionsForMerge(env);
- break;
- case MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE:
- setRegionStateToRevertMerging(env);
- break;
- case MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION:
- postRollBackMergeRegions(env);
- break;
- case MERGE_TABLE_REGIONS_MOVE_REGION_TO_SAME_RS:
- break; // nothing to rollback
- case MERGE_TABLE_REGIONS_PREPARE:
- break; // nothing to rollback
- default:
- throw new UnsupportedOperationException(this + " unhandled state=" + state);
- }
- } catch (Exception e) {
- // This will be retried. Unless there is a bug in the code,
- // this should be just a "temporary error" (e.g. network down)
- LOG.warn("Failed rollback attempt step " + state + " for merging the regions "
- + getRegionsToMergeListFullNameString() + " in table " + getTableName(), e);
- throw e;
- }
- }
-
- /*
- * Check whether we are in the state that can be rollback
- */
- @Override
- protected boolean isRollbackSupported(final MergeTableRegionsState state) {
- switch (state) {
- case MERGE_TABLE_REGIONS_POST_OPERATION:
- case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION:
- case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION:
- case MERGE_TABLE_REGIONS_UPDATE_META:
- // It is not safe to rollback if we reach to these states.
- return false;
- default:
- break;
- }
- return true;
- }
-
- @Override
- protected MergeTableRegionsState getState(final int stateId) {
- return MergeTableRegionsState.forNumber(stateId);
- }
-
- @Override
- protected int getStateId(final MergeTableRegionsState state) {
- return state.getNumber();
- }
-
- @Override
- protected MergeTableRegionsState getInitialState() {
- return MergeTableRegionsState.MERGE_TABLE_REGIONS_PREPARE;
- }
-
- @Override
- public void serializeStateData(final OutputStream stream) throws IOException {
- super.serializeStateData(stream);
-
- MasterProcedureProtos.MergeTableRegionsStateData.Builder mergeTableRegionsMsg =
- MasterProcedureProtos.MergeTableRegionsStateData.newBuilder()
- .setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
- .setMergedRegionInfo(HRegionInfo.convert(mergedRegionInfo))
- .setForcible(forcible);
- for (HRegionInfo hri: regionsToMerge) {
- mergeTableRegionsMsg.addRegionInfo(HRegionInfo.convert(hri));
- }
- mergeTableRegionsMsg.build().writeDelimitedTo(stream);
- }
-
- @Override
- public void deserializeStateData(final InputStream stream) throws IOException {
- super.deserializeStateData(stream);
-
- MasterProcedureProtos.MergeTableRegionsStateData mergeTableRegionsMsg =
- MasterProcedureProtos.MergeTableRegionsStateData.parseDelimitedFrom(stream);
- setUser(MasterProcedureUtil.toUserInfo(mergeTableRegionsMsg.getUserInfo()));
-
- assert(mergeTableRegionsMsg.getRegionInfoCount() == 2);
- regionsToMerge = new HRegionInfo[mergeTableRegionsMsg.getRegionInfoCount()];
- for (int i = 0; i < regionsToMerge.length; i++) {
- regionsToMerge[i] = HRegionInfo.convert(mergeTableRegionsMsg.getRegionInfo(i));
- }
-
- mergedRegionInfo = HRegionInfo.convert(mergeTableRegionsMsg.getMergedRegionInfo());
- }
-
- @Override
- public void toStringClassDetails(StringBuilder sb) {
- sb.append(getClass().getSimpleName());
- sb.append(" (table=");
- sb.append(getTableName());
- sb.append(" regions=");
- sb.append(getRegionsToMergeListFullNameString());
- sb.append(" forcible=");
- sb.append(forcible);
- sb.append(")");
- }
-
- @Override
- protected LockState acquireLock(final MasterProcedureEnv env) {
- if (env.waitInitialized(this)) {
- return LockState.LOCK_EVENT_WAIT;
- }
- return env.getProcedureScheduler().waitRegions(this, getTableName(),
- regionsToMerge[0], regionsToMerge[1])?
- LockState.LOCK_EVENT_WAIT: LockState.LOCK_ACQUIRED;
- }
-
- @Override
- protected void releaseLock(final MasterProcedureEnv env) {
- env.getProcedureScheduler().wakeRegions(this, getTableName(),
- regionsToMerge[0], regionsToMerge[1]);
- }
-
- @Override
- public TableName getTableName() {
- return regionsToMerge[0].getTable();
- }
-
- @Override
- public TableOperationType getTableOperationType() {
- return TableOperationType.MERGE;
- }
-
- /**
- * check daughter regions
- * @throws IOException
- */
- private void checkDaughterRegions() throws IOException {
- // Note: the following logic assumes that we only have 2 regions to merge. In the future,
- // if we want to extend to more than 2 regions, the code needs to modify a little bit.
- //
- if (regionsToMerge[0].getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
- regionsToMerge[1].getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
- throw new MergeRegionException("Can't merge non-default replicas");
- }
-
- if (!HRegionInfo.areAdjacent(regionsToMerge[0], regionsToMerge[1])) {
- String msg = "Trying to merge non-adjacent regions "
- + getRegionsToMergeListFullNameString() + " where forcible = " + forcible;
- LOG.warn(msg);
- if (!forcible) {
- throw new DoNotRetryIOException(msg);
- }
- }
- }
-
- /**
- * Prepare merge and do some check
- * @param env MasterProcedureEnv
- * @throws IOException
- */
- private void prepareMergeRegion(final MasterProcedureEnv env) throws IOException {
- // Note: the following logic assumes that we only have 2 regions to merge. In the future,
- // if we want to extend to more than 2 regions, the code needs to modify a little bit.
- //
- CatalogJanitor catalogJanitor = env.getMasterServices().getCatalogJanitor();
- boolean regionAHasMergeQualifier = !catalogJanitor.cleanMergeQualifier(regionsToMerge[0]);
- if (regionAHasMergeQualifier
- || !catalogJanitor.cleanMergeQualifier(regionsToMerge[1])) {
- String msg = "Skip merging regions " + getRegionsToMergeListFullNameString()
- + ", because region "
- + (regionAHasMergeQualifier ? regionsToMerge[0].getEncodedName() : regionsToMerge[1]
- .getEncodedName()) + " has merge qualifier";
- LOG.warn(msg);
- throw new MergeRegionException(msg);
- }
-
- RegionStates regionStates = getAssignmentManager(env).getRegionStates();
- RegionState regionStateA = regionStates.getRegionState(regionsToMerge[0].getEncodedName());
- RegionState regionStateB = regionStates.getRegionState(regionsToMerge[1].getEncodedName());
- if (regionStateA == null || regionStateB == null) {
- throw new UnknownRegionException(
- regionStateA == null ?
- regionsToMerge[0].getEncodedName() : regionsToMerge[1].getEncodedName());
- }
-
- if (!regionStateA.isOpened() || !regionStateB.isOpened()) {
- throw new MergeRegionException(
- "Unable to merge regions not online " + regionStateA + ", " + regionStateB);
- }
- }
-
- /**
- * Create merged region info through the specified two regions
- */
- private void setupMergedRegionInfo() {
- long rid = EnvironmentEdgeManager.currentTime();
- // Regionid is timestamp. Merged region's id can't be less than that of
- // merging regions else will insert at wrong location in hbase:meta
- if (rid < regionsToMerge[0].getRegionId() || rid < regionsToMerge[1].getRegionId()) {
- LOG.warn("Clock skew; merging regions id are " + regionsToMerge[0].getRegionId()
- + " and " + regionsToMerge[1].getRegionId() + ", but current time here is " + rid);
- rid = Math.max(regionsToMerge[0].getRegionId(), regionsToMerge[1].getRegionId()) + 1;
- }
-
- byte[] startKey = null;
- byte[] endKey = null;
- // Choose the smaller as start key
- if (regionsToMerge[0].compareTo(regionsToMerge[1]) <= 0) {
- startKey = regionsToMerge[0].getStartKey();
- } else {
- startKey = regionsToMerge[1].getStartKey();
- }
- // Choose the bigger as end key
- if (Bytes.equals(regionsToMerge[0].getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
- || (!Bytes.equals(regionsToMerge[1].getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
- && Bytes.compareTo(regionsToMerge[0].getEndKey(), regionsToMerge[1].getEndKey()) > 0)) {
- endKey = regionsToMerge[0].getEndKey();
- } else {
- endKey = regionsToMerge[1].getEndKey();
- }
-
- // Merged region is sorted between two merging regions in META
- mergedRegionInfo = new HRegionInfo(getTableName(), startKey, endKey, false, rid);
- }
-
- /**
- * Move all regions to the same region server
- * @param env MasterProcedureEnv
- * @return whether target regions hosted by the same RS
- * @throws IOException
- */
- private boolean MoveRegionsToSameRS(final MasterProcedureEnv env) throws IOException {
- // Make sure regions are on the same regionserver before send merge
- // regions request to region server.
- //
- boolean onSameRS = isRegionsOnTheSameServer(env);
- if (!onSameRS) {
- // Note: the following logic assumes that we only have 2 regions to merge. In the future,
- // if we want to extend to more than 2 regions, the code needs to modify a little bit.
- //
- RegionStates regionStates = getAssignmentManager(env).getRegionStates();
- ServerName regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[1]);
-
- RegionLoad loadOfRegionA = getRegionLoad(env, regionLocation, regionsToMerge[0]);
- RegionLoad loadOfRegionB = getRegionLoad(env, regionLocation2, regionsToMerge[1]);
- if (loadOfRegionA != null && loadOfRegionB != null
- && loadOfRegionA.getRequestsCount() < loadOfRegionB.getRequestsCount()) {
- // switch regionsToMerge[0] and regionsToMerge[1]
- HRegionInfo tmpRegion = this.regionsToMerge[0];
- this.regionsToMerge[0] = this.regionsToMerge[1];
- this.regionsToMerge[1] = tmpRegion;
- ServerName tmpLocation = regionLocation;
- regionLocation = regionLocation2;
- regionLocation2 = tmpLocation;
- }
-
- long startTime = EnvironmentEdgeManager.currentTime();
-
- RegionPlan regionPlan = new RegionPlan(regionsToMerge[1], regionLocation2, regionLocation);
- LOG.info("Moving regions to same server for merge: " + regionPlan.toString());
- getAssignmentManager(env).balance(regionPlan);
- do {
- try {
- Thread.sleep(20);
- // Make sure check RIT first, then get region location, otherwise
- // we would make a wrong result if region is online between getting
- // region location and checking RIT
- boolean isRIT = regionStates.isRegionInTransition(regionsToMerge[1]);
- regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[1]);
- onSameRS = regionLocation.equals(regionLocation2);
- if (onSameRS || !isRIT) {
- // Regions are on the same RS, or regionsToMerge[1] is not in
- // RegionInTransition any more
- break;
- }
- } catch (InterruptedException e) {
- InterruptedIOException iioe = new InterruptedIOException();
- iioe.initCause(e);
- throw iioe;
- }
- } while ((EnvironmentEdgeManager.currentTime() - startTime) <= getTimeout(env));
- }
- return onSameRS;
- }
-
- /**
- * Pre merge region action
- * @param env MasterProcedureEnv
- **/
- private void preMergeRegions(final MasterProcedureEnv env) throws IOException {
- final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
- if (cpHost != null) {
- boolean ret = cpHost.preMergeRegionsAction(regionsToMerge, getUser());
- if (ret) {
- throw new IOException(
- "Coprocessor bypassing regions " + getRegionsToMergeListFullNameString() + " merge.");
- }
- }
- }
-
- /**
- * Action after rollback a merge table regions action.
- * @param env MasterProcedureEnv
- * @throws IOException
- */
- private void postRollBackMergeRegions(final MasterProcedureEnv env) throws IOException {
- final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
- if (cpHost != null) {
- cpHost.postRollBackMergeRegionsAction(regionsToMerge, getUser());
- }
- }
-
- /**
- * Set the region states to MERGING state
- * @param env MasterProcedureEnv
- * @throws IOException
- */
- public void setRegionStateToMerging(final MasterProcedureEnv env) throws IOException {
- RegionStateTransition.Builder transition = RegionStateTransition.newBuilder();
- transition.setTransitionCode(TransitionCode.READY_TO_MERGE);
- transition.addRegionInfo(HRegionInfo.convert(mergedRegionInfo));
- transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[0]));
- transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[1]));
- if (env.getMasterServices().getAssignmentManager().onRegionTransition(
- getServerName(env), transition.build()) != null) {
- throw new IOException("Failed to update region state to MERGING for "
- + getRegionsToMergeListFullNameString());
- }
- }
-
- /**
- * Rollback the region state change
- * @param env MasterProcedureEnv
- * @throws IOException
- */
- private void setRegionStateToRevertMerging(final MasterProcedureEnv env) throws IOException {
- RegionStateTransition.Builder transition = RegionStateTransition.newBuilder();
- transition.setTransitionCode(TransitionCode.MERGE_REVERTED);
- transition.addRegionInfo(HRegionInfo.convert(mergedRegionInfo));
- transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[0]));
- transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[1]));
- String msg = env.getMasterServices().getAssignmentManager().onRegionTransition(
- getServerName(env), transition.build());
- if (msg != null) {
- // If daughter regions are online, the msg is coming from RPC retry. Ignore it.
- RegionStates regionStates = getAssignmentManager(env).getRegionStates();
- if (!regionStates.isRegionOnline(regionsToMerge[0]) ||
- !regionStates.isRegionOnline(regionsToMerge[1])) {
- throw new IOException("Failed to update region state for "
- + getRegionsToMergeListFullNameString()
- + " as part of operation for reverting merge. Error message: " + msg);
- }
- }
- }
-
- /**
- * Create merged region
- * @param env MasterProcedureEnv
- * @throws IOException
- */
- private void createMergedRegion(final MasterProcedureEnv env) throws IOException {
- final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
- final Path tabledir = FSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
- final FileSystem fs = mfs.getFileSystem();
- HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
- env.getMasterConfiguration(), fs, tabledir, regionsToMerge[0], false);
- regionFs.createMergesDir();
-
- mergeStoreFiles(env, regionFs, regionFs.getMergesDir());
- HRegionFileSystem regionFs2 = HRegionFileSystem.openRegionFromFileSystem(
- env.getMasterConfiguration(), fs, tabledir, regionsToMerge[1], false);
- mergeStoreFiles(env, regionFs2, regionFs.getMergesDir());
-
- regionFs.commitMergedRegion(mergedRegionInfo);
- }
-
- /**
- * Create reference file(s) of merging regions under the merges directory
- * @param env MasterProcedureEnv
- * @param regionFs region file system
- * @param mergedDir the temp directory of merged region
- * @throws IOException
- */
- private void mergeStoreFiles(
- final MasterProcedureEnv env, final HRegionFileSystem regionFs, final Path mergedDir)
- throws IOException {
- final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
- final Configuration conf = env.getMasterConfiguration();
- final HTableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
-
- for (String family: regionFs.getFamilies()) {
- final HColumnDescriptor hcd = htd.getFamily(family.getBytes());
- final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
-
- if (storeFiles != null && storeFiles.size() > 0) {
- final CacheConfig cacheConf = new CacheConfig(conf, hcd);
- for (StoreFileInfo storeFileInfo: storeFiles) {
- // Create reference file(s) of the region in mergedDir
- regionFs.mergeStoreFile(mergedRegionInfo, family, new StoreFile(mfs.getFileSystem(),
- storeFileInfo, conf, cacheConf, hcd.getBloomFilterType(), true),
- mergedDir);
- }
- }
- }
- }
-
- /**
- * Clean up merged region
- * @param env MasterProcedureEnv
- * @throws IOException
- */
- private void cleanupMergedRegion(final MasterProcedureEnv env) throws IOException {
- final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
- final Path tabledir = FSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
- final FileSystem fs = mfs.getFileSystem();
- HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
- env.getMasterConfiguration(), fs, tabledir, regionsToMerge[0], false);
- regionFs.cleanupMergedRegion(mergedRegionInfo);
- }
-
- /**
- * RPC to region server that host the regions to merge, ask for close these regions
- * @param env MasterProcedureEnv
- * @throws IOException
- */
- private void closeRegionsForMerge(final MasterProcedureEnv env) throws IOException {
- boolean success = env.getMasterServices().getServerManager().sendRegionCloseForSplitOrMerge(
- getServerName(env), regionsToMerge[0], regionsToMerge[1]);
- if (!success) {
- throw new IOException("Close regions " + getRegionsToMergeListFullNameString()
- + " for merging failed. Check region server log for more details.");
- }
- }
-
- /**
- * Rollback close regions
- * @param env MasterProcedureEnv
- **/
- private void rollbackCloseRegionsForMerge(final MasterProcedureEnv env) throws IOException {
- // Check whether the region is closed; if so, open it in the same server
- RegionStates regionStates = getAssignmentManager(env).getRegionStates();
- for(int i = 1; i < regionsToMerge.length; i++) {
- RegionState state = regionStates.getRegionState(regionsToMerge[i]);
- if (state != null && (state.isClosing() || state.isClosed())) {
- env.getMasterServices().getServerManager().sendRegionOpen(
- getServerName(env),
- regionsToMerge[i],
- ServerName.EMPTY_SERVER_LIST);
- }
- }
- }
-
- /**
- * Post merge region action
- * @param env MasterProcedureEnv
- **/
- private void preMergeRegionsCommit(final MasterProcedureEnv env) throws IOException {
- final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
- if (cpHost != null) {
- @MetaMutationAnnotation
- final List<Mutation> metaEntries = new ArrayList<>();
- boolean ret = cpHost.preMergeRegionsCommit(regionsToMerge, metaEntries, getUser());
-
- if (ret) {
- throw new IOException(
- "Coprocessor bypassing regions " + getRegionsToMergeListFullNameString() + " merge.");
- }
- try {
- for (Mutation p : metaEntries) {
- HRegionInfo.parseRegionName(p.getRow());
- }
- } catch (IOException e) {
- LOG.error("Row key of mutation from coprocessor is not parsable as region name."
- + "Mutations from coprocessor should only be for hbase:meta table.", e);
- throw e;
- }
- }
- }
-
- /**
- * Add merged region to META and delete original regions.
- * @param env MasterProcedureEnv
- * @throws IOException
- */
- private void updateMetaForMergedRegions(final MasterProcedureEnv env) throws IOException {
- RegionStateTransition.Builder transition = RegionStateTransition.newBuilder();
- transition.setTransitionCode(TransitionCode.MERGE_PONR);
- transition.addRegionInfo(HRegionInfo.convert(mergedRegionInfo));
- transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[0]));
- transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[1]));
- // Add merged region and delete original regions
- // as an atomic update. See HBASE-7721. This update to hbase:meta makes the region
- // will determine whether the region is merged or not in case of failures.
- if (env.getMasterServices().getAssignmentManager().onRegionTransition(
- getServerName(env), transition.build()) != null) {
- throw new IOException("Failed to update meta to add merged region that merges "
- + getRegionsToMergeListFullNameString());
- }
- }
-
- /**
- * Post merge region action
- * @param env MasterProcedureEnv
- **/
- private void postMergeRegionsCommit(final MasterProcedureEnv env) throws IOException {
- final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
- if (cpHost != null) {
- cpHost.postMergeRegionsCommit(regionsToMerge, mergedRegionInfo, getUser());
- }
- }
-
- /**
- * Assign merged region
- * @param env MasterProcedureEnv
- * @throws IOException
- * @throws InterruptedException
- **/
- private void openMergedRegions(final MasterProcedureEnv env)
- throws IOException, InterruptedException {
- // Check whether the merged region is already opened; if so,
- // this is retry and we should just ignore.
- RegionState regionState =
- getAssignmentManager(env).getRegionStates().getRegionState(mergedRegionInfo);
- if (regionState != null && regionState.isOpened()) {
- LOG.info("Skip opening merged region " + mergedRegionInfo.getRegionNameAsString()
- + " as it is already opened.");
- return;
- }
-
- // TODO: The new AM should provide an API to force assign the merged region to the same RS
- // as daughter regions; if the RS is unavailable, then assign to a different RS.
- env.getMasterServices().getAssignmentManager().assignMergedRegion(
- mergedRegionInfo, regionsToMerge[0], regionsToMerge[1]);
- }
-
- /**
- * Post merge region action
- * @param env MasterProcedureEnv
- **/
- private void postCompletedMergeRegions(final MasterProcedureEnv env) throws IOException {
- final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
- if (cpHost != null) {
- cpHost.postCompletedMergeRegionsAction(regionsToMerge, mergedRegionInfo, getUser());
- }
- }
-
- private RegionLoad getRegionLoad(
- final MasterProcedureEnv env,
- final ServerName sn,
- final HRegionInfo hri) {
- ServerManager serverManager = env.getMasterServices().getServerManager();
- ServerLoad load = serverManager.getLoad(sn);
- if (load != null) {
- Map<byte[], RegionLoad> regionsLoad = load.getRegionsLoad();
- if (regionsLoad != null) {
- return regionsLoad.get(hri.getRegionName());
- }
- }
- return null;
- }
-
- /**
- * The procedure could be restarted from a different machine. If the variable is null, we need to
- * retrieve it.
- * @param env MasterProcedureEnv
- * @return whether target regions hosted by the same RS
- */
- private boolean isRegionsOnTheSameServer(final MasterProcedureEnv env) throws IOException{
- Boolean onSameRS = true;
- int i = 0;
- RegionStates regionStates = getAssignmentManager(env).getRegionStates();
- regionLocation = regionStates.getRegionServerOfRegion(regionsToMerge[i]);
- if (regionLocation != null) {
- for(i = 1; i < regionsToMerge.length; i++) {
- ServerName regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[i]);
- if (regionLocation2 != null) {
- if (onSameRS) {
- onSameRS = regionLocation.equals(regionLocation2);
- }
- } else {
- // At least one region is not online, merge will fail, no need to continue.
- break;
- }
- }
- if (i == regionsToMerge.length) {
- // Finish checking all regions, return the result;
- return onSameRS;
- }
- }
-
- // If reaching here, at least one region is not online.
- String msg = "Skip merging regions " + getRegionsToMergeListFullNameString() +
- ", because region " + regionsToMerge[i].getEncodedName() + " is not online now.";
- LOG.warn(msg);
- throw new IOException(msg);
- }
-
- /**
- * The procedure could be restarted from a different machine. If the variable is null, we need to
- * retrieve it.
- * @param env MasterProcedureEnv
- * @return assignmentManager
- */
- private AssignmentManager getAssignmentManager(final MasterProcedureEnv env) {
- if (assignmentManager == null) {
- assignmentManager = env.getMasterServices().getAssignmentManager();
- }
- return assignmentManager;
- }
-
- /**
- * The procedure could be restarted from a different machine. If the variable is null, we need to
- * retrieve it.
- * @param env MasterProcedureEnv
- * @return timeout value
- */
- private int getTimeout(final MasterProcedureEnv env) {
- if (timeout == -1) {
- timeout = env.getMasterConfiguration().getInt(
- "hbase.master.regionmerge.timeout", regionsToMerge.length * 60 * 1000);
- }
- return timeout;
- }
-
- /**
- * The procedure could be restarted from a different machine. If the variable is null, we need to
- * retrieve it.
- * @param env MasterProcedureEnv
- * @return serverName
- */
- private ServerName getServerName(final MasterProcedureEnv env) {
- if (regionLocation == null) {
- regionLocation =
- getAssignmentManager(env).getRegionStates().getRegionServerOfRegion(regionsToMerge[0]);
- }
- return regionLocation;
- }
-
- /**
- * The procedure could be restarted from a different machine. If the variable is null, we need to
- * retrieve it.
- * @param fullName whether return only encoded name
- * @return region names in a list
- */
- private String getRegionsToMergeListFullNameString() {
- if (regionsToMergeListFullName == null) {
- StringBuilder sb = new StringBuilder("[");
- int i = 0;
- while(i < regionsToMerge.length - 1) {
- sb.append(regionsToMerge[i].getRegionNameAsString() + ", ");
- i++;
- }
- sb.append(regionsToMerge[i].getRegionNameAsString() + " ]");
- regionsToMergeListFullName = sb.toString();
- }
- return regionsToMergeListFullName;
- }
-
- /**
- * The procedure could be restarted from a different machine. If the variable is null, we need to
- * retrieve it.
- * @return encoded region names
- */
- private String getRegionsToMergeListEncodedNameString() {
- if (regionsToMergeListEncodedName == null) {
- StringBuilder sb = new StringBuilder("[");
- int i = 0;
- while(i < regionsToMerge.length - 1) {
- sb.append(regionsToMerge[i].getEncodedName() + ", ");
- i++;
- }
- sb.append(regionsToMerge[i].getEncodedName() + " ]");
- regionsToMergeListEncodedName = sb.toString();
- }
- return regionsToMergeListEncodedName;
- }
-
- /**
- * The procedure could be restarted from a different machine. If the variable is null, we need to
- * retrieve it.
- * @return traceEnabled
- */
- private Boolean isTraceEnabled() {
- if (traceEnabled == null) {
- traceEnabled = LOG.isTraceEnabled();
- }
- return traceEnabled;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ea447378/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
index 52bb4d5..622c19f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
@@ -21,17 +21,14 @@ package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
@@ -97,7 +94,9 @@ public class ModifyColumnFamilyProcedure
setNextState(ModifyColumnFamilyState.MODIFY_COLUMN_FAMILY_REOPEN_ALL_REGIONS);
break;
case MODIFY_COLUMN_FAMILY_REOPEN_ALL_REGIONS:
- reOpenAllRegionsIfTableIsOnline(env);
+ if (env.getAssignmentManager().isTableEnabled(getTableName())) {
+ addChildProcedure(env.getAssignmentManager().createReopenProcedures(getTableName()));
+ }
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException(this + " unhandled state=" + state);
@@ -265,7 +264,8 @@ public class ModifyColumnFamilyProcedure
env.getMasterServices().getTableDescriptors().add(unmodifiedHTableDescriptor);
// Make sure regions are opened after table descriptor is updated.
- reOpenAllRegionsIfTableIsOnline(env);
+ //reOpenAllRegionsIfTableIsOnline(env);
+ // TODO: NUKE ROLLBACK!!!!
}
/**
@@ -281,26 +281,6 @@ public class ModifyColumnFamilyProcedure
}
/**
- * Last action from the procedure - executed when online schema change is supported.
- * @param env MasterProcedureEnv
- * @throws IOException
- */
- private void reOpenAllRegionsIfTableIsOnline(final MasterProcedureEnv env) throws IOException {
- // This operation only run when the table is enabled.
- if (!env.getMasterServices().getTableStateManager()
- .isTableState(getTableName(), TableState.State.ENABLED)) {
- return;
- }
-
- List<HRegionInfo> regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, getTableName());
- if (MasterDDLOperationHelper.reOpenAllRegions(env, getTableName(), regionInfoList)) {
- LOG.info("Completed add column family operation on table " + getTableName());
- } else {
- LOG.warn("Error on reopening the regions on table " + getTableName());
- }
- }
-
- /**
* The procedure could be restarted from a different machine. If the variable is null, we need to
* retrieve it.
* @return traceEnabled
http://git-wip-us.apache.org/repos/asf/hbase/blob/ea447378/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
index 6a70f62..20a6a03 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
@@ -120,7 +120,10 @@ public class ModifyTableProcedure
setNextState(ModifyTableState.MODIFY_TABLE_REOPEN_ALL_REGIONS);
break;
case MODIFY_TABLE_REOPEN_ALL_REGIONS:
- reOpenAllRegionsIfTableIsOnline(env);
+ if (env.getAssignmentManager().isTableEnabled(getTableName())) {
+ addChildProcedure(env.getAssignmentManager()
+ .createReopenProcedures(getRegionInfoList(env)));
+ }
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);
@@ -299,7 +302,8 @@ public class ModifyTableProcedure
deleteFromFs(env, modifiedHTableDescriptor, unmodifiedHTableDescriptor);
// Make sure regions are opened after table descriptor is updated.
- reOpenAllRegionsIfTableIsOnline(env);
+ //reOpenAllRegionsIfTableIsOnline(env);
+ // TODO: NUKE ROLLBACK!!!!
}
/**
@@ -374,25 +378,6 @@ public class ModifyTableProcedure
}
/**
- * Last action from the procedure - executed when online schema change is supported.
- * @param env MasterProcedureEnv
- * @throws IOException
- */
- private void reOpenAllRegionsIfTableIsOnline(final MasterProcedureEnv env) throws IOException {
- // This operation only run when the table is enabled.
- if (!env.getMasterServices().getTableStateManager()
- .isTableState(getTableName(), TableState.State.ENABLED)) {
- return;
- }
-
- if (MasterDDLOperationHelper.reOpenAllRegions(env, getTableName(), getRegionInfoList(env))) {
- LOG.info("Completed modify table operation on table " + getTableName());
- } else {
- LOG.warn("Error on reopening the regions on table " + getTableName());
- }
- }
-
- /**
* The procedure could be restarted from a different machine. If the variable is null, we need to
* retrieve it.
* @return traceEnabled whether the trace is enabled
@@ -430,7 +415,8 @@ public class ModifyTableProcedure
private List<HRegionInfo> getRegionInfoList(final MasterProcedureEnv env) throws IOException {
if (regionInfoList == null) {
- regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, getTableName());
+ regionInfoList = env.getAssignmentManager().getRegionStates()
+ .getRegionsOfTable(getTableName());
}
return regionInfoList;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ea447378/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
index 3777c79..21bd6c8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
@@ -21,30 +21,26 @@ package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateException;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.ProcedureInfo;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
-import org.apache.hadoop.hbase.master.AssignmentManager;
-import org.apache.hadoop.hbase.master.RegionState.State;
-import org.apache.hadoop.hbase.master.RegionStates;
-import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
/**
* Helper to synchronously wait on conditions.
@@ -64,19 +60,93 @@ public final class ProcedureSyncWait {
T evaluate() throws IOException;
}
+ private static class ProcedureFuture implements Future<byte[]> {
+ private final ProcedureExecutor<MasterProcedureEnv> procExec;
+ private final long procId;
+
+ private boolean hasResult = false;
+ private byte[] result = null;
+
+ public ProcedureFuture(ProcedureExecutor<MasterProcedureEnv> procExec, long procId) {
+ this.procExec = procExec;
+ this.procId = procId;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) { return false; }
+
+ @Override
+ public boolean isCancelled() { return false; }
+
+ @Override
+ public boolean isDone() { return hasResult; }
+
+ @Override
+ public byte[] get() throws InterruptedException, ExecutionException {
+ if (hasResult) return result;
+ try {
+ return waitForProcedureToComplete(procExec, procId, Long.MAX_VALUE);
+ } catch (Exception e) {
+ throw new ExecutionException(e);
+ }
+ }
+
+ @Override
+ public byte[] get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ if (hasResult) return result;
+ try {
+ result = waitForProcedureToComplete(procExec, procId, unit.toMillis(timeout));
+ hasResult = true;
+ return result;
+ } catch (TimeoutIOException e) {
+ throw new TimeoutException(e.getMessage());
+ } catch (Exception e) {
+ throw new ExecutionException(e);
+ }
+ }
+ }
+
+ public static Future<byte[]> submitProcedure(final ProcedureExecutor<MasterProcedureEnv> procExec,
+ final Procedure proc) {
+ if (proc.isInitializing()) {
+ procExec.submitProcedure(proc);
+ }
+ return new ProcedureFuture(procExec, proc.getProcId());
+ }
+
public static byte[] submitAndWaitProcedure(ProcedureExecutor<MasterProcedureEnv> procExec,
final Procedure proc) throws IOException {
- long procId = procExec.submitProcedure(proc);
- return waitForProcedureToComplete(procExec, procId);
+ if (proc.isInitializing()) {
+ procExec.submitProcedure(proc);
+ }
+ return waitForProcedureToCompleteIOE(procExec, proc.getProcId(), Long.MAX_VALUE);
}
- private static byte[] waitForProcedureToComplete(ProcedureExecutor<MasterProcedureEnv> procExec,
- final long procId) throws IOException {
- while (!procExec.isFinished(procId) && procExec.isRunning()) {
- // TODO: add a config to make it tunable
- // Dev Consideration: are we waiting forever, or we can set up some timeout value?
- Threads.sleepWithoutInterrupt(250);
+ public static byte[] waitForProcedureToCompleteIOE(
+ final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final long timeout)
+ throws IOException {
+ try {
+ return waitForProcedureToComplete(procExec, procId, timeout);
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IOException(e);
}
+ }
+
+ public static byte[] waitForProcedureToComplete(
+ final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final long timeout)
+ throws IOException {
+ waitFor(procExec.getEnvironment(), "procId=" + procId,
+ new ProcedureSyncWait.Predicate<Boolean>() {
+ @Override
+ public Boolean evaluate() throws IOException {
+ return !procExec.isRunning() || procExec.isFinished(procId);
+ }
+ }
+ );
+
ProcedureInfo result = procExec.getResult(procId);
if (result != null) {
if (result.isFailed()) {
@@ -104,6 +174,7 @@ public final class ProcedureSyncWait {
public static <T> T waitFor(MasterProcedureEnv env, long waitTime, long waitingTimeForEvents,
String purpose, Predicate<T> predicate) throws IOException {
final long done = EnvironmentEdgeManager.currentTime() + waitTime;
+ boolean logged = false;
do {
T result = predicate.evaluate();
if (result != null && !result.equals(Boolean.FALSE)) {
@@ -115,7 +186,12 @@ public final class ProcedureSyncWait {
LOG.warn("Interrupted while sleeping, waiting on " + purpose);
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
}
- LOG.debug("Waiting on " + purpose);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("waitFor " + purpose);
+ } else {
+ if (!logged) LOG.debug("waitFor " + purpose);
+ }
+ logged = true;
} while (EnvironmentEdgeManager.currentTime() < done && env.isRunning());
throw new TimeoutIOException("Timed out while waiting on " + purpose);
@@ -133,44 +209,14 @@ public final class ProcedureSyncWait {
}
}
- protected static void waitRegionServers(final MasterProcedureEnv env) throws IOException {
- final ServerManager sm = env.getMasterServices().getServerManager();
- ProcedureSyncWait.waitFor(env, "server to assign region(s)",
- new ProcedureSyncWait.Predicate<Boolean>() {
- @Override
- public Boolean evaluate() throws IOException {
- List<ServerName> servers = sm.createDestinationServersList();
- return servers != null && !servers.isEmpty();
- }
- });
- }
-
- protected static List<HRegionInfo> getRegionsFromMeta(final MasterProcedureEnv env,
- final TableName tableName) throws IOException {
- return ProcedureSyncWait.waitFor(env, "regions of table=" + tableName + " from meta",
- new ProcedureSyncWait.Predicate<List<HRegionInfo>>() {
- @Override
- public List<HRegionInfo> evaluate() throws IOException {
- if (TableName.META_TABLE_NAME.equals(tableName)) {
- return new MetaTableLocator().getMetaRegions(env.getMasterServices().getZooKeeper());
- }
- return MetaTableAccessor.getTableRegions(env.getMasterServices().getConnection(),tableName);
- }
- });
- }
-
protected static void waitRegionInTransition(final MasterProcedureEnv env,
final List<HRegionInfo> regions) throws IOException, CoordinatedStateException {
- final AssignmentManager am = env.getMasterServices().getAssignmentManager();
- final RegionStates states = am.getRegionStates();
+ final RegionStates states = env.getAssignmentManager().getRegionStates();
for (final HRegionInfo region : regions) {
ProcedureSyncWait.waitFor(env, "regions " + region.getRegionNameAsString() + " in transition",
new ProcedureSyncWait.Predicate<Boolean>() {
@Override
public Boolean evaluate() throws IOException {
- if (states.isRegionInState(region, State.FAILED_OPEN)) {
- am.regionOffline(region);
- }
return !states.isRegionInTransition(region);
}
});
http://git-wip-us.apache.org/repos/asf/hbase/blob/ea447378/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
new file mode 100644
index 0000000..887e272
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
@@ -0,0 +1,541 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import com.google.common.collect.ArrayListMultimap;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.ServerListener;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * A remote procecdure dispatcher for regionservers.
+ */
+public class RSProcedureDispatcher
+ extends RemoteProcedureDispatcher<MasterProcedureEnv, ServerName>
+ implements ServerListener {
+ private static final Log LOG = LogFactory.getLog(RSProcedureDispatcher.class);
+
+ public static final String RS_RPC_STARTUP_WAIT_TIME_CONF_KEY =
+ "hbase.regionserver.rpc.startup.waittime";
+ private static final int DEFAULT_RS_RPC_STARTUP_WAIT_TIME = 60000;
+
+ private static final int RS_VERSION_WITH_EXEC_PROCS = 0x0201000; // 2.1
+
+ protected final MasterServices master;
+ protected final long rsStartupWaitTime;
+
+ public RSProcedureDispatcher(final MasterServices master) {
+ super(master.getConfiguration());
+
+ this.master = master;
+ this.rsStartupWaitTime = master.getConfiguration().getLong(
+ RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, DEFAULT_RS_RPC_STARTUP_WAIT_TIME);
+ }
+
+ @Override
+ public boolean start() {
+ if (!super.start()) {
+ return false;
+ }
+
+ master.getServerManager().registerListener(this);
+ for (ServerName serverName: master.getServerManager().getOnlineServersList()) {
+ addNode(serverName);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean stop() {
+ if (!super.stop()) {
+ return false;
+ }
+
+ master.getServerManager().unregisterListener(this);
+ return true;
+ }
+
+ @Override
+ protected void remoteDispatch(final ServerName serverName,
+ final Set<RemoteProcedure> operations) {
+ final int rsVersion = master.getAssignmentManager().getServerVersion(serverName);
+ if (rsVersion >= RS_VERSION_WITH_EXEC_PROCS) {
+ LOG.info(String.format(
+ "Using procedure batch rpc execution for serverName=%s version=%s",
+ serverName, rsVersion));
+ submitTask(new ExecuteProceduresRemoteCall(serverName, operations));
+ } else {
+ LOG.info(String.format(
+ "Fallback to compat rpc execution for serverName=%s version=%s",
+ serverName, rsVersion));
+ submitTask(new CompatRemoteProcedureResolver(serverName, operations));
+ }
+ }
+
+ protected void abortPendingOperations(final ServerName serverName,
+ final Set<RemoteProcedure> operations) {
+ // TODO: Replace with a ServerNotOnlineException()
+ final IOException e = new DoNotRetryIOException("server not online " + serverName);
+ final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
+ for (RemoteProcedure proc: operations) {
+ proc.remoteCallFailed(env, serverName, e);
+ }
+ }
+
+ public void serverAdded(final ServerName serverName) {
+ addNode(serverName);
+ }
+
+ public void serverRemoved(final ServerName serverName) {
+ removeNode(serverName);
+ }
+
+ /**
+ * Base remote call
+ */
+ protected abstract class AbstractRSRemoteCall implements Callable<Void> {
+ private final ServerName serverName;
+
+ private int numberOfAttemptsSoFar = 0;
+ private long maxWaitTime = -1;
+
+ public AbstractRSRemoteCall(final ServerName serverName) {
+ this.serverName = serverName;
+ }
+
+ public abstract Void call();
+
+ protected AdminService.BlockingInterface getRsAdmin() throws IOException {
+ final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName);
+ if (admin == null) {
+ throw new IOException("Attempting to send OPEN RPC to server " + getServerName() +
+ " failed because no RPC connection found to this server");
+ }
+ return admin;
+ }
+
+ protected ServerName getServerName() {
+ return serverName;
+ }
+
+ protected boolean scheduleForRetry(final IOException e) {
+ // Should we wait a little before retrying? If the server is starting it's yes.
+ final boolean hold = (e instanceof ServerNotRunningYetException);
+ if (hold) {
+ LOG.warn(String.format("waiting a little before trying on the same server=%s try=%d",
+ serverName, numberOfAttemptsSoFar), e);
+ long now = EnvironmentEdgeManager.currentTime();
+ if (now < getMaxWaitTime()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("server is not yet up; waiting up to %dms",
+ (getMaxWaitTime() - now)), e);
+ }
+ submitTask(this, 100, TimeUnit.MILLISECONDS);
+ return true;
+ }
+
+ LOG.warn(String.format("server %s is not up for a while; try a new one", serverName), e);
+ return false;
+ }
+
+ // In case socket is timed out and the region server is still online,
+ // the openRegion RPC could have been accepted by the server and
+ // just the response didn't go through. So we will retry to
+ // open the region on the same server.
+ final boolean retry = !hold && (e instanceof SocketTimeoutException
+ && master.getServerManager().isServerOnline(serverName));
+ if (retry) {
+ // we want to retry as many times as needed as long as the RS is not dead.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Retrying to same RegionServer %s because: %s",
+ serverName, e.getMessage()), e);
+ }
+ submitTask(this);
+ return true;
+ }
+
+ // trying to send the request elsewhere instead
+ LOG.warn(String.format("the request should be tried elsewhere instead; server=%s try=%d",
+ serverName, numberOfAttemptsSoFar), e);
+ return false;
+ }
+
+ private long getMaxWaitTime() {
+ if (this.maxWaitTime < 0) {
+ // This is the max attempts, not retries, so it should be at least 1.
+ this.maxWaitTime = EnvironmentEdgeManager.currentTime() + rsStartupWaitTime;
+ }
+ return this.maxWaitTime;
+ }
+
+ protected IOException unwrapException(IOException e) {
+ if (e instanceof RemoteException) {
+ e = ((RemoteException)e).unwrapRemoteException();
+ }
+ return e;
+ }
+ }
+
+ private interface RemoteProcedureResolver {
+ void dispatchOpenRequests(MasterProcedureEnv env, List<RegionOpenOperation> operations);
+ void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations);
+ }
+
+ public void splitAndResolveOperation(final ServerName serverName,
+ final Set<RemoteProcedure> operations, final RemoteProcedureResolver resolver) {
+ final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
+ final ArrayListMultimap<Class<?>, RemoteOperation> reqsByType =
+ buildAndGroupRequestByType(env, serverName, operations);
+
+ final List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class);
+ if (!openOps.isEmpty()) resolver.dispatchOpenRequests(env, openOps);
+
+ final List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class);
+ if (!closeOps.isEmpty()) resolver.dispatchCloseRequests(env, closeOps);
+
+ if (!reqsByType.isEmpty()) {
+ LOG.warn("unknown request type in the queue: " + reqsByType);
+ }
+ }
+
+ // ==========================================================================
+ // Compatibility calls
+ // ==========================================================================
+ protected class ExecuteProceduresRemoteCall extends AbstractRSRemoteCall
+ implements RemoteProcedureResolver {
+ private final Set<RemoteProcedure> operations;
+
+ private ExecuteProceduresRequest.Builder request = null;
+
+ public ExecuteProceduresRemoteCall(final ServerName serverName,
+ final Set<RemoteProcedure> operations) {
+ super(serverName);
+ this.operations = operations;
+ }
+
+ public Void call() {
+ final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
+
+ request = ExecuteProceduresRequest.newBuilder();
+ splitAndResolveOperation(getServerName(), operations, this);
+
+ try {
+ final ExecuteProceduresResponse response = sendRequest(getServerName(), request.build());
+ remoteCallCompleted(env, response);
+ } catch (IOException e) {
+ e = unwrapException(e);
+ // TODO: In the future some operation may want to bail out early.
+ // TODO: How many times should we retry (use numberOfAttemptsSoFar)
+ if (!scheduleForRetry(e)) {
+ remoteCallFailed(env, e);
+ }
+ }
+ return null;
+ }
+
+ public void dispatchOpenRequests(final MasterProcedureEnv env,
+ final List<RegionOpenOperation> operations) {
+ request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), operations));
+ }
+
+ public void dispatchCloseRequests(final MasterProcedureEnv env,
+ final List<RegionCloseOperation> operations) {
+ for (RegionCloseOperation op: operations) {
+ request.addCloseRegion(op.buildCloseRegionRequest(getServerName()));
+ }
+ }
+
+ protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
+ final ExecuteProceduresRequest request) throws IOException {
+ try {
+ return getRsAdmin().executeProcedures(null, request);
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
+ }
+
+
+ private void remoteCallCompleted(final MasterProcedureEnv env,
+ final ExecuteProceduresResponse response) {
+ /*
+ for (RemoteProcedure proc: operations) {
+ proc.remoteCallCompleted(env, getServerName(), response);
+ }*/
+ }
+
+ private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
+ for (RemoteProcedure proc: operations) {
+ proc.remoteCallFailed(env, getServerName(), e);
+ }
+ }
+ }
+
+ // ==========================================================================
+ // Compatibility calls
+ // Since we don't have a "batch proc-exec" request on the target RS
+ // we have to chunk the requests by type and dispatch the specific request.
+ // ==========================================================================
+ private static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env,
+ final ServerName serverName, final List<RegionOpenOperation> operations) {
+ final OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
+ builder.setServerStartCode(serverName.getStartcode());
+ builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
+ for (RegionOpenOperation op: operations) {
+ builder.addOpenInfo(op.buildRegionOpenInfoRequest(env));
+ }
+ return builder.build();
+ }
+
+ private final class OpenRegionRemoteCall extends AbstractRSRemoteCall {
+ private final List<RegionOpenOperation> operations;
+
+ public OpenRegionRemoteCall(final ServerName serverName,
+ final List<RegionOpenOperation> operations) {
+ super(serverName);
+ this.operations = operations;
+ }
+
+ @Override
+ public Void call() {
+ final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
+ final OpenRegionRequest request = buildOpenRegionRequest(env, getServerName(), operations);
+
+ try {
+ OpenRegionResponse response = sendRequest(getServerName(), request);
+ remoteCallCompleted(env, response);
+ } catch (IOException e) {
+ e = unwrapException(e);
+ // TODO: In the future some operation may want to bail out early.
+ // TODO: How many times should we retry (use numberOfAttemptsSoFar)
+ if (!scheduleForRetry(e)) {
+ remoteCallFailed(env, e);
+ }
+ }
+ return null;
+ }
+
+ private OpenRegionResponse sendRequest(final ServerName serverName,
+ final OpenRegionRequest request) throws IOException {
+ try {
+ return getRsAdmin().openRegion(null, request);
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
+ }
+
+ private void remoteCallCompleted(final MasterProcedureEnv env,
+ final OpenRegionResponse response) {
+ int index = 0;
+ for (RegionOpenOperation op: operations) {
+ OpenRegionResponse.RegionOpeningState state = response.getOpeningState(index++);
+ op.setFailedOpen(state == OpenRegionResponse.RegionOpeningState.FAILED_OPENING);
+ op.getRemoteProcedure().remoteCallCompleted(env, getServerName(), op);
+ }
+ }
+
+ private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
+ for (RegionOpenOperation op: operations) {
+ op.getRemoteProcedure().remoteCallFailed(env, getServerName(), e);
+ }
+ }
+ }
+
+ private final class CloseRegionRemoteCall extends AbstractRSRemoteCall {
+ private final RegionCloseOperation operation;
+
+ public CloseRegionRemoteCall(final ServerName serverName,
+ final RegionCloseOperation operation) {
+ super(serverName);
+ this.operation = operation;
+ }
+
+ @Override
+ public Void call() {
+ final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
+ final CloseRegionRequest request = operation.buildCloseRegionRequest(getServerName());
+ try {
+ CloseRegionResponse response = sendRequest(getServerName(), request);
+ remoteCallCompleted(env, response);
+ } catch (IOException e) {
+ e = unwrapException(e);
+ // TODO: In the future some operation may want to bail out early.
+ // TODO: How many times should we retry (use numberOfAttemptsSoFar)
+ if (!scheduleForRetry(e)) {
+ remoteCallFailed(env, e);
+ }
+ }
+ return null;
+ }
+
+ private CloseRegionResponse sendRequest(final ServerName serverName,
+ final CloseRegionRequest request) throws IOException {
+ try {
+ return getRsAdmin().closeRegion(null, request);
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
+ }
+
+ private void remoteCallCompleted(final MasterProcedureEnv env,
+ final CloseRegionResponse response) {
+ operation.setClosed(response.getClosed());
+ operation.getRemoteProcedure().remoteCallCompleted(env, getServerName(), operation);
+ }
+
+ private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
+ operation.getRemoteProcedure().remoteCallFailed(env, getServerName(), e);
+ }
+ }
+
+ protected class CompatRemoteProcedureResolver implements Callable<Void>, RemoteProcedureResolver {
+ private final Set<RemoteProcedure> operations;
+ private final ServerName serverName;
+
+ public CompatRemoteProcedureResolver(final ServerName serverName,
+ final Set<RemoteProcedure> operations) {
+ this.serverName = serverName;
+ this.operations = operations;
+ }
+
+ @Override
+ public Void call() {
+ splitAndResolveOperation(serverName, operations, this);
+ return null;
+ }
+
+ public void dispatchOpenRequests(final MasterProcedureEnv env,
+ final List<RegionOpenOperation> operations) {
+ submitTask(new OpenRegionRemoteCall(serverName, operations));
+ }
+
+ public void dispatchCloseRequests(final MasterProcedureEnv env,
+ final List<RegionCloseOperation> operations) {
+ for (RegionCloseOperation op: operations) {
+ submitTask(new CloseRegionRemoteCall(serverName, op));
+ }
+ }
+ }
+
+ // ==========================================================================
+ // RPC Messages
+ // - ServerOperation: refreshConfig, grant, revoke, ...
+ // - RegionOperation: open, close, flush, snapshot, ...
+ // ==========================================================================
+ public static abstract class ServerOperation extends RemoteOperation {
+ protected ServerOperation(final RemoteProcedure remoteProcedure) {
+ super(remoteProcedure);
+ }
+ }
+
+ public static abstract class RegionOperation extends RemoteOperation {
+ private final HRegionInfo regionInfo;
+
+ protected RegionOperation(final RemoteProcedure remoteProcedure,
+ final HRegionInfo regionInfo) {
+ super(remoteProcedure);
+ this.regionInfo = regionInfo;
+ }
+
+ public HRegionInfo getRegionInfo() {
+ return this.regionInfo;
+ }
+ }
+
+ public static class RegionOpenOperation extends RegionOperation {
+ private final List<ServerName> favoredNodes;
+ private final boolean openForReplay;
+ private boolean failedOpen;
+
+ public RegionOpenOperation(final RemoteProcedure remoteProcedure,
+ final HRegionInfo regionInfo, final List<ServerName> favoredNodes,
+ final boolean openForReplay) {
+ super(remoteProcedure, regionInfo);
+ this.favoredNodes = favoredNodes;
+ this.openForReplay = openForReplay;
+ }
+
+ protected void setFailedOpen(final boolean failedOpen) {
+ this.failedOpen = failedOpen;
+ }
+
+ public boolean isFailedOpen() {
+ return failedOpen;
+ }
+
+ public OpenRegionRequest.RegionOpenInfo buildRegionOpenInfoRequest(
+ final MasterProcedureEnv env) {
+ return RequestConverter.buildRegionOpenInfo(getRegionInfo(),
+ env.getAssignmentManager().getFavoredNodes(getRegionInfo()), false);
+ }
+ }
+
+ public static class RegionCloseOperation extends RegionOperation {
+ private final ServerName destinationServer;
+ private boolean closed = false;
+
+ public RegionCloseOperation(final RemoteProcedure remoteProcedure,
+ final HRegionInfo regionInfo, final ServerName destinationServer) {
+ super(remoteProcedure, regionInfo);
+ this.destinationServer = destinationServer;
+ }
+
+ public ServerName getDestinationServer() {
+ return destinationServer;
+ }
+
+ protected void setClosed(final boolean closed) {
+ this.closed = closed;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName) {
+ return ProtobufUtil.buildCloseRegionRequest(serverName,
+ getRegionInfo().getRegionName(), getDestinationServer());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ea447378/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java
index f8c9d8f..2281eb8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MetricsSnapshot;
-import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -405,17 +404,7 @@ public class RestoreSnapshotProcedure
try {
Connection conn = env.getMasterServices().getConnection();
- // 1. Forces all the RegionStates to be offline
- //
- // The AssignmentManager keeps all the region states around
- // with no possibility to remove them, until the master is restarted.
- // This means that a region marked as SPLIT before the restore will never be assigned again.
- // To avoid having all states around all the regions are switched to the OFFLINE state,
- // which is the same state that the regions will be after a delete table.
- forceRegionsOffline(env, regionsToAdd);
- forceRegionsOffline(env, regionsToRestore);
- forceRegionsOffline(env, regionsToRemove);
-
+ // 1. Prepare to restore
getMonitorStatus().setStatus("Preparing to restore each region");
// 2. Applies changes to hbase:meta
@@ -475,20 +464,6 @@ public class RestoreSnapshotProcedure
}
/**
- * Make sure that region states of the region list is in OFFLINE state.
- * @param env MasterProcedureEnv
- * @param hris region info list
- **/
- private void forceRegionsOffline(final MasterProcedureEnv env, final List<HRegionInfo> hris) {
- RegionStates states = env.getMasterServices().getAssignmentManager().getRegionStates();
- if (hris != null) {
- for (HRegionInfo hri: hris) {
- states.regionOffline(hri);
- }
- }
- }
-
- /**
* The procedure could be restarted from a different machine. If the variable is null, we need to
* retrieve it.
* @return traceEnabled
http://git-wip-us.apache.org/repos/asf/hbase/blob/ea447378/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java
new file mode 100644
index 0000000..dd1874b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Passed as Exception by {@link ServerCrashProcedure}
+ * notifying on-going RIT that server has failed.
+ */
+@InterfaceAudience.Private
+@SuppressWarnings("serial")
+public class ServerCrashException extends HBaseIOException {
+ private final long procId;
+
+ /**
+ * @param server The server that crashed.
+ */
+ public ServerCrashException(long procId) {
+ this.procId = procId;
+ }
+
+ @Override
+ public String getMessage() {
+ return "Caused by ServerCrashProcedure pid=" + this.procId;
+ }
+}
\ No newline at end of file