You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/25 06:32:15 UTC
[11/27] hbase git commit: Revert "HBASE-14614 Procedure v2 - Core
Assignment Manager (Matteo Bertozzi)" Revert a mistaken commit!!!
http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java
new file mode 100644
index 0000000..3600fe0
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java
@@ -0,0 +1,906 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MetaMutationAnnotation;
+import org.apache.hadoop.hbase.RegionLoad;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.UnknownRegionException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.exceptions.MergeRegionException;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.master.CatalogJanitor;
+import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.RegionStates;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MergeTableRegionsState;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+/**
+ * The procedure to Merge a region in a table.
+ */
+@InterfaceAudience.Private
+public class MergeTableRegionsProcedure
+ extends AbstractStateMachineTableProcedure<MergeTableRegionsState> {
+ private static final Log LOG = LogFactory.getLog(MergeTableRegionsProcedure.class);
+
+ private Boolean traceEnabled;
+ private AssignmentManager assignmentManager;
+ private int timeout;
+ private ServerName regionLocation;
+ private String regionsToMergeListFullName;
+ private String regionsToMergeListEncodedName;
+
+ private HRegionInfo [] regionsToMerge;
+ private HRegionInfo mergedRegionInfo;
+ private boolean forcible;
+
+ public MergeTableRegionsProcedure() {
+ this.traceEnabled = isTraceEnabled();
+ this.assignmentManager = null;
+ this.timeout = -1;
+ this.regionLocation = null;
+ this.regionsToMergeListFullName = null;
+ this.regionsToMergeListEncodedName = null;
+ }
+
+ public MergeTableRegionsProcedure(
+ final MasterProcedureEnv env,
+ final HRegionInfo[] regionsToMerge,
+ final boolean forcible) throws IOException {
+ super(env);
+ this.traceEnabled = isTraceEnabled();
+ this.assignmentManager = getAssignmentManager(env);
+ // For now, we only merge 2 regions. It could be extended to more than 2 regions in
+ // the future.
+ assert(regionsToMerge.length == 2);
+ assert(regionsToMerge[0].getTable() == regionsToMerge[1].getTable());
+ this.regionsToMerge = regionsToMerge;
+ this.forcible = forcible;
+
+ this.timeout = -1;
+ this.regionsToMergeListFullName = getRegionsToMergeListFullNameString();
+ this.regionsToMergeListEncodedName = getRegionsToMergeListEncodedNameString();
+
+ // Check daughter regions and make sure that we have valid daughter regions before
+ // doing the real work.
+ checkDaughterRegions();
+ // WARN: make sure there is no parent region of the two merging regions in
+ // hbase:meta If exists, fixing up daughters would cause daughter regions(we
+ // have merged one) online again when we restart master, so we should clear
+ // the parent region to prevent the above case
+ // Since HBASE-7721, we don't need fix up daughters any more. so here do
+ // nothing
+ setupMergedRegionInfo();
+ }
+
+ @Override
+ protected Flow executeFromState(
+ final MasterProcedureEnv env,
+ final MergeTableRegionsState state) throws InterruptedException {
+ if (isTraceEnabled()) {
+ LOG.trace(this + " execute state=" + state);
+ }
+
+ try {
+ switch (state) {
+ case MERGE_TABLE_REGIONS_PREPARE:
+ prepareMergeRegion(env);
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_MOVE_REGION_TO_SAME_RS);
+ break;
+ case MERGE_TABLE_REGIONS_MOVE_REGION_TO_SAME_RS:
+ if (MoveRegionsToSameRS(env)) {
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION);
+ } else {
+ LOG.info("Cancel merging regions " + getRegionsToMergeListFullNameString()
+ + ", because can't move them to the same RS");
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_OPERATION);
+ }
+ break;
+ case MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION:
+ preMergeRegions(env);
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE);
+ break;
+ case MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE:
+ setRegionStateToMerging(env);
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CLOSE_REGIONS);
+ break;
+ case MERGE_TABLE_REGIONS_CLOSE_REGIONS:
+ closeRegionsForMerge(env);
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CREATE_MERGED_REGION);
+ break;
+ case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION:
+ createMergedRegion(env);
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION);
+ break;
+ case MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION:
+ preMergeRegionsCommit(env);
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_UPDATE_META);
+ break;
+ case MERGE_TABLE_REGIONS_UPDATE_META:
+ updateMetaForMergedRegions(env);
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION);
+ break;
+ case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION:
+ postMergeRegionsCommit(env);
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_OPEN_MERGED_REGION);
+ break;
+ case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION:
+ openMergedRegions(env);
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_OPERATION);
+ break;
+ case MERGE_TABLE_REGIONS_POST_OPERATION:
+ postCompletedMergeRegions(env);
+ return Flow.NO_MORE_STATE;
+ default:
+ throw new UnsupportedOperationException(this + " unhandled state=" + state);
+ }
+ } catch (IOException e) {
+ LOG.warn("Error trying to merge regions " + getRegionsToMergeListFullNameString() +
+ " in the table " + getTableName() + " (in state=" + state + ")", e);
+
+ setFailure("master-merge-regions", e);
+ }
+ return Flow.HAS_MORE_STATE;
+ }
+
+ @Override
+ protected void rollbackState(
+ final MasterProcedureEnv env,
+ final MergeTableRegionsState state) throws IOException, InterruptedException {
+ if (isTraceEnabled()) {
+ LOG.trace(this + " rollback state=" + state);
+ }
+
+ try {
+ switch (state) {
+ case MERGE_TABLE_REGIONS_POST_OPERATION:
+ case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION:
+ case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION:
+ case MERGE_TABLE_REGIONS_UPDATE_META:
+ String msg = this + " We are in the " + state + " state."
+ + " It is complicated to rollback the merge operation that region server is working on."
+ + " Rollback is not supported and we should let the merge operation to complete";
+ LOG.warn(msg);
+ // PONR
+ throw new UnsupportedOperationException(this + " unhandled state=" + state);
+ case MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION:
+ break;
+ case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION:
+ cleanupMergedRegion(env);
+ break;
+ case MERGE_TABLE_REGIONS_CLOSE_REGIONS:
+ rollbackCloseRegionsForMerge(env);
+ break;
+ case MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE:
+ setRegionStateToRevertMerging(env);
+ break;
+ case MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION:
+ postRollBackMergeRegions(env);
+ break;
+ case MERGE_TABLE_REGIONS_MOVE_REGION_TO_SAME_RS:
+ break; // nothing to rollback
+ case MERGE_TABLE_REGIONS_PREPARE:
+ break; // nothing to rollback
+ default:
+ throw new UnsupportedOperationException(this + " unhandled state=" + state);
+ }
+ } catch (Exception e) {
+ // This will be retried. Unless there is a bug in the code,
+ // this should be just a "temporary error" (e.g. network down)
+ LOG.warn("Failed rollback attempt step " + state + " for merging the regions "
+ + getRegionsToMergeListFullNameString() + " in table " + getTableName(), e);
+ throw e;
+ }
+ }
+
+ /*
+ * Check whether we are in the state that can be rollback
+ */
+ @Override
+ protected boolean isRollbackSupported(final MergeTableRegionsState state) {
+ switch (state) {
+ case MERGE_TABLE_REGIONS_POST_OPERATION:
+ case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION:
+ case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION:
+ case MERGE_TABLE_REGIONS_UPDATE_META:
+ // It is not safe to rollback if we reach to these states.
+ return false;
+ default:
+ break;
+ }
+ return true;
+ }
+
+ @Override
+ protected MergeTableRegionsState getState(final int stateId) {
+ return MergeTableRegionsState.forNumber(stateId);
+ }
+
+ @Override
+ protected int getStateId(final MergeTableRegionsState state) {
+ return state.getNumber();
+ }
+
+ @Override
+ protected MergeTableRegionsState getInitialState() {
+ return MergeTableRegionsState.MERGE_TABLE_REGIONS_PREPARE;
+ }
+
+ @Override
+ public void serializeStateData(final OutputStream stream) throws IOException {
+ super.serializeStateData(stream);
+
+ MasterProcedureProtos.MergeTableRegionsStateData.Builder mergeTableRegionsMsg =
+ MasterProcedureProtos.MergeTableRegionsStateData.newBuilder()
+ .setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
+ .setMergedRegionInfo(HRegionInfo.convert(mergedRegionInfo))
+ .setForcible(forcible);
+ for (HRegionInfo hri: regionsToMerge) {
+ mergeTableRegionsMsg.addRegionInfo(HRegionInfo.convert(hri));
+ }
+ mergeTableRegionsMsg.build().writeDelimitedTo(stream);
+ }
+
+ @Override
+ public void deserializeStateData(final InputStream stream) throws IOException {
+ super.deserializeStateData(stream);
+
+ MasterProcedureProtos.MergeTableRegionsStateData mergeTableRegionsMsg =
+ MasterProcedureProtos.MergeTableRegionsStateData.parseDelimitedFrom(stream);
+ setUser(MasterProcedureUtil.toUserInfo(mergeTableRegionsMsg.getUserInfo()));
+
+ assert(mergeTableRegionsMsg.getRegionInfoCount() == 2);
+ regionsToMerge = new HRegionInfo[mergeTableRegionsMsg.getRegionInfoCount()];
+ for (int i = 0; i < regionsToMerge.length; i++) {
+ regionsToMerge[i] = HRegionInfo.convert(mergeTableRegionsMsg.getRegionInfo(i));
+ }
+
+ mergedRegionInfo = HRegionInfo.convert(mergeTableRegionsMsg.getMergedRegionInfo());
+ }
+
+ @Override
+ public void toStringClassDetails(StringBuilder sb) {
+ sb.append(getClass().getSimpleName());
+ sb.append(" (table=");
+ sb.append(getTableName());
+ sb.append(" regions=");
+ sb.append(getRegionsToMergeListFullNameString());
+ sb.append(" forcible=");
+ sb.append(forcible);
+ sb.append(")");
+ }
+
+ @Override
+ protected LockState acquireLock(final MasterProcedureEnv env) {
+ if (env.waitInitialized(this)) {
+ return LockState.LOCK_EVENT_WAIT;
+ }
+ return env.getProcedureScheduler().waitRegions(this, getTableName(),
+ regionsToMerge[0], regionsToMerge[1])?
+ LockState.LOCK_EVENT_WAIT: LockState.LOCK_ACQUIRED;
+ }
+
+ @Override
+ protected void releaseLock(final MasterProcedureEnv env) {
+ env.getProcedureScheduler().wakeRegions(this, getTableName(),
+ regionsToMerge[0], regionsToMerge[1]);
+ }
+
+ @Override
+ public TableName getTableName() {
+ return regionsToMerge[0].getTable();
+ }
+
+ @Override
+ public TableOperationType getTableOperationType() {
+ return TableOperationType.MERGE;
+ }
+
+ /**
+ * check daughter regions
+ * @throws IOException
+ */
+ private void checkDaughterRegions() throws IOException {
+ // Note: the following logic assumes that we only have 2 regions to merge. In the future,
+ // if we want to extend to more than 2 regions, the code needs to modify a little bit.
+ //
+ if (regionsToMerge[0].getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
+ regionsToMerge[1].getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
+ throw new MergeRegionException("Can't merge non-default replicas");
+ }
+
+ if (!HRegionInfo.areAdjacent(regionsToMerge[0], regionsToMerge[1])) {
+ String msg = "Trying to merge non-adjacent regions "
+ + getRegionsToMergeListFullNameString() + " where forcible = " + forcible;
+ LOG.warn(msg);
+ if (!forcible) {
+ throw new DoNotRetryIOException(msg);
+ }
+ }
+ }
+
+ /**
+ * Prepare merge and do some check
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ private void prepareMergeRegion(final MasterProcedureEnv env) throws IOException {
+ // Note: the following logic assumes that we only have 2 regions to merge. In the future,
+ // if we want to extend to more than 2 regions, the code needs to modify a little bit.
+ //
+ CatalogJanitor catalogJanitor = env.getMasterServices().getCatalogJanitor();
+ boolean regionAHasMergeQualifier = !catalogJanitor.cleanMergeQualifier(regionsToMerge[0]);
+ if (regionAHasMergeQualifier
+ || !catalogJanitor.cleanMergeQualifier(regionsToMerge[1])) {
+ String msg = "Skip merging regions " + getRegionsToMergeListFullNameString()
+ + ", because region "
+ + (regionAHasMergeQualifier ? regionsToMerge[0].getEncodedName() : regionsToMerge[1]
+ .getEncodedName()) + " has merge qualifier";
+ LOG.warn(msg);
+ throw new MergeRegionException(msg);
+ }
+
+ RegionStates regionStates = getAssignmentManager(env).getRegionStates();
+ RegionState regionStateA = regionStates.getRegionState(regionsToMerge[0].getEncodedName());
+ RegionState regionStateB = regionStates.getRegionState(regionsToMerge[1].getEncodedName());
+ if (regionStateA == null || regionStateB == null) {
+ throw new UnknownRegionException(
+ regionStateA == null ?
+ regionsToMerge[0].getEncodedName() : regionsToMerge[1].getEncodedName());
+ }
+
+ if (!regionStateA.isOpened() || !regionStateB.isOpened()) {
+ throw new MergeRegionException(
+ "Unable to merge regions not online " + regionStateA + ", " + regionStateB);
+ }
+ }
+
+ /**
+ * Create merged region info through the specified two regions
+ */
+ private void setupMergedRegionInfo() {
+ long rid = EnvironmentEdgeManager.currentTime();
+ // Regionid is timestamp. Merged region's id can't be less than that of
+ // merging regions else will insert at wrong location in hbase:meta
+ if (rid < regionsToMerge[0].getRegionId() || rid < regionsToMerge[1].getRegionId()) {
+ LOG.warn("Clock skew; merging regions id are " + regionsToMerge[0].getRegionId()
+ + " and " + regionsToMerge[1].getRegionId() + ", but current time here is " + rid);
+ rid = Math.max(regionsToMerge[0].getRegionId(), regionsToMerge[1].getRegionId()) + 1;
+ }
+
+ byte[] startKey = null;
+ byte[] endKey = null;
+ // Choose the smaller as start key
+ if (regionsToMerge[0].compareTo(regionsToMerge[1]) <= 0) {
+ startKey = regionsToMerge[0].getStartKey();
+ } else {
+ startKey = regionsToMerge[1].getStartKey();
+ }
+ // Choose the bigger as end key
+ if (Bytes.equals(regionsToMerge[0].getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
+ || (!Bytes.equals(regionsToMerge[1].getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
+ && Bytes.compareTo(regionsToMerge[0].getEndKey(), regionsToMerge[1].getEndKey()) > 0)) {
+ endKey = regionsToMerge[0].getEndKey();
+ } else {
+ endKey = regionsToMerge[1].getEndKey();
+ }
+
+ // Merged region is sorted between two merging regions in META
+ mergedRegionInfo = new HRegionInfo(getTableName(), startKey, endKey, false, rid);
+ }
+
+ /**
+ * Move all regions to the same region server
+ * @param env MasterProcedureEnv
+ * @return whether target regions hosted by the same RS
+ * @throws IOException
+ */
+ private boolean MoveRegionsToSameRS(final MasterProcedureEnv env) throws IOException {
+ // Make sure regions are on the same regionserver before send merge
+ // regions request to region server.
+ //
+ boolean onSameRS = isRegionsOnTheSameServer(env);
+ if (!onSameRS) {
+ // Note: the following logic assumes that we only have 2 regions to merge. In the future,
+ // if we want to extend to more than 2 regions, the code needs to modify a little bit.
+ //
+ RegionStates regionStates = getAssignmentManager(env).getRegionStates();
+ ServerName regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[1]);
+
+ RegionLoad loadOfRegionA = getRegionLoad(env, regionLocation, regionsToMerge[0]);
+ RegionLoad loadOfRegionB = getRegionLoad(env, regionLocation2, regionsToMerge[1]);
+ if (loadOfRegionA != null && loadOfRegionB != null
+ && loadOfRegionA.getRequestsCount() < loadOfRegionB.getRequestsCount()) {
+ // switch regionsToMerge[0] and regionsToMerge[1]
+ HRegionInfo tmpRegion = this.regionsToMerge[0];
+ this.regionsToMerge[0] = this.regionsToMerge[1];
+ this.regionsToMerge[1] = tmpRegion;
+ ServerName tmpLocation = regionLocation;
+ regionLocation = regionLocation2;
+ regionLocation2 = tmpLocation;
+ }
+
+ long startTime = EnvironmentEdgeManager.currentTime();
+
+ RegionPlan regionPlan = new RegionPlan(regionsToMerge[1], regionLocation2, regionLocation);
+ LOG.info("Moving regions to same server for merge: " + regionPlan.toString());
+ getAssignmentManager(env).balance(regionPlan);
+ do {
+ try {
+ Thread.sleep(20);
+ // Make sure check RIT first, then get region location, otherwise
+ // we would make a wrong result if region is online between getting
+ // region location and checking RIT
+ boolean isRIT = regionStates.isRegionInTransition(regionsToMerge[1]);
+ regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[1]);
+ onSameRS = regionLocation.equals(regionLocation2);
+ if (onSameRS || !isRIT) {
+ // Regions are on the same RS, or regionsToMerge[1] is not in
+ // RegionInTransition any more
+ break;
+ }
+ } catch (InterruptedException e) {
+ InterruptedIOException iioe = new InterruptedIOException();
+ iioe.initCause(e);
+ throw iioe;
+ }
+ } while ((EnvironmentEdgeManager.currentTime() - startTime) <= getTimeout(env));
+ }
+ return onSameRS;
+ }
+
+ /**
+ * Pre merge region action
+ * @param env MasterProcedureEnv
+ **/
+ private void preMergeRegions(final MasterProcedureEnv env) throws IOException {
+ final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ boolean ret = cpHost.preMergeRegionsAction(regionsToMerge, getUser());
+ if (ret) {
+ throw new IOException(
+ "Coprocessor bypassing regions " + getRegionsToMergeListFullNameString() + " merge.");
+ }
+ }
+ }
+
+ /**
+ * Action after rollback a merge table regions action.
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ private void postRollBackMergeRegions(final MasterProcedureEnv env) throws IOException {
+ final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ cpHost.postRollBackMergeRegionsAction(regionsToMerge, getUser());
+ }
+ }
+
+ /**
+ * Set the region states to MERGING state
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ public void setRegionStateToMerging(final MasterProcedureEnv env) throws IOException {
+ RegionStateTransition.Builder transition = RegionStateTransition.newBuilder();
+ transition.setTransitionCode(TransitionCode.READY_TO_MERGE);
+ transition.addRegionInfo(HRegionInfo.convert(mergedRegionInfo));
+ transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[0]));
+ transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[1]));
+ if (env.getMasterServices().getAssignmentManager().onRegionTransition(
+ getServerName(env), transition.build()) != null) {
+ throw new IOException("Failed to update region state to MERGING for "
+ + getRegionsToMergeListFullNameString());
+ }
+ }
+
+ /**
+ * Rollback the region state change
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ private void setRegionStateToRevertMerging(final MasterProcedureEnv env) throws IOException {
+ RegionStateTransition.Builder transition = RegionStateTransition.newBuilder();
+ transition.setTransitionCode(TransitionCode.MERGE_REVERTED);
+ transition.addRegionInfo(HRegionInfo.convert(mergedRegionInfo));
+ transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[0]));
+ transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[1]));
+ String msg = env.getMasterServices().getAssignmentManager().onRegionTransition(
+ getServerName(env), transition.build());
+ if (msg != null) {
+ // If daughter regions are online, the msg is coming from RPC retry. Ignore it.
+ RegionStates regionStates = getAssignmentManager(env).getRegionStates();
+ if (!regionStates.isRegionOnline(regionsToMerge[0]) ||
+ !regionStates.isRegionOnline(regionsToMerge[1])) {
+ throw new IOException("Failed to update region state for "
+ + getRegionsToMergeListFullNameString()
+ + " as part of operation for reverting merge. Error message: " + msg);
+ }
+ }
+ }
+
+ /**
+ * Create merged region
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ private void createMergedRegion(final MasterProcedureEnv env) throws IOException {
+ final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+ final Path tabledir = FSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
+ final FileSystem fs = mfs.getFileSystem();
+ HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
+ env.getMasterConfiguration(), fs, tabledir, regionsToMerge[0], false);
+ regionFs.createMergesDir();
+
+ mergeStoreFiles(env, regionFs, regionFs.getMergesDir());
+ HRegionFileSystem regionFs2 = HRegionFileSystem.openRegionFromFileSystem(
+ env.getMasterConfiguration(), fs, tabledir, regionsToMerge[1], false);
+ mergeStoreFiles(env, regionFs2, regionFs.getMergesDir());
+
+ regionFs.commitMergedRegion(mergedRegionInfo);
+ }
+
+ /**
+ * Create reference file(s) of merging regions under the merges directory
+ * @param env MasterProcedureEnv
+ * @param regionFs region file system
+ * @param mergedDir the temp directory of merged region
+ * @throws IOException
+ */
+ private void mergeStoreFiles(
+ final MasterProcedureEnv env, final HRegionFileSystem regionFs, final Path mergedDir)
+ throws IOException {
+ final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+ final Configuration conf = env.getMasterConfiguration();
+ final HTableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
+
+ for (String family: regionFs.getFamilies()) {
+ final HColumnDescriptor hcd = htd.getFamily(family.getBytes());
+ final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
+
+ if (storeFiles != null && storeFiles.size() > 0) {
+ final CacheConfig cacheConf = new CacheConfig(conf, hcd);
+ for (StoreFileInfo storeFileInfo: storeFiles) {
+ // Create reference file(s) of the region in mergedDir
+ regionFs.mergeStoreFile(mergedRegionInfo, family, new StoreFile(mfs.getFileSystem(),
+ storeFileInfo, conf, cacheConf, hcd.getBloomFilterType(), true),
+ mergedDir);
+ }
+ }
+ }
+ }
+
+ /**
+ * Clean up merged region
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ private void cleanupMergedRegion(final MasterProcedureEnv env) throws IOException {
+ final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+ final Path tabledir = FSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
+ final FileSystem fs = mfs.getFileSystem();
+ HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
+ env.getMasterConfiguration(), fs, tabledir, regionsToMerge[0], false);
+ regionFs.cleanupMergedRegion(mergedRegionInfo);
+ }
+
+ /**
+ * RPC to region server that host the regions to merge, ask for close these regions
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ private void closeRegionsForMerge(final MasterProcedureEnv env) throws IOException {
+ boolean success = env.getMasterServices().getServerManager().sendRegionCloseForSplitOrMerge(
+ getServerName(env), regionsToMerge[0], regionsToMerge[1]);
+ if (!success) {
+ throw new IOException("Close regions " + getRegionsToMergeListFullNameString()
+ + " for merging failed. Check region server log for more details.");
+ }
+ }
+
+ /**
+ * Rollback close regions
+ * @param env MasterProcedureEnv
+ **/
+ private void rollbackCloseRegionsForMerge(final MasterProcedureEnv env) throws IOException {
+ // Check whether the region is closed; if so, open it in the same server
+ RegionStates regionStates = getAssignmentManager(env).getRegionStates();
+ for(int i = 1; i < regionsToMerge.length; i++) {
+ RegionState state = regionStates.getRegionState(regionsToMerge[i]);
+ if (state != null && (state.isClosing() || state.isClosed())) {
+ env.getMasterServices().getServerManager().sendRegionOpen(
+ getServerName(env),
+ regionsToMerge[i],
+ ServerName.EMPTY_SERVER_LIST);
+ }
+ }
+ }
+
+ /**
+ * Post merge region action
+ * @param env MasterProcedureEnv
+ **/
+ private void preMergeRegionsCommit(final MasterProcedureEnv env) throws IOException {
+ final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ @MetaMutationAnnotation
+ final List<Mutation> metaEntries = new ArrayList<>();
+ boolean ret = cpHost.preMergeRegionsCommit(regionsToMerge, metaEntries, getUser());
+
+ if (ret) {
+ throw new IOException(
+ "Coprocessor bypassing regions " + getRegionsToMergeListFullNameString() + " merge.");
+ }
+ try {
+ for (Mutation p : metaEntries) {
+ HRegionInfo.parseRegionName(p.getRow());
+ }
+ } catch (IOException e) {
+ LOG.error("Row key of mutation from coprocessor is not parsable as region name."
+ + "Mutations from coprocessor should only be for hbase:meta table.", e);
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Add merged region to META and delete original regions.
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ private void updateMetaForMergedRegions(final MasterProcedureEnv env) throws IOException {
+ RegionStateTransition.Builder transition = RegionStateTransition.newBuilder();
+ transition.setTransitionCode(TransitionCode.MERGE_PONR);
+ transition.addRegionInfo(HRegionInfo.convert(mergedRegionInfo));
+ transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[0]));
+ transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[1]));
+ // Add merged region and delete original regions
+ // as an atomic update. See HBASE-7721. This update to hbase:meta makes the region
+ // will determine whether the region is merged or not in case of failures.
+ if (env.getMasterServices().getAssignmentManager().onRegionTransition(
+ getServerName(env), transition.build()) != null) {
+ throw new IOException("Failed to update meta to add merged region that merges "
+ + getRegionsToMergeListFullNameString());
+ }
+ }
+
+ /**
+ * Post merge region action
+ * @param env MasterProcedureEnv
+ **/
+ private void postMergeRegionsCommit(final MasterProcedureEnv env) throws IOException {
+ final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ cpHost.postMergeRegionsCommit(regionsToMerge, mergedRegionInfo, getUser());
+ }
+ }
+
+ /**
+ * Assign merged region
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ * @throws InterruptedException
+ **/
+ private void openMergedRegions(final MasterProcedureEnv env)
+ throws IOException, InterruptedException {
+ // Check whether the merged region is already opened; if so,
+ // this is retry and we should just ignore.
+ RegionState regionState =
+ getAssignmentManager(env).getRegionStates().getRegionState(mergedRegionInfo);
+ if (regionState != null && regionState.isOpened()) {
+ LOG.info("Skip opening merged region " + mergedRegionInfo.getRegionNameAsString()
+ + " as it is already opened.");
+ return;
+ }
+
+ // TODO: The new AM should provide an API to force assign the merged region to the same RS
+ // as daughter regions; if the RS is unavailable, then assign to a different RS.
+ env.getMasterServices().getAssignmentManager().assignMergedRegion(
+ mergedRegionInfo, regionsToMerge[0], regionsToMerge[1]);
+ }
+
+ /**
+ * Post merge region action
+ * @param env MasterProcedureEnv
+ **/
+ private void postCompletedMergeRegions(final MasterProcedureEnv env) throws IOException {
+ final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ cpHost.postCompletedMergeRegionsAction(regionsToMerge, mergedRegionInfo, getUser());
+ }
+ }
+
+ private RegionLoad getRegionLoad(
+ final MasterProcedureEnv env,
+ final ServerName sn,
+ final HRegionInfo hri) {
+ ServerManager serverManager = env.getMasterServices().getServerManager();
+ ServerLoad load = serverManager.getLoad(sn);
+ if (load != null) {
+ Map<byte[], RegionLoad> regionsLoad = load.getRegionsLoad();
+ if (regionsLoad != null) {
+ return regionsLoad.get(hri.getRegionName());
+ }
+ }
+ return null;
+ }
+
+ /**
+ * The procedure could be restarted from a different machine. If the variable is null, we need to
+ * retrieve it.
+ * @param env MasterProcedureEnv
+ * @return whether target regions hosted by the same RS
+ */
+ private boolean isRegionsOnTheSameServer(final MasterProcedureEnv env) throws IOException{
+ Boolean onSameRS = true;
+ int i = 0;
+ RegionStates regionStates = getAssignmentManager(env).getRegionStates();
+ regionLocation = regionStates.getRegionServerOfRegion(regionsToMerge[i]);
+ if (regionLocation != null) {
+ for(i = 1; i < regionsToMerge.length; i++) {
+ ServerName regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[i]);
+ if (regionLocation2 != null) {
+ if (onSameRS) {
+ onSameRS = regionLocation.equals(regionLocation2);
+ }
+ } else {
+ // At least one region is not online, merge will fail, no need to continue.
+ break;
+ }
+ }
+ if (i == regionsToMerge.length) {
+ // Finish checking all regions, return the result;
+ return onSameRS;
+ }
+ }
+
+ // If reaching here, at least one region is not online.
+ String msg = "Skip merging regions " + getRegionsToMergeListFullNameString() +
+ ", because region " + regionsToMerge[i].getEncodedName() + " is not online now.";
+ LOG.warn(msg);
+ throw new IOException(msg);
+ }
+
+ /**
+ * The procedure could be restarted from a different machine. If the variable is null, we need to
+ * retrieve it.
+ * @param env MasterProcedureEnv
+ * @return assignmentManager
+ */
+ private AssignmentManager getAssignmentManager(final MasterProcedureEnv env) {
+ if (assignmentManager == null) {
+ assignmentManager = env.getMasterServices().getAssignmentManager();
+ }
+ return assignmentManager;
+ }
+
+ /**
+ * The procedure could be restarted from a different machine. If the variable is null, we need to
+ * retrieve it.
+ * @param env MasterProcedureEnv
+ * @return timeout value
+ */
+ private int getTimeout(final MasterProcedureEnv env) {
+ if (timeout == -1) {
+ timeout = env.getMasterConfiguration().getInt(
+ "hbase.master.regionmerge.timeout", regionsToMerge.length * 60 * 1000);
+ }
+ return timeout;
+ }
+
+ /**
+ * The procedure could be restarted from a different machine. If the variable is null, we need to
+ * retrieve it.
+ * @param env MasterProcedureEnv
+ * @return serverName
+ */
+ private ServerName getServerName(final MasterProcedureEnv env) {
+ if (regionLocation == null) {
+ regionLocation =
+ getAssignmentManager(env).getRegionStates().getRegionServerOfRegion(regionsToMerge[0]);
+ }
+ return regionLocation;
+ }
+
+ /**
+ * The procedure could be restarted from a different machine. If the variable is null, we need to
+ * retrieve it.
+ * @param fullName whether return only encoded name
+ * @return region names in a list
+ */
+ private String getRegionsToMergeListFullNameString() {
+ if (regionsToMergeListFullName == null) {
+ StringBuilder sb = new StringBuilder("[");
+ int i = 0;
+ while(i < regionsToMerge.length - 1) {
+ sb.append(regionsToMerge[i].getRegionNameAsString() + ", ");
+ i++;
+ }
+ sb.append(regionsToMerge[i].getRegionNameAsString() + " ]");
+ regionsToMergeListFullName = sb.toString();
+ }
+ return regionsToMergeListFullName;
+ }
+
+ /**
+ * The procedure could be restarted from a different machine. If the variable is null, we need to
+ * retrieve it.
+ * @return encoded region names
+ */
+ private String getRegionsToMergeListEncodedNameString() {
+ if (regionsToMergeListEncodedName == null) {
+ StringBuilder sb = new StringBuilder("[");
+ int i = 0;
+ while(i < regionsToMerge.length - 1) {
+ sb.append(regionsToMerge[i].getEncodedName() + ", ");
+ i++;
+ }
+ sb.append(regionsToMerge[i].getEncodedName() + " ]");
+ regionsToMergeListEncodedName = sb.toString();
+ }
+ return regionsToMergeListEncodedName;
+ }
+
+ /**
+ * The procedure could be restarted from a different machine. If the variable is null, we need to
+ * retrieve it.
+ * @return traceEnabled
+ */
+ private Boolean isTraceEnabled() {
+ if (traceEnabled == null) {
+ traceEnabled = LOG.isTraceEnabled();
+ }
+ return traceEnabled;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
index 622c19f..52bb4d5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
@@ -21,14 +21,17 @@ package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
@@ -94,9 +97,7 @@ public class ModifyColumnFamilyProcedure
setNextState(ModifyColumnFamilyState.MODIFY_COLUMN_FAMILY_REOPEN_ALL_REGIONS);
break;
case MODIFY_COLUMN_FAMILY_REOPEN_ALL_REGIONS:
- if (env.getAssignmentManager().isTableEnabled(getTableName())) {
- addChildProcedure(env.getAssignmentManager().createReopenProcedures(getTableName()));
- }
+ reOpenAllRegionsIfTableIsOnline(env);
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException(this + " unhandled state=" + state);
@@ -264,8 +265,7 @@ public class ModifyColumnFamilyProcedure
env.getMasterServices().getTableDescriptors().add(unmodifiedHTableDescriptor);
// Make sure regions are opened after table descriptor is updated.
- //reOpenAllRegionsIfTableIsOnline(env);
- // TODO: NUKE ROLLBACK!!!!
+ reOpenAllRegionsIfTableIsOnline(env);
}
/**
@@ -281,6 +281,26 @@ public class ModifyColumnFamilyProcedure
}
/**
+ * Last action from the procedure - executed when online schema change is supported.
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ private void reOpenAllRegionsIfTableIsOnline(final MasterProcedureEnv env) throws IOException {
+ // This operation only run when the table is enabled.
+ if (!env.getMasterServices().getTableStateManager()
+ .isTableState(getTableName(), TableState.State.ENABLED)) {
+ return;
+ }
+
+ List<HRegionInfo> regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, getTableName());
+ if (MasterDDLOperationHelper.reOpenAllRegions(env, getTableName(), regionInfoList)) {
+ LOG.info("Completed add column family operation on table " + getTableName());
+ } else {
+ LOG.warn("Error on reopening the regions on table " + getTableName());
+ }
+ }
+
+ /**
* The procedure could be restarted from a different machine. If the variable is null, we need to
* retrieve it.
* @return traceEnabled
http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
index 20a6a03..6a70f62 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
@@ -120,10 +120,7 @@ public class ModifyTableProcedure
setNextState(ModifyTableState.MODIFY_TABLE_REOPEN_ALL_REGIONS);
break;
case MODIFY_TABLE_REOPEN_ALL_REGIONS:
- if (env.getAssignmentManager().isTableEnabled(getTableName())) {
- addChildProcedure(env.getAssignmentManager()
- .createReopenProcedures(getRegionInfoList(env)));
- }
+ reOpenAllRegionsIfTableIsOnline(env);
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);
@@ -302,8 +299,7 @@ public class ModifyTableProcedure
deleteFromFs(env, modifiedHTableDescriptor, unmodifiedHTableDescriptor);
// Make sure regions are opened after table descriptor is updated.
- //reOpenAllRegionsIfTableIsOnline(env);
- // TODO: NUKE ROLLBACK!!!!
+ reOpenAllRegionsIfTableIsOnline(env);
}
/**
@@ -378,6 +374,25 @@ public class ModifyTableProcedure
}
/**
+ * Last action from the procedure - executed when online schema change is supported.
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ private void reOpenAllRegionsIfTableIsOnline(final MasterProcedureEnv env) throws IOException {
+ // This operation only run when the table is enabled.
+ if (!env.getMasterServices().getTableStateManager()
+ .isTableState(getTableName(), TableState.State.ENABLED)) {
+ return;
+ }
+
+ if (MasterDDLOperationHelper.reOpenAllRegions(env, getTableName(), getRegionInfoList(env))) {
+ LOG.info("Completed modify table operation on table " + getTableName());
+ } else {
+ LOG.warn("Error on reopening the regions on table " + getTableName());
+ }
+ }
+
+ /**
* The procedure could be restarted from a different machine. If the variable is null, we need to
* retrieve it.
* @return traceEnabled whether the trace is enabled
@@ -415,8 +430,7 @@ public class ModifyTableProcedure
private List<HRegionInfo> getRegionInfoList(final MasterProcedureEnv env) throws IOException {
if (regionInfoList == null) {
- regionInfoList = env.getAssignmentManager().getRegionStates()
- .getRegionsOfTable(getTableName());
+ regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, getTableName());
}
return regionInfoList;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
index 5199bf8..3777c79 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
@@ -21,26 +21,30 @@ package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateException;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.ProcedureInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
-import org.apache.hadoop.hbase.master.assignment.RegionStates;
+import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.master.RegionStates;
+import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
/**
* Helper to synchronously wait on conditions.
@@ -60,93 +64,19 @@ public final class ProcedureSyncWait {
T evaluate() throws IOException;
}
- private static class ProcedureFuture implements Future<byte[]> {
- private final ProcedureExecutor<MasterProcedureEnv> procExec;
- private final long procId;
-
- private boolean hasResult = false;
- private byte[] result = null;
-
- public ProcedureFuture(ProcedureExecutor<MasterProcedureEnv> procExec, long procId) {
- this.procExec = procExec;
- this.procId = procId;
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) { return false; }
-
- @Override
- public boolean isCancelled() { return false; }
-
- @Override
- public boolean isDone() { return hasResult; }
-
- @Override
- public byte[] get() throws InterruptedException, ExecutionException {
- if (hasResult) return result;
- try {
- return waitForProcedureToComplete(procExec, procId, Long.MAX_VALUE);
- } catch (Exception e) {
- throw new ExecutionException(e);
- }
- }
-
- @Override
- public byte[] get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- if (hasResult) return result;
- try {
- result = waitForProcedureToComplete(procExec, procId, unit.toMillis(timeout));
- hasResult = true;
- return result;
- } catch (TimeoutIOException e) {
- throw new TimeoutException(e.getMessage());
- } catch (Exception e) {
- throw new ExecutionException(e);
- }
- }
- }
-
- public static Future<byte[]> submitProcedure(final ProcedureExecutor<MasterProcedureEnv> procExec,
- final Procedure proc) {
- if (proc.isInitializing()) {
- procExec.submitProcedure(proc);
- }
- return new ProcedureFuture(procExec, proc.getProcId());
- }
-
public static byte[] submitAndWaitProcedure(ProcedureExecutor<MasterProcedureEnv> procExec,
final Procedure proc) throws IOException {
- if (proc.isInitializing()) {
- procExec.submitProcedure(proc);
- }
- return waitForProcedureToCompleteIOE(procExec, proc.getProcId(), Long.MAX_VALUE);
+ long procId = procExec.submitProcedure(proc);
+ return waitForProcedureToComplete(procExec, procId);
}
- public static byte[] waitForProcedureToCompleteIOE(
- final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final long timeout)
- throws IOException {
- try {
- return waitForProcedureToComplete(procExec, procId, timeout);
- } catch (IOException e) {
- throw e;
- } catch (Exception e) {
- throw new IOException(e);
+ private static byte[] waitForProcedureToComplete(ProcedureExecutor<MasterProcedureEnv> procExec,
+ final long procId) throws IOException {
+ while (!procExec.isFinished(procId) && procExec.isRunning()) {
+ // TODO: add a config to make it tunable
+ // Dev Consideration: are we waiting forever, or we can set up some timeout value?
+ Threads.sleepWithoutInterrupt(250);
}
- }
-
- public static byte[] waitForProcedureToComplete(
- final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final long timeout)
- throws IOException {
- waitFor(procExec.getEnvironment(), "pid=" + procId,
- new ProcedureSyncWait.Predicate<Boolean>() {
- @Override
- public Boolean evaluate() throws IOException {
- return !procExec.isRunning() || procExec.isFinished(procId);
- }
- }
- );
-
ProcedureInfo result = procExec.getResult(procId);
if (result != null) {
if (result.isFailed()) {
@@ -156,7 +86,7 @@ public final class ProcedureSyncWait {
return result.getResult();
} else {
if (procExec.isRunning()) {
- throw new IOException("pid= " + procId + "not found");
+ throw new IOException("Procedure " + procId + "not found");
} else {
throw new IOException("The Master is Aborting");
}
@@ -174,7 +104,6 @@ public final class ProcedureSyncWait {
public static <T> T waitFor(MasterProcedureEnv env, long waitTime, long waitingTimeForEvents,
String purpose, Predicate<T> predicate) throws IOException {
final long done = EnvironmentEdgeManager.currentTime() + waitTime;
- boolean logged = false;
do {
T result = predicate.evaluate();
if (result != null && !result.equals(Boolean.FALSE)) {
@@ -186,12 +115,7 @@ public final class ProcedureSyncWait {
LOG.warn("Interrupted while sleeping, waiting on " + purpose);
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
}
- if (LOG.isTraceEnabled()) {
- LOG.trace("waitFor " + purpose);
- } else {
- if (!logged) LOG.debug("waitFor " + purpose);
- }
- logged = true;
+ LOG.debug("Waiting on " + purpose);
} while (EnvironmentEdgeManager.currentTime() < done && env.isRunning());
throw new TimeoutIOException("Timed out while waiting on " + purpose);
@@ -209,14 +133,44 @@ public final class ProcedureSyncWait {
}
}
+ protected static void waitRegionServers(final MasterProcedureEnv env) throws IOException {
+ final ServerManager sm = env.getMasterServices().getServerManager();
+ ProcedureSyncWait.waitFor(env, "server to assign region(s)",
+ new ProcedureSyncWait.Predicate<Boolean>() {
+ @Override
+ public Boolean evaluate() throws IOException {
+ List<ServerName> servers = sm.createDestinationServersList();
+ return servers != null && !servers.isEmpty();
+ }
+ });
+ }
+
+ protected static List<HRegionInfo> getRegionsFromMeta(final MasterProcedureEnv env,
+ final TableName tableName) throws IOException {
+ return ProcedureSyncWait.waitFor(env, "regions of table=" + tableName + " from meta",
+ new ProcedureSyncWait.Predicate<List<HRegionInfo>>() {
+ @Override
+ public List<HRegionInfo> evaluate() throws IOException {
+ if (TableName.META_TABLE_NAME.equals(tableName)) {
+ return new MetaTableLocator().getMetaRegions(env.getMasterServices().getZooKeeper());
+ }
+ return MetaTableAccessor.getTableRegions(env.getMasterServices().getConnection(),tableName);
+ }
+ });
+ }
+
protected static void waitRegionInTransition(final MasterProcedureEnv env,
final List<HRegionInfo> regions) throws IOException, CoordinatedStateException {
- final RegionStates states = env.getAssignmentManager().getRegionStates();
+ final AssignmentManager am = env.getMasterServices().getAssignmentManager();
+ final RegionStates states = am.getRegionStates();
for (final HRegionInfo region : regions) {
ProcedureSyncWait.waitFor(env, "regions " + region.getRegionNameAsString() + " in transition",
new ProcedureSyncWait.Predicate<Boolean>() {
@Override
public Boolean evaluate() throws IOException {
+ if (states.isRegionInState(region, State.FAILED_OPEN)) {
+ am.regionOffline(region);
+ }
return !states.isRegionInTransition(region);
}
});
http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
deleted file mode 100644
index 887e272..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
+++ /dev/null
@@ -1,541 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.master.procedure;
-
-import com.google.common.collect.ArrayListMultimap;
-
-import java.io.IOException;
-import java.net.SocketTimeoutException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
-import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.master.ServerListener;
-import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-
-/**
- * A remote procecdure dispatcher for regionservers.
- */
-public class RSProcedureDispatcher
- extends RemoteProcedureDispatcher<MasterProcedureEnv, ServerName>
- implements ServerListener {
- private static final Log LOG = LogFactory.getLog(RSProcedureDispatcher.class);
-
- public static final String RS_RPC_STARTUP_WAIT_TIME_CONF_KEY =
- "hbase.regionserver.rpc.startup.waittime";
- private static final int DEFAULT_RS_RPC_STARTUP_WAIT_TIME = 60000;
-
- private static final int RS_VERSION_WITH_EXEC_PROCS = 0x0201000; // 2.1
-
- protected final MasterServices master;
- protected final long rsStartupWaitTime;
-
- public RSProcedureDispatcher(final MasterServices master) {
- super(master.getConfiguration());
-
- this.master = master;
- this.rsStartupWaitTime = master.getConfiguration().getLong(
- RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, DEFAULT_RS_RPC_STARTUP_WAIT_TIME);
- }
-
- @Override
- public boolean start() {
- if (!super.start()) {
- return false;
- }
-
- master.getServerManager().registerListener(this);
- for (ServerName serverName: master.getServerManager().getOnlineServersList()) {
- addNode(serverName);
- }
- return true;
- }
-
- @Override
- public boolean stop() {
- if (!super.stop()) {
- return false;
- }
-
- master.getServerManager().unregisterListener(this);
- return true;
- }
-
- @Override
- protected void remoteDispatch(final ServerName serverName,
- final Set<RemoteProcedure> operations) {
- final int rsVersion = master.getAssignmentManager().getServerVersion(serverName);
- if (rsVersion >= RS_VERSION_WITH_EXEC_PROCS) {
- LOG.info(String.format(
- "Using procedure batch rpc execution for serverName=%s version=%s",
- serverName, rsVersion));
- submitTask(new ExecuteProceduresRemoteCall(serverName, operations));
- } else {
- LOG.info(String.format(
- "Fallback to compat rpc execution for serverName=%s version=%s",
- serverName, rsVersion));
- submitTask(new CompatRemoteProcedureResolver(serverName, operations));
- }
- }
-
- protected void abortPendingOperations(final ServerName serverName,
- final Set<RemoteProcedure> operations) {
- // TODO: Replace with a ServerNotOnlineException()
- final IOException e = new DoNotRetryIOException("server not online " + serverName);
- final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
- for (RemoteProcedure proc: operations) {
- proc.remoteCallFailed(env, serverName, e);
- }
- }
-
- public void serverAdded(final ServerName serverName) {
- addNode(serverName);
- }
-
- public void serverRemoved(final ServerName serverName) {
- removeNode(serverName);
- }
-
- /**
- * Base remote call
- */
- protected abstract class AbstractRSRemoteCall implements Callable<Void> {
- private final ServerName serverName;
-
- private int numberOfAttemptsSoFar = 0;
- private long maxWaitTime = -1;
-
- public AbstractRSRemoteCall(final ServerName serverName) {
- this.serverName = serverName;
- }
-
- public abstract Void call();
-
- protected AdminService.BlockingInterface getRsAdmin() throws IOException {
- final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName);
- if (admin == null) {
- throw new IOException("Attempting to send OPEN RPC to server " + getServerName() +
- " failed because no RPC connection found to this server");
- }
- return admin;
- }
-
- protected ServerName getServerName() {
- return serverName;
- }
-
- protected boolean scheduleForRetry(final IOException e) {
- // Should we wait a little before retrying? If the server is starting it's yes.
- final boolean hold = (e instanceof ServerNotRunningYetException);
- if (hold) {
- LOG.warn(String.format("waiting a little before trying on the same server=%s try=%d",
- serverName, numberOfAttemptsSoFar), e);
- long now = EnvironmentEdgeManager.currentTime();
- if (now < getMaxWaitTime()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("server is not yet up; waiting up to %dms",
- (getMaxWaitTime() - now)), e);
- }
- submitTask(this, 100, TimeUnit.MILLISECONDS);
- return true;
- }
-
- LOG.warn(String.format("server %s is not up for a while; try a new one", serverName), e);
- return false;
- }
-
- // In case socket is timed out and the region server is still online,
- // the openRegion RPC could have been accepted by the server and
- // just the response didn't go through. So we will retry to
- // open the region on the same server.
- final boolean retry = !hold && (e instanceof SocketTimeoutException
- && master.getServerManager().isServerOnline(serverName));
- if (retry) {
- // we want to retry as many times as needed as long as the RS is not dead.
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Retrying to same RegionServer %s because: %s",
- serverName, e.getMessage()), e);
- }
- submitTask(this);
- return true;
- }
-
- // trying to send the request elsewhere instead
- LOG.warn(String.format("the request should be tried elsewhere instead; server=%s try=%d",
- serverName, numberOfAttemptsSoFar), e);
- return false;
- }
-
- private long getMaxWaitTime() {
- if (this.maxWaitTime < 0) {
- // This is the max attempts, not retries, so it should be at least 1.
- this.maxWaitTime = EnvironmentEdgeManager.currentTime() + rsStartupWaitTime;
- }
- return this.maxWaitTime;
- }
-
- protected IOException unwrapException(IOException e) {
- if (e instanceof RemoteException) {
- e = ((RemoteException)e).unwrapRemoteException();
- }
- return e;
- }
- }
-
- private interface RemoteProcedureResolver {
- void dispatchOpenRequests(MasterProcedureEnv env, List<RegionOpenOperation> operations);
- void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations);
- }
-
- public void splitAndResolveOperation(final ServerName serverName,
- final Set<RemoteProcedure> operations, final RemoteProcedureResolver resolver) {
- final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
- final ArrayListMultimap<Class<?>, RemoteOperation> reqsByType =
- buildAndGroupRequestByType(env, serverName, operations);
-
- final List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class);
- if (!openOps.isEmpty()) resolver.dispatchOpenRequests(env, openOps);
-
- final List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class);
- if (!closeOps.isEmpty()) resolver.dispatchCloseRequests(env, closeOps);
-
- if (!reqsByType.isEmpty()) {
- LOG.warn("unknown request type in the queue: " + reqsByType);
- }
- }
-
- // ==========================================================================
- // Compatibility calls
- // ==========================================================================
- protected class ExecuteProceduresRemoteCall extends AbstractRSRemoteCall
- implements RemoteProcedureResolver {
- private final Set<RemoteProcedure> operations;
-
- private ExecuteProceduresRequest.Builder request = null;
-
- public ExecuteProceduresRemoteCall(final ServerName serverName,
- final Set<RemoteProcedure> operations) {
- super(serverName);
- this.operations = operations;
- }
-
- public Void call() {
- final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
-
- request = ExecuteProceduresRequest.newBuilder();
- splitAndResolveOperation(getServerName(), operations, this);
-
- try {
- final ExecuteProceduresResponse response = sendRequest(getServerName(), request.build());
- remoteCallCompleted(env, response);
- } catch (IOException e) {
- e = unwrapException(e);
- // TODO: In the future some operation may want to bail out early.
- // TODO: How many times should we retry (use numberOfAttemptsSoFar)
- if (!scheduleForRetry(e)) {
- remoteCallFailed(env, e);
- }
- }
- return null;
- }
-
- public void dispatchOpenRequests(final MasterProcedureEnv env,
- final List<RegionOpenOperation> operations) {
- request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), operations));
- }
-
- public void dispatchCloseRequests(final MasterProcedureEnv env,
- final List<RegionCloseOperation> operations) {
- for (RegionCloseOperation op: operations) {
- request.addCloseRegion(op.buildCloseRegionRequest(getServerName()));
- }
- }
-
- protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
- final ExecuteProceduresRequest request) throws IOException {
- try {
- return getRsAdmin().executeProcedures(null, request);
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
- }
-
-
- private void remoteCallCompleted(final MasterProcedureEnv env,
- final ExecuteProceduresResponse response) {
- /*
- for (RemoteProcedure proc: operations) {
- proc.remoteCallCompleted(env, getServerName(), response);
- }*/
- }
-
- private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
- for (RemoteProcedure proc: operations) {
- proc.remoteCallFailed(env, getServerName(), e);
- }
- }
- }
-
- // ==========================================================================
- // Compatibility calls
- // Since we don't have a "batch proc-exec" request on the target RS
- // we have to chunk the requests by type and dispatch the specific request.
- // ==========================================================================
- private static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env,
- final ServerName serverName, final List<RegionOpenOperation> operations) {
- final OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
- builder.setServerStartCode(serverName.getStartcode());
- builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
- for (RegionOpenOperation op: operations) {
- builder.addOpenInfo(op.buildRegionOpenInfoRequest(env));
- }
- return builder.build();
- }
-
- private final class OpenRegionRemoteCall extends AbstractRSRemoteCall {
- private final List<RegionOpenOperation> operations;
-
- public OpenRegionRemoteCall(final ServerName serverName,
- final List<RegionOpenOperation> operations) {
- super(serverName);
- this.operations = operations;
- }
-
- @Override
- public Void call() {
- final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
- final OpenRegionRequest request = buildOpenRegionRequest(env, getServerName(), operations);
-
- try {
- OpenRegionResponse response = sendRequest(getServerName(), request);
- remoteCallCompleted(env, response);
- } catch (IOException e) {
- e = unwrapException(e);
- // TODO: In the future some operation may want to bail out early.
- // TODO: How many times should we retry (use numberOfAttemptsSoFar)
- if (!scheduleForRetry(e)) {
- remoteCallFailed(env, e);
- }
- }
- return null;
- }
-
- private OpenRegionResponse sendRequest(final ServerName serverName,
- final OpenRegionRequest request) throws IOException {
- try {
- return getRsAdmin().openRegion(null, request);
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
- }
-
- private void remoteCallCompleted(final MasterProcedureEnv env,
- final OpenRegionResponse response) {
- int index = 0;
- for (RegionOpenOperation op: operations) {
- OpenRegionResponse.RegionOpeningState state = response.getOpeningState(index++);
- op.setFailedOpen(state == OpenRegionResponse.RegionOpeningState.FAILED_OPENING);
- op.getRemoteProcedure().remoteCallCompleted(env, getServerName(), op);
- }
- }
-
- private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
- for (RegionOpenOperation op: operations) {
- op.getRemoteProcedure().remoteCallFailed(env, getServerName(), e);
- }
- }
- }
-
- private final class CloseRegionRemoteCall extends AbstractRSRemoteCall {
- private final RegionCloseOperation operation;
-
- public CloseRegionRemoteCall(final ServerName serverName,
- final RegionCloseOperation operation) {
- super(serverName);
- this.operation = operation;
- }
-
- @Override
- public Void call() {
- final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
- final CloseRegionRequest request = operation.buildCloseRegionRequest(getServerName());
- try {
- CloseRegionResponse response = sendRequest(getServerName(), request);
- remoteCallCompleted(env, response);
- } catch (IOException e) {
- e = unwrapException(e);
- // TODO: In the future some operation may want to bail out early.
- // TODO: How many times should we retry (use numberOfAttemptsSoFar)
- if (!scheduleForRetry(e)) {
- remoteCallFailed(env, e);
- }
- }
- return null;
- }
-
- private CloseRegionResponse sendRequest(final ServerName serverName,
- final CloseRegionRequest request) throws IOException {
- try {
- return getRsAdmin().closeRegion(null, request);
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
- }
-
- private void remoteCallCompleted(final MasterProcedureEnv env,
- final CloseRegionResponse response) {
- operation.setClosed(response.getClosed());
- operation.getRemoteProcedure().remoteCallCompleted(env, getServerName(), operation);
- }
-
- private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
- operation.getRemoteProcedure().remoteCallFailed(env, getServerName(), e);
- }
- }
-
- protected class CompatRemoteProcedureResolver implements Callable<Void>, RemoteProcedureResolver {
- private final Set<RemoteProcedure> operations;
- private final ServerName serverName;
-
- public CompatRemoteProcedureResolver(final ServerName serverName,
- final Set<RemoteProcedure> operations) {
- this.serverName = serverName;
- this.operations = operations;
- }
-
- @Override
- public Void call() {
- splitAndResolveOperation(serverName, operations, this);
- return null;
- }
-
- public void dispatchOpenRequests(final MasterProcedureEnv env,
- final List<RegionOpenOperation> operations) {
- submitTask(new OpenRegionRemoteCall(serverName, operations));
- }
-
- public void dispatchCloseRequests(final MasterProcedureEnv env,
- final List<RegionCloseOperation> operations) {
- for (RegionCloseOperation op: operations) {
- submitTask(new CloseRegionRemoteCall(serverName, op));
- }
- }
- }
-
- // ==========================================================================
- // RPC Messages
- // - ServerOperation: refreshConfig, grant, revoke, ...
- // - RegionOperation: open, close, flush, snapshot, ...
- // ==========================================================================
- public static abstract class ServerOperation extends RemoteOperation {
- protected ServerOperation(final RemoteProcedure remoteProcedure) {
- super(remoteProcedure);
- }
- }
-
- public static abstract class RegionOperation extends RemoteOperation {
- private final HRegionInfo regionInfo;
-
- protected RegionOperation(final RemoteProcedure remoteProcedure,
- final HRegionInfo regionInfo) {
- super(remoteProcedure);
- this.regionInfo = regionInfo;
- }
-
- public HRegionInfo getRegionInfo() {
- return this.regionInfo;
- }
- }
-
- public static class RegionOpenOperation extends RegionOperation {
- private final List<ServerName> favoredNodes;
- private final boolean openForReplay;
- private boolean failedOpen;
-
- public RegionOpenOperation(final RemoteProcedure remoteProcedure,
- final HRegionInfo regionInfo, final List<ServerName> favoredNodes,
- final boolean openForReplay) {
- super(remoteProcedure, regionInfo);
- this.favoredNodes = favoredNodes;
- this.openForReplay = openForReplay;
- }
-
- protected void setFailedOpen(final boolean failedOpen) {
- this.failedOpen = failedOpen;
- }
-
- public boolean isFailedOpen() {
- return failedOpen;
- }
-
- public OpenRegionRequest.RegionOpenInfo buildRegionOpenInfoRequest(
- final MasterProcedureEnv env) {
- return RequestConverter.buildRegionOpenInfo(getRegionInfo(),
- env.getAssignmentManager().getFavoredNodes(getRegionInfo()), false);
- }
- }
-
- public static class RegionCloseOperation extends RegionOperation {
- private final ServerName destinationServer;
- private boolean closed = false;
-
- public RegionCloseOperation(final RemoteProcedure remoteProcedure,
- final HRegionInfo regionInfo, final ServerName destinationServer) {
- super(remoteProcedure, regionInfo);
- this.destinationServer = destinationServer;
- }
-
- public ServerName getDestinationServer() {
- return destinationServer;
- }
-
- protected void setClosed(final boolean closed) {
- this.closed = closed;
- }
-
- public boolean isClosed() {
- return closed;
- }
-
- public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName) {
- return ProtobufUtil.buildCloseRegionRequest(serverName,
- getRegionInfo().getRegionName(), getDestinationServer());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java
index cfd9df9..21709f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MetricsSnapshot;
+import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -415,7 +416,17 @@ public class RestoreSnapshotProcedure
try {
Connection conn = env.getMasterServices().getConnection();
- // 1. Prepare to restore
+ // 1. Forces all the RegionStates to be offline
+ //
+ // The AssignmentManager keeps all the region states around
+ // with no possibility to remove them, until the master is restarted.
+ // This means that a region marked as SPLIT before the restore will never be assigned again.
+ // To avoid having all states around all the regions are switched to the OFFLINE state,
+ // which is the same state that the regions will be after a delete table.
+ forceRegionsOffline(env, regionsToAdd);
+ forceRegionsOffline(env, regionsToRestore);
+ forceRegionsOffline(env, regionsToRemove);
+
getMonitorStatus().setStatus("Preparing to restore each region");
// 2. Applies changes to hbase:meta
@@ -485,6 +496,20 @@ public class RestoreSnapshotProcedure
}
/**
+ * Make sure that region states of the region list is in OFFLINE state.
+ * @param env MasterProcedureEnv
+ * @param hris region info list
+ **/
+ private void forceRegionsOffline(final MasterProcedureEnv env, final List<HRegionInfo> hris) {
+ RegionStates states = env.getMasterServices().getAssignmentManager().getRegionStates();
+ if (hris != null) {
+ for (HRegionInfo hri: hris) {
+ states.regionOffline(hri);
+ }
+ }
+ }
+
+ /**
* The procedure could be restarted from a different machine. If the variable is null, we need to
* retrieve it.
* @return traceEnabled
http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java
deleted file mode 100644
index ca351f6..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master.procedure;
-
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * Passed as Exception by {@link ServerCrashProcedure}
- * notifying on-going RIT that server has failed.
- */
-@InterfaceAudience.Private
-@SuppressWarnings("serial")
-public class ServerCrashException extends HBaseIOException {
- private final long procId;
- private final ServerName serverName;
-
- /**
- * @param serverName The server that crashed.
- */
- public ServerCrashException(long procId, ServerName serverName) {
- this.procId = procId;
- this.serverName = serverName;
- }
-
- @Override
- public String getMessage() {
- return "ServerCrashProcedure pid=" + this.procId + ", server=" + this.serverName;
- }
-}
\ No newline at end of file