You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/27 20:18:08 UTC
[16/29] hbase git commit: HBASE-14614 Procedure v2 - Core Assignment
Manager (Matteo Bertozzi) Move to a new AssignmentManager,
one that describes Assignment using a State Machine built on top of
ProcedureV2 facility.
http://git-wip-us.apache.org/repos/asf/hbase/blob/657a5d46/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
new file mode 100644
index 0000000..2b1de9d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
@@ -0,0 +1,776 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.assignment;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MetaMutationAnnotation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.UnknownRegionException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.exceptions.MergeRegionException;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.master.CatalogJanitor;
+import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.procedure.AbstractStateMachineTableProcedure;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MergeTableRegionsState;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.lmax.disruptor.YieldingWaitStrategy;
+
+/**
+ * The procedure to Merge a region in a table.
+ * This procedure takes an exclusive table lock since it is working over multiple regions.
+ * It holds the lock for the life of the procedure.
+ */
+@InterfaceAudience.Private
+public class MergeTableRegionsProcedure
+ extends AbstractStateMachineTableProcedure<MergeTableRegionsState> {
+ private static final Log LOG = LogFactory.getLog(MergeTableRegionsProcedure.class);
+ private Boolean traceEnabled;
+ private volatile boolean lock = false;
+ private ServerName regionLocation;
+ private HRegionInfo[] regionsToMerge;
+ private HRegionInfo mergedRegion;
+ private boolean forcible;
+
+ public MergeTableRegionsProcedure() {
+ // Required by the Procedure framework to create the procedure on replay
+ }
+
+ public MergeTableRegionsProcedure(final MasterProcedureEnv env,
+ final HRegionInfo regionToMergeA, final HRegionInfo regionToMergeB) throws IOException {
+ this(env, regionToMergeA, regionToMergeB, false);
+ }
+
+ public MergeTableRegionsProcedure(final MasterProcedureEnv env,
+ final HRegionInfo regionToMergeA, final HRegionInfo regionToMergeB,
+ final boolean forcible) throws MergeRegionException {
+ this(env, new HRegionInfo[] {regionToMergeA, regionToMergeB}, forcible);
+ }
+
+ public MergeTableRegionsProcedure(final MasterProcedureEnv env,
+ final HRegionInfo[] regionsToMerge, final boolean forcible)
+ throws MergeRegionException {
+ super(env);
+
+ // Check daughter regions and make sure that we have valid daughter regions
+ // before doing the real work.
+ checkRegionsToMerge(regionsToMerge, forcible);
+
+ // WARN: make sure there is no parent region of the two merging regions in
+ // hbase:meta If exists, fixing up daughters would cause daughter regions(we
+ // have merged one) online again when we restart master, so we should clear
+ // the parent region to prevent the above case
+ // Since HBASE-7721, we don't need fix up daughters any more. so here do nothing
+ this.regionsToMerge = regionsToMerge;
+ this.mergedRegion = createMergedRegionInfo(regionsToMerge);
+ this.forcible = forcible;
+ }
+
+ private static void checkRegionsToMerge(final HRegionInfo[] regionsToMerge,
+ final boolean forcible) throws MergeRegionException {
+ // For now, we only merge 2 regions.
+ // It could be extended to more than 2 regions in the future.
+ if (regionsToMerge == null || regionsToMerge.length != 2) {
+ throw new MergeRegionException("Expected to merge 2 regions, got: " +
+ Arrays.toString(regionsToMerge));
+ }
+
+ checkRegionsToMerge(regionsToMerge[0], regionsToMerge[1], forcible);
+ }
+
+ private static void checkRegionsToMerge(final HRegionInfo regionToMergeA,
+ final HRegionInfo regionToMergeB, final boolean forcible) throws MergeRegionException {
+ if (!regionToMergeA.getTable().equals(regionToMergeB.getTable())) {
+ throw new MergeRegionException("Can't merge regions from two different tables: " +
+ regionToMergeA + ", " + regionToMergeB);
+ }
+
+ if (regionToMergeA.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
+ regionToMergeB.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
+ throw new MergeRegionException("Can't merge non-default replicas");
+ }
+
+ if (!HRegionInfo.areAdjacent(regionToMergeA, regionToMergeB)) {
+ String msg = "Unable to merge not adjacent regions " + regionToMergeA.getShortNameToLog() +
+ ", " + regionToMergeB.getShortNameToLog() + " where forcible = " + forcible;
+ LOG.warn(msg);
+ if (!forcible) {
+ throw new MergeRegionException(msg);
+ }
+ }
+ }
+
+ private static HRegionInfo createMergedRegionInfo(final HRegionInfo[] regionsToMerge) {
+ return createMergedRegionInfo(regionsToMerge[0], regionsToMerge[1]);
+ }
+
+ /**
+ * Create merged region info through the specified two regions
+ */
+ private static HRegionInfo createMergedRegionInfo(final HRegionInfo regionToMergeA,
+ final HRegionInfo regionToMergeB) {
+ // Choose the smaller as start key
+ final byte[] startKey;
+ if (regionToMergeA.compareTo(regionToMergeB) <= 0) {
+ startKey = regionToMergeA.getStartKey();
+ } else {
+ startKey = regionToMergeB.getStartKey();
+ }
+
+ // Choose the bigger as end key
+ final byte[] endKey;
+ if (Bytes.equals(regionToMergeA.getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
+ || (!Bytes.equals(regionToMergeB.getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
+ && Bytes.compareTo(regionToMergeA.getEndKey(), regionToMergeB.getEndKey()) > 0)) {
+ endKey = regionToMergeA.getEndKey();
+ } else {
+ endKey = regionToMergeB.getEndKey();
+ }
+
+ // Merged region is sorted between two merging regions in META
+ final long rid = getMergedRegionIdTimestamp(regionToMergeA, regionToMergeB);
+ return new HRegionInfo(regionToMergeA.getTable(), startKey, endKey, false, rid);
+ }
+
+ private static long getMergedRegionIdTimestamp(final HRegionInfo regionToMergeA,
+ final HRegionInfo regionToMergeB) {
+ long rid = EnvironmentEdgeManager.currentTime();
+ // Regionid is timestamp. Merged region's id can't be less than that of
+ // merging regions else will insert at wrong location in hbase:meta (See HBASE-710).
+ if (rid < regionToMergeA.getRegionId() || rid < regionToMergeB.getRegionId()) {
+ LOG.warn("Clock skew; merging regions id are " + regionToMergeA.getRegionId()
+ + " and " + regionToMergeB.getRegionId() + ", but current time here is " + rid);
+ rid = Math.max(regionToMergeA.getRegionId(), regionToMergeB.getRegionId()) + 1;
+ }
+ return rid;
+ }
+
+ @Override
+ protected Flow executeFromState(
+ final MasterProcedureEnv env,
+ final MergeTableRegionsState state)
+ throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(this + " execute state=" + state);
+ }
+ try {
+ switch (state) {
+ case MERGE_TABLE_REGIONS_PREPARE:
+ if (!prepareMergeRegion(env)) {
+ assert isFailed() : "Merge region should have an exception here";
+ return Flow.NO_MORE_STATE;
+ }
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION);
+ break;
+ case MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION:
+ preMergeRegions(env);
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE);
+ break;
+ case MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE:
+ setRegionStateToMerging(env);
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CLOSE_REGIONS);
+ break;
+ case MERGE_TABLE_REGIONS_CLOSE_REGIONS:
+ addChildProcedure(createUnassignProcedures(env, getRegionReplication(env)));
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CREATE_MERGED_REGION);
+ break;
+ case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION:
+ createMergedRegion(env);
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION);
+ break;
+ case MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION:
+ preMergeRegionsCommit(env);
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_UPDATE_META);
+ break;
+ case MERGE_TABLE_REGIONS_UPDATE_META:
+ updateMetaForMergedRegions(env);
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION);
+ break;
+ case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION:
+ postMergeRegionsCommit(env);
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_OPEN_MERGED_REGION);
+ break;
+ case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION:
+ addChildProcedure(createAssignProcedures(env, getRegionReplication(env)));
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_OPERATION);
+ break;
+ case MERGE_TABLE_REGIONS_POST_OPERATION:
+ postCompletedMergeRegions(env);
+ return Flow.NO_MORE_STATE;
+ default:
+ throw new UnsupportedOperationException(this + " unhandled state=" + state);
+ }
+ } catch (IOException e) {
+ LOG.warn("Error trying to merge regions " + HRegionInfo.getShortNameToLog(regionsToMerge) +
+ " in the table " + getTableName() + " (in state=" + state + ")", e);
+
+ setFailure("master-merge-regions", e);
+ }
+ return Flow.HAS_MORE_STATE;
+ }
+
+ @Override
+ protected void rollbackState(
+ final MasterProcedureEnv env,
+ final MergeTableRegionsState state) throws IOException, InterruptedException {
+ if (isTraceEnabled()) {
+ LOG.trace(this + " rollback state=" + state);
+ }
+
+ try {
+ switch (state) {
+ case MERGE_TABLE_REGIONS_POST_OPERATION:
+ case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION:
+ case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION:
+ case MERGE_TABLE_REGIONS_UPDATE_META:
+ String msg = this + " We are in the " + state + " state."
+ + " It is complicated to rollback the merge operation that region server is working on."
+ + " Rollback is not supported and we should let the merge operation to complete";
+ LOG.warn(msg);
+ // PONR
+ throw new UnsupportedOperationException(this + " unhandled state=" + state);
+ case MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION:
+ break;
+ case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION:
+ cleanupMergedRegion(env);
+ break;
+ case MERGE_TABLE_REGIONS_CLOSE_REGIONS:
+ rollbackCloseRegionsForMerge(env);
+ break;
+ case MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE:
+ setRegionStateToRevertMerging(env);
+ break;
+ case MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION:
+ postRollBackMergeRegions(env);
+ break;
+ case MERGE_TABLE_REGIONS_MOVE_REGION_TO_SAME_RS:
+ break; // nothing to rollback
+ case MERGE_TABLE_REGIONS_PREPARE:
+ break;
+ default:
+ throw new UnsupportedOperationException(this + " unhandled state=" + state);
+ }
+ } catch (Exception e) {
+ // This will be retried. Unless there is a bug in the code,
+ // this should be just a "temporary error" (e.g. network down)
+ LOG.warn("Failed rollback attempt step " + state + " for merging the regions "
+ + HRegionInfo.getShortNameToLog(regionsToMerge) + " in table " + getTableName(), e);
+ throw e;
+ }
+ }
+
+ /*
+ * Check whether we are in the state that can be rollback
+ */
+ @Override
+ protected boolean isRollbackSupported(final MergeTableRegionsState state) {
+ switch (state) {
+ case MERGE_TABLE_REGIONS_POST_OPERATION:
+ case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION:
+ case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION:
+ case MERGE_TABLE_REGIONS_UPDATE_META:
+ // It is not safe to rollback if we reach to these states.
+ return false;
+ default:
+ break;
+ }
+ return true;
+ }
+
+ @Override
+ protected MergeTableRegionsState getState(final int stateId) {
+ return MergeTableRegionsState.forNumber(stateId);
+ }
+
+ @Override
+ protected int getStateId(final MergeTableRegionsState state) {
+ return state.getNumber();
+ }
+
+ @Override
+ protected MergeTableRegionsState getInitialState() {
+ return MergeTableRegionsState.MERGE_TABLE_REGIONS_PREPARE;
+ }
+
+ @Override
+ public void serializeStateData(final OutputStream stream) throws IOException {
+ super.serializeStateData(stream);
+
+ final MasterProcedureProtos.MergeTableRegionsStateData.Builder mergeTableRegionsMsg =
+ MasterProcedureProtos.MergeTableRegionsStateData.newBuilder()
+ .setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
+ .setMergedRegionInfo(HRegionInfo.convert(mergedRegion))
+ .setForcible(forcible);
+ for (int i = 0; i < regionsToMerge.length; ++i) {
+ mergeTableRegionsMsg.addRegionInfo(HRegionInfo.convert(regionsToMerge[i]));
+ }
+ mergeTableRegionsMsg.build().writeDelimitedTo(stream);
+ }
+
+ @Override
+ public void deserializeStateData(final InputStream stream) throws IOException {
+ super.deserializeStateData(stream);
+
+ final MasterProcedureProtos.MergeTableRegionsStateData mergeTableRegionsMsg =
+ MasterProcedureProtos.MergeTableRegionsStateData.parseDelimitedFrom(stream);
+ setUser(MasterProcedureUtil.toUserInfo(mergeTableRegionsMsg.getUserInfo()));
+
+ assert(mergeTableRegionsMsg.getRegionInfoCount() == 2);
+ regionsToMerge = new HRegionInfo[mergeTableRegionsMsg.getRegionInfoCount()];
+ for (int i = 0; i < regionsToMerge.length; i++) {
+ regionsToMerge[i] = HRegionInfo.convert(mergeTableRegionsMsg.getRegionInfo(i));
+ }
+
+ mergedRegion = HRegionInfo.convert(mergeTableRegionsMsg.getMergedRegionInfo());
+ }
+
+ @Override
+ public void toStringClassDetails(StringBuilder sb) {
+ sb.append(getClass().getSimpleName());
+ sb.append(" table=");
+ sb.append(getTableName());
+ sb.append(", regions=");
+ sb.append(HRegionInfo.getShortNameToLog(regionsToMerge));
+ sb.append(", forcibly=");
+ sb.append(forcible);
+ }
+
+ @Override
+ protected LockState acquireLock(final MasterProcedureEnv env) {
+ if (env.waitInitialized(this)) return LockState.LOCK_EVENT_WAIT;
+ if (env.getProcedureScheduler().waitRegions(this, getTableName(),
+ mergedRegion, regionsToMerge[0], regionsToMerge[1])) {
+ try {
+ LOG.debug(LockState.LOCK_EVENT_WAIT + " " + env.getProcedureScheduler().dumpLocks());
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ return LockState.LOCK_EVENT_WAIT;
+ }
+ this.lock = true;
+ return LockState.LOCK_ACQUIRED;
+ }
+
+ @Override
+ protected void releaseLock(final MasterProcedureEnv env) {
+ this.lock = false;
+ env.getProcedureScheduler().wakeRegions(this, getTableName(),
+ mergedRegion, regionsToMerge[0], regionsToMerge[1]);
+ }
+
+ @Override
+ protected boolean holdLock(MasterProcedureEnv env) {
+ return true;
+ }
+
+ @Override
+ protected boolean hasLock(MasterProcedureEnv env) {
+ return this.lock;
+ }
+
+ @Override
+ public TableName getTableName() {
+ return mergedRegion.getTable();
+ }
+
+ @Override
+ public TableOperationType getTableOperationType() {
+ return TableOperationType.REGION_MERGE;
+ }
+
+ /**
+ * Prepare merge and do some check
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ private boolean prepareMergeRegion(final MasterProcedureEnv env) throws IOException {
+ // Note: the following logic assumes that we only have 2 regions to merge. In the future,
+ // if we want to extend to more than 2 regions, the code needs to modify a little bit.
+ //
+ CatalogJanitor catalogJanitor = env.getMasterServices().getCatalogJanitor();
+ boolean regionAHasMergeQualifier = !catalogJanitor.cleanMergeQualifier(regionsToMerge[0]);
+ if (regionAHasMergeQualifier
+ || !catalogJanitor.cleanMergeQualifier(regionsToMerge[1])) {
+ String msg = "Skip merging regions " + HRegionInfo.getShortNameToLog(regionsToMerge) +
+ ", because region "
+ + (regionAHasMergeQualifier ? regionsToMerge[0].getEncodedName() : regionsToMerge[1]
+ .getEncodedName()) + " has merge qualifier";
+ LOG.warn(msg);
+ throw new MergeRegionException(msg);
+ }
+
+ RegionStates regionStates = env.getAssignmentManager().getRegionStates();
+ RegionState regionStateA = regionStates.getRegionState(regionsToMerge[0].getEncodedName());
+ RegionState regionStateB = regionStates.getRegionState(regionsToMerge[1].getEncodedName());
+ if (regionStateA == null || regionStateB == null) {
+ throw new UnknownRegionException(
+ regionStateA == null ?
+ regionsToMerge[0].getEncodedName() : regionsToMerge[1].getEncodedName());
+ }
+
+ if (!regionStateA.isOpened() || !regionStateB.isOpened()) {
+ throw new MergeRegionException(
+ "Unable to merge regions not online " + regionStateA + ", " + regionStateB);
+ }
+
+ if (!env.getMasterServices().isSplitOrMergeEnabled(MasterSwitchType.MERGE)) {
+ String regionsStr = Arrays.deepToString(regionsToMerge);
+ LOG.warn("merge switch is off! skip merge of " + regionsStr);
+ super.setFailure(getClass().getSimpleName(),
+ new IOException("Merge of " + regionsStr + " failed because merge switch is off"));
+ return false;
+ }
+
+
+ // Ask the remote regionserver if regions are mergeable. If we get an IOE, report it
+ // along w/ the failure so can see why we are not mergeable at this time.
+ IOException mergeableCheckIOE = null;
+ boolean mergeable = false;
+ RegionState current = regionStateA;
+ try {
+ mergeable = isMergeable(env, current);
+ } catch (IOException e) {
+ mergeableCheckIOE = e;
+ }
+ if (mergeable && mergeableCheckIOE == null) {
+ current = regionStateB;
+ try {
+ mergeable = isMergeable(env, current);
+ } catch (IOException e) {
+ mergeableCheckIOE = e;
+ }
+ }
+ if (!mergeable) {
+ IOException e = new IOException(current.getRegion().getShortNameToLog() + " NOT mergeable");
+ if (mergeableCheckIOE != null) e.initCause(mergeableCheckIOE);
+ super.setFailure(getClass().getSimpleName(), e);
+ return false;
+ }
+
+ return true;
+ }
+
+ private boolean isMergeable(final MasterProcedureEnv env, final RegionState rs)
+ throws IOException {
+ GetRegionInfoResponse response =
+ Util.getRegionInfoResponse(env, rs.getServerName(), rs.getRegion());
+ return response.hasSplittable() && response.getSplittable();
+ }
+
+ /**
+ * Pre merge region action
+ * @param env MasterProcedureEnv
+ **/
+ private void preMergeRegions(final MasterProcedureEnv env) throws IOException {
+ final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ boolean ret = cpHost.preMergeRegionsAction(regionsToMerge, getUser());
+ if (ret) {
+ throw new IOException(
+ "Coprocessor bypassing regions " + HRegionInfo.getShortNameToLog(regionsToMerge) +
+ " merge.");
+ }
+ }
+ // TODO: Clean up split and merge. Currently all over the place.
+ env.getMasterServices().getMasterQuotaManager().onRegionMerged(this.mergedRegion);
+ }
+
+ /**
+ * Action after rollback a merge table regions action.
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ private void postRollBackMergeRegions(final MasterProcedureEnv env) throws IOException {
+ final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ cpHost.postRollBackMergeRegionsAction(regionsToMerge, getUser());
+ }
+ }
+
+ /**
+ * Set the region states to MERGING state
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ public void setRegionStateToMerging(final MasterProcedureEnv env) throws IOException {
+ //transition.setTransitionCode(TransitionCode.READY_TO_MERGE);
+ }
+
+ /**
+ * Rollback the region state change
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ private void setRegionStateToRevertMerging(final MasterProcedureEnv env) throws IOException {
+ //transition.setTransitionCode(TransitionCode.MERGE_REVERTED);
+ }
+
+ /**
+ * Create merged region
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ private void createMergedRegion(final MasterProcedureEnv env) throws IOException {
+ final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+ final Path tabledir = FSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
+ final FileSystem fs = mfs.getFileSystem();
+ HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
+ env.getMasterConfiguration(), fs, tabledir, regionsToMerge[0], false);
+ regionFs.createMergesDir();
+
+ mergeStoreFiles(env, regionFs, regionFs.getMergesDir());
+ HRegionFileSystem regionFs2 = HRegionFileSystem.openRegionFromFileSystem(
+ env.getMasterConfiguration(), fs, tabledir, regionsToMerge[1], false);
+ mergeStoreFiles(env, regionFs2, regionFs.getMergesDir());
+
+ regionFs.commitMergedRegion(mergedRegion);
+ }
+
+ /**
+ * Create reference file(s) of merging regions under the merges directory
+ * @param env MasterProcedureEnv
+ * @param regionFs region file system
+ * @param mergedDir the temp directory of merged region
+ * @throws IOException
+ */
+ private void mergeStoreFiles(
+ final MasterProcedureEnv env, final HRegionFileSystem regionFs, final Path mergedDir)
+ throws IOException {
+ final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+ final Configuration conf = env.getMasterConfiguration();
+ final HTableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
+
+ for (String family: regionFs.getFamilies()) {
+ final HColumnDescriptor hcd = htd.getFamily(family.getBytes());
+ final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
+
+ if (storeFiles != null && storeFiles.size() > 0) {
+ final CacheConfig cacheConf = new CacheConfig(conf, hcd);
+ for (StoreFileInfo storeFileInfo: storeFiles) {
+ // Create reference file(s) of the region in mergedDir
+ regionFs.mergeStoreFile(
+ mergedRegion,
+ family,
+ new StoreFile(
+ mfs.getFileSystem(), storeFileInfo, conf, cacheConf, hcd.getBloomFilterType()),
+ mergedDir);
+ }
+ }
+ }
+ }
+
+ /**
+ * Clean up merged region
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ private void cleanupMergedRegion(final MasterProcedureEnv env) throws IOException {
+ final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+ final Path tabledir = FSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
+ final FileSystem fs = mfs.getFileSystem();
+ HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
+ env.getMasterConfiguration(), fs, tabledir, regionsToMerge[0], false);
+ regionFs.cleanupMergedRegion(mergedRegion);
+ }
+
+ /**
+ * Rollback close regions
+ * @param env MasterProcedureEnv
+ **/
+ private void rollbackCloseRegionsForMerge(final MasterProcedureEnv env) throws IOException {
+ // Check whether the region is closed; if so, open it in the same server
+ final int regionReplication = getRegionReplication(env);
+ final ServerName serverName = getServerName(env);
+
+ final AssignProcedure[] procs =
+ new AssignProcedure[regionsToMerge.length * regionReplication];
+ int procsIdx = 0;
+ for (int i = 0; i < regionsToMerge.length; ++i) {
+ for (int j = 0; j < regionReplication; ++j) {
+ final HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(regionsToMerge[i], j);
+ procs[procsIdx++] = env.getAssignmentManager().createAssignProcedure(hri, serverName);
+ }
+ }
+ env.getMasterServices().getMasterProcedureExecutor().submitProcedures(procs);
+ }
+
+ private UnassignProcedure[] createUnassignProcedures(final MasterProcedureEnv env,
+ final int regionReplication) {
+ final UnassignProcedure[] procs =
+ new UnassignProcedure[regionsToMerge.length * regionReplication];
+ int procsIdx = 0;
+ for (int i = 0; i < regionsToMerge.length; ++i) {
+ for (int j = 0; j < regionReplication; ++j) {
+ final HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(regionsToMerge[i], j);
+ procs[procsIdx++] = env.getAssignmentManager().createUnassignProcedure(hri,null,true);
+ }
+ }
+ return procs;
+ }
+
+ private AssignProcedure[] createAssignProcedures(final MasterProcedureEnv env,
+ final int regionReplication) {
+ final ServerName targetServer = getServerName(env);
+ final AssignProcedure[] procs = new AssignProcedure[regionReplication];
+ for (int i = 0; i < procs.length; ++i) {
+ final HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(mergedRegion, i);
+ procs[i] = env.getAssignmentManager().createAssignProcedure(hri, targetServer);
+ }
+ return procs;
+ }
+
+ private int getRegionReplication(final MasterProcedureEnv env) throws IOException {
+ final HTableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
+ return htd.getRegionReplication();
+ }
+
+ /**
+ * Post merge region action
+ * @param env MasterProcedureEnv
+ **/
+ private void preMergeRegionsCommit(final MasterProcedureEnv env) throws IOException {
+ final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ @MetaMutationAnnotation
+ final List<Mutation> metaEntries = new ArrayList<Mutation>();
+ boolean ret = cpHost.preMergeRegionsCommit(regionsToMerge, metaEntries, getUser());
+
+ if (ret) {
+ throw new IOException(
+ "Coprocessor bypassing regions " + HRegionInfo.getShortNameToLog(regionsToMerge) +
+ " merge.");
+ }
+ try {
+ for (Mutation p : metaEntries) {
+ HRegionInfo.parseRegionName(p.getRow());
+ }
+ } catch (IOException e) {
+ LOG.error("Row key of mutation from coprocessor is not parsable as region name."
+ + "Mutations from coprocessor should only be for hbase:meta table.", e);
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Add merged region to META and delete original regions.
+ */
+ private void updateMetaForMergedRegions(final MasterProcedureEnv env)
+ throws IOException, ProcedureYieldException {
+ final ServerName serverName = getServerName(env);
+ env.getAssignmentManager().markRegionAsMerged(mergedRegion, serverName,
+ regionsToMerge[0], regionsToMerge[1]);
+ }
+
+ /**
+ * Post merge region action
+ * @param env MasterProcedureEnv
+ **/
+ private void postMergeRegionsCommit(final MasterProcedureEnv env) throws IOException {
+ final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ cpHost.postMergeRegionsCommit(regionsToMerge, mergedRegion, getUser());
+ }
+ }
+
+ /**
+ * Post merge region action
+ * @param env MasterProcedureEnv
+ **/
+ private void postCompletedMergeRegions(final MasterProcedureEnv env) throws IOException {
+ final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ cpHost.postCompletedMergeRegionsAction(regionsToMerge, mergedRegion, getUser());
+ }
+ }
+
+ /**
+ * The procedure could be restarted from a different machine. If the variable is null, we need to
+ * retrieve it.
+ * @param env MasterProcedureEnv
+ * @return serverName
+ */
+ private ServerName getServerName(final MasterProcedureEnv env) {
+ if (regionLocation == null) {
+ regionLocation = env.getAssignmentManager().getRegionStates().
+ getRegionServerOfRegion(regionsToMerge[0]);
+ // May still be null here but return null and let caller deal.
+ // Means we lost the in-memory-only location. We are in recovery
+ // or so. The caller should be able to deal w/ a null ServerName.
+ // Let them go to the Balancer to find one to use instead.
+ }
+ return regionLocation;
+ }
+
+ /**
+ * The procedure could be restarted from a different machine. If the variable is null, we need to
+ * retrieve it.
+ * @return traceEnabled
+ */
+ private Boolean isTraceEnabled() {
+ if (traceEnabled == null) {
+ traceEnabled = LOG.isTraceEnabled();
+ }
+ return traceEnabled;
+ }
+
+ /**
+ * @return The merged region. Maybe be null if called to early or we failed.
+ */
+ @VisibleForTesting
+ public HRegionInfo getMergedRegion() {
+ return this.mergedRegion;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/657a5d46/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java
new file mode 100644
index 0000000..d8c1b7d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java
@@ -0,0 +1,145 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.assignment;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.master.procedure.AbstractStateMachineRegionProcedure;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MoveRegionState;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MoveRegionStateData;
+
+/**
+ * Procedure that implements a RegionPlan.
+ * It first runs an unassign subprocedure followed
+ * by an assign subprocedure. It takes a lock on the region being moved.
+ * It holds the lock for the life of the procedure.
+ */
+@InterfaceAudience.Private
+public class MoveRegionProcedure extends AbstractStateMachineRegionProcedure<MoveRegionState> {
+ private static final Log LOG = LogFactory.getLog(MoveRegionProcedure.class);
+ private RegionPlan plan;
+
+ public MoveRegionProcedure() {
+ // Required by the Procedure framework to create the procedure on replay
+ super();
+ }
+
+ public MoveRegionProcedure(final MasterProcedureEnv env, final RegionPlan plan) {
+ super(env, plan.getRegionInfo());
+ assert plan.getDestination() != null: plan.toString();
+ this.plan = plan;
+ }
+
+ @Override
+ protected Flow executeFromState(final MasterProcedureEnv env, final MoveRegionState state)
+ throws InterruptedException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + " execute state=" + state);
+ }
+ switch (state) {
+ case MOVE_REGION_UNASSIGN:
+ addChildProcedure(new UnassignProcedure(plan.getRegionInfo(), plan.getSource(), true));
+ setNextState(MoveRegionState.MOVE_REGION_ASSIGN);
+ break;
+ case MOVE_REGION_ASSIGN:
+ addChildProcedure(new AssignProcedure(plan.getRegionInfo(), plan.getDestination()));
+ return Flow.NO_MORE_STATE;
+ default:
+ throw new UnsupportedOperationException("unhandled state=" + state);
+ }
+ return Flow.HAS_MORE_STATE;
+ }
+
+ @Override
+ protected void rollbackState(final MasterProcedureEnv env, final MoveRegionState state)
+ throws IOException {
+ // no-op
+ }
+
+ @Override
+ public boolean abort(final MasterProcedureEnv env) {
+ return false;
+ }
+
+ @Override
+ public void toStringClassDetails(final StringBuilder sb) {
+ sb.append(getClass().getSimpleName());
+ sb.append(" ");
+ sb.append(plan);
+ }
+
+ @Override
+ protected MoveRegionState getInitialState() {
+ return MoveRegionState.MOVE_REGION_UNASSIGN;
+ }
+
+ @Override
+ protected int getStateId(final MoveRegionState state) {
+ return state.getNumber();
+ }
+
+ @Override
+ protected MoveRegionState getState(final int stateId) {
+ return MoveRegionState.valueOf(stateId);
+ }
+
+ @Override
+ public TableName getTableName() {
+ return plan.getRegionInfo().getTable();
+ }
+
+ @Override
+ public TableOperationType getTableOperationType() {
+ return TableOperationType.REGION_EDIT;
+ }
+
+ @Override
+ protected void serializeStateData(final OutputStream stream) throws IOException {
+ super.serializeStateData(stream);
+
+ final MoveRegionStateData.Builder state = MoveRegionStateData.newBuilder()
+ // No need to serialize the HRegionInfo. The super class has the region.
+ .setSourceServer(ProtobufUtil.toServerName(plan.getSource()))
+ .setDestinationServer(ProtobufUtil.toServerName(plan.getDestination()));
+ state.build().writeDelimitedTo(stream);
+ }
+
+ @Override
+ protected void deserializeStateData(final InputStream stream) throws IOException {
+ super.deserializeStateData(stream);
+
+ final MoveRegionStateData state = MoveRegionStateData.parseDelimitedFrom(stream);
+ final HRegionInfo regionInfo = getRegion(); // Get it from super class deserialization.
+ final ServerName sourceServer = ProtobufUtil.toServerName(state.getSourceServer());
+ final ServerName destinationServer = ProtobufUtil.toServerName(state.getDestinationServer());
+ this.plan = new RegionPlan(regionInfo, sourceServer, destinationServer);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/657a5d46/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
new file mode 100644
index 0000000..21e0d9c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
@@ -0,0 +1,327 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.assignment;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.procedure2.util.StringUtils;
+import org.apache.hadoop.hbase.util.MultiHConnection;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
+import org.apache.zookeeper.KeeperException;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Store Region State to hbase:meta table.
+ */
+@InterfaceAudience.Private
+public class RegionStateStore {
+ private static final Log LOG = LogFactory.getLog(RegionStateStore.class);
+
+ /** The delimiter for meta columns for replicaIds > 0 */
+ protected static final char META_REPLICA_ID_DELIMITER = '_';
+
+ private final MasterServices master;
+
+ private MultiHConnection multiHConnection;
+
+ public RegionStateStore(final MasterServices master) {
+ this.master = master;
+ }
+
+ public void start() throws IOException {
+ }
+
+ public void stop() {
+ if (multiHConnection != null) {
+ multiHConnection.close();
+ multiHConnection = null;
+ }
+ }
+
+ public interface RegionStateVisitor {
+ void visitRegionState(HRegionInfo regionInfo, State state,
+ ServerName regionLocation, ServerName lastHost, long openSeqNum);
+ }
+
+ public void visitMeta(final RegionStateVisitor visitor) throws IOException {
+ MetaTableAccessor.fullScanRegions(master.getConnection(), new MetaTableAccessor.Visitor() {
+ final boolean isDebugEnabled = LOG.isDebugEnabled();
+
+ @Override
+ public boolean visit(final Result r) throws IOException {
+ if (r != null && !r.isEmpty()) {
+ long st = System.currentTimeMillis();
+ visitMetaEntry(visitor, r);
+ long et = System.currentTimeMillis();
+ LOG.info("[T] LOAD META PERF " + StringUtils.humanTimeDiff(et - st));
+ } else if (isDebugEnabled) {
+ LOG.debug("NULL result from meta - ignoring but this is strange.");
+ }
+ return true;
+ }
+ });
+ }
+
+ private void visitMetaEntry(final RegionStateVisitor visitor, final Result result)
+ throws IOException {
+ final RegionLocations rl = MetaTableAccessor.getRegionLocations(result);
+ if (rl == null) return;
+
+ final HRegionLocation[] locations = rl.getRegionLocations();
+ if (locations == null) return;
+
+ for (int i = 0; i < locations.length; ++i) {
+ final HRegionLocation hrl = locations[i];
+ if (hrl == null) continue;
+
+ final HRegionInfo regionInfo = hrl.getRegionInfo();
+ if (regionInfo == null) continue;
+
+ final int replicaId = regionInfo.getReplicaId();
+ final State state = getRegionState(result, replicaId);
+
+ final ServerName lastHost = hrl.getServerName();
+ final ServerName regionLocation = getRegionServer(result, replicaId);
+ final long openSeqNum = -1;
+
+ // TODO: move under trace, now is visible for debugging
+ LOG.info(String.format("Load hbase:meta entry region=%s regionState=%s lastHost=%s regionLocation=%s",
+ regionInfo, state, lastHost, regionLocation));
+
+ visitor.visitRegionState(regionInfo, state, regionLocation, lastHost, openSeqNum);
+ }
+ }
+
+ public void updateRegionLocation(final HRegionInfo regionInfo, final State state,
+ final ServerName regionLocation, final ServerName lastHost, final long openSeqNum,
+ final long pid)
+ throws IOException {
+ if (regionInfo.isMetaRegion()) {
+ updateMetaLocation(regionInfo, regionLocation);
+ } else {
+ updateUserRegionLocation(regionInfo, state, regionLocation, lastHost, openSeqNum, pid);
+ }
+ }
+
+ public void updateRegionState(final long openSeqNum, final long pid,
+ final RegionState newState, final RegionState oldState) throws IOException {
+ updateRegionLocation(newState.getRegion(), newState.getState(), newState.getServerName(),
+ oldState != null ? oldState.getServerName() : null, openSeqNum, pid);
+ }
+
+ protected void updateMetaLocation(final HRegionInfo regionInfo, final ServerName serverName)
+ throws IOException {
+ try {
+ MetaTableLocator.setMetaLocation(master.getZooKeeper(), serverName,
+ regionInfo.getReplicaId(), State.OPEN);
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ }
+ }
+
+ protected void updateUserRegionLocation(final HRegionInfo regionInfo, final State state,
+ final ServerName regionLocation, final ServerName lastHost, final long openSeqNum,
+ final long pid)
+ throws IOException {
+ final int replicaId = regionInfo.getReplicaId();
+ final Put put = new Put(MetaTableAccessor.getMetaKeyForRegion(regionInfo));
+ MetaTableAccessor.addRegionInfo(put, regionInfo);
+ final StringBuilder info = new StringBuilder("pid=" + pid + " updating hbase:meta row=");
+ info.append(regionInfo.getRegionNameAsString()).append(", regionState=").append(state);
+ if (openSeqNum >= 0) {
+ Preconditions.checkArgument(state == State.OPEN && regionLocation != null,
+ "Open region should be on a server");
+ MetaTableAccessor.addLocation(put, regionLocation, openSeqNum, -1, replicaId);
+ info.append(", openSeqNum=").append(openSeqNum);
+ info.append(", regionLocation=").append(regionLocation);
+ } else if (regionLocation != null && !regionLocation.equals(lastHost)) {
+ // Ideally, if no regionLocation, write null to the hbase:meta but this will confuse clients
+ // currently; they want a server to hit. TODO: Make clients wait if no location.
+ put.addImmutable(HConstants.CATALOG_FAMILY, getServerNameColumn(replicaId),
+ Bytes.toBytes(regionLocation.getServerName()));
+ info.append(", regionLocation=").append(regionLocation);
+ }
+ put.addImmutable(HConstants.CATALOG_FAMILY, getStateColumn(replicaId),
+ Bytes.toBytes(state.name()));
+ LOG.info(info);
+
+ final boolean serialReplication = hasSerialReplicationScope(regionInfo.getTable());
+ if (serialReplication && state == State.OPEN) {
+ Put barrierPut = MetaTableAccessor.makeBarrierPut(regionInfo.getEncodedNameAsBytes(),
+ openSeqNum, regionInfo.getTable().getName());
+ updateRegionLocation(regionInfo, state, put, barrierPut);
+ } else {
+ updateRegionLocation(regionInfo, state, put);
+ }
+ }
+
+ protected void updateRegionLocation(final HRegionInfo regionInfo, final State state,
+ final Put... put) throws IOException {
+ synchronized (this) {
+ if (multiHConnection == null) {
+ multiHConnection = new MultiHConnection(master.getConfiguration(), 1);
+ }
+ }
+
+ try {
+ multiHConnection.processBatchCallback(Arrays.asList(put), TableName.META_TABLE_NAME, null, null);
+ } catch (IOException e) {
+ // TODO: Revist!!!! Means that if a server is loaded, then we will abort our host!
+ // In tests we abort the Master!
+ String msg = String.format("FAILED persisting region=%s state=%s",
+ regionInfo.getShortNameToLog(), state);
+ LOG.error(msg, e);
+ master.abort(msg, e);
+ throw e;
+ }
+ }
+
+ // ============================================================================================
+ // Update Region Splitting State helpers
+ // ============================================================================================
+ public void splitRegion(final HRegionInfo parent, final HRegionInfo hriA,
+ final HRegionInfo hriB, final ServerName serverName) throws IOException {
+ final HTableDescriptor htd = getTableDescriptor(parent.getTable());
+ MetaTableAccessor.splitRegion(master.getConnection(), parent, hriA, hriB, serverName,
+ getRegionReplication(htd), hasSerialReplicationScope(htd));
+ }
+
+ // ============================================================================================
+ // Update Region Merging State helpers
+ // ============================================================================================
+ public void mergeRegions(final HRegionInfo parent, final HRegionInfo hriA,
+ final HRegionInfo hriB, final ServerName serverName) throws IOException {
+ final HTableDescriptor htd = getTableDescriptor(parent.getTable());
+ MetaTableAccessor.mergeRegions(master.getConnection(), parent, hriA, hriB, serverName,
+ getRegionReplication(htd), EnvironmentEdgeManager.currentTime(),
+ hasSerialReplicationScope(htd));
+ }
+
+ // ============================================================================================
+ // Delete Region State helpers
+ // ============================================================================================
+ public void deleteRegion(final HRegionInfo regionInfo) throws IOException {
+ deleteRegions(Collections.singletonList(regionInfo));
+ }
+
+ public void deleteRegions(final List<HRegionInfo> regions) throws IOException {
+ MetaTableAccessor.deleteRegions(master.getConnection(), regions);
+ }
+
+ // ==========================================================================
+ // Table Descriptors helpers
+ // ==========================================================================
+ private boolean hasSerialReplicationScope(final TableName tableName) throws IOException {
+ return hasSerialReplicationScope(getTableDescriptor(tableName));
+ }
+
+ private boolean hasSerialReplicationScope(final HTableDescriptor htd) {
+ return (htd != null)? htd.hasSerialReplicationScope(): false;
+ }
+
+ private int getRegionReplication(final HTableDescriptor htd) {
+ return (htd != null) ? htd.getRegionReplication() : 1;
+ }
+
+ private HTableDescriptor getTableDescriptor(final TableName tableName) throws IOException {
+ return master.getTableDescriptors().get(tableName);
+ }
+
+ // ==========================================================================
+ // Server Name
+ // ==========================================================================
+
+ /**
+ * Returns the {@link ServerName} from catalog table {@link Result}
+ * where the region is transitioning. It should be the same as
+ * {@link MetaTableAccessor#getServerName(Result,int)} if the server is at OPEN state.
+ * @param r Result to pull the transitioning server name from
+ * @return A ServerName instance or {@link MetaTableAccessor#getServerName(Result,int)}
+ * if necessary fields not found or empty.
+ */
+ static ServerName getRegionServer(final Result r, int replicaId) {
+ final Cell cell = r.getColumnLatestCell(HConstants.CATALOG_FAMILY,
+ getServerNameColumn(replicaId));
+ if (cell == null || cell.getValueLength() == 0) {
+ RegionLocations locations = MetaTableAccessor.getRegionLocations(r);
+ if (locations != null) {
+ HRegionLocation location = locations.getRegionLocation(replicaId);
+ if (location != null) {
+ return location.getServerName();
+ }
+ }
+ return null;
+ }
+ return ServerName.parseServerName(Bytes.toString(cell.getValueArray(),
+ cell.getValueOffset(), cell.getValueLength()));
+ }
+
+ private static byte[] getServerNameColumn(int replicaId) {
+ return replicaId == 0
+ ? HConstants.SERVERNAME_QUALIFIER
+ : Bytes.toBytes(HConstants.SERVERNAME_QUALIFIER_STR + META_REPLICA_ID_DELIMITER
+ + String.format(HRegionInfo.REPLICA_ID_FORMAT, replicaId));
+ }
+
+ // ==========================================================================
+ // Region State
+ // ==========================================================================
+
+ /**
+ * Pull the region state from a catalog table {@link Result}.
+ * @param r Result to pull the region state from
+ * @return the region state, or OPEN if there's no value written.
+ */
+ protected State getRegionState(final Result r, int replicaId) {
+ Cell cell = r.getColumnLatestCell(HConstants.CATALOG_FAMILY, getStateColumn(replicaId));
+ if (cell == null || cell.getValueLength() == 0) return State.OPENING;
+ return State.valueOf(Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
+ }
+
+ private static byte[] getStateColumn(int replicaId) {
+ return replicaId == 0
+ ? HConstants.STATE_QUALIFIER
+ : Bytes.toBytes(HConstants.STATE_QUALIFIER_STR + META_REPLICA_ID_DELIMITER
+ + String.format(HRegionInfo.REPLICA_ID_FORMAT, replicaId));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/657a5d46/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
new file mode 100644
index 0000000..082e171
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
@@ -0,0 +1,969 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.assignment;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * RegionStates contains a set of Maps that describes the in-memory state of the AM, with
+ * the regions available in the system, the region in transition, the offline regions and
+ * the servers holding regions.
+ */
+@InterfaceAudience.Private
+public class RegionStates {
+ private static final Log LOG = LogFactory.getLog(RegionStates.class);
+
+ protected static final State[] STATES_EXPECTED_ON_OPEN = new State[] {
+ State.OFFLINE, State.CLOSED, // disable/offline
+ State.SPLITTING, State.SPLIT, // ServerCrashProcedure
+ State.OPENING, State.FAILED_OPEN, // already in-progress (retrying)
+ };
+
+ protected static final State[] STATES_EXPECTED_ON_CLOSE = new State[] {
+ State.SPLITTING, State.SPLIT, // ServerCrashProcedure
+ State.OPEN, // enabled/open
+ State.CLOSING // already in-progress (retrying)
+ };
+
+ private static class AssignmentProcedureEvent extends ProcedureEvent<HRegionInfo> {
+ public AssignmentProcedureEvent(final HRegionInfo regionInfo) {
+ super(regionInfo);
+ }
+ }
+
+ private static class ServerReportEvent extends ProcedureEvent<ServerName> {
+ public ServerReportEvent(final ServerName serverName) {
+ super(serverName);
+ }
+ }
+
+ /**
+ * Current Region State.
+ * In-memory only. Not persisted.
+ */
+ // Mutable/Immutable? Changes have to be synchronized or not?
+ // Data members are volatile which seems to say multi-threaded access is fine.
+ // In the below we do check and set but the check state could change before
+ // we do the set because no synchronization....which seems dodgy. Clear up
+ // understanding here... how many threads accessing? Do locks make it so one
+ // thread at a time working on a single Region's RegionStateNode? Lets presume
+ // so for now. Odd is that elsewhere in this RegionStates, we synchronize on
+ // the RegionStateNode instance. TODO.
+ public static class RegionStateNode implements Comparable<RegionStateNode> {
+ private final HRegionInfo regionInfo;
+ private final ProcedureEvent<?> event;
+
+ private volatile RegionTransitionProcedure procedure = null;
+ private volatile ServerName regionLocation = null;
+ private volatile ServerName lastHost = null;
+ /**
+ * A Region-in-Transition (RIT) moves through states.
+ * See {@link State} for complete list. A Region that
+ * is opened moves from OFFLINE => OPENING => OPENED.
+ */
+ private volatile State state = State.OFFLINE;
+
+ /**
+ * Updated whenever a call to {@link #setRegionLocation(ServerName)}
+ * or {@link #setState(State, State...)}.
+ */
+ private volatile long lastUpdate = 0;
+
+ private volatile long openSeqNum = HConstants.NO_SEQNUM;
+
+ public RegionStateNode(final HRegionInfo regionInfo) {
+ this.regionInfo = regionInfo;
+ this.event = new AssignmentProcedureEvent(regionInfo);
+ }
+
+ public boolean setState(final State update, final State... expected) {
+ final boolean expectedState = isInState(expected);
+ if (expectedState) {
+ this.state = update;
+ this.lastUpdate = EnvironmentEdgeManager.currentTime();
+ }
+ return expectedState;
+ }
+
+ /**
+ * Put region into OFFLINE mode (set state and clear location).
+ * @return Last recorded server deploy
+ */
+ public ServerName offline() {
+ setState(State.OFFLINE);
+ return setRegionLocation(null);
+ }
+
+ /**
+ * Set new {@link State} but only if currently in <code>expected</code> State
+ * (if not, throw {@link UnexpectedStateException}.
+ */
+ public State transitionState(final State update, final State... expected)
+ throws UnexpectedStateException {
+ if (!setState(update, expected)) {
+ throw new UnexpectedStateException("Expected " + Arrays.toString(expected) +
+ " so could move to " + update + " but current state=" + getState());
+ }
+ return update;
+ }
+
+ public boolean isInState(final State... expected) {
+ if (expected != null && expected.length > 0) {
+ boolean expectedState = false;
+ for (int i = 0; i < expected.length; ++i) {
+ expectedState |= (getState() == expected[i]);
+ }
+ return expectedState;
+ }
+ return true;
+ }
+
+ public boolean isStuck() {
+ return isInState(State.FAILED_OPEN) && getProcedure() != null;
+ }
+
+ public boolean isInTransition() {
+ return getProcedure() != null;
+ }
+
+ public long getLastUpdate() {
+ return procedure != null ? procedure.getLastUpdate() : lastUpdate;
+ }
+
+ public void setLastHost(final ServerName serverName) {
+ this.lastHost = serverName;
+ }
+
+ public void setOpenSeqNum(final long seqId) {
+ this.openSeqNum = seqId;
+ }
+
+
+ public ServerName setRegionLocation(final ServerName serverName) {
+ ServerName lastRegionLocation = this.regionLocation;
+ if (LOG.isTraceEnabled() && serverName == null) {
+ LOG.trace("Tracking when we are set to null " + this, new Throwable("TRACE"));
+ }
+ this.regionLocation = serverName;
+ this.lastUpdate = EnvironmentEdgeManager.currentTime();
+ return lastRegionLocation;
+ }
+
+ public boolean setProcedure(final RegionTransitionProcedure proc) {
+ if (this.procedure != null && this.procedure != proc) {
+ return false;
+ }
+ this.procedure = proc;
+ return true;
+ }
+
+ public boolean unsetProcedure(final RegionTransitionProcedure proc) {
+ if (this.procedure != null && this.procedure != proc) {
+ return false;
+ }
+ this.procedure = null;
+ return true;
+ }
+
+ public RegionTransitionProcedure getProcedure() {
+ return procedure;
+ }
+
+ public ProcedureEvent<?> getProcedureEvent() {
+ return event;
+ }
+
+ public HRegionInfo getRegionInfo() {
+ return regionInfo;
+ }
+
+ public TableName getTable() {
+ return getRegionInfo().getTable();
+ }
+
+ public boolean isSystemTable() {
+ return getTable().isSystemTable();
+ }
+
+ public ServerName getLastHost() {
+ return lastHost;
+ }
+
+ public ServerName getRegionLocation() {
+ return regionLocation;
+ }
+
+ public State getState() {
+ return state;
+ }
+
+ public long getOpenSeqNum() {
+ return openSeqNum;
+ }
+
+ public int getFormatVersion() {
+ // we don't have any format for now
+ // it should probably be in regionInfo.getFormatVersion()
+ return 0;
+ }
+
+ @Override
+ public int compareTo(final RegionStateNode other) {
+ // NOTE: HRegionInfo sort by table first, so we are relying on that.
+ // we have a TestRegionState#testOrderedByTable() that check for that.
+ return getRegionInfo().compareTo(other.getRegionInfo());
+ }
+
+ @Override
+ public int hashCode() {
+ return getRegionInfo().hashCode();
+ }
+
+ @Override
+ public boolean equals(final Object other) {
+ if (this == other) return true;
+ if (!(other instanceof RegionStateNode)) return false;
+ return compareTo((RegionStateNode)other) == 0;
+ }
+
+ @Override
+ public String toString() {
+ return toDescriptiveString();
+ }
+
+ public String toShortString() {
+ // rit= is the current Region-In-Transition State -- see State enum.
+ return String.format("rit=%s, location=%s", getState(), getRegionLocation());
+ }
+
+ public String toDescriptiveString() {
+ return String.format("%s, table=%s, region=%s",
+ toShortString(), getTable(), getRegionInfo().getEncodedName());
+ }
+ }
+
+ // This comparator sorts the RegionStates by time stamp then Region name.
+ // Comparing by timestamp alone can lead us to discard different RegionStates that happen
+ // to share a timestamp.
+ private static class RegionStateStampComparator implements Comparator<RegionState> {
+ @Override
+ public int compare(final RegionState l, final RegionState r) {
+ int stampCmp = Long.compare(l.getStamp(), r.getStamp());
+ return stampCmp != 0 ? stampCmp : l.getRegion().compareTo(r.getRegion());
+ }
+ }
+
+ public enum ServerState { ONLINE, SPLITTING, OFFLINE }
+ public static class ServerStateNode implements Comparable<ServerStateNode> {
+ private final ServerReportEvent reportEvent;
+
+ private final Set<RegionStateNode> regions;
+ private final ServerName serverName;
+
+ private volatile ServerState state = ServerState.ONLINE;
+ private volatile int versionNumber = 0;
+
+ public ServerStateNode(final ServerName serverName) {
+ this.serverName = serverName;
+ this.regions = new HashSet<RegionStateNode>();
+ this.reportEvent = new ServerReportEvent(serverName);
+ }
+
+ public ServerName getServerName() {
+ return serverName;
+ }
+
+ public ServerState getState() {
+ return state;
+ }
+
+ public int getVersionNumber() {
+ return versionNumber;
+ }
+
+ public ProcedureEvent<?> getReportEvent() {
+ return reportEvent;
+ }
+
+ public boolean isInState(final ServerState... expected) {
+ boolean expectedState = false;
+ if (expected != null) {
+ for (int i = 0; i < expected.length; ++i) {
+ expectedState |= (state == expected[i]);
+ }
+ }
+ return expectedState;
+ }
+
+ public void setState(final ServerState state) {
+ this.state = state;
+ }
+
+ public void setVersionNumber(final int versionNumber) {
+ this.versionNumber = versionNumber;
+ }
+
+ public Set<RegionStateNode> getRegions() {
+ return regions;
+ }
+
+ public int getRegionCount() {
+ return regions.size();
+ }
+
+ public ArrayList<HRegionInfo> getRegionInfoList() {
+ ArrayList<HRegionInfo> hris = new ArrayList<HRegionInfo>(regions.size());
+ for (RegionStateNode region: regions) {
+ hris.add(region.getRegionInfo());
+ }
+ return hris;
+ }
+
+ public void addRegion(final RegionStateNode regionNode) {
+ this.regions.add(regionNode);
+ }
+
+ public void removeRegion(final RegionStateNode regionNode) {
+ this.regions.remove(regionNode);
+ }
+
+ @Override
+ public int compareTo(final ServerStateNode other) {
+ return getServerName().compareTo(other.getServerName());
+ }
+
+ @Override
+ public int hashCode() {
+ return getServerName().hashCode();
+ }
+
+ @Override
+ public boolean equals(final Object other) {
+ if (this == other) return true;
+ if (!(other instanceof ServerStateNode)) return false;
+ return compareTo((ServerStateNode)other) == 0;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("ServerStateNode(%s)", getServerName());
+ }
+ }
+
+ public final static RegionStateStampComparator REGION_STATE_STAMP_COMPARATOR =
+ new RegionStateStampComparator();
+
+ // TODO: Replace the ConcurrentSkipListMaps
+ /**
+ * RegionName -- i.e. HRegionInfo.getRegionName() -- as bytes to {@link RegionStateNode}
+ */
+ private final ConcurrentSkipListMap<byte[], RegionStateNode> regionsMap =
+ new ConcurrentSkipListMap<byte[], RegionStateNode>(Bytes.BYTES_COMPARATOR);
+
+ private final ConcurrentSkipListMap<HRegionInfo, RegionStateNode> regionInTransition =
+ new ConcurrentSkipListMap<HRegionInfo, RegionStateNode>();
+
+ /**
+ * Regions marked as offline on a read of hbase:meta. Unused or at least, once
+ * offlined, regions have no means of coming on line again. TODO.
+ */
+ private final ConcurrentSkipListMap<HRegionInfo, RegionStateNode> regionOffline =
+ new ConcurrentSkipListMap<HRegionInfo, RegionStateNode>();
+
+ private final ConcurrentSkipListMap<byte[], RegionFailedOpen> regionFailedOpen =
+ new ConcurrentSkipListMap<byte[], RegionFailedOpen>(Bytes.BYTES_COMPARATOR);
+
+ private final ConcurrentHashMap<ServerName, ServerStateNode> serverMap =
+ new ConcurrentHashMap<ServerName, ServerStateNode>();
+
+ public RegionStates() { }
+
+ public void clear() {
+ regionsMap.clear();
+ regionInTransition.clear();
+ regionOffline.clear();
+ serverMap.clear();
+ }
+
+ // ==========================================================================
+ // RegionStateNode helpers
+ // ==========================================================================
+ protected RegionStateNode createRegionNode(final HRegionInfo regionInfo) {
+ RegionStateNode newNode = new RegionStateNode(regionInfo);
+ RegionStateNode oldNode = regionsMap.putIfAbsent(regionInfo.getRegionName(), newNode);
+ return oldNode != null ? oldNode : newNode;
+ }
+
+ protected RegionStateNode getOrCreateRegionNode(final HRegionInfo regionInfo) {
+ RegionStateNode node = regionsMap.get(regionInfo.getRegionName());
+ return node != null ? node : createRegionNode(regionInfo);
+ }
+
+ RegionStateNode getRegionNodeFromName(final byte[] regionName) {
+ return regionsMap.get(regionName);
+ }
+
+ protected RegionStateNode getRegionNode(final HRegionInfo regionInfo) {
+ return getRegionNodeFromName(regionInfo.getRegionName());
+ }
+
+ RegionStateNode getRegionNodeFromEncodedName(final String encodedRegionName) {
+ // TODO: Need a map <encodedName, ...> but it is just dispatch merge...
+ for (RegionStateNode node: regionsMap.values()) {
+ if (node.getRegionInfo().getEncodedName().equals(encodedRegionName)) {
+ return node;
+ }
+ }
+ return null;
+ }
+
+ public void deleteRegion(final HRegionInfo regionInfo) {
+ regionsMap.remove(regionInfo.getRegionName());
+ // Remove from the offline regions map too if there.
+ if (this.regionOffline.containsKey(regionInfo)) {
+ if (LOG.isTraceEnabled()) LOG.trace("Removing from regionOffline Map: " + regionInfo);
+ this.regionOffline.remove(regionInfo);
+ }
+ }
+
+ ArrayList<RegionStateNode> getTableRegionStateNodes(final TableName tableName) {
+ final ArrayList<RegionStateNode> regions = new ArrayList<RegionStateNode>();
+ for (RegionStateNode node: regionsMap.tailMap(tableName.getName()).values()) {
+ if (!node.getTable().equals(tableName)) break;
+ regions.add(node);
+ }
+ return regions;
+ }
+
+ ArrayList<RegionState> getTableRegionStates(final TableName tableName) {
+ final ArrayList<RegionState> regions = new ArrayList<RegionState>();
+ for (RegionStateNode node: regionsMap.tailMap(tableName.getName()).values()) {
+ if (!node.getTable().equals(tableName)) break;
+ regions.add(createRegionState(node));
+ }
+ return regions;
+ }
+
+ ArrayList<HRegionInfo> getTableRegionsInfo(final TableName tableName) {
+ final ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
+ for (RegionStateNode node: regionsMap.tailMap(tableName.getName()).values()) {
+ if (!node.getTable().equals(tableName)) break;
+ regions.add(node.getRegionInfo());
+ }
+ return regions;
+ }
+
+ Collection<RegionStateNode> getRegionNodes() {
+ return regionsMap.values();
+ }
+
+ public ArrayList<RegionState> getRegionStates() {
+ final ArrayList<RegionState> regions = new ArrayList<RegionState>(regionsMap.size());
+ for (RegionStateNode node: regionsMap.values()) {
+ regions.add(createRegionState(node));
+ }
+ return regions;
+ }
+
+ // ==========================================================================
+ // RegionState helpers
+ // ==========================================================================
+ public RegionState getRegionState(final HRegionInfo regionInfo) {
+ return createRegionState(getRegionNode(regionInfo));
+ }
+
+ public RegionState getRegionState(final String encodedRegionName) {
+ return createRegionState(getRegionNodeFromEncodedName(encodedRegionName));
+ }
+
+ private RegionState createRegionState(final RegionStateNode node) {
+ return node == null ? null :
+ new RegionState(node.getRegionInfo(), node.getState(),
+ node.getLastUpdate(), node.getRegionLocation());
+ }
+
+ // ============================================================================================
+ // TODO: helpers
+ // ============================================================================================
+ public boolean hasTableRegionStates(final TableName tableName) {
+ // TODO
+ return !getTableRegionStates(tableName).isEmpty();
+ }
+
+ public List<HRegionInfo> getRegionsOfTable(final TableName table) {
+ return getRegionsOfTable(table, false);
+ }
+
+ List<HRegionInfo> getRegionsOfTable(final TableName table, final boolean offline) {
+ final ArrayList<RegionStateNode> nodes = getTableRegionStateNodes(table);
+ final ArrayList<HRegionInfo> hris = new ArrayList<HRegionInfo>(nodes.size());
+ for (RegionStateNode node: nodes) {
+ if (include(node, offline)) hris.add(node.getRegionInfo());
+ }
+ return hris;
+ }
+
+ /**
+ * Utility. Whether to include region in list of regions. Default is to
+ * weed out split and offline regions.
+ * @return True if we should include the <code>node</code> (do not include
+ * if split or offline unless <code>offline</code> is set to true.
+ */
+ boolean include(final RegionStateNode node, final boolean offline) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("WORKING ON " + node + " " + node.getRegionInfo());
+ }
+ if (node.isInState(State.SPLIT)) return false;
+ if (node.isInState(State.OFFLINE) && !offline) return false;
+ final HRegionInfo hri = node.getRegionInfo();
+ return (!hri.isOffline() && !hri.isSplit()) ||
+ ((hri.isOffline() || hri.isSplit()) && offline);
+ }
+
+ /**
+ * Returns the set of regions hosted by the specified server
+ * @param serverName the server we are interested in
+ * @return set of HRegionInfo hosted by the specified server
+ */
+ public List<HRegionInfo> getServerRegionInfoSet(final ServerName serverName) {
+ final ServerStateNode serverInfo = getServerNode(serverName);
+ if (serverInfo == null) return Collections.emptyList();
+
+ synchronized (serverInfo) {
+ return serverInfo.getRegionInfoList();
+ }
+ }
+
+ // ============================================================================================
+ // TODO: split helpers
+ // ============================================================================================
+ public void logSplit(final ServerName serverName) {
+ final ServerStateNode serverNode = getOrCreateServer(serverName);
+ synchronized (serverNode) {
+ serverNode.setState(ServerState.SPLITTING);
+ /* THIS HAS TO BE WRONG. THIS IS SPLITTING OF REGION, NOT SPLITTING WALs.
+ for (RegionStateNode regionNode: serverNode.getRegions()) {
+ synchronized (regionNode) {
+ // TODO: Abort procedure if present
+ regionNode.setState(State.SPLITTING);
+ }
+ }*/
+ }
+ }
+
+ public void logSplit(final HRegionInfo regionInfo) {
+ final RegionStateNode regionNode = getRegionNode(regionInfo);
+ synchronized (regionNode) {
+ regionNode.setState(State.SPLIT);
+ }
+ }
+
+ @VisibleForTesting
+ public void updateRegionState(final HRegionInfo regionInfo, final State state) {
+ final RegionStateNode regionNode = getOrCreateRegionNode(regionInfo);
+ synchronized (regionNode) {
+ regionNode.setState(state);
+ }
+ }
+
+ // ============================================================================================
+ // TODO:
+ // ============================================================================================
+ public List<HRegionInfo> getAssignedRegions() {
+ final List<HRegionInfo> result = new ArrayList<HRegionInfo>();
+ for (RegionStateNode node: regionsMap.values()) {
+ if (!node.isInTransition()) {
+ result.add(node.getRegionInfo());
+ }
+ }
+ return result;
+ }
+
+ public boolean isRegionInState(final HRegionInfo regionInfo, final State... state) {
+ final RegionStateNode region = getRegionNode(regionInfo);
+ if (region != null) {
+ synchronized (region) {
+ return region.isInState(state);
+ }
+ }
+ return false;
+ }
+
+ public boolean isRegionOnline(final HRegionInfo regionInfo) {
+ return isRegionInState(regionInfo, State.OPEN);
+ }
+
+ /**
+ * @return True if region is offline (In OFFLINE or CLOSED state).
+ */
+ public boolean isRegionOffline(final HRegionInfo regionInfo) {
+ return isRegionInState(regionInfo, State.OFFLINE, State.CLOSED);
+ }
+
+ public Map<ServerName, List<HRegionInfo>> getSnapShotOfAssignment(
+ final Collection<HRegionInfo> regions) {
+ final Map<ServerName, List<HRegionInfo>> result = new HashMap<ServerName, List<HRegionInfo>>();
+ for (HRegionInfo hri: regions) {
+ final RegionStateNode node = getRegionNode(hri);
+ if (node == null) continue;
+
+ // TODO: State.OPEN
+ final ServerName serverName = node.getRegionLocation();
+ if (serverName == null) continue;
+
+ List<HRegionInfo> serverRegions = result.get(serverName);
+ if (serverRegions == null) {
+ serverRegions = new ArrayList<HRegionInfo>();
+ result.put(serverName, serverRegions);
+ }
+
+ serverRegions.add(node.getRegionInfo());
+ }
+ return result;
+ }
+
+ public Map<HRegionInfo, ServerName> getRegionAssignments() {
+ final HashMap<HRegionInfo, ServerName> assignments = new HashMap<HRegionInfo, ServerName>();
+ for (RegionStateNode node: regionsMap.values()) {
+ assignments.put(node.getRegionInfo(), node.getRegionLocation());
+ }
+ return assignments;
+ }
+
+ public Map<RegionState.State, List<HRegionInfo>> getRegionByStateOfTable(TableName tableName) {
+ final State[] states = State.values();
+ final Map<RegionState.State, List<HRegionInfo>> tableRegions =
+ new HashMap<State, List<HRegionInfo>>(states.length);
+ for (int i = 0; i < states.length; ++i) {
+ tableRegions.put(states[i], new ArrayList<HRegionInfo>());
+ }
+
+ for (RegionStateNode node: regionsMap.values()) {
+ tableRegions.get(node.getState()).add(node.getRegionInfo());
+ }
+ return tableRegions;
+ }
+
+ public ServerName getRegionServerOfRegion(final HRegionInfo regionInfo) {
+ final RegionStateNode region = getRegionNode(regionInfo);
+ if (region != null) {
+ synchronized (region) {
+ ServerName server = region.getRegionLocation();
+ return server != null ? server : region.getLastHost();
+ }
+ }
+ return null;
+ }
+
+ /**
+ * This is an EXPENSIVE clone. Cloning though is the safest thing to do.
+ * Can't let out original since it can change and at least the load balancer
+ * wants to iterate this exported list. We need to synchronize on regions
+ * since all access to this.servers is under a lock on this.regions.
+ * @param forceByCluster a flag to force to aggregate the server-load to the cluster level
+ * @return A clone of current assignments by table.
+ */
+ public Map<TableName, Map<ServerName, List<HRegionInfo>>> getAssignmentsByTable(
+ final boolean forceByCluster) {
+ if (!forceByCluster) return getAssignmentsByTable();
+
+ final HashMap<ServerName, List<HRegionInfo>> ensemble =
+ new HashMap<ServerName, List<HRegionInfo>>(serverMap.size());
+ for (ServerStateNode serverNode: serverMap.values()) {
+ ensemble.put(serverNode.getServerName(), serverNode.getRegionInfoList());
+ }
+
+ // TODO: can we use Collections.singletonMap(HConstants.ENSEMBLE_TABLE_NAME, ensemble)?
+ final Map<TableName, Map<ServerName, List<HRegionInfo>>> result =
+ new HashMap<TableName, Map<ServerName, List<HRegionInfo>>>(1);
+ result.put(HConstants.ENSEMBLE_TABLE_NAME, ensemble);
+ return result;
+ }
+
+ public Map<TableName, Map<ServerName, List<HRegionInfo>>> getAssignmentsByTable() {
+ final Map<TableName, Map<ServerName, List<HRegionInfo>>> result = new HashMap<>();
+ for (RegionStateNode node: regionsMap.values()) {
+ Map<ServerName, List<HRegionInfo>> tableResult = result.get(node.getTable());
+ if (tableResult == null) {
+ tableResult = new HashMap<ServerName, List<HRegionInfo>>();
+ result.put(node.getTable(), tableResult);
+ }
+
+ final ServerName serverName = node.getRegionLocation();
+ if (serverName == null) {
+ LOG.info("Skipping, no server for " + node);
+ continue;
+ }
+ List<HRegionInfo> serverResult = tableResult.get(serverName);
+ if (serverResult == null) {
+ serverResult = new ArrayList<HRegionInfo>();
+ tableResult.put(serverName, serverResult);
+ }
+
+ serverResult.add(node.getRegionInfo());
+ }
+ return result;
+ }
+
+ // ==========================================================================
+ // Region in transition helpers
+ // ==========================================================================
+ protected boolean addRegionInTransition(final RegionStateNode regionNode,
+ final RegionTransitionProcedure procedure) {
+ if (procedure != null && !regionNode.setProcedure(procedure)) return false;
+
+ regionInTransition.put(regionNode.getRegionInfo(), regionNode);
+ return true;
+ }
+
+ protected void removeRegionInTransition(final RegionStateNode regionNode,
+ final RegionTransitionProcedure procedure) {
+ regionInTransition.remove(regionNode.getRegionInfo());
+ regionNode.unsetProcedure(procedure);
+ }
+
+ public boolean hasRegionsInTransition() {
+ return !regionInTransition.isEmpty();
+ }
+
+ public boolean isRegionInTransition(final HRegionInfo regionInfo) {
+ final RegionStateNode node = regionInTransition.get(regionInfo);
+ return node != null ? node.isInTransition() : false;
+ }
+
+ /**
+ * @return If a procedure-in-transition for <code>hri</code>, return it else null.
+ */
+ public RegionTransitionProcedure getRegionTransitionProcedure(final HRegionInfo hri) {
+ RegionStateNode node = regionInTransition.get(hri);
+ if (node == null) return null;
+ return node.getProcedure();
+ }
+
+ public RegionState getRegionTransitionState(final HRegionInfo hri) {
+ RegionStateNode node = regionInTransition.get(hri);
+ if (node == null) return null;
+
+ synchronized (node) {
+ return node.isInTransition() ? createRegionState(node) : null;
+ }
+ }
+
+ public List<RegionStateNode> getRegionsInTransition() {
+ return new ArrayList<RegionStateNode>(regionInTransition.values());
+ }
+
+ /**
+ * Get the number of regions in transition.
+ */
+ public int getRegionsInTransitionCount() {
+ return regionInTransition.size();
+ }
+
+ public List<RegionState> getRegionsStateInTransition() {
+ final List<RegionState> rit = new ArrayList<RegionState>(regionInTransition.size());
+ for (RegionStateNode node: regionInTransition.values()) {
+ rit.add(createRegionState(node));
+ }
+ return rit;
+ }
+
+ public SortedSet<RegionState> getRegionsInTransitionOrderedByTimestamp() {
+ final SortedSet<RegionState> rit = new TreeSet<RegionState>(REGION_STATE_STAMP_COMPARATOR);
+ for (RegionStateNode node: regionInTransition.values()) {
+ rit.add(createRegionState(node));
+ }
+ return rit;
+ }
+
+ // ==========================================================================
+ // Region offline helpers
+ // ==========================================================================
+ // TODO: Populated when we read meta but regions never make it out of here.
+ public void addToOfflineRegions(final RegionStateNode regionNode) {
+ LOG.info("Added to offline, CURRENTLY NEVER CLEARED!!! " + regionNode);
+ regionOffline.put(regionNode.getRegionInfo(), regionNode);
+ }
+
+ // TODO: Unused.
+ public void removeFromOfflineRegions(final HRegionInfo regionInfo) {
+ regionOffline.remove(regionInfo);
+ }
+
+ // ==========================================================================
+ // Region FAIL_OPEN helpers
+ // ==========================================================================
+ public static final class RegionFailedOpen {
+ private final RegionStateNode regionNode;
+
+ private volatile Exception exception = null;
+ private volatile int retries = 0;
+
+ public RegionFailedOpen(final RegionStateNode regionNode) {
+ this.regionNode = regionNode;
+ }
+
+ public RegionStateNode getRegionNode() {
+ return regionNode;
+ }
+
+ public HRegionInfo getRegionInfo() {
+ return regionNode.getRegionInfo();
+ }
+
+ public int incrementAndGetRetries() {
+ return ++this.retries;
+ }
+
+ public int getRetries() {
+ return retries;
+ }
+
+ public void setException(final Exception exception) {
+ this.exception = exception;
+ }
+
+ public Exception getException() {
+ return this.exception;
+ }
+ }
+
+ public RegionFailedOpen addToFailedOpen(final RegionStateNode regionNode) {
+ final byte[] key = regionNode.getRegionInfo().getRegionName();
+ RegionFailedOpen node = regionFailedOpen.get(key);
+ if (node == null) {
+ RegionFailedOpen newNode = new RegionFailedOpen(regionNode);
+ RegionFailedOpen oldNode = regionFailedOpen.putIfAbsent(key, newNode);
+ node = oldNode != null ? oldNode : newNode;
+ }
+ return node;
+ }
+
+ public RegionFailedOpen getFailedOpen(final HRegionInfo regionInfo) {
+ return regionFailedOpen.get(regionInfo.getRegionName());
+ }
+
+ public void removeFromFailedOpen(final HRegionInfo regionInfo) {
+ regionFailedOpen.remove(regionInfo.getRegionName());
+ }
+
+ public List<RegionState> getRegionFailedOpen() {
+ if (regionFailedOpen.isEmpty()) return Collections.emptyList();
+
+ ArrayList<RegionState> regions = new ArrayList<RegionState>(regionFailedOpen.size());
+ for (RegionFailedOpen r: regionFailedOpen.values()) {
+ regions.add(createRegionState(r.getRegionNode()));
+ }
+ return regions;
+ }
+
+ // ==========================================================================
+ // Servers
+ // ==========================================================================
+ public ServerStateNode getOrCreateServer(final ServerName serverName) {
+ ServerStateNode node = serverMap.get(serverName);
+ if (node == null) {
+ node = new ServerStateNode(serverName);
+ ServerStateNode oldNode = serverMap.putIfAbsent(serverName, node);
+ node = oldNode != null ? oldNode : node;
+ }
+ return node;
+ }
+
+ public void removeServer(final ServerName serverName) {
+ serverMap.remove(serverName);
+ }
+
+ protected ServerStateNode getServerNode(final ServerName serverName) {
+ return serverMap.get(serverName);
+ }
+
+ public double getAverageLoad() {
+ int numServers = 0;
+ int totalLoad = 0;
+ for (ServerStateNode node: serverMap.values()) {
+ totalLoad += node.getRegionCount();
+ numServers++;
+ }
+ return numServers == 0 ? 0.0: (double)totalLoad / (double)numServers;
+ }
+
+ public ServerStateNode addRegionToServer(final ServerName serverName,
+ final RegionStateNode regionNode) {
+ ServerStateNode serverNode = getOrCreateServer(serverName);
+ serverNode.addRegion(regionNode);
+ return serverNode;
+ }
+
+ public ServerStateNode removeRegionFromServer(final ServerName serverName,
+ final RegionStateNode regionNode) {
+ ServerStateNode serverNode = getOrCreateServer(serverName);
+ serverNode.removeRegion(regionNode);
+ return serverNode;
+ }
+
+ // ==========================================================================
+ // ToString helpers
+ // ==========================================================================
+ public static String regionNamesToString(final Collection<byte[]> regions) {
+ final StringBuilder sb = new StringBuilder();
+ final Iterator<byte[]> it = regions.iterator();
+ sb.append("[");
+ if (it.hasNext()) {
+ sb.append(Bytes.toStringBinary(it.next()));
+ while (it.hasNext()) {
+ sb.append(", ");
+ sb.append(Bytes.toStringBinary(it.next()));
+ }
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+}