You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/23 18:37:52 UTC
[15/28] hbase git commit: HBASE-18087 Fix unit tests in
TestTableFavoredNodes
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/GCRegionProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/GCRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/GCRegionProcedure.java
new file mode 100644
index 0000000..05766f7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/GCRegionProcedure.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.assignment;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.backup.HFileArchiver;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.favored.FavoredNodesManager;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.procedure.AbstractStateMachineRegionProcedure;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.GCRegionState;
+
+import com.google.common.collect.Lists;
+
+/**
+ * GC a Region that is no longer in use. It has been split or merged away.
+ * Caller determines if it is GC time. This Procedure does not check.
+ * <p>This is a Region StateMachine Procedure. We take a read lock on the Table and then
+ * exclusive on the Region.
+ */
+@InterfaceAudience.Private
+public class GCRegionProcedure extends AbstractStateMachineRegionProcedure<GCRegionState> {
+ private static final Log LOG = LogFactory.getLog(GCRegionProcedure.class);
+
+ public GCRegionProcedure(final MasterProcedureEnv env, final HRegionInfo hri) {
+ super(env, hri);
+ }
+
+ public GCRegionProcedure() {
+ // Required by the Procedure framework to create the procedure on replay
+ super();
+ }
+
+ @Override
+ public TableOperationType getTableOperationType() {
+ return TableOperationType.REGION_GC;
+ }
+
+ @Override
+ protected Flow executeFromState(MasterProcedureEnv env, GCRegionState state)
+ throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + " execute state=" + state);
+ }
+ MasterServices masterServices = env.getMasterServices();
+ try {
+ switch (state) {
+ case GC_REGION_PREPARE:
+ // Nothing to do to prepare.
+ setNextState(GCRegionState.GC_REGION_ARCHIVE);
+ break;
+ case GC_REGION_ARCHIVE:
+ FileSystem fs = masterServices.getMasterFileSystem().getFileSystem();
+ if (HFileArchiver.exists(masterServices.getConfiguration(), fs, getRegion())) {
+ if (LOG.isDebugEnabled()) LOG.debug("Archiving region=" + getRegion().getShortNameToLog());
+ HFileArchiver.archiveRegion(masterServices.getConfiguration(), fs, getRegion());
+ }
+ setNextState(GCRegionState.GC_REGION_PURGE_METADATA);
+ break;
+ case GC_REGION_PURGE_METADATA:
+ // TODO: Purge metadata before removing from HDFS? This ordering is copied
+ // from CatalogJanitor.
+ AssignmentManager am = masterServices.getAssignmentManager();
+ if (am != null) {
+ if (am.getRegionStates() != null) {
+ am.getRegionStates().deleteRegion(getRegion());
+ }
+ }
+ MetaTableAccessor.deleteRegion(masterServices.getConnection(), getRegion());
+ masterServices.getServerManager().removeRegion(getRegion());
+ FavoredNodesManager fnm = masterServices.getFavoredNodesManager();
+ if (fnm != null) {
+ fnm.deleteFavoredNodesForRegions(Lists.newArrayList(getRegion()));
+ }
+ return Flow.NO_MORE_STATE;
+ default:
+ throw new UnsupportedOperationException(this + " unhandled state=" + state);
+ }
+ } catch (IOException ioe) {
+ // TODO: This is going to spew log?
+ LOG.warn("Error trying to GC " + getRegion().getShortNameToLog() + "; retrying...", ioe);
+ }
+ return Flow.HAS_MORE_STATE;
+ }
+
+ @Override
+ protected void rollbackState(MasterProcedureEnv env, GCRegionState state) throws IOException, InterruptedException {
+ // no-op
+ }
+
+ @Override
+ protected GCRegionState getState(int stateId) {
+ return GCRegionState.forNumber(stateId);
+ }
+
+ @Override
+ protected int getStateId(GCRegionState state) {
+ return state.getNumber();
+ }
+
+ @Override
+ protected GCRegionState getInitialState() {
+ return GCRegionState.GC_REGION_PREPARE;
+ }
+
+ @Override
+ protected void serializeStateData(OutputStream stream) throws IOException {
+ super.serializeStateData(stream);
+ final MasterProcedureProtos.GCRegionStateData.Builder msg =
+ MasterProcedureProtos.GCRegionStateData.newBuilder()
+ .setRegionInfo(HRegionInfo.convert(getRegion()));
+ msg.build().writeDelimitedTo(stream);
+ }
+
+ @Override
+ protected void deserializeStateData(InputStream stream) throws IOException {
+ super.deserializeStateData(stream);
+ final MasterProcedureProtos.GCRegionStateData msg =
+ MasterProcedureProtos.GCRegionStateData.parseDelimitedFrom(stream);
+ setRegion(HRegionInfo.convert(msg.getRegionInfo()));
+ }
+
+ @Override
+ protected org.apache.hadoop.hbase.procedure2.Procedure.LockState acquireLock(MasterProcedureEnv env) {
+ return super.acquireLock(env);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
new file mode 100644
index 0000000..2b1de9d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
@@ -0,0 +1,776 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.assignment;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MetaMutationAnnotation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.UnknownRegionException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.exceptions.MergeRegionException;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.master.CatalogJanitor;
+import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.procedure.AbstractStateMachineTableProcedure;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MergeTableRegionsState;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.lmax.disruptor.YieldingWaitStrategy;
+
+/**
+ * The procedure to Merge a region in a table.
+ * This procedure takes an exclusive table lock since it is working over multiple regions.
+ * It holds the lock for the life of the procedure.
+ */
+@InterfaceAudience.Private
+public class MergeTableRegionsProcedure
+ extends AbstractStateMachineTableProcedure<MergeTableRegionsState> {
+ private static final Log LOG = LogFactory.getLog(MergeTableRegionsProcedure.class);
+ private Boolean traceEnabled;
+ private volatile boolean lock = false;
+ private ServerName regionLocation;
+ private HRegionInfo[] regionsToMerge;
+ private HRegionInfo mergedRegion;
+ private boolean forcible;
+
+ public MergeTableRegionsProcedure() {
+ // Required by the Procedure framework to create the procedure on replay
+ }
+
+ public MergeTableRegionsProcedure(final MasterProcedureEnv env,
+ final HRegionInfo regionToMergeA, final HRegionInfo regionToMergeB) throws IOException {
+ this(env, regionToMergeA, regionToMergeB, false);
+ }
+
+ public MergeTableRegionsProcedure(final MasterProcedureEnv env,
+ final HRegionInfo regionToMergeA, final HRegionInfo regionToMergeB,
+ final boolean forcible) throws MergeRegionException {
+ this(env, new HRegionInfo[] {regionToMergeA, regionToMergeB}, forcible);
+ }
+
+ public MergeTableRegionsProcedure(final MasterProcedureEnv env,
+ final HRegionInfo[] regionsToMerge, final boolean forcible)
+ throws MergeRegionException {
+ super(env);
+
+ // Check daughter regions and make sure that we have valid daughter regions
+ // before doing the real work.
+ checkRegionsToMerge(regionsToMerge, forcible);
+
+ // WARN: make sure there is no parent region of the two merging regions in
+ // hbase:meta If exists, fixing up daughters would cause daughter regions(we
+ // have merged one) online again when we restart master, so we should clear
+ // the parent region to prevent the above case
+ // Since HBASE-7721, we don't need fix up daughters any more. so here do nothing
+ this.regionsToMerge = regionsToMerge;
+ this.mergedRegion = createMergedRegionInfo(regionsToMerge);
+ this.forcible = forcible;
+ }
+
+ private static void checkRegionsToMerge(final HRegionInfo[] regionsToMerge,
+ final boolean forcible) throws MergeRegionException {
+ // For now, we only merge 2 regions.
+ // It could be extended to more than 2 regions in the future.
+ if (regionsToMerge == null || regionsToMerge.length != 2) {
+ throw new MergeRegionException("Expected to merge 2 regions, got: " +
+ Arrays.toString(regionsToMerge));
+ }
+
+ checkRegionsToMerge(regionsToMerge[0], regionsToMerge[1], forcible);
+ }
+
+ private static void checkRegionsToMerge(final HRegionInfo regionToMergeA,
+ final HRegionInfo regionToMergeB, final boolean forcible) throws MergeRegionException {
+ if (!regionToMergeA.getTable().equals(regionToMergeB.getTable())) {
+ throw new MergeRegionException("Can't merge regions from two different tables: " +
+ regionToMergeA + ", " + regionToMergeB);
+ }
+
+ if (regionToMergeA.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
+ regionToMergeB.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
+ throw new MergeRegionException("Can't merge non-default replicas");
+ }
+
+ if (!HRegionInfo.areAdjacent(regionToMergeA, regionToMergeB)) {
+ String msg = "Unable to merge not adjacent regions " + regionToMergeA.getShortNameToLog() +
+ ", " + regionToMergeB.getShortNameToLog() + " where forcible = " + forcible;
+ LOG.warn(msg);
+ if (!forcible) {
+ throw new MergeRegionException(msg);
+ }
+ }
+ }
+
+ private static HRegionInfo createMergedRegionInfo(final HRegionInfo[] regionsToMerge) {
+ return createMergedRegionInfo(regionsToMerge[0], regionsToMerge[1]);
+ }
+
+ /**
+ * Create merged region info through the specified two regions
+ */
+ private static HRegionInfo createMergedRegionInfo(final HRegionInfo regionToMergeA,
+ final HRegionInfo regionToMergeB) {
+ // Choose the smaller as start key
+ final byte[] startKey;
+ if (regionToMergeA.compareTo(regionToMergeB) <= 0) {
+ startKey = regionToMergeA.getStartKey();
+ } else {
+ startKey = regionToMergeB.getStartKey();
+ }
+
+ // Choose the bigger as end key
+ final byte[] endKey;
+ if (Bytes.equals(regionToMergeA.getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
+ || (!Bytes.equals(regionToMergeB.getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
+ && Bytes.compareTo(regionToMergeA.getEndKey(), regionToMergeB.getEndKey()) > 0)) {
+ endKey = regionToMergeA.getEndKey();
+ } else {
+ endKey = regionToMergeB.getEndKey();
+ }
+
+ // Merged region is sorted between two merging regions in META
+ final long rid = getMergedRegionIdTimestamp(regionToMergeA, regionToMergeB);
+ return new HRegionInfo(regionToMergeA.getTable(), startKey, endKey, false, rid);
+ }
+
+ private static long getMergedRegionIdTimestamp(final HRegionInfo regionToMergeA,
+ final HRegionInfo regionToMergeB) {
+ long rid = EnvironmentEdgeManager.currentTime();
+ // Regionid is timestamp. Merged region's id can't be less than that of
+ // merging regions else will insert at wrong location in hbase:meta (See HBASE-710).
+ if (rid < regionToMergeA.getRegionId() || rid < regionToMergeB.getRegionId()) {
+ LOG.warn("Clock skew; merging regions id are " + regionToMergeA.getRegionId()
+ + " and " + regionToMergeB.getRegionId() + ", but current time here is " + rid);
+ rid = Math.max(regionToMergeA.getRegionId(), regionToMergeB.getRegionId()) + 1;
+ }
+ return rid;
+ }
+
+ @Override
+ protected Flow executeFromState(
+ final MasterProcedureEnv env,
+ final MergeTableRegionsState state)
+ throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(this + " execute state=" + state);
+ }
+ try {
+ switch (state) {
+ case MERGE_TABLE_REGIONS_PREPARE:
+ if (!prepareMergeRegion(env)) {
+ assert isFailed() : "Merge region should have an exception here";
+ return Flow.NO_MORE_STATE;
+ }
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION);
+ break;
+ case MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION:
+ preMergeRegions(env);
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE);
+ break;
+ case MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE:
+ setRegionStateToMerging(env);
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CLOSE_REGIONS);
+ break;
+ case MERGE_TABLE_REGIONS_CLOSE_REGIONS:
+ addChildProcedure(createUnassignProcedures(env, getRegionReplication(env)));
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CREATE_MERGED_REGION);
+ break;
+ case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION:
+ createMergedRegion(env);
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION);
+ break;
+ case MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION:
+ preMergeRegionsCommit(env);
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_UPDATE_META);
+ break;
+ case MERGE_TABLE_REGIONS_UPDATE_META:
+ updateMetaForMergedRegions(env);
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION);
+ break;
+ case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION:
+ postMergeRegionsCommit(env);
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_OPEN_MERGED_REGION);
+ break;
+ case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION:
+ addChildProcedure(createAssignProcedures(env, getRegionReplication(env)));
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_OPERATION);
+ break;
+ case MERGE_TABLE_REGIONS_POST_OPERATION:
+ postCompletedMergeRegions(env);
+ return Flow.NO_MORE_STATE;
+ default:
+ throw new UnsupportedOperationException(this + " unhandled state=" + state);
+ }
+ } catch (IOException e) {
+ LOG.warn("Error trying to merge regions " + HRegionInfo.getShortNameToLog(regionsToMerge) +
+ " in the table " + getTableName() + " (in state=" + state + ")", e);
+
+ setFailure("master-merge-regions", e);
+ }
+ return Flow.HAS_MORE_STATE;
+ }
+
+ @Override
+ protected void rollbackState(
+ final MasterProcedureEnv env,
+ final MergeTableRegionsState state) throws IOException, InterruptedException {
+ if (isTraceEnabled()) {
+ LOG.trace(this + " rollback state=" + state);
+ }
+
+ try {
+ switch (state) {
+ case MERGE_TABLE_REGIONS_POST_OPERATION:
+ case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION:
+ case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION:
+ case MERGE_TABLE_REGIONS_UPDATE_META:
+ String msg = this + " We are in the " + state + " state."
+ + " It is complicated to rollback the merge operation that region server is working on."
+ + " Rollback is not supported and we should let the merge operation to complete";
+ LOG.warn(msg);
+ // PONR
+ throw new UnsupportedOperationException(this + " unhandled state=" + state);
+ case MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION:
+ break;
+ case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION:
+ cleanupMergedRegion(env);
+ break;
+ case MERGE_TABLE_REGIONS_CLOSE_REGIONS:
+ rollbackCloseRegionsForMerge(env);
+ break;
+ case MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE:
+ setRegionStateToRevertMerging(env);
+ break;
+ case MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION:
+ postRollBackMergeRegions(env);
+ break;
+ case MERGE_TABLE_REGIONS_MOVE_REGION_TO_SAME_RS:
+ break; // nothing to rollback
+ case MERGE_TABLE_REGIONS_PREPARE:
+ break;
+ default:
+ throw new UnsupportedOperationException(this + " unhandled state=" + state);
+ }
+ } catch (Exception e) {
+ // This will be retried. Unless there is a bug in the code,
+ // this should be just a "temporary error" (e.g. network down)
+ LOG.warn("Failed rollback attempt step " + state + " for merging the regions "
+ + HRegionInfo.getShortNameToLog(regionsToMerge) + " in table " + getTableName(), e);
+ throw e;
+ }
+ }
+
+ /*
+ * Check whether we are in the state that can be rollback
+ */
+ @Override
+ protected boolean isRollbackSupported(final MergeTableRegionsState state) {
+ switch (state) {
+ case MERGE_TABLE_REGIONS_POST_OPERATION:
+ case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION:
+ case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION:
+ case MERGE_TABLE_REGIONS_UPDATE_META:
+ // It is not safe to rollback if we reach to these states.
+ return false;
+ default:
+ break;
+ }
+ return true;
+ }
+
+ @Override
+ protected MergeTableRegionsState getState(final int stateId) {
+ return MergeTableRegionsState.forNumber(stateId);
+ }
+
+ @Override
+ protected int getStateId(final MergeTableRegionsState state) {
+ return state.getNumber();
+ }
+
+ @Override
+ protected MergeTableRegionsState getInitialState() {
+ return MergeTableRegionsState.MERGE_TABLE_REGIONS_PREPARE;
+ }
+
+ @Override
+ public void serializeStateData(final OutputStream stream) throws IOException {
+ super.serializeStateData(stream);
+
+ final MasterProcedureProtos.MergeTableRegionsStateData.Builder mergeTableRegionsMsg =
+ MasterProcedureProtos.MergeTableRegionsStateData.newBuilder()
+ .setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
+ .setMergedRegionInfo(HRegionInfo.convert(mergedRegion))
+ .setForcible(forcible);
+ for (int i = 0; i < regionsToMerge.length; ++i) {
+ mergeTableRegionsMsg.addRegionInfo(HRegionInfo.convert(regionsToMerge[i]));
+ }
+ mergeTableRegionsMsg.build().writeDelimitedTo(stream);
+ }
+
+ @Override
+ public void deserializeStateData(final InputStream stream) throws IOException {
+ super.deserializeStateData(stream);
+
+ final MasterProcedureProtos.MergeTableRegionsStateData mergeTableRegionsMsg =
+ MasterProcedureProtos.MergeTableRegionsStateData.parseDelimitedFrom(stream);
+ setUser(MasterProcedureUtil.toUserInfo(mergeTableRegionsMsg.getUserInfo()));
+
+ assert(mergeTableRegionsMsg.getRegionInfoCount() == 2);
+ regionsToMerge = new HRegionInfo[mergeTableRegionsMsg.getRegionInfoCount()];
+ for (int i = 0; i < regionsToMerge.length; i++) {
+ regionsToMerge[i] = HRegionInfo.convert(mergeTableRegionsMsg.getRegionInfo(i));
+ }
+
+ mergedRegion = HRegionInfo.convert(mergeTableRegionsMsg.getMergedRegionInfo());
+ }
+
+ @Override
+ public void toStringClassDetails(StringBuilder sb) {
+ sb.append(getClass().getSimpleName());
+ sb.append(" table=");
+ sb.append(getTableName());
+ sb.append(", regions=");
+ sb.append(HRegionInfo.getShortNameToLog(regionsToMerge));
+ sb.append(", forcibly=");
+ sb.append(forcible);
+ }
+
+ @Override
+ protected LockState acquireLock(final MasterProcedureEnv env) {
+ if (env.waitInitialized(this)) return LockState.LOCK_EVENT_WAIT;
+ if (env.getProcedureScheduler().waitRegions(this, getTableName(),
+ mergedRegion, regionsToMerge[0], regionsToMerge[1])) {
+ try {
+ LOG.debug(LockState.LOCK_EVENT_WAIT + " " + env.getProcedureScheduler().dumpLocks());
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ return LockState.LOCK_EVENT_WAIT;
+ }
+ this.lock = true;
+ return LockState.LOCK_ACQUIRED;
+ }
+
+ @Override
+ protected void releaseLock(final MasterProcedureEnv env) {
+ this.lock = false;
+ env.getProcedureScheduler().wakeRegions(this, getTableName(),
+ mergedRegion, regionsToMerge[0], regionsToMerge[1]);
+ }
+
+ @Override
+ protected boolean holdLock(MasterProcedureEnv env) {
+ return true;
+ }
+
+ @Override
+ protected boolean hasLock(MasterProcedureEnv env) {
+ return this.lock;
+ }
+
+ @Override
+ public TableName getTableName() {
+ return mergedRegion.getTable();
+ }
+
+ @Override
+ public TableOperationType getTableOperationType() {
+ return TableOperationType.REGION_MERGE;
+ }
+
+ /**
+ * Prepare merge and do some check
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ private boolean prepareMergeRegion(final MasterProcedureEnv env) throws IOException {
+ // Note: the following logic assumes that we only have 2 regions to merge. In the future,
+ // if we want to extend to more than 2 regions, the code needs to modify a little bit.
+ //
+ CatalogJanitor catalogJanitor = env.getMasterServices().getCatalogJanitor();
+ boolean regionAHasMergeQualifier = !catalogJanitor.cleanMergeQualifier(regionsToMerge[0]);
+ if (regionAHasMergeQualifier
+ || !catalogJanitor.cleanMergeQualifier(regionsToMerge[1])) {
+ String msg = "Skip merging regions " + HRegionInfo.getShortNameToLog(regionsToMerge) +
+ ", because region "
+ + (regionAHasMergeQualifier ? regionsToMerge[0].getEncodedName() : regionsToMerge[1]
+ .getEncodedName()) + " has merge qualifier";
+ LOG.warn(msg);
+ throw new MergeRegionException(msg);
+ }
+
+ RegionStates regionStates = env.getAssignmentManager().getRegionStates();
+ RegionState regionStateA = regionStates.getRegionState(regionsToMerge[0].getEncodedName());
+ RegionState regionStateB = regionStates.getRegionState(regionsToMerge[1].getEncodedName());
+ if (regionStateA == null || regionStateB == null) {
+ throw new UnknownRegionException(
+ regionStateA == null ?
+ regionsToMerge[0].getEncodedName() : regionsToMerge[1].getEncodedName());
+ }
+
+ if (!regionStateA.isOpened() || !regionStateB.isOpened()) {
+ throw new MergeRegionException(
+ "Unable to merge regions not online " + regionStateA + ", " + regionStateB);
+ }
+
+ if (!env.getMasterServices().isSplitOrMergeEnabled(MasterSwitchType.MERGE)) {
+ String regionsStr = Arrays.deepToString(regionsToMerge);
+ LOG.warn("merge switch is off! skip merge of " + regionsStr);
+ super.setFailure(getClass().getSimpleName(),
+ new IOException("Merge of " + regionsStr + " failed because merge switch is off"));
+ return false;
+ }
+
+
+ // Ask the remote regionserver if regions are mergeable. If we get an IOE, report it
+ // along w/ the failure so can see why we are not mergeable at this time.
+ IOException mergeableCheckIOE = null;
+ boolean mergeable = false;
+ RegionState current = regionStateA;
+ try {
+ mergeable = isMergeable(env, current);
+ } catch (IOException e) {
+ mergeableCheckIOE = e;
+ }
+ if (mergeable && mergeableCheckIOE == null) {
+ current = regionStateB;
+ try {
+ mergeable = isMergeable(env, current);
+ } catch (IOException e) {
+ mergeableCheckIOE = e;
+ }
+ }
+ if (!mergeable) {
+ IOException e = new IOException(current.getRegion().getShortNameToLog() + " NOT mergeable");
+ if (mergeableCheckIOE != null) e.initCause(mergeableCheckIOE);
+ super.setFailure(getClass().getSimpleName(), e);
+ return false;
+ }
+
+ return true;
+ }
+
+ private boolean isMergeable(final MasterProcedureEnv env, final RegionState rs)
+ throws IOException {
+ GetRegionInfoResponse response =
+ Util.getRegionInfoResponse(env, rs.getServerName(), rs.getRegion());
+ return response.hasSplittable() && response.getSplittable();
+ }
+
+ /**
+ * Pre merge region action
+ * @param env MasterProcedureEnv
+ **/
+ private void preMergeRegions(final MasterProcedureEnv env) throws IOException {
+ final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ boolean ret = cpHost.preMergeRegionsAction(regionsToMerge, getUser());
+ if (ret) {
+ throw new IOException(
+ "Coprocessor bypassing regions " + HRegionInfo.getShortNameToLog(regionsToMerge) +
+ " merge.");
+ }
+ }
+ // TODO: Clean up split and merge. Currently all over the place.
+ env.getMasterServices().getMasterQuotaManager().onRegionMerged(this.mergedRegion);
+ }
+
+ /**
+ * Action after rollback a merge table regions action.
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ private void postRollBackMergeRegions(final MasterProcedureEnv env) throws IOException {
+ final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ cpHost.postRollBackMergeRegionsAction(regionsToMerge, getUser());
+ }
+ }
+
+ /**
+ * Set the region states to MERGING state
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ public void setRegionStateToMerging(final MasterProcedureEnv env) throws IOException {
+ //transition.setTransitionCode(TransitionCode.READY_TO_MERGE);
+ }
+
+ /**
+ * Rollback the region state change
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ private void setRegionStateToRevertMerging(final MasterProcedureEnv env) throws IOException {
+ //transition.setTransitionCode(TransitionCode.MERGE_REVERTED);
+ }
+
+ /**
+ * Create merged region
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ private void createMergedRegion(final MasterProcedureEnv env) throws IOException {
+ final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+ final Path tabledir = FSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
+ final FileSystem fs = mfs.getFileSystem();
+ HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
+ env.getMasterConfiguration(), fs, tabledir, regionsToMerge[0], false);
+ regionFs.createMergesDir();
+
+ mergeStoreFiles(env, regionFs, regionFs.getMergesDir());
+ HRegionFileSystem regionFs2 = HRegionFileSystem.openRegionFromFileSystem(
+ env.getMasterConfiguration(), fs, tabledir, regionsToMerge[1], false);
+ mergeStoreFiles(env, regionFs2, regionFs.getMergesDir());
+
+ regionFs.commitMergedRegion(mergedRegion);
+ }
+
+ /**
+ * Create reference file(s) of merging regions under the merges directory
+ * @param env MasterProcedureEnv
+ * @param regionFs region file system
+ * @param mergedDir the temp directory of merged region
+ * @throws IOException
+ */
+ private void mergeStoreFiles(
+ final MasterProcedureEnv env, final HRegionFileSystem regionFs, final Path mergedDir)
+ throws IOException {
+ final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+ final Configuration conf = env.getMasterConfiguration();
+ final HTableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
+
+ for (String family: regionFs.getFamilies()) {
+ final HColumnDescriptor hcd = htd.getFamily(family.getBytes());
+ final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
+
+ if (storeFiles != null && storeFiles.size() > 0) {
+ final CacheConfig cacheConf = new CacheConfig(conf, hcd);
+ for (StoreFileInfo storeFileInfo: storeFiles) {
+ // Create reference file(s) of the region in mergedDir
+ regionFs.mergeStoreFile(
+ mergedRegion,
+ family,
+ new StoreFile(
+ mfs.getFileSystem(), storeFileInfo, conf, cacheConf, hcd.getBloomFilterType()),
+ mergedDir);
+ }
+ }
+ }
+ }
+
+ /**
+ * Clean up merged region
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ private void cleanupMergedRegion(final MasterProcedureEnv env) throws IOException {
+ final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+ final Path tabledir = FSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
+ final FileSystem fs = mfs.getFileSystem();
+ HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
+ env.getMasterConfiguration(), fs, tabledir, regionsToMerge[0], false);
+ regionFs.cleanupMergedRegion(mergedRegion);
+ }
+
+ /**
+ * Rollback close regions
+ * @param env MasterProcedureEnv
+ **/
+ private void rollbackCloseRegionsForMerge(final MasterProcedureEnv env) throws IOException {
+ // Check whether the region is closed; if so, open it in the same server
+ final int regionReplication = getRegionReplication(env);
+ final ServerName serverName = getServerName(env);
+
+ final AssignProcedure[] procs =
+ new AssignProcedure[regionsToMerge.length * regionReplication];
+ int procsIdx = 0;
+ for (int i = 0; i < regionsToMerge.length; ++i) {
+ for (int j = 0; j < regionReplication; ++j) {
+ final HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(regionsToMerge[i], j);
+ procs[procsIdx++] = env.getAssignmentManager().createAssignProcedure(hri, serverName);
+ }
+ }
+ env.getMasterServices().getMasterProcedureExecutor().submitProcedures(procs);
+ }
+
+ private UnassignProcedure[] createUnassignProcedures(final MasterProcedureEnv env,
+ final int regionReplication) {
+ final UnassignProcedure[] procs =
+ new UnassignProcedure[regionsToMerge.length * regionReplication];
+ int procsIdx = 0;
+ for (int i = 0; i < regionsToMerge.length; ++i) {
+ for (int j = 0; j < regionReplication; ++j) {
+ final HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(regionsToMerge[i], j);
+ procs[procsIdx++] = env.getAssignmentManager().createUnassignProcedure(hri,null,true);
+ }
+ }
+ return procs;
+ }
+
+ private AssignProcedure[] createAssignProcedures(final MasterProcedureEnv env,
+ final int regionReplication) {
+ final ServerName targetServer = getServerName(env);
+ final AssignProcedure[] procs = new AssignProcedure[regionReplication];
+ for (int i = 0; i < procs.length; ++i) {
+ final HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(mergedRegion, i);
+ procs[i] = env.getAssignmentManager().createAssignProcedure(hri, targetServer);
+ }
+ return procs;
+ }
+
+ private int getRegionReplication(final MasterProcedureEnv env) throws IOException {
+ final HTableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
+ return htd.getRegionReplication();
+ }
+
+ /**
+ * Post merge region action
+ * @param env MasterProcedureEnv
+ **/
+ private void preMergeRegionsCommit(final MasterProcedureEnv env) throws IOException {
+ final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ @MetaMutationAnnotation
+ final List<Mutation> metaEntries = new ArrayList<Mutation>();
+ boolean ret = cpHost.preMergeRegionsCommit(regionsToMerge, metaEntries, getUser());
+
+ if (ret) {
+ throw new IOException(
+ "Coprocessor bypassing regions " + HRegionInfo.getShortNameToLog(regionsToMerge) +
+ " merge.");
+ }
+ try {
+ for (Mutation p : metaEntries) {
+ HRegionInfo.parseRegionName(p.getRow());
+ }
+ } catch (IOException e) {
+ LOG.error("Row key of mutation from coprocessor is not parsable as region name."
+ + "Mutations from coprocessor should only be for hbase:meta table.", e);
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Add merged region to META and delete original regions.
+ */
+ private void updateMetaForMergedRegions(final MasterProcedureEnv env)
+ throws IOException, ProcedureYieldException {
+ final ServerName serverName = getServerName(env);
+ env.getAssignmentManager().markRegionAsMerged(mergedRegion, serverName,
+ regionsToMerge[0], regionsToMerge[1]);
+ }
+
+ /**
+ * Post merge region action
+ * @param env MasterProcedureEnv
+ **/
+ private void postMergeRegionsCommit(final MasterProcedureEnv env) throws IOException {
+ final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ cpHost.postMergeRegionsCommit(regionsToMerge, mergedRegion, getUser());
+ }
+ }
+
+ /**
+ * Post merge region action
+ * @param env MasterProcedureEnv
+ **/
+ private void postCompletedMergeRegions(final MasterProcedureEnv env) throws IOException {
+ final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ cpHost.postCompletedMergeRegionsAction(regionsToMerge, mergedRegion, getUser());
+ }
+ }
+
+ /**
+ * The procedure could be restarted from a different machine. If the variable is null, we need to
+ * retrieve it.
+ * @param env MasterProcedureEnv
+ * @return serverName
+ */
+ private ServerName getServerName(final MasterProcedureEnv env) {
+ if (regionLocation == null) {
+ regionLocation = env.getAssignmentManager().getRegionStates().
+ getRegionServerOfRegion(regionsToMerge[0]);
+ // May still be null here but return null and let caller deal.
+ // Means we lost the in-memory-only location. We are in recovery
+ // or so. The caller should be able to deal w/ a null ServerName.
+ // Let them go to the Balancer to find one to use instead.
+ }
+ return regionLocation;
+ }
+
+ /**
+ * The procedure could be restarted from a different machine. If the variable is null, we need to
+ * retrieve it.
+ * @return traceEnabled
+ */
+ private Boolean isTraceEnabled() {
+ if (traceEnabled == null) {
+ traceEnabled = LOG.isTraceEnabled();
+ }
+ return traceEnabled;
+ }
+
+ /**
+ * @return The merged region. Maybe be null if called to early or we failed.
+ */
+ @VisibleForTesting
+ public HRegionInfo getMergedRegion() {
+ return this.mergedRegion;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java
new file mode 100644
index 0000000..f998af8
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java
@@ -0,0 +1,145 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.assignment;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.master.procedure.AbstractStateMachineRegionProcedure;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MoveRegionState;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MoveRegionStateData;
+
+/**
+ * Procedure that implements a RegionPlan.
+ * It first runs an unassign subprocedure followed
+ * by an assign subprocedure. It takes a lock on the region being moved.
+ * It holds the lock for the life of the procedure.
+ */
+@InterfaceAudience.Private
+public class MoveRegionProcedure extends AbstractStateMachineRegionProcedure<MoveRegionState> {
+ private static final Log LOG = LogFactory.getLog(MoveRegionProcedure.class);
+ private RegionPlan plan;
+
+ public MoveRegionProcedure() {
+ // Required by the Procedure framework to create the procedure on replay
+ super();
+ }
+
+ public MoveRegionProcedure(final MasterProcedureEnv env, final RegionPlan plan) {
+ super(env, plan.getRegionInfo());
+ assert plan.getDestination() != null: plan.toString();
+ this.plan = plan;
+ }
+
+ @Override
+ protected Flow executeFromState(final MasterProcedureEnv env, final MoveRegionState state)
+ throws InterruptedException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + " execute state=" + state);
+ }
+ switch (state) {
+ case MOVE_REGION_UNASSIGN:
+ addChildProcedure(new UnassignProcedure(plan.getRegionInfo(), plan.getSource(), true));
+ setNextState(MoveRegionState.MOVE_REGION_ASSIGN);
+ break;
+ case MOVE_REGION_ASSIGN:
+ addChildProcedure(new AssignProcedure(plan.getRegionInfo(), plan.getDestination()));
+ return Flow.NO_MORE_STATE;
+ default:
+ throw new UnsupportedOperationException("unhandled state=" + state);
+ }
+ return Flow.HAS_MORE_STATE;
+ }
+
+ @Override
+ protected void rollbackState(final MasterProcedureEnv env, final MoveRegionState state)
+ throws IOException {
+ // no-op
+ }
+
+ @Override
+ public boolean abort(final MasterProcedureEnv env) {
+ return false;
+ }
+
+ @Override
+ public void toStringClassDetails(final StringBuilder sb) {
+ sb.append(getClass().getSimpleName());
+ sb.append(" ");
+ sb.append(plan);
+ }
+
+ @Override
+ protected MoveRegionState getInitialState() {
+ return MoveRegionState.MOVE_REGION_UNASSIGN;
+ }
+
+ @Override
+ protected int getStateId(final MoveRegionState state) {
+ return state.getNumber();
+ }
+
+ @Override
+ protected MoveRegionState getState(final int stateId) {
+ return MoveRegionState.valueOf(stateId);
+ }
+
+ @Override
+ public TableName getTableName() {
+ return plan.getRegionInfo().getTable();
+ }
+
+ @Override
+ public TableOperationType getTableOperationType() {
+ return TableOperationType.REGION_EDIT;
+ }
+
+ @Override
+ protected void serializeStateData(final OutputStream stream) throws IOException {
+ super.serializeStateData(stream);
+
+ final MoveRegionStateData.Builder state = MoveRegionStateData.newBuilder()
+ .setRegionInfo(HRegionInfo.convert(plan.getRegionInfo()))
+ .setSourceServer(ProtobufUtil.toServerName(plan.getSource()))
+ .setDestinationServer(ProtobufUtil.toServerName(plan.getDestination()));
+ state.build().writeDelimitedTo(stream);
+ }
+
+ @Override
+ protected void deserializeStateData(final InputStream stream) throws IOException {
+ super.deserializeStateData(stream);
+
+ final MoveRegionStateData state = MoveRegionStateData.parseDelimitedFrom(stream);
+ final HRegionInfo regionInfo = HRegionInfo.convert(state.getRegionInfo());
+ final ServerName sourceServer = ProtobufUtil.toServerName(state.getSourceServer());
+ final ServerName destinationServer = ProtobufUtil.toServerName(state.getDestinationServer());
+ this.plan = new RegionPlan(regionInfo, sourceServer, destinationServer);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
new file mode 100644
index 0000000..21e0d9c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
@@ -0,0 +1,327 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.assignment;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.procedure2.util.StringUtils;
+import org.apache.hadoop.hbase.util.MultiHConnection;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
+import org.apache.zookeeper.KeeperException;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Store Region State to hbase:meta table.
+ */
+@InterfaceAudience.Private
+public class RegionStateStore {
+ private static final Log LOG = LogFactory.getLog(RegionStateStore.class);
+
+ /** The delimiter for meta columns for replicaIds > 0 */
+ protected static final char META_REPLICA_ID_DELIMITER = '_';
+
+ private final MasterServices master;
+
+ private MultiHConnection multiHConnection;
+
+ public RegionStateStore(final MasterServices master) {
+ this.master = master;
+ }
+
+ public void start() throws IOException {
+ }
+
+ public void stop() {
+ if (multiHConnection != null) {
+ multiHConnection.close();
+ multiHConnection = null;
+ }
+ }
+
+ public interface RegionStateVisitor {
+ void visitRegionState(HRegionInfo regionInfo, State state,
+ ServerName regionLocation, ServerName lastHost, long openSeqNum);
+ }
+
+ public void visitMeta(final RegionStateVisitor visitor) throws IOException {
+ MetaTableAccessor.fullScanRegions(master.getConnection(), new MetaTableAccessor.Visitor() {
+ final boolean isDebugEnabled = LOG.isDebugEnabled();
+
+ @Override
+ public boolean visit(final Result r) throws IOException {
+ if (r != null && !r.isEmpty()) {
+ long st = System.currentTimeMillis();
+ visitMetaEntry(visitor, r);
+ long et = System.currentTimeMillis();
+ LOG.info("[T] LOAD META PERF " + StringUtils.humanTimeDiff(et - st));
+ } else if (isDebugEnabled) {
+ LOG.debug("NULL result from meta - ignoring but this is strange.");
+ }
+ return true;
+ }
+ });
+ }
+
+ private void visitMetaEntry(final RegionStateVisitor visitor, final Result result)
+ throws IOException {
+ final RegionLocations rl = MetaTableAccessor.getRegionLocations(result);
+ if (rl == null) return;
+
+ final HRegionLocation[] locations = rl.getRegionLocations();
+ if (locations == null) return;
+
+ for (int i = 0; i < locations.length; ++i) {
+ final HRegionLocation hrl = locations[i];
+ if (hrl == null) continue;
+
+ final HRegionInfo regionInfo = hrl.getRegionInfo();
+ if (regionInfo == null) continue;
+
+ final int replicaId = regionInfo.getReplicaId();
+ final State state = getRegionState(result, replicaId);
+
+ final ServerName lastHost = hrl.getServerName();
+ final ServerName regionLocation = getRegionServer(result, replicaId);
+ final long openSeqNum = -1;
+
+ // TODO: move under trace, now is visible for debugging
+ LOG.info(String.format("Load hbase:meta entry region=%s regionState=%s lastHost=%s regionLocation=%s",
+ regionInfo, state, lastHost, regionLocation));
+
+ visitor.visitRegionState(regionInfo, state, regionLocation, lastHost, openSeqNum);
+ }
+ }
+
+ public void updateRegionLocation(final HRegionInfo regionInfo, final State state,
+ final ServerName regionLocation, final ServerName lastHost, final long openSeqNum,
+ final long pid)
+ throws IOException {
+ if (regionInfo.isMetaRegion()) {
+ updateMetaLocation(regionInfo, regionLocation);
+ } else {
+ updateUserRegionLocation(regionInfo, state, regionLocation, lastHost, openSeqNum, pid);
+ }
+ }
+
+ public void updateRegionState(final long openSeqNum, final long pid,
+ final RegionState newState, final RegionState oldState) throws IOException {
+ updateRegionLocation(newState.getRegion(), newState.getState(), newState.getServerName(),
+ oldState != null ? oldState.getServerName() : null, openSeqNum, pid);
+ }
+
+ protected void updateMetaLocation(final HRegionInfo regionInfo, final ServerName serverName)
+ throws IOException {
+ try {
+ MetaTableLocator.setMetaLocation(master.getZooKeeper(), serverName,
+ regionInfo.getReplicaId(), State.OPEN);
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ }
+ }
+
+ protected void updateUserRegionLocation(final HRegionInfo regionInfo, final State state,
+ final ServerName regionLocation, final ServerName lastHost, final long openSeqNum,
+ final long pid)
+ throws IOException {
+ final int replicaId = regionInfo.getReplicaId();
+ final Put put = new Put(MetaTableAccessor.getMetaKeyForRegion(regionInfo));
+ MetaTableAccessor.addRegionInfo(put, regionInfo);
+ final StringBuilder info = new StringBuilder("pid=" + pid + " updating hbase:meta row=");
+ info.append(regionInfo.getRegionNameAsString()).append(", regionState=").append(state);
+ if (openSeqNum >= 0) {
+ Preconditions.checkArgument(state == State.OPEN && regionLocation != null,
+ "Open region should be on a server");
+ MetaTableAccessor.addLocation(put, regionLocation, openSeqNum, -1, replicaId);
+ info.append(", openSeqNum=").append(openSeqNum);
+ info.append(", regionLocation=").append(regionLocation);
+ } else if (regionLocation != null && !regionLocation.equals(lastHost)) {
+ // Ideally, if no regionLocation, write null to the hbase:meta but this will confuse clients
+ // currently; they want a server to hit. TODO: Make clients wait if no location.
+ put.addImmutable(HConstants.CATALOG_FAMILY, getServerNameColumn(replicaId),
+ Bytes.toBytes(regionLocation.getServerName()));
+ info.append(", regionLocation=").append(regionLocation);
+ }
+ put.addImmutable(HConstants.CATALOG_FAMILY, getStateColumn(replicaId),
+ Bytes.toBytes(state.name()));
+ LOG.info(info);
+
+ final boolean serialReplication = hasSerialReplicationScope(regionInfo.getTable());
+ if (serialReplication && state == State.OPEN) {
+ Put barrierPut = MetaTableAccessor.makeBarrierPut(regionInfo.getEncodedNameAsBytes(),
+ openSeqNum, regionInfo.getTable().getName());
+ updateRegionLocation(regionInfo, state, put, barrierPut);
+ } else {
+ updateRegionLocation(regionInfo, state, put);
+ }
+ }
+
+ protected void updateRegionLocation(final HRegionInfo regionInfo, final State state,
+ final Put... put) throws IOException {
+ synchronized (this) {
+ if (multiHConnection == null) {
+ multiHConnection = new MultiHConnection(master.getConfiguration(), 1);
+ }
+ }
+
+ try {
+ multiHConnection.processBatchCallback(Arrays.asList(put), TableName.META_TABLE_NAME, null, null);
+ } catch (IOException e) {
+ // TODO: Revist!!!! Means that if a server is loaded, then we will abort our host!
+ // In tests we abort the Master!
+ String msg = String.format("FAILED persisting region=%s state=%s",
+ regionInfo.getShortNameToLog(), state);
+ LOG.error(msg, e);
+ master.abort(msg, e);
+ throw e;
+ }
+ }
+
+ // ============================================================================================
+ // Update Region Splitting State helpers
+ // ============================================================================================
+ public void splitRegion(final HRegionInfo parent, final HRegionInfo hriA,
+ final HRegionInfo hriB, final ServerName serverName) throws IOException {
+ final HTableDescriptor htd = getTableDescriptor(parent.getTable());
+ MetaTableAccessor.splitRegion(master.getConnection(), parent, hriA, hriB, serverName,
+ getRegionReplication(htd), hasSerialReplicationScope(htd));
+ }
+
+ // ============================================================================================
+ // Update Region Merging State helpers
+ // ============================================================================================
+ public void mergeRegions(final HRegionInfo parent, final HRegionInfo hriA,
+ final HRegionInfo hriB, final ServerName serverName) throws IOException {
+ final HTableDescriptor htd = getTableDescriptor(parent.getTable());
+ MetaTableAccessor.mergeRegions(master.getConnection(), parent, hriA, hriB, serverName,
+ getRegionReplication(htd), EnvironmentEdgeManager.currentTime(),
+ hasSerialReplicationScope(htd));
+ }
+
+ // ============================================================================================
+ // Delete Region State helpers
+ // ============================================================================================
+ public void deleteRegion(final HRegionInfo regionInfo) throws IOException {
+ deleteRegions(Collections.singletonList(regionInfo));
+ }
+
+ public void deleteRegions(final List<HRegionInfo> regions) throws IOException {
+ MetaTableAccessor.deleteRegions(master.getConnection(), regions);
+ }
+
+ // ==========================================================================
+ // Table Descriptors helpers
+ // ==========================================================================
+ private boolean hasSerialReplicationScope(final TableName tableName) throws IOException {
+ return hasSerialReplicationScope(getTableDescriptor(tableName));
+ }
+
+ private boolean hasSerialReplicationScope(final HTableDescriptor htd) {
+ return (htd != null)? htd.hasSerialReplicationScope(): false;
+ }
+
+ private int getRegionReplication(final HTableDescriptor htd) {
+ return (htd != null) ? htd.getRegionReplication() : 1;
+ }
+
+ private HTableDescriptor getTableDescriptor(final TableName tableName) throws IOException {
+ return master.getTableDescriptors().get(tableName);
+ }
+
+ // ==========================================================================
+ // Server Name
+ // ==========================================================================
+
+ /**
+ * Returns the {@link ServerName} from catalog table {@link Result}
+ * where the region is transitioning. It should be the same as
+ * {@link MetaTableAccessor#getServerName(Result,int)} if the server is at OPEN state.
+ * @param r Result to pull the transitioning server name from
+ * @return A ServerName instance or {@link MetaTableAccessor#getServerName(Result,int)}
+ * if necessary fields not found or empty.
+ */
+ static ServerName getRegionServer(final Result r, int replicaId) {
+ final Cell cell = r.getColumnLatestCell(HConstants.CATALOG_FAMILY,
+ getServerNameColumn(replicaId));
+ if (cell == null || cell.getValueLength() == 0) {
+ RegionLocations locations = MetaTableAccessor.getRegionLocations(r);
+ if (locations != null) {
+ HRegionLocation location = locations.getRegionLocation(replicaId);
+ if (location != null) {
+ return location.getServerName();
+ }
+ }
+ return null;
+ }
+ return ServerName.parseServerName(Bytes.toString(cell.getValueArray(),
+ cell.getValueOffset(), cell.getValueLength()));
+ }
+
+ private static byte[] getServerNameColumn(int replicaId) {
+ return replicaId == 0
+ ? HConstants.SERVERNAME_QUALIFIER
+ : Bytes.toBytes(HConstants.SERVERNAME_QUALIFIER_STR + META_REPLICA_ID_DELIMITER
+ + String.format(HRegionInfo.REPLICA_ID_FORMAT, replicaId));
+ }
+
+ // ==========================================================================
+ // Region State
+ // ==========================================================================
+
+ /**
+ * Pull the region state from a catalog table {@link Result}.
+ * @param r Result to pull the region state from
+ * @return the region state, or OPEN if there's no value written.
+ */
+ protected State getRegionState(final Result r, int replicaId) {
+ Cell cell = r.getColumnLatestCell(HConstants.CATALOG_FAMILY, getStateColumn(replicaId));
+ if (cell == null || cell.getValueLength() == 0) return State.OPENING;
+ return State.valueOf(Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
+ }
+
+ private static byte[] getStateColumn(int replicaId) {
+ return replicaId == 0
+ ? HConstants.STATE_QUALIFIER
+ : Bytes.toBytes(HConstants.STATE_QUALIFIER_STR + META_REPLICA_ID_DELIMITER
+ + String.format(HRegionInfo.REPLICA_ID_FORMAT, replicaId));
+ }
+}