You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/23 07:36:23 UTC
[11/50] [abbrv] hbase git commit: HBASE-14614 Procedure v2 - Core
Assignment Manager (Matteo Bertozzi) Move to a new AssignmentManager,
one that describes Assignment using a State Machine built on top of
ProcedureV2 facility.
http://git-wip-us.apache.org/repos/asf/hbase/blob/f56592fd/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
new file mode 100644
index 0000000..cd8b858
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
@@ -0,0 +1,348 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.assignment;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
+import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
+
+/**
+ * Base class for the Assign and Unassign Procedure.
+ * There can only be one RegionTransitionProcedure per region running at a time
+ * since each procedure takes a lock on the region (see MasterProcedureScheduler).
+ *
+ * <p>This procedure is asynchronous and responds to external events.
+ * The AssignmentManager will notify this procedure when the RS completes
+ * the operation and reports the transitioned state
+ * (see the Assign and Unassign class for more details).
+ * <p>Procedures move from the REGION_TRANSITION_QUEUE state when they are
+ * first submitted, to the REGION_TRANSITION_DISPATCH state when the request
+ * to remote server is done. They end in the REGION_TRANSITION_FINISH state.
+ * the
+ */
+@InterfaceAudience.Private
+public abstract class RegionTransitionProcedure
+ extends Procedure<MasterProcedureEnv>
+ implements TableProcedureInterface,
+ RemoteProcedure<MasterProcedureEnv, ServerName> {
+ private static final Log LOG = LogFactory.getLog(RegionTransitionProcedure.class);
+
+ protected final AtomicBoolean aborted = new AtomicBoolean(false);
+
+ private RegionTransitionState transitionState =
+ RegionTransitionState.REGION_TRANSITION_QUEUE;
+ private HRegionInfo regionInfo;
+ private volatile boolean lock = false;
+ // Server we assign or unassign from -- the target.
+ protected volatile ServerName server;
+
+ public RegionTransitionProcedure() {
+ // Required by the Procedure framework to create the procedure on replay
+ super();
+ }
+
+ public RegionTransitionProcedure(final HRegionInfo regionInfo) {
+ this.regionInfo = regionInfo;
+ }
+
+ public HRegionInfo getRegionInfo() {
+ return regionInfo;
+ }
+
+ protected void setRegionInfo(final HRegionInfo regionInfo) {
+ // Setter is for deserialization.
+ this.regionInfo = regionInfo;
+ }
+
+ @Override
+ public TableName getTableName() {
+ HRegionInfo hri = getRegionInfo();
+ return hri != null? hri.getTable(): null;
+ }
+
+ public boolean isMeta() {
+ return TableName.isMetaTableName(getTableName());
+ }
+
+ @Override
+ public void toStringClassDetails(final StringBuilder sb) {
+ sb.append(getClass().getSimpleName());
+ sb.append(" table=");
+ sb.append(getTableName());
+ sb.append(", region=");
+ sb.append(getRegionInfo() == null? null: getRegionInfo().getEncodedName());
+ sb.append(", server=");
+ sb.append(getServer());
+ }
+
+ public RegionStateNode getRegionState(final MasterProcedureEnv env) {
+ return env.getAssignmentManager().getRegionStates().
+ getOrCreateRegionNode(getRegionInfo());
+ }
+
+ protected void setTransitionState(final RegionTransitionState state) {
+ this.transitionState = state;
+ }
+
+ protected RegionTransitionState getTransitionState() {
+ return transitionState;
+ }
+
+ protected abstract boolean startTransition(MasterProcedureEnv env, RegionStateNode regionNode)
+ throws IOException, ProcedureSuspendedException;
+ protected abstract boolean updateTransition(MasterProcedureEnv env, RegionStateNode regionNode)
+ throws IOException, ProcedureSuspendedException;
+ protected abstract void finishTransition(MasterProcedureEnv env, RegionStateNode regionNode)
+ throws IOException, ProcedureSuspendedException;
+
+ protected abstract void reportTransition(MasterProcedureEnv env,
+ RegionStateNode regionNode, TransitionCode code, long seqId) throws UnexpectedStateException;
+
+ public abstract RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName serverName);
+ protected abstract void remoteCallFailed(MasterProcedureEnv env,
+ RegionStateNode regionNode, IOException exception);
+
+ @Override
+ public void remoteCallCompleted(final MasterProcedureEnv env,
+ final ServerName serverName, final RemoteOperation response) {
+ // Ignore the response? reportTransition() is the one that count?
+ }
+
+ @Override
+ public void remoteCallFailed(final MasterProcedureEnv env,
+ final ServerName serverName, final IOException exception) {
+ final RegionStateNode regionNode = getRegionState(env);
+ assert serverName.equals(regionNode.getRegionLocation());
+ String msg = exception.getMessage() == null? exception.getClass().getSimpleName():
+ exception.getMessage();
+ LOG.warn("Failed " + this + "; " + regionNode.toShortString() + "; exception=" + msg);
+ remoteCallFailed(env, regionNode, exception);
+ env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent());
+ }
+
+ protected void addToRemoteDispatcher(final MasterProcedureEnv env,
+ final ServerName targetServer) {
+ assert targetServer.equals(getRegionState(env).getRegionLocation()) :
+ "targetServer=" + targetServer + " getRegionLocation=" +
+ getRegionState(env).getRegionLocation(); // TODO
+
+ LOG.info("Dispatch " + this + "; " + getRegionState(env).toShortString());
+
+ // Add the open/close region operation to the server dispatch queue.
+ // The pending open/close will be dispatched to the server together with the other
+ // pending operations (if any) for that server.
+ env.getProcedureScheduler().suspendEvent(getRegionState(env).getProcedureEvent());
+
+ if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) {
+ // Failed add of operation. Call remoteCallFailed to do proper clean up since
+ // this thread was suspended above. If we unsuspend a suspended worker, there
+ // is danger two workers could end up processing a single Procedure instance.
+ remoteCallFailed(env, targetServer,
+ new FailedRemoteDispatchException(this + " to " + targetServer));
+ }
+ }
+
+ protected void reportTransition(final MasterProcedureEnv env, final ServerName serverName,
+ final TransitionCode code, final long seqId) throws UnexpectedStateException {
+ final RegionStateNode regionNode = getRegionState(env);
+ if (!serverName.equals(regionNode.getRegionLocation())) {
+ if (isMeta() && regionNode.getRegionLocation() == null) {
+ regionNode.setRegionLocation(serverName);
+ } else {
+ throw new UnexpectedStateException(String.format(
+ "reported unexpected transition state=%s from server=%s on region=%s, expected server=%s",
+ code, serverName, regionNode.getRegionInfo(), regionNode.getRegionLocation()));
+ }
+ }
+
+ reportTransition(env, regionNode, code, seqId);
+
+ // NOTE: This call actual adds this procedure back on the scheduler.
+ // This makes it so that this procedure may be picked up by another
+ // worker even though another worker may currently be running this
+ // procedure. TODO.
+ env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent());
+ }
+
+ protected boolean isServerOnline(final MasterProcedureEnv env, final RegionStateNode regionNode) {
+ return isServerOnline(env, regionNode.getRegionLocation());
+ }
+
+ protected boolean isServerOnline(final MasterProcedureEnv env, final ServerName serverName) {
+ return env.getMasterServices().getServerManager().isServerOnline(serverName);
+ }
+
+ @Override
+ protected Procedure[] execute(final MasterProcedureEnv env) throws ProcedureSuspendedException {
+ final AssignmentManager am = env.getAssignmentManager();
+ final RegionStateNode regionNode = getRegionState(env);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("" + transitionState + " " + this + "; " + regionNode.toShortString());
+ }
+ if (!am.addRegionInTransition(regionNode, this)) {
+ String msg = String.format(
+ "There is already another procedure running on this region this=%s owner=%s",
+ this, regionNode.getProcedure());
+ LOG.warn(msg + " " + this + "; " + regionNode.toShortString());
+ setAbortFailure(getClass().getSimpleName(), msg);
+ return null;
+ }
+ try {
+ boolean retry;
+ do {
+ retry = false;
+ switch (transitionState) {
+ case REGION_TRANSITION_QUEUE:
+ // 1. push into the AM queue for balancer policy
+ if (!startTransition(env, regionNode)) {
+ // The operation figured it is done or it aborted; check getException()
+ am.removeRegionInTransition(getRegionState(env), this);
+ return null;
+ }
+ transitionState = RegionTransitionState.REGION_TRANSITION_DISPATCH;
+ if (env.getProcedureScheduler().waitEvent(regionNode.getProcedureEvent(), this)) {
+ throw new ProcedureSuspendedException();
+ }
+ break;
+
+ case REGION_TRANSITION_DISPATCH:
+ // 2. send the request to the target server
+ if (!updateTransition(env, regionNode)) {
+ // The operation figured it is done or it aborted; check getException()
+ am.removeRegionInTransition(regionNode, this);
+ return null;
+ }
+ if (transitionState != RegionTransitionState.REGION_TRANSITION_DISPATCH) {
+ retry = true;
+ break;
+ }
+ if (env.getProcedureScheduler().waitEvent(regionNode.getProcedureEvent(), this)) {
+ throw new ProcedureSuspendedException();
+ }
+ break;
+
+ case REGION_TRANSITION_FINISH:
+ // 3. wait assignment response. completion/failure
+ finishTransition(env, regionNode);
+ am.removeRegionInTransition(regionNode, this);
+ return null;
+ }
+ } while (retry);
+ } catch (IOException e) {
+ LOG.warn("Retryable error trying to transition: " +
+ this + "; " + regionNode.toShortString(), e);
+ }
+
+ return new Procedure[] {this};
+ }
+
+ @Override
+ protected void rollback(final MasterProcedureEnv env) {
+ if (isRollbackSupported(transitionState)) {
+ // Nothing done up to this point. abort safely.
+ // This should happen when something like disableTable() is triggered.
+ env.getAssignmentManager().removeRegionInTransition(getRegionState(env), this);
+ return;
+ }
+
+ // There is no rollback for assignment unless we cancel the operation by
+ // dropping/disabling the table.
+ throw new UnsupportedOperationException("Unhandled state " + transitionState +
+ "; there is no rollback for assignment unless we cancel the operation by " +
+ "dropping/disabling the table");
+ }
+
+ protected abstract boolean isRollbackSupported(final RegionTransitionState state);
+
+ @Override
+ protected boolean abort(final MasterProcedureEnv env) {
+ if (isRollbackSupported(transitionState)) {
+ aborted.set(true);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ protected LockState acquireLock(final MasterProcedureEnv env) {
+ // Unless we are assigning meta, wait for meta to be available and loaded.
+ if (!isMeta() && (env.waitFailoverCleanup(this) ||
+ env.getAssignmentManager().waitMetaInitialized(this, getRegionInfo()))) {
+ return LockState.LOCK_EVENT_WAIT;
+ }
+
+ // TODO: Revisit this and move it to the executor
+ if (env.getProcedureScheduler().waitRegion(this, getRegionInfo())) {
+ try {
+ LOG.debug(LockState.LOCK_EVENT_WAIT + " pid=" + getProcId() + " " +
+ env.getProcedureScheduler().dumpLocks());
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ return LockState.LOCK_EVENT_WAIT;
+ }
+ this.lock = true;
+ return LockState.LOCK_ACQUIRED;
+ }
+
+ @Override
+ protected void releaseLock(final MasterProcedureEnv env) {
+ env.getProcedureScheduler().wakeRegion(this, getRegionInfo());
+ lock = false;
+ }
+
+ @Override
+ protected boolean holdLock(final MasterProcedureEnv env) {
+ return true;
+ }
+
+ @Override
+ protected boolean hasLock(final MasterProcedureEnv env) {
+ return lock;
+ }
+
+ @Override
+ protected boolean shouldWaitClientAck(MasterProcedureEnv env) {
+ // The operation is triggered internally on the server
+ // the client does not know about this procedure.
+ return false;
+ }
+
+ public ServerName getServer() {
+ return this.server;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/f56592fd/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
new file mode 100644
index 0000000..4ed9cb3
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
@@ -0,0 +1,742 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.assignment;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
+import org.apache.hadoop.hbase.master.procedure.AbstractStateMachineTableProcedure;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SplitTableRegionState;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * The procedure to split a region in a table.
+ */
+@InterfaceAudience.Private
+public class SplitTableRegionProcedure
+ extends AbstractStateMachineTableProcedure<SplitTableRegionState> {
+ private static final Log LOG = LogFactory.getLog(SplitTableRegionProcedure.class);
+
+ private Boolean traceEnabled = null;
+
+ private volatile boolean lock = false;
+
+ private HRegionInfo parentHRI;
+ private HRegionInfo daughter_1_HRI;
+ private HRegionInfo daughter_2_HRI;
+
+ public SplitTableRegionProcedure() {
+ // Required by the Procedure framework to create the procedure on replay
+ }
+
+ public SplitTableRegionProcedure(final MasterProcedureEnv env,
+ final HRegionInfo regionToSplit, final byte[] splitRow) throws IOException {
+ super(env);
+
+ checkSplitRow(regionToSplit, splitRow);
+
+ this.parentHRI = regionToSplit;
+
+ final TableName table = regionToSplit.getTable();
+ final long rid = getDaughterRegionIdTimestamp(regionToSplit);
+ this.daughter_1_HRI = new HRegionInfo(table, regionToSplit.getStartKey(), splitRow, false, rid);
+ this.daughter_2_HRI = new HRegionInfo(table, splitRow, regionToSplit.getEndKey(), false, rid);
+ }
+
+ protected void setFailure(Throwable cause) {
+ super.setFailure(getClass().getSimpleName(), cause);
+ }
+
+ private static void checkSplitRow(final HRegionInfo regionToSplit, final byte[] splitRow)
+ throws IOException {
+ if (splitRow == null || splitRow.length == 0) {
+ throw new DoNotRetryIOException("Split row cannot be null");
+ }
+
+ if (Bytes.equals(regionToSplit.getStartKey(), splitRow)) {
+ throw new DoNotRetryIOException(
+ "Split row is equal to startkey: " + Bytes.toStringBinary(splitRow));
+ }
+
+ if (!regionToSplit.containsRow(splitRow)) {
+ throw new DoNotRetryIOException(
+ "Split row is not inside region key range splitKey:" + Bytes.toStringBinary(splitRow) +
+ " region: " + regionToSplit);
+ }
+ }
+
+ /**
+ * Calculate daughter regionid to use.
+ * @param hri Parent {@link HRegionInfo}
+ * @return Daughter region id (timestamp) to use.
+ */
+ private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) {
+ long rid = EnvironmentEdgeManager.currentTime();
+ // Regionid is timestamp. Can't be less than that of parent else will insert
+ // at wrong location in hbase:meta (See HBASE-710).
+ if (rid < hri.getRegionId()) {
+ LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() +
+ " but current time here is " + rid);
+ rid = hri.getRegionId() + 1;
+ }
+ return rid;
+ }
+
+ @Override
+ protected Flow executeFromState(final MasterProcedureEnv env, final SplitTableRegionState state)
+ throws InterruptedException {
+ if (isTraceEnabled()) {
+ LOG.trace(this + " execute state=" + state);
+ }
+
+ try {
+ switch (state) {
+ case SPLIT_TABLE_REGION_PREPARE:
+ if (prepareSplitRegion(env)) {
+ setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION);
+ break;
+ } else {
+ assert isFailed() : "split region should have an exception here";
+ return Flow.NO_MORE_STATE;
+ }
+ case SPLIT_TABLE_REGION_PRE_OPERATION:
+ preSplitRegion(env);
+ setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_CLOSE_PARENT_REGION);
+ break;
+ case SPLIT_TABLE_REGION_CLOSE_PARENT_REGION:
+ addChildProcedure(createUnassignProcedures(env, getRegionReplication(env)));
+ setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS);
+ break;
+ case SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS:
+ createDaughterRegions(env);
+ setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_PONR);
+ break;
+ case SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_PONR:
+ preSplitRegionBeforePONR(env);
+ setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_UPDATE_META);
+ break;
+ case SPLIT_TABLE_REGION_UPDATE_META:
+ updateMetaForDaughterRegions(env);
+ setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_PONR);
+ break;
+ case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_PONR:
+ preSplitRegionAfterPONR(env);
+ setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS);
+ break;
+ case SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS:
+ addChildProcedure(createAssignProcedures(env, getRegionReplication(env)));
+ setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_POST_OPERATION);
+ break;
+ case SPLIT_TABLE_REGION_POST_OPERATION:
+ postSplitRegion(env);
+ return Flow.NO_MORE_STATE;
+ default:
+ throw new UnsupportedOperationException(this + " unhandled state=" + state);
+ }
+ } catch (IOException e) {
+ String msg = "Error trying to split region " + parentHRI.getEncodedName() + " in the table "
+ + getTableName() + " (in state=" + state + ")";
+ if (!isRollbackSupported(state)) {
+ // We reach a state that cannot be rolled back. We just need to keep retry.
+ LOG.warn(msg, e);
+ } else {
+ LOG.error(msg, e);
+ setFailure(e);
+ }
+ }
+ return Flow.HAS_MORE_STATE;
+ }
+
+ @Override
+ protected void rollbackState(final MasterProcedureEnv env, final SplitTableRegionState state)
+ throws IOException, InterruptedException {
+ if (isTraceEnabled()) {
+ LOG.trace(this + " rollback state=" + state);
+ }
+
+ try {
+ switch (state) {
+ case SPLIT_TABLE_REGION_POST_OPERATION:
+ case SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS:
+ case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_PONR:
+ case SPLIT_TABLE_REGION_UPDATE_META:
+ // PONR
+ throw new UnsupportedOperationException(this + " unhandled state=" + state);
+ case SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_PONR:
+ break;
+ case SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS:
+ // Doing nothing, as re-open parent region would clean up daughter region directories.
+ break;
+ case SPLIT_TABLE_REGION_CLOSE_PARENT_REGION:
+ openParentRegion(env);
+ break;
+ case SPLIT_TABLE_REGION_PRE_OPERATION:
+ postRollBackSplitRegion(env);
+ break;
+ case SPLIT_TABLE_REGION_PREPARE:
+ break; // nothing to do
+ default:
+ throw new UnsupportedOperationException(this + " unhandled state=" + state);
+ }
+ } catch (IOException e) {
+ // This will be retried. Unless there is a bug in the code,
+ // this should be just a "temporary error" (e.g. network down)
+ LOG.warn("Failed rollback attempt step " + state + " for splitting the region "
+ + parentHRI.getEncodedName() + " in table " + getTableName(), e);
+ throw e;
+ }
+ }
+
+ /*
+ * Check whether we are in the state that can be rollback
+ */
+ @Override
+ protected boolean isRollbackSupported(final SplitTableRegionState state) {
+ switch (state) {
+ case SPLIT_TABLE_REGION_POST_OPERATION:
+ case SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS:
+ case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_PONR:
+ case SPLIT_TABLE_REGION_UPDATE_META:
+ // It is not safe to rollback if we reach to these states.
+ return false;
+ default:
+ break;
+ }
+ return true;
+ }
+
+ @Override
+ protected SplitTableRegionState getState(final int stateId) {
+ return SplitTableRegionState.forNumber(stateId);
+ }
+
+ @Override
+ protected int getStateId(final SplitTableRegionState state) {
+ return state.getNumber();
+ }
+
+ @Override
+ protected SplitTableRegionState getInitialState() {
+ return SplitTableRegionState.SPLIT_TABLE_REGION_PREPARE;
+ }
+
+ @Override
+ public void serializeStateData(final OutputStream stream) throws IOException {
+ super.serializeStateData(stream);
+
+ final MasterProcedureProtos.SplitTableRegionStateData.Builder splitTableRegionMsg =
+ MasterProcedureProtos.SplitTableRegionStateData.newBuilder()
+ .setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
+ .setParentRegionInfo(HRegionInfo.convert(parentHRI))
+ .addChildRegionInfo(HRegionInfo.convert(daughter_1_HRI))
+ .addChildRegionInfo(HRegionInfo.convert(daughter_2_HRI));
+ splitTableRegionMsg.build().writeDelimitedTo(stream);
+ }
+
+ @Override
+ public void deserializeStateData(final InputStream stream) throws IOException {
+ super.deserializeStateData(stream);
+
+ final MasterProcedureProtos.SplitTableRegionStateData splitTableRegionsMsg =
+ MasterProcedureProtos.SplitTableRegionStateData.parseDelimitedFrom(stream);
+ setUser(MasterProcedureUtil.toUserInfo(splitTableRegionsMsg.getUserInfo()));
+ parentHRI = HRegionInfo.convert(splitTableRegionsMsg.getParentRegionInfo());
+ assert(splitTableRegionsMsg.getChildRegionInfoCount() == 2);
+ daughter_1_HRI = HRegionInfo.convert(splitTableRegionsMsg.getChildRegionInfo(0));
+ daughter_2_HRI = HRegionInfo.convert(splitTableRegionsMsg.getChildRegionInfo(1));
+ }
+
+ @Override
+ public void toStringClassDetails(StringBuilder sb) {
+ sb.append(getClass().getSimpleName());
+ sb.append(" table=");
+ sb.append(getTableName());
+ sb.append(", parent=");
+ sb.append(parentHRI.getShortNameToLog());
+ sb.append(", daughterA=");
+ sb.append(daughter_1_HRI.getShortNameToLog());
+ sb.append(", daughterB=");
+ sb.append(daughter_2_HRI.getShortNameToLog());
+ }
+
+ @Override
+ protected LockState acquireLock(final MasterProcedureEnv env) {
+ if (env.waitInitialized(this)) return LockState.LOCK_EVENT_WAIT;
+
+ if (env.getProcedureScheduler().waitRegions(this, getTableName(), parentHRI)) {
+ try {
+ LOG.debug(LockState.LOCK_EVENT_WAIT + " " + env.getProcedureScheduler().dumpLocks());
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ return LockState.LOCK_EVENT_WAIT;
+ }
+ this.lock = true;
+ return LockState.LOCK_ACQUIRED;
+ }
+
+ @Override
+ protected void releaseLock(final MasterProcedureEnv env) {
+ this.lock = false;
+ env.getProcedureScheduler().wakeRegions(this, getTableName(), parentHRI);
+ }
+
+ @Override
+ public TableName getTableName() {
+ return parentHRI.getTable();
+ }
+
+ @Override
+ public TableOperationType getTableOperationType() {
+ return TableOperationType.SPLIT;
+ }
+
+ private byte[] getSplitRow() {
+ return daughter_2_HRI.getStartKey();
+ }
+
+ private static State [] EXPECTED_SPLIT_STATES = new State [] {State.OPEN, State.CLOSED};
+ /**
+ * Prepare to Split region.
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ @VisibleForTesting
+ public boolean prepareSplitRegion(final MasterProcedureEnv env) throws IOException {
+ // Check whether the region is splittable
+ RegionStateNode node = env.getAssignmentManager().getRegionStates().getRegionNode(parentHRI);
+ if (node != null) {
+ parentHRI = node.getRegionInfo();
+
+ // expected parent to be online or closed
+ if (!node.isInState(EXPECTED_SPLIT_STATES)) {
+ setFailure(new IOException("Split " + parentHRI.getRegionNameAsString() +
+ " FAILED because state=" + node.getState() + "; expected " +
+ Arrays.toString(EXPECTED_SPLIT_STATES)));
+ return false;
+ }
+
+ // lookup the parent HRI state from the AM, which has the latest updated info.
+ if (parentHRI.isSplit() || parentHRI.isOffline()) {
+ setFailure(new IOException("Split " + parentHRI.getRegionNameAsString() + " FAILED because " +
+ "offline/split already."));
+ return false;
+ }
+ }
+
+ // since we have the lock and the master is coordinating the operation
+ // we are always able to split the region
+ if (!env.getMasterServices().isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) {
+ LOG.warn("split switch is off! skip split of " + parentHRI);
+ setFailure(new IOException("Split region " + parentHRI.getRegionNameAsString() +
+ " failed due to split switch off"));
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Action before splitting region in a table.
+ * @param env MasterProcedureEnv
+ * @param state the procedure state
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private void preSplitRegion(final MasterProcedureEnv env)
+ throws IOException, InterruptedException {
+ final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ cpHost.preSplitRegionAction(getTableName(), getSplitRow(), getUser());
+ }
+ }
+
+ /**
+ * Action after rollback a split table region action.
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ private void postRollBackSplitRegion(final MasterProcedureEnv env) throws IOException {
+ final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ cpHost.postRollBackSplitRegionAction(getUser());
+ }
+ }
+
+ /**
+ * Rollback close parent region
+ * @param env MasterProcedureEnv
+ **/
+ private void openParentRegion(final MasterProcedureEnv env) throws IOException {
+ // Check whether the region is closed; if so, open it in the same server
+ final int regionReplication = getRegionReplication(env);
+ final ServerName serverName = getParentRegionServerName(env);
+
+ final AssignProcedure[] procs = new AssignProcedure[regionReplication];
+ for (int i = 0; i < regionReplication; ++i) {
+ final HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(parentHRI, i);
+ procs[i] = env.getAssignmentManager().createAssignProcedure(hri, serverName);
+ }
+ env.getMasterServices().getMasterProcedureExecutor().submitProcedures(procs);
+ }
+
+ /**
+ * Create daughter regions
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ @VisibleForTesting
+ public void createDaughterRegions(final MasterProcedureEnv env) throws IOException {
+ final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+ final Path tabledir = FSUtils.getTableDir(mfs.getRootDir(), parentHRI.getTable());
+ final FileSystem fs = mfs.getFileSystem();
+ HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
+ env.getMasterConfiguration(), fs, tabledir, parentHRI, false);
+ regionFs.createSplitsDir();
+
+ Pair<Integer, Integer> expectedReferences = splitStoreFiles(env, regionFs);
+
+ assertReferenceFileCount(fs, expectedReferences.getFirst(),
+ regionFs.getSplitsDir(daughter_1_HRI));
+ //Move the files from the temporary .splits to the final /table/region directory
+ regionFs.commitDaughterRegion(daughter_1_HRI);
+ assertReferenceFileCount(fs, expectedReferences.getFirst(),
+ new Path(tabledir, daughter_1_HRI.getEncodedName()));
+
+ assertReferenceFileCount(fs, expectedReferences.getSecond(),
+ regionFs.getSplitsDir(daughter_2_HRI));
+ regionFs.commitDaughterRegion(daughter_2_HRI);
+ assertReferenceFileCount(fs, expectedReferences.getSecond(),
+ new Path(tabledir, daughter_2_HRI.getEncodedName()));
+ }
+
+ /**
+ * Create Split directory
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ private Pair<Integer, Integer> splitStoreFiles(
+ final MasterProcedureEnv env,
+ final HRegionFileSystem regionFs) throws IOException {
+ final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+ final Configuration conf = env.getMasterConfiguration();
+
+ // The following code sets up a thread pool executor with as many slots as
+ // there's files to split. It then fires up everything, waits for
+ // completion and finally checks for any exception
+ //
+ // Note: splitStoreFiles creates daughter region dirs under the parent splits dir
+ // Nothing to unroll here if failure -- re-run createSplitsDir will
+ // clean this up.
+ int nbFiles = 0;
+ for (String family: regionFs.getFamilies()) {
+ final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
+ if (storeFiles != null) {
+ nbFiles += storeFiles.size();
+ }
+ }
+ if (nbFiles == 0) {
+ // no file needs to be splitted.
+ return new Pair<Integer, Integer>(0,0);
+ }
+ // Max #threads is the smaller of the number of storefiles or the default max determined above.
+ int maxThreads = Math.min(
+ conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX,
+ conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT)),
+ nbFiles);
+ LOG.info("Preparing to split " + nbFiles + " storefiles for region " + parentHRI +
+ " using " + maxThreads + " threads");
+ final ExecutorService threadPool = Executors.newFixedThreadPool(
+ maxThreads, Threads.getNamedThreadFactory("StoreFileSplitter-%1$d"));
+ final List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>>(nbFiles);
+
+ // Split each store file.
+ final HTableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
+ for (String family: regionFs.getFamilies()) {
+ final HColumnDescriptor hcd = htd.getFamily(family.getBytes());
+ final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
+ if (storeFiles != null && storeFiles.size() > 0) {
+ final CacheConfig cacheConf = new CacheConfig(conf, hcd);
+ for (StoreFileInfo storeFileInfo: storeFiles) {
+ StoreFileSplitter sfs = new StoreFileSplitter(
+ regionFs,
+ family.getBytes(),
+ new StoreFile(
+ mfs.getFileSystem(), storeFileInfo, conf, cacheConf, hcd.getBloomFilterType()));
+ futures.add(threadPool.submit(sfs));
+ }
+ }
+ }
+ // Shutdown the pool
+ threadPool.shutdown();
+
+ // Wait for all the tasks to finish
+ long fileSplitTimeout = conf.getLong("hbase.master.fileSplitTimeout", 30000);
+ try {
+ boolean stillRunning = !threadPool.awaitTermination(fileSplitTimeout, TimeUnit.MILLISECONDS);
+ if (stillRunning) {
+ threadPool.shutdownNow();
+ // wait for the thread to shutdown completely.
+ while (!threadPool.isTerminated()) {
+ Thread.sleep(50);
+ }
+ throw new IOException("Took too long to split the" +
+ " files and create the references, aborting split");
+ }
+ } catch (InterruptedException e) {
+ throw (InterruptedIOException)new InterruptedIOException().initCause(e);
+ }
+
+ int daughterA = 0;
+ int daughterB = 0;
+ // Look for any exception
+ for (Future<Pair<Path, Path>> future : futures) {
+ try {
+ Pair<Path, Path> p = future.get();
+ daughterA += p.getFirst() != null ? 1 : 0;
+ daughterB += p.getSecond() != null ? 1 : 0;
+ } catch (InterruptedException e) {
+ throw (InterruptedIOException) new InterruptedIOException().initCause(e);
+ } catch (ExecutionException e) {
+ throw new IOException(e);
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Split storefiles for region " + parentHRI + " Daughter A: " + daughterA
+ + " storefiles, Daughter B: " + daughterB + " storefiles.");
+ }
+ return new Pair<Integer, Integer>(daughterA, daughterB);
+ }
+
+ private void assertReferenceFileCount(final FileSystem fs, final int expectedReferenceFileCount,
+ final Path dir) throws IOException {
+ if (expectedReferenceFileCount != 0 &&
+ expectedReferenceFileCount != FSUtils.getRegionReferenceFileCount(fs, dir)) {
+ throw new IOException("Failing split. Expected reference file count isn't equal.");
+ }
+ }
+
+ private Pair<Path, Path> splitStoreFile(final HRegionFileSystem regionFs,
+ final byte[] family, final StoreFile sf) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Splitting started for store file: " + sf.getPath() + " for region: " + parentHRI);
+ }
+
+ final byte[] splitRow = getSplitRow();
+ final String familyName = Bytes.toString(family);
+ final Path path_first =
+ regionFs.splitStoreFile(this.daughter_1_HRI, familyName, sf, splitRow, false, null);
+ final Path path_second =
+ regionFs.splitStoreFile(this.daughter_2_HRI, familyName, sf, splitRow, true, null);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Splitting complete for store file: " + sf.getPath() + " for region: " + parentHRI);
+ }
+ return new Pair<Path,Path>(path_first, path_second);
+ }
+
+ /**
+ * Utility class used to do the file splitting / reference writing
+ * in parallel instead of sequentially.
+ */
+ private class StoreFileSplitter implements Callable<Pair<Path,Path>> {
+ private final HRegionFileSystem regionFs;
+ private final byte[] family;
+ private final StoreFile sf;
+
+ /**
+ * Constructor that takes what it needs to split
+ * @param regionFs the file system
+ * @param family Family that contains the store file
+ * @param sf which file
+ */
+ public StoreFileSplitter(final HRegionFileSystem regionFs, final byte[] family,
+ final StoreFile sf) {
+ this.regionFs = regionFs;
+ this.sf = sf;
+ this.family = family;
+ }
+
+ public Pair<Path,Path> call() throws IOException {
+ return splitStoreFile(regionFs, family, sf);
+ }
+ }
+
+ /**
+ * Post split region actions before the Point-of-No-Return step
+ * @param env MasterProcedureEnv
+ **/
+ private void preSplitRegionBeforePONR(final MasterProcedureEnv env)
+ throws IOException, InterruptedException {
+ final List<Mutation> metaEntries = new ArrayList<Mutation>();
+ final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ if (cpHost.preSplitBeforePONRAction(getSplitRow(), metaEntries, getUser())) {
+ throw new IOException("Coprocessor bypassing region " +
+ parentHRI.getRegionNameAsString() + " split.");
+ }
+ try {
+ for (Mutation p : metaEntries) {
+ HRegionInfo.parseRegionName(p.getRow());
+ }
+ } catch (IOException e) {
+ LOG.error("Row key of mutation from coprocessor is not parsable as region name."
+ + "Mutations from coprocessor should only for hbase:meta table.");
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Add daughter regions to META
+ * @param env MasterProcedureEnv
+ * @throws IOException
+ */
+ private void updateMetaForDaughterRegions(final MasterProcedureEnv env) throws IOException {
+ env.getAssignmentManager().markRegionAsSplit(parentHRI, getParentRegionServerName(env),
+ daughter_1_HRI, daughter_2_HRI);
+ }
+
+ /**
+ * Pre split region actions after the Point-of-No-Return step
+ * @param env MasterProcedureEnv
+ **/
+ private void preSplitRegionAfterPONR(final MasterProcedureEnv env)
+ throws IOException, InterruptedException {
+ final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ cpHost.preSplitAfterPONRAction(getUser());
+ }
+ }
+
+ /**
+ * Post split region actions
+ * @param env MasterProcedureEnv
+ **/
+ private void postSplitRegion(final MasterProcedureEnv env) throws IOException {
+ final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ cpHost.postCompletedSplitRegionAction(daughter_1_HRI, daughter_2_HRI, getUser());
+ }
+ }
+
+ private ServerName getParentRegionServerName(final MasterProcedureEnv env) {
+ return env.getMasterServices().getAssignmentManager()
+ .getRegionStates().getRegionServerOfRegion(parentHRI);
+ }
+
+ private UnassignProcedure[] createUnassignProcedures(final MasterProcedureEnv env,
+ final int regionReplication) {
+ final UnassignProcedure[] procs = new UnassignProcedure[regionReplication];
+ for (int i = 0; i < procs.length; ++i) {
+ final HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(parentHRI, i);
+ procs[i] = env.getAssignmentManager().createUnassignProcedure(hri, null, true);
+ }
+ return procs;
+ }
+
+ private AssignProcedure[] createAssignProcedures(final MasterProcedureEnv env,
+ final int regionReplication) {
+ final ServerName targetServer = getParentRegionServerName(env);
+ final AssignProcedure[] procs = new AssignProcedure[regionReplication * 2];
+ int procsIdx = 0;
+ for (int i = 0; i < regionReplication; ++i) {
+ final HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(daughter_1_HRI, i);
+ procs[procsIdx++] = env.getAssignmentManager().createAssignProcedure(hri, targetServer);
+ }
+ for (int i = 0; i < regionReplication; ++i) {
+ final HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(daughter_2_HRI, i);
+ procs[procsIdx++] = env.getAssignmentManager().createAssignProcedure(hri, targetServer);
+ }
+ return procs;
+ }
+
+ private int getRegionReplication(final MasterProcedureEnv env) throws IOException {
+ final HTableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
+ return htd.getRegionReplication();
+ }
+
+ /**
+ * The procedure could be restarted from a different machine. If the variable is null, we need to
+ * retrieve it.
+ * @return traceEnabled
+ */
+ private boolean isTraceEnabled() {
+ if (traceEnabled == null) {
+ traceEnabled = LOG.isTraceEnabled();
+ }
+ return traceEnabled;
+ }
+
+ @Override
+ protected boolean holdLock(final MasterProcedureEnv env) {
+ return true;
+ }
+
+ @Override
+ protected boolean hasLock(final MasterProcedureEnv env) {
+ return lock;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/f56592fd/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java
new file mode 100644
index 0000000..b910d6f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java
@@ -0,0 +1,224 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.assignment;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.ServerCrashException;
+import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionCloseOperation;
+import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.UnassignRegionStateData;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
+import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
+import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
+
+
+/**
+ * Procedure that describe the unassignment of a single region.
+ * There can only be one RegionTransitionProcedure per region running at the time,
+ * since each procedure takes a lock on the region.
+ *
+ * <p>The Unassign starts by placing a "close region" request in the Remote Dispatcher
+ * queue, and the procedure will then go into a "waiting state".
+ * The Remote Dispatcher will batch the various requests for that server and
+ * they will be sent to the RS for execution.
+ * The RS will complete the open operation by calling master.reportRegionStateTransition().
+ * The AM will intercept the transition report, and notify the procedure.
+ * The procedure will finish the unassign by publishing its new state on meta
+ * or it will retry the unassign.
+ */
+@InterfaceAudience.Private
+public class UnassignProcedure extends RegionTransitionProcedure {
+ private static final Log LOG = LogFactory.getLog(UnassignProcedure.class);
+
+ private final AtomicBoolean serverCrashed = new AtomicBoolean(false);
+
+ // TODO: should this be in a reassign procedure?
+ // ...and keep unassign for 'disable' case?
+ private boolean force;
+
+ public UnassignProcedure() {
+ // Required by the Procedure framework to create the procedure on replay
+ super();
+ }
+
+ public UnassignProcedure(final HRegionInfo regionInfo,
+ final ServerName server, final boolean force) {
+ super(regionInfo);
+ this.server = server;
+ this.force = force;
+
+ // we don't need REGION_TRANSITION_QUEUE, we jump directly to sending the request
+ setTransitionState(RegionTransitionState.REGION_TRANSITION_DISPATCH);
+ }
+
+ @Override
+ public TableOperationType getTableOperationType() {
+ return TableOperationType.UNASSIGN;
+ }
+
+ @Override
+ protected boolean isRollbackSupported(final RegionTransitionState state) {
+ switch (state) {
+ case REGION_TRANSITION_QUEUE:
+ case REGION_TRANSITION_DISPATCH:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ @Override
+ public void serializeStateData(final OutputStream stream) throws IOException {
+ UnassignRegionStateData.Builder state = UnassignRegionStateData.newBuilder()
+ .setTransitionState(getTransitionState())
+ .setDestinationServer(ProtobufUtil.toServerName(server))
+ .setRegionInfo(HRegionInfo.convert(getRegionInfo()));
+ if (force) {
+ state.setForce(true);
+ }
+ state.build().writeDelimitedTo(stream);
+ }
+
+ @Override
+ public void deserializeStateData(final InputStream stream) throws IOException {
+ final UnassignRegionStateData state = UnassignRegionStateData.parseDelimitedFrom(stream);
+ setTransitionState(state.getTransitionState());
+ setRegionInfo(HRegionInfo.convert(state.getRegionInfo()));
+ force = state.getForce();
+ if (state.hasDestinationServer()) {
+ server = ProtobufUtil.toServerName(state.getDestinationServer());
+ }
+ }
+
+ @Override
+ protected boolean startTransition(final MasterProcedureEnv env, final RegionStateNode regionNode) {
+ // nothing to do here. we skip the step in the constructor
+ // by jumping to REGION_TRANSITION_DISPATCH
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected boolean updateTransition(final MasterProcedureEnv env, final RegionStateNode regionNode)
+ throws IOException {
+ // if the region is already closed or offline we can't do much...
+ if (regionNode.isInState(State.CLOSED, State.OFFLINE)) {
+ LOG.info("Not unassigned " + this + "; " + regionNode.toShortString());
+ return false;
+ }
+
+ // if the server is down, mark the operation as complete
+ if (serverCrashed.get() || !isServerOnline(env, regionNode)) {
+ LOG.info("Server already down: " + this + "; " + regionNode.toShortString());
+ return false;
+ }
+
+ // if we haven't started the operation yet, we can abort
+ if (aborted.get() && regionNode.isInState(State.OPEN)) {
+ setAbortFailure(getClass().getSimpleName(), "abort requested");
+ return false;
+ }
+
+ // Mark the region as closing
+ env.getAssignmentManager().markRegionAsClosing(regionNode);
+
+ // Add the close region operation the the server dispatch queue.
+ // The pending close will be dispatched to the server together with the other
+ // pending operation for that server.
+ addToRemoteDispatcher(env, regionNode.getRegionLocation());
+ return true;
+ }
+
+ @Override
+ protected void finishTransition(final MasterProcedureEnv env, final RegionStateNode regionNode)
+ throws IOException {
+ env.getAssignmentManager().markRegionAsClosed(regionNode);
+ }
+
+ @Override
+ public RemoteOperation remoteCallBuild(final MasterProcedureEnv env, final ServerName serverName) {
+ assert serverName.equals(getRegionState(env).getRegionLocation());
+ return new RegionCloseOperation(this, getRegionInfo(), server);
+ }
+
+ @Override
+ protected void reportTransition(final MasterProcedureEnv env, final RegionStateNode regionNode,
+ final TransitionCode code, final long seqId) throws UnexpectedStateException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received report " + code + " " + this + "; " + regionNode.toShortString());
+ }
+ switch (code) {
+ case CLOSED:
+ setTransitionState(RegionTransitionState.REGION_TRANSITION_FINISH);
+ break;
+ default:
+ throw new UnexpectedStateException(String.format(
+ "Received report unexpected transition state=%s for region=%s server=%s, expected CLOSED.",
+ code, regionNode.getRegionInfo(), regionNode.getRegionLocation()));
+ }
+ }
+
+ @Override
+ protected void remoteCallFailed(final MasterProcedureEnv env, final RegionStateNode regionNode,
+ final IOException exception) {
+ // TODO: Is there on-going rpc to cleanup?
+ if (exception instanceof ServerCrashException) {
+ // This exception comes from ServerCrashProcedure after log splitting.
+ // It is ok to let this procedure go on to complete close now.
+ // This will release lock on this region so the subsequent assign
+ // can succeed.
+ setTransitionState(RegionTransitionState.REGION_TRANSITION_FINISH);
+ } else if (exception instanceof RegionServerAbortedException ||
+ exception instanceof RegionServerStoppedException ||
+ exception instanceof ServerNotRunningYetException) {
+ // TODO
+ // RS is aborting, we cannot offline the region since the region may need to do WAL
+ // recovery. Until we see the RS expiration, we should retry.
+ LOG.info("Ignoring; waiting on ServerCrashProcedure", exception);
+ // serverCrashed.set(true);
+ } else if (exception instanceof NotServingRegionException) {
+ LOG.info("IS THIS OK? ANY LOGS TO REPLAY; ACTING AS THOUGH ALL GOOD " + regionNode, exception);
+ setTransitionState(RegionTransitionState.REGION_TRANSITION_FINISH);
+ } else {
+ // TODO: kill the server in case we get an exception we are not able to handle
+ LOG.warn("Killing server; unexpected exception; " +
+ this + "; " + regionNode.toShortString() +
+ " exception=" + exception);
+ env.getMasterServices().getServerManager().expireServer(regionNode.getRegionLocation());
+ serverCrashed.set(true);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/f56592fd/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
index 6410375..6a6c899 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
@@ -1,4 +1,4 @@
-/**
+ /**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -62,9 +62,11 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
/**
- * The base class for load balancers. It provides functions used by
- * {@link org.apache.hadoop.hbase.master.AssignmentManager} to assign regions in the edge cases.
- * It doesn't provide an implementation of the actual balancing algorithm.
+ * The base class for load balancers. It provides the the functions used to by
+ * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to assign regions
+ * in the edge cases. It doesn't provide an implementation of the
+ * actual balancing algorithm.
+ *
*/
public abstract class BaseLoadBalancer implements LoadBalancer {
protected static final int MIN_SERVER_BALANCE = 2;
@@ -202,7 +204,15 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
// Use servername and port as there can be dead servers in this list. We want everything with
// a matching hostname and port to have the same index.
for (ServerName sn : clusterState.keySet()) {
- if (serversToIndex.get(sn.getHostAndPort()) == null) {
+ if (sn == null) {
+ LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " +
+ "skipping; unassigned regions?");
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("EMPTY SERVERNAME " + clusterState.toString());
+ }
+ continue;
+ }
+ if (serversToIndex.get(sn.getAddress().toString()) == null) {
serversToIndex.put(sn.getHostAndPort(), numServers++);
}
if (!hostsToIndex.containsKey(sn.getHostname())) {
@@ -257,6 +267,10 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
+ if (entry.getKey() == null) {
+ LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue());
+ continue;
+ }
int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
// keep the servername if this is the first server name for this hostname
@@ -899,8 +913,11 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
}
}
if (leastLoadedServerIndex != -1) {
- LOG.debug("Pick the least loaded server " + servers[leastLoadedServerIndex].getHostname()
- + " with better locality for region " + regions[region]);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Pick the least loaded server " +
+ servers[leastLoadedServerIndex].getHostname() +
+ " with better locality for region " + regions[region].getShortNameToLog());
+ }
}
return leastLoadedServerIndex;
} else {
http://git-wip-us.apache.org/repos/asf/hbase/blob/f56592fd/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java
index fd98c9c..6d2a404 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java
@@ -709,7 +709,12 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
// No favored nodes, lets unassign.
LOG.warn("Region not on favored nodes, unassign. Region: " + hri
+ " current: " + current + " favored nodes: " + favoredNodes);
- this.services.getAssignmentManager().unassign(hri);
+ try {
+ this.services.getAssignmentManager().unassign(hri);
+ } catch (IOException e) {
+ LOG.warn("Failed unassign", e);
+ continue;
+ }
RegionPlan rp = new RegionPlan(hri, null, null);
regionPlans.add(rp);
misplacedRegions++;
http://git-wip-us.apache.org/repos/asf/hbase/blob/f56592fd/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
index f7e166d..907e745 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
@@ -39,9 +38,8 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -149,19 +147,15 @@ class RegionLocationFinder {
if (services == null) {
return false;
}
- AssignmentManager am = services.getAssignmentManager();
+ final AssignmentManager am = services.getAssignmentManager();
if (am == null) {
return false;
}
- RegionStates regionStates = am.getRegionStates();
- if (regionStates == null) {
- return false;
- }
- Set<HRegionInfo> regions = regionStates.getRegionAssignments().keySet();
+ // TODO: Should this refresh all the regions or only the ones assigned?
boolean includesUserTables = false;
- for (final HRegionInfo hri : regions) {
+ for (final HRegionInfo hri : am.getAssignedRegions()) {
cache.refresh(hri);
includesUserTables = includesUserTables || !hri.isSystemTable();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f56592fd/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java
index 7e8d696..818156d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java
@@ -20,28 +20,27 @@ package org.apache.hadoop.hbase.master.balancer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Random;
import java.util.TreeMap;
-import java.util.Comparator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.util.Pair;
import com.google.common.collect.MinMaxPriorityQueue;
-import org.apache.hadoop.hbase.util.Pair;
/**
* Makes decisions about the placement and movement of Regions across
@@ -54,7 +53,7 @@ import org.apache.hadoop.hbase.util.Pair;
* locations for all Regions in a cluster.
*
* <p>This classes produces plans for the
- * {@link org.apache.hadoop.hbase.master.AssignmentManager} to execute.
+ * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to execute.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class SimpleLoadBalancer extends BaseLoadBalancer {
http://git-wip-us.apache.org/repos/asf/hbase/blob/f56592fd/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
index 53db1f2..4b96bc6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
@@ -293,9 +293,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
if (total <= 0 || sumMultiplier <= 0
|| (sumMultiplier > 0 && (total / sumMultiplier) < minCostNeedBalance)) {
- LOG.info("Skipping load balancing because balanced cluster; " + "total cost is " + total
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Skipping load balancing because balanced cluster; " + "total cost is " + total
+ ", sum multiplier is " + sumMultiplier + " min cost which need balance is "
+ minCostNeedBalance);
+ }
return false;
}
return true;
@@ -1153,11 +1155,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
stats = new double[cluster.numServers];
}
- for (int i =0; i < cluster.numServers; i++) {
+ for (int i = 0; i < cluster.numServers; i++) {
stats[i] = 0;
for (int regionIdx : cluster.regionsPerServer[i]) {
if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
- stats[i] ++;
+ stats[i]++;
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f56592fd/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java
index 03fdaef..6ebadb4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java
@@ -52,9 +52,8 @@ public abstract class AbstractStateMachineNamespaceProcedure<TState>
@Override
public void toStringClassDetails(final StringBuilder sb) {
sb.append(getClass().getSimpleName());
- sb.append(" (namespace=");
+ sb.append(", namespace=");
sb.append(getNamespaceName());
- sb.append(")");
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/f56592fd/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java
index 7bb2887..34c1853 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
@@ -100,7 +99,10 @@ public class AddColumnFamilyProcedure
setNextState(AddColumnFamilyState.ADD_COLUMN_FAMILY_REOPEN_ALL_REGIONS);
break;
case ADD_COLUMN_FAMILY_REOPEN_ALL_REGIONS:
- reOpenAllRegionsIfTableIsOnline(env);
+ if (env.getAssignmentManager().isTableEnabled(getTableName())) {
+ addChildProcedure(env.getAssignmentManager()
+ .createReopenProcedures(getRegionInfoList(env)));
+ }
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException(this + " unhandled state=" + state);
@@ -285,7 +287,8 @@ public class AddColumnFamilyProcedure
env.getMasterServices().getTableDescriptors().add(unmodifiedHTableDescriptor);
// Make sure regions are opened after table descriptor is updated.
- reOpenAllRegionsIfTableIsOnline(env);
+ //reOpenAllRegionsIfTableIsOnline(env);
+ // TODO: NUKE ROLLBACK!!!!
}
}
@@ -302,25 +305,6 @@ public class AddColumnFamilyProcedure
}
/**
- * Last action from the procedure - executed when online schema change is supported.
- * @param env MasterProcedureEnv
- * @throws IOException
- */
- private void reOpenAllRegionsIfTableIsOnline(final MasterProcedureEnv env) throws IOException {
- // This operation only run when the table is enabled.
- if (!env.getMasterServices().getTableStateManager()
- .isTableState(getTableName(), TableState.State.ENABLED)) {
- return;
- }
-
- if (MasterDDLOperationHelper.reOpenAllRegions(env, getTableName(), getRegionInfoList(env))) {
- LOG.info("Completed add column family operation on table " + getTableName());
- } else {
- LOG.warn("Error on reopening the regions on table " + getTableName());
- }
- }
-
- /**
* The procedure could be restarted from a different machine. If the variable is null, we need to
* retrieve it.
* @return traceEnabled
@@ -362,7 +346,8 @@ public class AddColumnFamilyProcedure
private List<HRegionInfo> getRegionInfoList(final MasterProcedureEnv env) throws IOException {
if (regionInfoList == null) {
- regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, getTableName());
+ regionInfoList = env.getAssignmentManager().getRegionStates()
+ .getRegionsOfTable(getTableName());
}
return regionInfoList;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f56592fd/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloneSnapshotProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloneSnapshotProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloneSnapshotProcedure.java
index 683d840..c1d0326 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloneSnapshotProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloneSnapshotProcedure.java
@@ -149,10 +149,12 @@ public class CloneSnapshotProcedure
setNextState(CloneSnapshotState.CLONE_SNAPSHOT_ASSIGN_REGIONS);
break;
case CLONE_SNAPSHOT_ASSIGN_REGIONS:
- CreateTableProcedure.assignRegions(env, getTableName(), newRegions);
+ CreateTableProcedure.setEnablingState(env, getTableName());
+ addChildProcedure(env.getAssignmentManager().createAssignProcedures(newRegions));
setNextState(CloneSnapshotState.CLONE_SNAPSHOT_UPDATE_DESC_CACHE);
break;
case CLONE_SNAPSHOT_UPDATE_DESC_CACHE:
+ CreateTableProcedure.setEnabledState(env, getTableName());
CreateTableProcedure.updateTableDescCache(env, getTableName());
setNextState(CloneSnapshotState.CLONE_SNAPHOST_RESTORE_ACL);
break;
http://git-wip-us.apache.org/repos/asf/hbase/blob/f56592fd/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
index ced7abc..c3900dd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -107,10 +106,12 @@ public class CreateTableProcedure
setNextState(CreateTableState.CREATE_TABLE_ASSIGN_REGIONS);
break;
case CREATE_TABLE_ASSIGN_REGIONS:
- assignRegions(env, getTableName(), newRegions);
+ setEnablingState(env, getTableName());
+ addChildProcedure(env.getAssignmentManager().createAssignProcedures(newRegions));
setNextState(CreateTableState.CREATE_TABLE_UPDATE_DESC_CACHE);
break;
case CREATE_TABLE_UPDATE_DESC_CACHE:
+ setEnabledState(env, getTableName());
updateTableDescCache(env, getTableName());
setNextState(CreateTableState.CREATE_TABLE_POST_OPERATION);
break;
@@ -333,21 +334,21 @@ public class CreateTableProcedure
protected static List<HRegionInfo> addTableToMeta(final MasterProcedureEnv env,
final HTableDescriptor hTableDescriptor,
final List<HRegionInfo> regions) throws IOException {
- if (regions != null && regions.size() > 0) {
- ProcedureSyncWait.waitMetaRegions(env);
+ assert (regions != null && regions.size() > 0) : "expected at least 1 region, got " + regions;
- // Add regions to META
- addRegionsToMeta(env, hTableDescriptor, regions);
- // Add replicas if needed
- List<HRegionInfo> newRegions = addReplicas(env, hTableDescriptor, regions);
+ ProcedureSyncWait.waitMetaRegions(env);
- // Setup replication for region replicas if needed
- if (hTableDescriptor.getRegionReplication() > 1) {
- ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterConfiguration());
- }
- return newRegions;
+ // Add replicas if needed
+ List<HRegionInfo> newRegions = addReplicas(env, hTableDescriptor, regions);
+
+ // Add regions to META
+ addRegionsToMeta(env, hTableDescriptor, newRegions);
+
+ // Setup replication for region replicas if needed
+ if (hTableDescriptor.getRegionReplication() > 1) {
+ ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterConfiguration());
}
- return regions;
+ return newRegions;
}
/**
@@ -374,18 +375,16 @@ public class CreateTableProcedure
return hRegionInfos;
}
- protected static void assignRegions(final MasterProcedureEnv env,
- final TableName tableName, final List<HRegionInfo> regions) throws IOException {
- ProcedureSyncWait.waitRegionServers(env);
+ protected static void setEnablingState(final MasterProcedureEnv env, final TableName tableName)
+ throws IOException {
// Mark the table as Enabling
env.getMasterServices().getTableStateManager()
.setTableState(tableName, TableState.State.ENABLING);
+ }
- // Trigger immediate assignment of the regions in round-robin fashion
- final AssignmentManager assignmentManager = env.getMasterServices().getAssignmentManager();
- ModifyRegionUtils.assignRegions(assignmentManager, regions);
-
+ protected static void setEnabledState(final MasterProcedureEnv env, final TableName tableName)
+ throws IOException {
// Enable table
env.getMasterServices().getTableStateManager()
.setTableState(tableName, TableState.State.ENABLED);
http://git-wip-us.apache.org/repos/asf/hbase/blob/f56592fd/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
index 096172a..78bd715 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -106,7 +105,10 @@ public class DeleteColumnFamilyProcedure
setNextState(DeleteColumnFamilyState.DELETE_COLUMN_FAMILY_REOPEN_ALL_REGIONS);
break;
case DELETE_COLUMN_FAMILY_REOPEN_ALL_REGIONS:
- reOpenAllRegionsIfTableIsOnline(env);
+ if (env.getAssignmentManager().isTableEnabled(getTableName())) {
+ addChildProcedure(env.getAssignmentManager()
+ .createReopenProcedures(getRegionInfoList(env)));
+ }
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException(this + " unhandled state=" + state);
@@ -292,7 +294,8 @@ public class DeleteColumnFamilyProcedure
env.getMasterServices().getTableDescriptors().add(unmodifiedHTableDescriptor);
// Make sure regions are opened after table descriptor is updated.
- reOpenAllRegionsIfTableIsOnline(env);
+ //reOpenAllRegionsIfTableIsOnline(env);
+ // TODO: NUKE ROLLBACK!!!!
}
/**
@@ -316,25 +319,6 @@ public class DeleteColumnFamilyProcedure
}
/**
- * Last action from the procedure - executed when online schema change is supported.
- * @param env MasterProcedureEnv
- * @throws IOException
- */
- private void reOpenAllRegionsIfTableIsOnline(final MasterProcedureEnv env) throws IOException {
- // This operation only run when the table is enabled.
- if (!env.getMasterServices().getTableStateManager()
- .isTableState(getTableName(), TableState.State.ENABLED)) {
- return;
- }
-
- if (MasterDDLOperationHelper.reOpenAllRegions(env, getTableName(), getRegionInfoList(env))) {
- LOG.info("Completed delete column family operation on table " + getTableName());
- } else {
- LOG.warn("Error on reopening the regions on table " + getTableName());
- }
- }
-
- /**
* The procedure could be restarted from a different machine. If the variable is null, we need to
* retrieve it.
* @return traceEnabled
@@ -376,7 +360,8 @@ public class DeleteColumnFamilyProcedure
private List<HRegionInfo> getRegionInfoList(final MasterProcedureEnv env) throws IOException {
if (regionInfoList == null) {
- regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, getTableName());
+ regionInfoList = env.getAssignmentManager().getRegionStates()
+ .getRegionsOfTable(getTableName());
}
return regionInfoList;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f56592fd/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
index bda68eb..04dfc60 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.exceptions.HBaseException;
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
-import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.mob.MobConstants;
@@ -97,8 +96,8 @@ public class DeleteTableProcedure
}
// TODO: Move out... in the acquireLock()
- LOG.debug("waiting for '" + getTableName() + "' regions in transition");
- regions = ProcedureSyncWait.getRegionsFromMeta(env, getTableName());
+ LOG.debug("Waiting for '" + getTableName() + "' regions in transition");
+ regions = env.getAssignmentManager().getRegionStates().getRegionsOfTable(getTableName());
assert regions != null && !regions.isEmpty() : "unexpected 0 regions";
ProcedureSyncWait.waitRegionInTransition(env, regions);
@@ -350,8 +349,7 @@ public class DeleteTableProcedure
final TableName tableName) throws IOException {
Connection connection = env.getMasterServices().getConnection();
Scan tableScan = MetaTableAccessor.getScanForTableName(connection, tableName);
- try (Table metaTable =
- connection.getTable(TableName.META_TABLE_NAME)) {
+ try (Table metaTable = connection.getTable(TableName.META_TABLE_NAME)) {
List<Delete> deletes = new ArrayList<>();
try (ResultScanner resScanner = metaTable.getScanner(tableScan)) {
for (Result result : resScanner) {
@@ -385,11 +383,9 @@ public class DeleteTableProcedure
protected static void deleteAssignmentState(final MasterProcedureEnv env,
final TableName tableName) throws IOException {
- final AssignmentManager am = env.getMasterServices().getAssignmentManager();
-
// Clean up regions of the table in RegionStates.
LOG.debug("Removing '" + tableName + "' from region states.");
- am.getRegionStates().tableDeleted(tableName);
+ env.getMasterServices().getAssignmentManager().deleteTable(tableName);
// If entry for this table states, remove it.
LOG.debug("Marking '" + tableName + "' as deleted.");