You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/24 07:20:39 UTC
[16/27] hbase git commit: HBASE-14614 Procedure v2 - Core Assignment
Manager (Matteo Bertozzi) Move to a new AssignmentManager,
one that describes Assignment using a State Machine built on top of
ProcedureV2 facility.
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
deleted file mode 100644
index 3a2a6d7..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master;
-
-import com.google.common.base.Preconditions;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.master.RegionState.State;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.MultiHConnection;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * A helper to persist region state in meta. We may change this class
- * to StateStore later if we also use it to store other states in meta
- */
-@InterfaceAudience.Private
-public class RegionStateStore {
- private static final Log LOG = LogFactory.getLog(RegionStateStore.class);
-
- /** The delimiter for meta columns for replicaIds > 0 */
- protected static final char META_REPLICA_ID_DELIMITER = '_';
-
- private volatile Region metaRegion;
- private volatile boolean initialized;
- private MultiHConnection multiHConnection;
- private final MasterServices server;
-
- /**
- * Returns the {@link ServerName} from catalog table {@link Result}
- * where the region is transitioning. It should be the same as
- * {@link MetaTableAccessor#getServerName(Result,int)} if the server is at OPEN state.
- * @param r Result to pull the transitioning server name from
- * @return A ServerName instance or {@link MetaTableAccessor#getServerName(Result,int)}
- * if necessary fields not found or empty.
- */
- static ServerName getRegionServer(final Result r, int replicaId) {
- Cell cell = r.getColumnLatestCell(HConstants.CATALOG_FAMILY, getServerNameColumn(replicaId));
- if (cell == null || cell.getValueLength() == 0) {
- RegionLocations locations = MetaTableAccessor.getRegionLocations(r);
- if (locations != null) {
- HRegionLocation location = locations.getRegionLocation(replicaId);
- if (location != null) {
- return location.getServerName();
- }
- }
- return null;
- }
- return ServerName.parseServerName(Bytes.toString(cell.getValueArray(),
- cell.getValueOffset(), cell.getValueLength()));
- }
-
- private static byte[] getServerNameColumn(int replicaId) {
- return replicaId == 0
- ? HConstants.SERVERNAME_QUALIFIER
- : Bytes.toBytes(HConstants.SERVERNAME_QUALIFIER_STR + META_REPLICA_ID_DELIMITER
- + String.format(HRegionInfo.REPLICA_ID_FORMAT, replicaId));
- }
-
- /**
- * Pull the region state from a catalog table {@link Result}.
- * @param r Result to pull the region state from
- * @return the region state, or OPEN if there's no value written.
- */
- static State getRegionState(final Result r, int replicaId) {
- Cell cell = r.getColumnLatestCell(HConstants.CATALOG_FAMILY, getStateColumn(replicaId));
- if (cell == null || cell.getValueLength() == 0) return State.OPEN;
- return State.valueOf(Bytes.toString(cell.getValueArray(),
- cell.getValueOffset(), cell.getValueLength()));
- }
-
- private static byte[] getStateColumn(int replicaId) {
- return replicaId == 0
- ? HConstants.STATE_QUALIFIER
- : Bytes.toBytes(HConstants.STATE_QUALIFIER_STR + META_REPLICA_ID_DELIMITER
- + String.format(HRegionInfo.REPLICA_ID_FORMAT, replicaId));
- }
-
- /**
- * Check if we should persist a state change in meta. Generally it's
- * better to persist all state changes. However, we should not do that
- * if the region is not in meta at all. Based on the state and the
- * previous state, we can identify if a user region has an entry
- * in meta. For example, merged regions are deleted from meta;
- * New merging parents, or splitting daughters are
- * not created in meta yet.
- */
- private boolean shouldPersistStateChange(
- HRegionInfo hri, RegionState state, RegionState oldState) {
- return !hri.isMetaRegion() && !RegionStates.isOneOfStates(
- state, State.MERGING_NEW, State.SPLITTING_NEW, State.MERGED)
- && !(RegionStates.isOneOfStates(state, State.OFFLINE)
- && RegionStates.isOneOfStates(oldState, State.MERGING_NEW,
- State.SPLITTING_NEW, State.MERGED));
- }
-
- RegionStateStore(final MasterServices server) {
- this.server = server;
- initialized = false;
- }
-
- void start() throws IOException {
- if (server instanceof RegionServerServices) {
- metaRegion = ((RegionServerServices)server).getFromOnlineRegions(
- HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
- }
- // When meta is not colocated on master
- if (metaRegion == null) {
- Configuration conf = server.getConfiguration();
- // Config to determine the no of HConnections to META.
- // A single Connection should be sufficient in most cases. Only if
- // you are doing lot of writes (>1M) to META,
- // increasing this value might improve the write throughput.
- multiHConnection =
- new MultiHConnection(conf, conf.getInt("hbase.regionstatestore.meta.connection", 1));
- }
- initialized = true;
- }
-
- void stop() {
- initialized = false;
- if (multiHConnection != null) {
- multiHConnection.close();
- }
- }
-
- void updateRegionState(long openSeqNum,
- RegionState newState, RegionState oldState) {
- try {
- HRegionInfo hri = newState.getRegion();
-
- // Update meta before checking for initialization. Meta state stored in zk.
- if (hri.isMetaRegion()) {
- // persist meta state in MetaTableLocator (which in turn is zk storage currently)
- try {
- MetaTableLocator.setMetaLocation(server.getZooKeeper(),
- newState.getServerName(), hri.getReplicaId(), newState.getState());
- return; // Done
- } catch (KeeperException e) {
- throw new IOException("Failed to update meta ZNode", e);
- }
- }
-
- if (!initialized
- || !shouldPersistStateChange(hri, newState, oldState)) {
- return;
- }
-
- ServerName oldServer = oldState != null ? oldState.getServerName() : null;
- ServerName serverName = newState.getServerName();
- State state = newState.getState();
-
- int replicaId = hri.getReplicaId();
- Put metaPut = new Put(MetaTableAccessor.getMetaKeyForRegion(hri));
- StringBuilder info = new StringBuilder("Updating hbase:meta row ");
- info.append(hri.getRegionNameAsString()).append(" with state=").append(state);
- if (serverName != null && !serverName.equals(oldServer)) {
- metaPut.addImmutable(HConstants.CATALOG_FAMILY, getServerNameColumn(replicaId),
- Bytes.toBytes(serverName.getServerName()));
- info.append(", sn=").append(serverName);
- }
- if (openSeqNum >= 0) {
- Preconditions.checkArgument(state == State.OPEN
- && serverName != null, "Open region should be on a server");
- MetaTableAccessor.addLocation(metaPut, serverName, openSeqNum, -1, replicaId);
- info.append(", openSeqNum=").append(openSeqNum);
- info.append(", server=").append(serverName);
- }
- metaPut.addImmutable(HConstants.CATALOG_FAMILY, getStateColumn(replicaId),
- Bytes.toBytes(state.name()));
- LOG.info(info);
- HTableDescriptor descriptor = server.getTableDescriptors().get(hri.getTable());
- boolean serial = false;
- if (descriptor != null) {
- serial = server.getTableDescriptors().get(hri.getTable()).hasSerialReplicationScope();
- }
- boolean shouldPutBarrier = serial && state == State.OPEN;
- // Persist the state change to meta
- if (metaRegion != null) {
- try {
- // Assume meta is pinned to master.
- // At least, that's what we want.
- metaRegion.put(metaPut);
- if (shouldPutBarrier) {
- Put barrierPut = MetaTableAccessor.makeBarrierPut(hri.getEncodedNameAsBytes(),
- openSeqNum, hri.getTable().getName());
- metaRegion.put(barrierPut);
- }
- return; // Done here
- } catch (Throwable t) {
- // In unit tests, meta could be moved away by intention
- // So, the shortcut is gone. We won't try to establish the
- // shortcut any more because we prefer meta to be pinned
- // to the master
- synchronized (this) {
- if (metaRegion != null) {
- LOG.info("Meta region shortcut failed", t);
- if (multiHConnection == null) {
- multiHConnection = new MultiHConnection(server.getConfiguration(), 1);
- }
- metaRegion = null;
- }
- }
- }
- }
- // Called when meta is not on master
- List<Put> list = shouldPutBarrier ?
- Arrays.asList(metaPut, MetaTableAccessor.makeBarrierPut(hri.getEncodedNameAsBytes(),
- openSeqNum, hri.getTable().getName())) : Collections.singletonList(metaPut);
- multiHConnection.processBatchCallback(list, TableName.META_TABLE_NAME, null, null);
-
- } catch (IOException ioe) {
- LOG.error("Failed to persist region state " + newState, ioe);
- server.abort("Failed to update region location", ioe);
- }
- }
-
- void splitRegion(HRegionInfo p,
- HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) throws IOException {
- MetaTableAccessor.splitRegion(server.getConnection(), p, a, b, sn, regionReplication,
- server.getTableDescriptors().get(p.getTable()).hasSerialReplicationScope());
- }
-
- void mergeRegions(HRegionInfo p,
- HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) throws IOException {
- MetaTableAccessor.mergeRegions(server.getConnection(), p, a, b, sn, regionReplication,
- EnvironmentEdgeManager.currentTime(),
- server.getTableDescriptors().get(p.getTable()).hasSerialReplicationScope());
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
deleted file mode 100644
index dcbf5a4..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
+++ /dev/null
@@ -1,1170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.ServerLoad;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
-import org.apache.hadoop.hbase.master.RegionState.State;
-import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.Pair;
-
-/**
- * Region state accountant. It holds the states of all regions in the memory.
- * In normal scenario, it should match the meta table and the true region states.
- *
- * This map is used by AssignmentManager to track region states.
- */
-@InterfaceAudience.Private
-public class RegionStates {
- private static final Log LOG = LogFactory.getLog(RegionStates.class);
-
- public final static RegionStateStampComparator REGION_STATE_COMPARATOR =
- new RegionStateStampComparator();
-
- // This comparator sorts the RegionStates by time stamp then Region name.
- // Comparing by timestamp alone can lead us to discard different RegionStates that happen
- // to share a timestamp.
- private static class RegionStateStampComparator implements Comparator<RegionState> {
- @Override
- public int compare(RegionState l, RegionState r) {
- return Long.compare(l.getStamp(), r.getStamp()) == 0 ?
- Bytes.compareTo(l.getRegion().getRegionName(), r.getRegion().getRegionName()) :
- Long.compare(l.getStamp(), r.getStamp());
- }
- }
-
- /**
- * Regions currently in transition.
- */
- final HashMap<String, RegionState> regionsInTransition = new HashMap<>();
-
- /**
- * Region encoded name to state map.
- * All the regions should be in this map.
- */
- private final Map<String, RegionState> regionStates = new HashMap<>();
-
- /**
- * Holds mapping of table -> region state
- */
- private final Map<TableName, Map<String, RegionState>> regionStatesTableIndex = new HashMap<>();
-
- /**
- * Server to regions assignment map.
- * Contains the set of regions currently assigned to a given server.
- */
- private final Map<ServerName, Set<HRegionInfo>> serverHoldings = new HashMap<>();
-
- /**
- * Maintains the mapping from the default region to the replica regions.
- */
- private final Map<HRegionInfo, Set<HRegionInfo>> defaultReplicaToOtherReplicas = new HashMap<>();
-
- /**
- * Region to server assignment map.
- * Contains the server a given region is currently assigned to.
- */
- private final TreeMap<HRegionInfo, ServerName> regionAssignments = new TreeMap<>();
-
- /**
- * Encoded region name to server assignment map for re-assignment
- * purpose. Contains the server a given region is last known assigned
- * to, which has not completed log splitting, so not assignable.
- * If a region is currently assigned, this server info in this
- * map should be the same as that in regionAssignments.
- * However the info in regionAssignments is cleared when the region
- * is offline while the info in lastAssignments is cleared when
- * the region is closed or the server is dead and processed.
- */
- private final HashMap<String, ServerName> lastAssignments = new HashMap<>();
-
- /**
- * Encoded region name to server assignment map for the
- * purpose to clean up serverHoldings when a region is online
- * on a new server. When the region is offline from the previous
- * server, we cleaned up regionAssignments so that it has the
- * latest assignment map. But we didn't clean up serverHoldings
- * to match the meta. We need this map to find out the old server
- * whose serverHoldings needs cleanup, given a moved region.
- */
- private final HashMap<String, ServerName> oldAssignments = new HashMap<>();
-
- /**
- * Map a host port pair string to the latest start code
- * of a region server which is known to be dead. It is dead
- * to us, but server manager may not know it yet.
- */
- private final HashMap<String, Long> deadServers = new HashMap<>();
-
- /**
- * Map a dead servers to the time when log split is done.
- * Since log splitting is not ordered, we have to remember
- * all processed instances. The map is cleaned up based
- * on a configured time. By default, we assume a dead
- * server should be done with log splitting in two hours.
- */
- private final HashMap<ServerName, Long> processedServers = new HashMap<>();
- private long lastProcessedServerCleanTime;
-
- private final TableStateManager tableStateManager;
- private final RegionStateStore regionStateStore;
- private final ServerManager serverManager;
- private final MasterServices server;
-
- // The maximum time to keep a log split info in region states map
- static final String LOG_SPLIT_TIME = "hbase.master.maximum.logsplit.keeptime";
- static final long DEFAULT_LOG_SPLIT_TIME = 7200000L; // 2 hours
-
- RegionStates(final MasterServices master, final TableStateManager tableStateManager,
- final ServerManager serverManager, final RegionStateStore regionStateStore) {
- this.tableStateManager = tableStateManager;
- this.regionStateStore = regionStateStore;
- this.serverManager = serverManager;
- this.server = master;
- }
-
- /**
- * @return a copy of the region assignment map
- */
- public synchronized Map<HRegionInfo, ServerName> getRegionAssignments() {
- return new TreeMap<>(regionAssignments);
- }
-
- /**
- * Return the replicas (including default) for the regions grouped by ServerName
- * @param regions
- * @return a pair containing the groupings as a map
- */
- synchronized Map<ServerName, List<HRegionInfo>> getRegionAssignments(
- Collection<HRegionInfo> regions) {
- Map<ServerName, List<HRegionInfo>> map = new HashMap<>();
- for (HRegionInfo region : regions) {
- HRegionInfo defaultReplica = RegionReplicaUtil.getRegionInfoForDefaultReplica(region);
- Set<HRegionInfo> allReplicas = defaultReplicaToOtherReplicas.get(defaultReplica);
- if (allReplicas != null) {
- for (HRegionInfo hri : allReplicas) {
- ServerName server = regionAssignments.get(hri);
- if (server != null) {
- List<HRegionInfo> regionsOnServer = map.get(server);
- if (regionsOnServer == null) {
- regionsOnServer = new ArrayList<>(1);
- map.put(server, regionsOnServer);
- }
- regionsOnServer.add(hri);
- }
- }
- }
- }
- return map;
- }
-
- public synchronized ServerName getRegionServerOfRegion(HRegionInfo hri) {
- return regionAssignments.get(hri);
- }
-
- /**
- * Get regions in transition and their states
- */
- public synchronized Set<RegionState> getRegionsInTransition() {
- return new HashSet<>(regionsInTransition.values());
- }
-
- public synchronized SortedSet<RegionState> getRegionsInTransitionOrderedByTimestamp() {
- final TreeSet<RegionState> rit = new TreeSet<>(REGION_STATE_COMPARATOR);
- for (RegionState rs: regionsInTransition.values()) {
- rit.add(rs);
- }
- return rit;
- }
-
- /**
- * Get the number of regions in transition.
- */
- public synchronized int getRegionsInTransitionCount() {
- return regionsInTransition.size();
- }
-
- /**
- * @return True if specified region in transition.
- */
- public synchronized boolean isRegionInTransition(final HRegionInfo hri) {
- return regionsInTransition.containsKey(hri.getEncodedName());
- }
-
- /**
- * @return True if specified region in transition.
- */
- public synchronized boolean isRegionInTransition(final String encodedName) {
- return regionsInTransition.containsKey(encodedName);
- }
-
- /**
- * @return True if any region in transition.
- */
- public synchronized boolean isRegionsInTransition() {
- return !regionsInTransition.isEmpty();
- }
-
- /**
- * @return True if hbase:meta table region is in transition.
- */
- public synchronized boolean isMetaRegionInTransition() {
- for (RegionState state : regionsInTransition.values()) {
- if (state.getRegion().isMetaRegion()) return true;
- }
- return false;
- }
-
- /**
- * @return True if specified region assigned, and not in transition.
- */
- public synchronized boolean isRegionOnline(final HRegionInfo hri) {
- return !isRegionInTransition(hri) && regionAssignments.containsKey(hri);
- }
-
- /**
- * @return True if specified region offline/closed, but not in transition.
- * If the region is not in the map, it is offline to us too.
- */
- public synchronized boolean isRegionOffline(final HRegionInfo hri) {
- return getRegionState(hri) == null || (!isRegionInTransition(hri)
- && isRegionInState(hri, State.OFFLINE, State.CLOSED));
- }
-
- /**
- * @return True if specified region is in one of the specified states.
- */
- public boolean isRegionInState(
- final HRegionInfo hri, final State... states) {
- return isRegionInState(hri.getEncodedName(), states);
- }
-
- /**
- * @return True if specified region is in one of the specified states.
- */
- public boolean isRegionInState(
- final String encodedName, final State... states) {
- RegionState regionState = getRegionState(encodedName);
- return isOneOfStates(regionState, states);
- }
-
- /**
- * Wait for the state map to be updated by assignment manager.
- */
- public synchronized void waitForUpdate(
- final long timeout) throws InterruptedException {
- this.wait(timeout);
- }
-
- /**
- * Get region transition state
- */
- public RegionState getRegionTransitionState(final HRegionInfo hri) {
- return getRegionTransitionState(hri.getEncodedName());
- }
-
- /**
- * Get region transition state
- */
- public synchronized RegionState
- getRegionTransitionState(final String encodedName) {
- return regionsInTransition.get(encodedName);
- }
-
- /**
- * Add a list of regions to RegionStates. If a region is split
- * and offline, its state will be SPLIT. Otherwise, its state will
- * be OFFLINE. Region already in RegionStates will be skipped.
- */
- public void createRegionStates(
- final List<HRegionInfo> hris) {
- for (HRegionInfo hri: hris) {
- createRegionState(hri);
- }
- }
-
- /**
- * Add a region to RegionStates. If the region is split
- * and offline, its state will be SPLIT. Otherwise, its state will
- * be OFFLINE. If it is already in RegionStates, this call has
- * no effect, and the original state is returned.
- */
- public RegionState createRegionState(final HRegionInfo hri) {
- return createRegionState(hri, null, null, null);
- }
-
- /**
- * Add a region to RegionStates with the specified state.
- * If the region is already in RegionStates, this call has
- * no effect, and the original state is returned.
- *
- * @param hri the region info to create a state for
- * @param newState the state to the region in set to
- * @param serverName the server the region is transitioning on
- * @param lastHost the last server that hosts the region
- * @return the current state
- */
- public synchronized RegionState createRegionState(final HRegionInfo hri,
- State newState, ServerName serverName, ServerName lastHost) {
- if (newState == null || (newState == State.OPEN && serverName == null)) {
- newState = State.OFFLINE;
- }
- if (hri.isOffline() && hri.isSplit()) {
- newState = State.SPLIT;
- serverName = null;
- }
- String encodedName = hri.getEncodedName();
- RegionState regionState = regionStates.get(encodedName);
- if (regionState != null) {
- LOG.warn("Tried to create a state for a region already in RegionStates, "
- + "used existing: " + regionState + ", ignored new: " + newState);
- } else {
- regionState = new RegionState(hri, newState, serverName);
- putRegionState(regionState);
- if (newState == State.OPEN) {
- if (!serverName.equals(lastHost)) {
- LOG.warn("Open region's last host " + lastHost
- + " should be the same as the current one " + serverName
- + ", ignored the last and used the current one");
- lastHost = serverName;
- }
- lastAssignments.put(encodedName, lastHost);
- regionAssignments.put(hri, lastHost);
- } else if (!isOneOfStates(regionState, State.MERGED, State.SPLIT, State.OFFLINE)) {
- regionsInTransition.put(encodedName, regionState);
- }
- if (lastHost != null && newState != State.SPLIT) {
- addToServerHoldings(lastHost, hri);
- if (newState != State.OPEN) {
- oldAssignments.put(encodedName, lastHost);
- }
- }
- }
- return regionState;
- }
-
- private RegionState putRegionState(RegionState regionState) {
- HRegionInfo hri = regionState.getRegion();
- String encodedName = hri.getEncodedName();
- TableName table = hri.getTable();
- RegionState oldState = regionStates.put(encodedName, regionState);
- Map<String, RegionState> map = regionStatesTableIndex.get(table);
- if (map == null) {
- map = new HashMap<>();
- regionStatesTableIndex.put(table, map);
- }
- map.put(encodedName, regionState);
- return oldState;
- }
-
- /**
- * Update a region state. It will be put in transition if not already there.
- */
- public RegionState updateRegionState(
- final HRegionInfo hri, final State state) {
- RegionState regionState = getRegionState(hri.getEncodedName());
- return updateRegionState(hri, state,
- regionState == null ? null : regionState.getServerName());
- }
-
- /**
- * Update a region state. It will be put in transition if not already there.
- */
- public RegionState updateRegionState(
- final HRegionInfo hri, final State state, final ServerName serverName) {
- return updateRegionState(hri, state, serverName, HConstants.NO_SEQNUM);
- }
-
- public void regionOnline(final HRegionInfo hri, final ServerName serverName) {
- regionOnline(hri, serverName, HConstants.NO_SEQNUM);
- }
-
- /**
- * A region is online, won't be in transition any more.
- * We can't confirm it is really online on specified region server
- * because it hasn't been put in region server's online region list yet.
- */
- public void regionOnline(final HRegionInfo hri, final ServerName serverName, long openSeqNum) {
- String encodedName = hri.getEncodedName();
- if (!serverManager.isServerOnline(serverName)) {
- // This is possible if the region server dies before master gets a
- // chance to handle ZK event in time. At this time, if the dead server
- // is already processed by SSH, we should ignore this event.
- // If not processed yet, ignore and let SSH deal with it.
- LOG.warn("Ignored, " + encodedName + " was opened on a dead server: " + serverName);
- return;
- }
- updateRegionState(hri, State.OPEN, serverName, openSeqNum);
-
- synchronized (this) {
- RegionState regionState = regionsInTransition.remove(encodedName);
- // When region is online and remove from regionsInTransition,
- // update the RIT duration to assignment manager metrics
- if (regionState != null && this.server.getAssignmentManager() != null) {
- long ritDuration = System.currentTimeMillis() - regionState.getStamp()
- + regionState.getRitDuration();
- this.server.getAssignmentManager().getAssignmentManagerMetrics()
- .updateRitDuration(ritDuration);
- }
- ServerName oldServerName = regionAssignments.put(hri, serverName);
- if (!serverName.equals(oldServerName)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Onlined " + hri.getShortNameToLog() + " on " + serverName);
- }
- addToServerHoldings(serverName, hri);
- addToReplicaMapping(hri);
- if (oldServerName == null) {
- oldServerName = oldAssignments.remove(encodedName);
- }
- if (oldServerName != null
- && !oldServerName.equals(serverName)
- && serverHoldings.containsKey(oldServerName)) {
- LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName);
- removeFromServerHoldings(oldServerName, hri);
- }
- }
- }
- }
-
- private void addToServerHoldings(ServerName serverName, HRegionInfo hri) {
- Set<HRegionInfo> regions = serverHoldings.get(serverName);
- if (regions == null) {
- regions = new HashSet<>();
- serverHoldings.put(serverName, regions);
- }
- regions.add(hri);
- }
-
- private void addToReplicaMapping(HRegionInfo hri) {
- HRegionInfo defaultReplica = RegionReplicaUtil.getRegionInfoForDefaultReplica(hri);
- Set<HRegionInfo> replicas =
- defaultReplicaToOtherReplicas.get(defaultReplica);
- if (replicas == null) {
- replicas = new HashSet<>();
- defaultReplicaToOtherReplicas.put(defaultReplica, replicas);
- }
- replicas.add(hri);
- }
-
- private void removeFromServerHoldings(ServerName serverName, HRegionInfo hri) {
- Set<HRegionInfo> oldRegions = serverHoldings.get(serverName);
- oldRegions.remove(hri);
- if (oldRegions.isEmpty()) {
- serverHoldings.remove(serverName);
- }
- }
-
- private void removeFromReplicaMapping(HRegionInfo hri) {
- HRegionInfo defaultReplica = RegionReplicaUtil.getRegionInfoForDefaultReplica(hri);
- Set<HRegionInfo> replicas = defaultReplicaToOtherReplicas.get(defaultReplica);
- if (replicas != null) {
- replicas.remove(hri);
- if (replicas.isEmpty()) {
- defaultReplicaToOtherReplicas.remove(defaultReplica);
- }
- }
- }
-
- /**
- * A dead server's wals have been split so that all the regions
- * used to be open on it can be safely assigned now. Mark them assignable.
- */
- public synchronized void logSplit(final ServerName serverName) {
- for (Iterator<Map.Entry<String, ServerName>> it
- = lastAssignments.entrySet().iterator(); it.hasNext();) {
- Map.Entry<String, ServerName> e = it.next();
- if (e.getValue().equals(serverName)) {
- it.remove();
- }
- }
- long now = System.currentTimeMillis();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding to log splitting servers " + serverName);
- }
- processedServers.put(serverName, Long.valueOf(now));
- Configuration conf = server.getConfiguration();
- long obsoleteTime = conf.getLong(LOG_SPLIT_TIME, DEFAULT_LOG_SPLIT_TIME);
- // Doesn't have to be very accurate about the clean up time
- if (now > lastProcessedServerCleanTime + obsoleteTime) {
- lastProcessedServerCleanTime = now;
- long cutoff = now - obsoleteTime;
- for (Iterator<Map.Entry<ServerName, Long>> it
- = processedServers.entrySet().iterator(); it.hasNext();) {
- Map.Entry<ServerName, Long> e = it.next();
- if (e.getValue().longValue() < cutoff) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Removed from log splitting servers " + e.getKey());
- }
- it.remove();
- }
- }
- }
- }
-
- /**
- * Log split is done for a given region, so it is assignable now.
- */
- public void logSplit(final HRegionInfo region) {
- clearLastAssignment(region);
- }
-
- public synchronized void clearLastAssignment(final HRegionInfo region) {
- lastAssignments.remove(region.getEncodedName());
- }
-
- /**
- * A region is offline, won't be in transition any more.
- */
- public void regionOffline(final HRegionInfo hri) {
- regionOffline(hri, null);
- }
-
- /**
- * A region is offline, won't be in transition any more. Its state
- * should be the specified expected state, which can only be
- * Split/Merged/Offline/null(=Offline)/SplittingNew/MergingNew.
- */
- public void regionOffline(
- final HRegionInfo hri, final State expectedState) {
- Preconditions.checkArgument(expectedState == null
- || RegionState.isUnassignable(expectedState),
- "Offlined region should not be " + expectedState);
- if (isRegionInState(hri, State.SPLITTING_NEW, State.MERGING_NEW)) {
- // Remove it from all region maps
- deleteRegion(hri);
- return;
- }
- State newState =
- expectedState == null ? State.OFFLINE : expectedState;
- updateRegionState(hri, newState);
- String encodedName = hri.getEncodedName();
- synchronized (this) {
- regionsInTransition.remove(encodedName);
- ServerName oldServerName = regionAssignments.remove(hri);
- if (oldServerName != null && serverHoldings.containsKey(oldServerName)) {
- if (newState == State.MERGED || newState == State.SPLIT
- || hri.isMetaRegion() || tableStateManager.isTableState(hri.getTable(),
- TableState.State.DISABLED, TableState.State.DISABLING)) {
- // Offline the region only if it's merged/split, or the table is disabled/disabling.
- // Otherwise, offline it from this server only when it is online on a different server.
- LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName);
- removeFromServerHoldings(oldServerName, hri);
- removeFromReplicaMapping(hri);
- } else {
- // Need to remember it so that we can offline it from this
- // server when it is online on a different server.
- oldAssignments.put(encodedName, oldServerName);
- }
- }
- }
- }
-
- /**
- * A server is offline, all regions on it are dead.
- */
- public List<HRegionInfo> serverOffline(final ServerName sn) {
- // Offline all regions on this server not already in transition.
- List<HRegionInfo> rits = new ArrayList<>();
- Set<HRegionInfo> regionsToCleanIfNoMetaEntry = new HashSet<>();
- // Offline regions outside the loop and synchronized block to avoid
- // ConcurrentModificationException and deadlock in case of meta anassigned,
- // but RegionState a blocked.
- Set<HRegionInfo> regionsToOffline = new HashSet<>();
- synchronized (this) {
- Set<HRegionInfo> assignedRegions = serverHoldings.get(sn);
- if (assignedRegions == null) {
- assignedRegions = new HashSet<>();
- }
-
- for (HRegionInfo region : assignedRegions) {
- // Offline open regions, no need to offline if SPLIT/MERGED/OFFLINE
- if (isRegionOnline(region)) {
- regionsToOffline.add(region);
- } else if (isRegionInState(region, State.SPLITTING, State.MERGING)) {
- LOG.debug("Offline splitting/merging region " + getRegionState(region));
- regionsToOffline.add(region);
- }
- }
-
- for (RegionState state : regionsInTransition.values()) {
- HRegionInfo hri = state.getRegion();
- if (assignedRegions.contains(hri)) {
- // Region is open on this region server, but in transition.
- // This region must be moving away from this server, or splitting/merging.
- // SSH will handle it, either skip assigning, or re-assign.
- LOG.info("Transitioning " + state + " will be handled by ServerCrashProcedure for " + sn);
- } else if (sn.equals(state.getServerName())) {
- // Region is in transition on this region server, and this
- // region is not open on this server. So the region must be
- // moving to this server from another one (i.e. opening or
- // pending open on this server, was open on another one.
- // Offline state is also kind of pending open if the region is in
- // transition. The region could be in failed_close state too if we have
- // tried several times to open it while this region server is not reachable)
- if (isOneOfStates(state, State.OPENING, State.PENDING_OPEN,
- State.FAILED_OPEN, State.FAILED_CLOSE, State.OFFLINE)) {
- LOG.info("Found region in " + state +
- " to be reassigned by ServerCrashProcedure for " + sn);
- rits.add(hri);
- } else if (isOneOfStates(state, State.SPLITTING_NEW, State.MERGING_NEW)) {
- regionsToCleanIfNoMetaEntry.add(state.getRegion());
- } else {
- LOG.warn("THIS SHOULD NOT HAPPEN: unexpected " + state);
- }
- }
- }
- this.notifyAll();
- }
-
- for (HRegionInfo hri : regionsToOffline) {
- regionOffline(hri);
- }
-
- cleanIfNoMetaEntry(regionsToCleanIfNoMetaEntry);
- return rits;
- }
-
- /**
- * This method does an RPC to hbase:meta. Do not call this method with a lock/synchronize held.
- * @param hris The hris to check if empty in hbase:meta and if so, clean them up.
- */
- private void cleanIfNoMetaEntry(Set<HRegionInfo> hris) {
- if (hris.isEmpty()) return;
- for (HRegionInfo hri: hris) {
- try {
- // This is RPC to meta table. It is done while we have a synchronize on
- // regionstates. No progress will be made if meta is not available at this time.
- // This is a cleanup task. Not critical.
- if (MetaTableAccessor.getRegion(server.getConnection(), hri.getEncodedNameAsBytes()) ==
- null) {
- regionOffline(hri);
- FSUtils.deleteRegionDir(server.getConfiguration(), hri);
- }
- } catch (IOException e) {
- LOG.warn("Got exception while deleting " + hri + " directories from file system.", e);
- }
- }
- }
-
- /**
- * Gets the online regions of the specified table.
- * This method looks at the in-memory state. It does not go to <code>hbase:meta</code>.
- * Only returns <em>online</em> regions. If a region on this table has been
- * closed during a disable, etc., it will be included in the returned list.
- * So, the returned list may not necessarily be ALL regions in this table, its
- * all the ONLINE regions in the table.
- * @param tableName
- * @return Online regions from <code>tableName</code>
- */
- public synchronized List<HRegionInfo> getRegionsOfTable(TableName tableName) {
- List<HRegionInfo> tableRegions = new ArrayList<>();
- // boundary needs to have table's name but regionID 0 so that it is sorted
- // before all table's regions.
- HRegionInfo boundary = new HRegionInfo(tableName, null, null, false, 0L);
- for (HRegionInfo hri: regionAssignments.tailMap(boundary).keySet()) {
- if(!hri.getTable().equals(tableName)) break;
- tableRegions.add(hri);
- }
- return tableRegions;
- }
-
- /**
- * Gets current state of all regions of the table.
- * This method looks at the in-memory state. It does not go to <code>hbase:meta</code>.
- * Method guaranteed to return keys for all states
- * in {@link org.apache.hadoop.hbase.master.RegionState.State}
- *
- * @param tableName
- * @return Online regions from <code>tableName</code>
- */
- public synchronized Map<RegionState.State, List<HRegionInfo>>
- getRegionByStateOfTable(TableName tableName) {
- Map<RegionState.State, List<HRegionInfo>> tableRegions = new HashMap<>();
- for (State state : State.values()) {
- tableRegions.put(state, new ArrayList<>());
- }
- Map<String, RegionState> indexMap = regionStatesTableIndex.get(tableName);
- if (indexMap == null)
- return tableRegions;
- for (RegionState regionState : indexMap.values()) {
- tableRegions.get(regionState.getState()).add(regionState.getRegion());
- }
- return tableRegions;
- }
-
- /**
- * Wait on region to clear regions-in-transition.
- * <p>
- * If the region isn't in transition, returns immediately. Otherwise, method
- * blocks until the region is out of transition.
- */
- public synchronized void waitOnRegionToClearRegionsInTransition(
- final HRegionInfo hri) throws InterruptedException {
- if (!isRegionInTransition(hri)) return;
-
- while(!server.isStopped() && isRegionInTransition(hri)) {
- RegionState rs = getRegionState(hri);
- LOG.info("Waiting on " + rs + " to clear regions-in-transition");
- waitForUpdate(100);
- }
-
- if (server.isStopped()) {
- LOG.info("Giving up wait on region in " +
- "transition because stoppable.isStopped is set");
- }
- }
-
- /**
- * A table is deleted. Remove its regions from all internal maps.
- * We loop through all regions assuming we don't delete tables too much.
- */
- public void tableDeleted(final TableName tableName) {
- Set<HRegionInfo> regionsToDelete = new HashSet<>();
- synchronized (this) {
- for (RegionState state: regionStates.values()) {
- HRegionInfo region = state.getRegion();
- if (region.getTable().equals(tableName)) {
- regionsToDelete.add(region);
- }
- }
- }
- for (HRegionInfo region: regionsToDelete) {
- deleteRegion(region);
- }
- }
-
- /**
- * Get a copy of all regions assigned to a server
- */
- public synchronized Set<HRegionInfo> getServerRegions(ServerName serverName) {
- Set<HRegionInfo> regions = serverHoldings.get(serverName);
- if (regions == null) return null;
- return new HashSet<>(regions);
- }
-
- /**
- * Remove a region from all state maps.
- */
- @VisibleForTesting
- public synchronized void deleteRegion(final HRegionInfo hri) {
- String encodedName = hri.getEncodedName();
- regionsInTransition.remove(encodedName);
- regionStates.remove(encodedName);
- TableName table = hri.getTable();
- Map<String, RegionState> indexMap = regionStatesTableIndex.get(table);
- indexMap.remove(encodedName);
- if (indexMap.isEmpty())
- regionStatesTableIndex.remove(table);
- lastAssignments.remove(encodedName);
- ServerName sn = regionAssignments.remove(hri);
- if (sn != null) {
- Set<HRegionInfo> regions = serverHoldings.get(sn);
- regions.remove(hri);
- }
- }
-
- /**
- * Checking if a region was assigned to a server which is not online now.
- * If so, we should hold re-assign this region till SSH has split its wals.
- * Once logs are split, the last assignment of this region will be reset,
- * which means a null last assignment server is ok for re-assigning.
- *
- * A region server could be dead but we don't know it yet. We may
- * think it's online falsely. Therefore if a server is online, we still
- * need to confirm it reachable and having the expected start code.
- */
- synchronized boolean wasRegionOnDeadServer(final String encodedName) {
- ServerName server = lastAssignments.get(encodedName);
- return isServerDeadAndNotProcessed(server);
- }
-
- synchronized boolean isServerDeadAndNotProcessed(ServerName server) {
- if (server == null) return false;
- if (serverManager.isServerOnline(server)) {
- String hostAndPort = server.getHostAndPort();
- long startCode = server.getStartcode();
- Long deadCode = deadServers.get(hostAndPort);
- if (deadCode == null || startCode > deadCode.longValue()) {
- if (serverManager.isServerReachable(server)) {
- return false;
- }
- // The size of deadServers won't grow unbounded.
- deadServers.put(hostAndPort, Long.valueOf(startCode));
- }
- // Watch out! If the server is not dead, the region could
- // remain unassigned. That's why ServerManager#isServerReachable
- // should use some retry.
- //
- // We cache this info since it is very unlikely for that
- // instance to come back up later on. We don't want to expire
- // the server since we prefer to let it die naturally.
- LOG.warn("Couldn't reach online server " + server);
- }
- // Now, we know it's dead. Check if it's processed
- return !processedServers.containsKey(server);
- }
-
- /**
- * Get the last region server a region was on for purpose of re-assignment,
- * i.e. should the re-assignment be held back till log split is done?
- */
- synchronized ServerName getLastRegionServerOfRegion(final String encodedName) {
- return lastAssignments.get(encodedName);
- }
-
- synchronized void setLastRegionServerOfRegions(
- final ServerName serverName, final List<HRegionInfo> regionInfos) {
- for (HRegionInfo hri: regionInfos) {
- setLastRegionServerOfRegion(serverName, hri.getEncodedName());
- }
- }
-
- synchronized void setLastRegionServerOfRegion(
- final ServerName serverName, final String encodedName) {
- lastAssignments.put(encodedName, serverName);
- }
-
- synchronized boolean isRegionOnServer(
- final HRegionInfo hri, final ServerName serverName) {
- Set<HRegionInfo> regions = serverHoldings.get(serverName);
- return regions == null ? false : regions.contains(hri);
- }
-
- public void prepareAssignDaughters(HRegionInfo a, HRegionInfo b) {
- synchronized (this) {
- if (isRegionInState(a, State.SPLITTING_NEW)) {
- updateRegionState(a, State.OFFLINE, null);
- }
- if (isRegionInState(b, State.SPLITTING_NEW)) {
- updateRegionState(b, State.OFFLINE, null);
- }
- }
- }
-
- public void prepareAssignMergedRegion(HRegionInfo mergedRegion) {
- synchronized (this) {
- if (isRegionInState(mergedRegion, State.MERGING_NEW)) {
- updateRegionState(mergedRegion, State.OFFLINE, null);
- }
- }
- }
-
- void splitRegion(HRegionInfo p,
- HRegionInfo a, HRegionInfo b, ServerName sn) throws IOException {
-
- regionStateStore.splitRegion(p, a, b, sn, getRegionReplication(p));
- synchronized (this) {
- // After PONR, split is considered to be done.
- // Update server holdings to be aligned with the meta.
- Set<HRegionInfo> regions = serverHoldings.get(sn);
- if (regions == null) {
- throw new IllegalStateException(sn + " should host some regions");
- }
- regions.remove(p);
- regions.add(a);
- regions.add(b);
- }
- }
-
- void mergeRegions(HRegionInfo p,
- HRegionInfo a, HRegionInfo b, ServerName sn) throws IOException {
- regionStateStore.mergeRegions(p, a, b, sn, getRegionReplication(a));
- synchronized (this) {
- // After PONR, merge is considered to be done.
- // Update server holdings to be aligned with the meta.
- Set<HRegionInfo> regions = serverHoldings.get(sn);
- if (regions == null) {
- throw new IllegalStateException(sn + " should host some regions");
- }
- regions.remove(a);
- regions.remove(b);
- regions.add(p);
- }
- }
-
- private int getRegionReplication(HRegionInfo r) throws IOException {
- if (tableStateManager != null) {
- HTableDescriptor htd = server.getTableDescriptors().get(r.getTable());
- if (htd != null) {
- return htd.getRegionReplication();
- }
- }
- return 1;
- }
-
- /**
- * At cluster clean re/start, mark all user regions closed except those of tables
- * that are excluded, such as disabled/disabling/enabling tables. All user regions
- * and their previous locations are returned.
- */
- synchronized Map<HRegionInfo, ServerName> closeAllUserRegions(Set<TableName> excludedTables) {
- boolean noExcludeTables = excludedTables == null || excludedTables.isEmpty();
- Set<HRegionInfo> toBeClosed = new HashSet<>(regionStates.size());
- for(RegionState state: regionStates.values()) {
- HRegionInfo hri = state.getRegion();
- if (state.isSplit() || hri.isSplit()) {
- continue;
- }
- TableName tableName = hri.getTable();
- if (!TableName.META_TABLE_NAME.equals(tableName)
- && (noExcludeTables || !excludedTables.contains(tableName))) {
- toBeClosed.add(hri);
- }
- }
- Map<HRegionInfo, ServerName> allUserRegions = new HashMap<>(toBeClosed.size());
- for (HRegionInfo hri: toBeClosed) {
- RegionState regionState = updateRegionState(hri, State.CLOSED);
- allUserRegions.put(hri, regionState.getServerName());
- }
- return allUserRegions;
- }
-
- /**
- * Compute the average load across all region servers.
- * Currently, this uses a very naive computation - just uses the number of
- * regions being served, ignoring stats about number of requests.
- * @return the average load
- */
- protected synchronized double getAverageLoad() {
- int numServers = 0, totalLoad = 0;
- for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
- Set<HRegionInfo> regions = e.getValue();
- ServerName serverName = e.getKey();
- int regionCount = regions.size();
- if (serverManager.isServerOnline(serverName)) {
- totalLoad += regionCount;
- numServers++;
- }
- }
- if (numServers > 1) {
- // The master region server holds only a couple regions.
- // Don't consider this server in calculating the average load
- // if there are other region servers to avoid possible confusion.
- Set<HRegionInfo> hris = serverHoldings.get(server.getServerName());
- if (hris != null) {
- totalLoad -= hris.size();
- numServers--;
- }
- }
- return numServers == 0 ? 0.0 :
- (double)totalLoad / (double)numServers;
- }
-
- protected Map<TableName, Map<ServerName, List<HRegionInfo>>> getAssignmentsByTable() {
- return getAssignmentsByTable(false);
- }
-
- /**
- * This is an EXPENSIVE clone. Cloning though is the safest thing to do.
- * Can't let out original since it can change and at least the load balancer
- * wants to iterate this exported list. We need to synchronize on regions
- * since all access to this.servers is under a lock on this.regions.
- * @param forceByCluster a flag to force to aggregate the server-load to the cluster level
- * @return A clone of current assignments by table.
- */
- protected Map<TableName, Map<ServerName, List<HRegionInfo>>> getAssignmentsByTable(
- boolean forceByCluster) {
- Map<TableName, Map<ServerName, List<HRegionInfo>>> result;
- synchronized (this) {
- result = getTableRSRegionMap(server.getConfiguration().getBoolean(
- HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE,false) && !forceByCluster);
- }
- Map<ServerName, ServerLoad>
- onlineSvrs = serverManager.getOnlineServers();
- // Take care of servers w/o assignments, and remove servers in draining mode
- List<ServerName> drainingServers = this.serverManager.getDrainingServersList();
- for (Map<ServerName, List<HRegionInfo>> map: result.values()) {
- for (ServerName svr: onlineSvrs.keySet()) {
- if (!map.containsKey(svr)) {
- map.put(svr, new ArrayList<>());
- }
- }
- map.keySet().removeAll(drainingServers);
- }
- return result;
- }
-
- private Map<TableName, Map<ServerName, List<HRegionInfo>>> getTableRSRegionMap(Boolean bytable){
- Map<TableName, Map<ServerName, List<HRegionInfo>>> result = new HashMap<>();
- for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
- for (HRegionInfo hri: e.getValue()) {
- if (hri.isMetaRegion()) continue;
- TableName tablename = bytable ? hri.getTable() : HConstants.ENSEMBLE_TABLE_NAME;
- Map<ServerName, List<HRegionInfo>> svrToRegions = result.get(tablename);
- if (svrToRegions == null) {
- svrToRegions = new HashMap<>(serverHoldings.size());
- result.put(tablename, svrToRegions);
- }
- List<HRegionInfo> regions = svrToRegions.get(e.getKey());
- if (regions == null) {
- regions = new ArrayList<>();
- svrToRegions.put(e.getKey(), regions);
- }
- regions.add(hri);
- }
- }
- return result;
- }
-
- public RegionState getRegionState(final HRegionInfo hri) {
- return getRegionState(hri.getEncodedName());
- }
-
- /**
- * Returns a clone of region assignments per server
- * @return a Map of ServerName to a List of HRegionInfo's
- */
- protected synchronized Map<ServerName, List<HRegionInfo>> getRegionAssignmentsByServer() {
- Map<ServerName, List<HRegionInfo>> regionsByServer = new HashMap<>(serverHoldings.size());
- for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
- regionsByServer.put(e.getKey(), new ArrayList<>(e.getValue()));
- }
- return regionsByServer;
- }
-
- public synchronized RegionState getRegionState(final String encodedName) {
- return regionStates.get(encodedName);
- }
-
- /**
- * Get the HRegionInfo from cache, if not there, from the hbase:meta table.
- * Be careful. Does RPC. Do not hold a lock or synchronize when you call this method.
- * @param regionName
- * @return HRegionInfo for the region
- */
- @SuppressWarnings("deprecation")
- protected HRegionInfo getRegionInfo(final byte [] regionName) {
- String encodedName = HRegionInfo.encodeRegionName(regionName);
- RegionState regionState = getRegionState(encodedName);
- if (regionState != null) {
- return regionState.getRegion();
- }
-
- try {
- Pair<HRegionInfo, ServerName> p =
- MetaTableAccessor.getRegion(server.getConnection(), regionName);
- HRegionInfo hri = p == null ? null : p.getFirst();
- if (hri != null) {
- createRegionState(hri);
- }
- return hri;
- } catch (IOException e) {
- server.abort("Aborting because error occurred while reading "
- + Bytes.toStringBinary(regionName) + " from hbase:meta", e);
- return null;
- }
- }
-
- static boolean isOneOfStates(RegionState regionState, State... states) {
- State s = regionState != null ? regionState.getState() : null;
- for (State state: states) {
- if (s == state) return true;
- }
- return false;
- }
-
- /**
- * Update a region state. It will be put in transition if not already there.
- */
- private RegionState updateRegionState(final HRegionInfo hri,
- final RegionState.State state, final ServerName serverName, long openSeqNum) {
- if (state == RegionState.State.FAILED_CLOSE || state == RegionState.State.FAILED_OPEN) {
- LOG.warn("Failed to open/close " + hri.getShortNameToLog()
- + " on " + serverName + ", set to " + state);
- }
-
- String encodedName = hri.getEncodedName();
- RegionState regionState = new RegionState(
- hri, state, System.currentTimeMillis(), serverName);
- RegionState oldState = getRegionState(encodedName);
- if (!regionState.equals(oldState)) {
- LOG.info("Transition " + oldState + " to " + regionState);
- // Persist region state before updating in-memory info, if needed
- regionStateStore.updateRegionState(openSeqNum, regionState, oldState);
- }
-
- synchronized (this) {
- RegionState oldRegionState = regionsInTransition.put(encodedName, regionState);
- // When region transform old region state to new region state,
- // accumulate the RIT duration to new region state.
- if (oldRegionState != null) {
- regionState.updateRitDuration(oldRegionState.getStamp());
- }
- putRegionState(regionState);
-
- // For these states, region should be properly closed.
- // There should be no log splitting issue.
- if ((state == State.CLOSED || state == State.MERGED
- || state == State.SPLIT) && lastAssignments.containsKey(encodedName)) {
- ServerName last = lastAssignments.get(encodedName);
- if (last.equals(serverName)) {
- lastAssignments.remove(encodedName);
- } else {
- LOG.warn(encodedName + " moved to " + state + " on "
- + serverName + ", expected " + last);
- }
- }
-
- // Once a region is opened, record its last assignment right away.
- if (serverName != null && state == State.OPEN) {
- ServerName last = lastAssignments.get(encodedName);
- if (!serverName.equals(last)) {
- lastAssignments.put(encodedName, serverName);
- if (last != null && isServerDeadAndNotProcessed(last)) {
- LOG.warn(encodedName + " moved to " + serverName
- + ", while it's previous host " + last
- + " is dead but not processed yet");
- }
- }
- }
-
- // notify the change
- this.notifyAll();
- }
- return regionState;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index db0a0e5..6ae9f0f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -57,12 +57,10 @@ import org.apache.hadoop.hbase.ipc.FailedServerException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
-import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -76,7 +74,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavor
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.RetryCounter;
@@ -314,7 +311,8 @@ public class ServerManager {
}
}
- void regionServerReport(ServerName sn,
+ @VisibleForTesting
+ public void regionServerReport(ServerName sn,
ServerLoad sl) throws YouAreDeadException {
checkIsDead(sn, "REPORT");
if (null == this.onlineServers.replace(sn, sl)) {
@@ -614,12 +612,7 @@ public class ServerManager {
return;
}
- boolean carryingMeta = master.getAssignmentManager().isCarryingMeta(serverName);
- ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
- procExec.submitProcedure(new ServerCrashProcedure(
- procExec.getEnvironment(), serverName, true, carryingMeta));
- LOG.debug("Added=" + serverName +
- " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta);
+ master.getAssignmentManager().submitServerCrash(serverName, true);
// Tell our listeners that a server was removed
if (!this.listeners.isEmpty()) {
@@ -629,6 +622,37 @@ public class ServerManager {
}
}
+ /**
+ * Sends an MERGE REGIONS RPC to the specified server to merge the specified
+ * regions.
+ * <p>
+ * A region server could reject the close request because it either does not
+ * have the specified region.
+ * @param server server to merge regions
+ * @param region_a region to merge
+ * @param region_b region to merge
+ * @param forcible true if do a compulsory merge, otherwise we will only merge
+ * two adjacent regions
+ * @throws IOException
+ */
+ public void sendRegionsMerge(ServerName server, HRegionInfo region_a,
+ HRegionInfo region_b, boolean forcible, final User user) throws IOException {
+ if (server == null)
+ throw new NullPointerException("Passed server is null");
+ if (region_a == null || region_b == null)
+ throw new NullPointerException("Passed region is null");
+ AdminService.BlockingInterface admin = getRsAdmin(server);
+ if (admin == null) {
+ throw new IOException("Attempting to send MERGE REGIONS RPC to server "
+ + server.toString() + " for region "
+ + region_a.getRegionNameAsString() + ","
+ + region_b.getRegionNameAsString()
+ + " failed because no RPC connection found to this server");
+ }
+ HBaseRpcController controller = newRpcController();
+ ProtobufUtil.mergeRegions(controller, admin, region_a, region_b, forcible, user);
+ }
+
@VisibleForTesting
public void moveFromOnlineToDeadServers(final ServerName sn) {
synchronized (onlineServers) {
@@ -660,9 +684,7 @@ public class ServerManager {
}
this.deadservers.add(serverName);
- ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
- procExec.submitProcedure(new ServerCrashProcedure(
- procExec.getEnvironment(), serverName, shouldSplitWal, false));
+ master.getAssignmentManager().submitServerCrash(serverName, shouldSplitWal);
}
/**
@@ -748,9 +770,8 @@ public class ServerManager {
throw new IOException("Attempting to send OPEN RPC to server " + server.toString() +
" failed because no RPC connection found to this server");
}
- OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server,
- region, favoredNodes,
- (RecoveryMode.LOG_REPLAY == this.master.getMasterWalManager().getLogRecoveryMode()));
+ OpenRegionRequest request =
+ RequestConverter.buildOpenRegionRequest(server, region, favoredNodes, false);
try {
OpenRegionResponse response = admin.openRegion(null, request);
return ResponseConverter.getRegionOpeningState(response);
@@ -832,8 +853,8 @@ public class ServerManager {
" failed because no RPC connection found to this server");
}
- OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server, regionOpenInfos,
- (RecoveryMode.LOG_REPLAY == this.master.getMasterWalManager().getLogRecoveryMode()));
+ OpenRegionRequest request =
+ RequestConverter.buildOpenRegionRequest(server, regionOpenInfos, false);
try {
OpenRegionResponse response = admin.openRegion(null, request);
return ResponseConverter.getRegionOpeningStateList(response);
@@ -877,30 +898,6 @@ public class ServerManager {
}
/**
- * Sends an CLOSE RPC to the specified server to close the specified region for SPLIT.
- * <p>
- * A region server could reject the close request because it either does not
- * have the specified region or the region is being split.
- * @param server server to close a region
- * @param regionToClose the info of the region(s) to close
- * @throws IOException
- */
- public boolean sendRegionCloseForSplitOrMerge(
- final ServerName server,
- final HRegionInfo... regionToClose) throws IOException {
- if (server == null) {
- throw new NullPointerException("Passed server is null");
- }
- AdminService.BlockingInterface admin = getRsAdmin(server);
- if (admin == null) {
- throw new IOException("Attempting to send CLOSE For Split or Merge RPC to server " +
- server.toString() + " failed because no RPC connection found to this server.");
- }
- HBaseRpcController controller = newRpcController();
- return ProtobufUtil.closeRegionForSplitOrMerge(controller, admin, server, regionToClose);
- }
-
- /**
* Sends a WARMUP RPC to the specified server to warmup the specified region.
* <p>
* A region server could reject the close request because it either does not
@@ -990,7 +987,7 @@ public class ServerManager {
* @throws IOException
* @throws RetriesExhaustedException wrapping a ConnectException if failed
*/
- private AdminService.BlockingInterface getRsAdmin(final ServerName sn)
+ public AdminService.BlockingInterface getRsAdmin(final ServerName sn)
throws IOException {
AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
if (admin == null) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
index 7017d29..2fc2bbb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
@@ -710,7 +710,7 @@ public class SplitLogManager {
long now = EnvironmentEdgeManager.currentTime();
if (now > lastLog + 5000) {
lastLog = now;
- LOG.info("total tasks = " + tot + " unassigned = " + unassigned + " tasks=" + tasks);
+ LOG.info("total=" + tot + ", unassigned=" + unassigned + ", tasks=" + tasks);
}
}
if (resubmitted > 0) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableNamespaceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableNamespaceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableNamespaceManager.java
index 7582d42..4a2c942 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableNamespaceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableNamespaceManager.java
@@ -313,8 +313,9 @@ public class TableNamespaceManager {
}
private boolean isTableAssigned() {
- return !masterServices.getAssignmentManager()
- .getRegionStates().getRegionsOfTable(TableName.NAMESPACE_TABLE_NAME).isEmpty();
+ // TODO: we have a better way now (wait on event)
+ return masterServices.getAssignmentManager()
+ .getRegionStates().hasTableRegionStates(TableName.NAMESPACE_TABLE_NAME);
}
public void validateTableAndRegionCount(NamespaceDescriptor desc) throws IOException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java
index 96ea036..dfc4321 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java
@@ -183,8 +183,9 @@ public class TableStateManager {
@Nullable
protected TableState readMetaState(TableName tableName) throws IOException {
- if (tableName.equals(TableName.META_TABLE_NAME))
+ if (tableName.equals(TableName.META_TABLE_NAME)) {
return new TableState(tableName, TableState.State.ENABLED);
+ }
return MetaTableAccessor.getTableState(master.getConnection(), tableName);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/UnAssignCallable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/UnAssignCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/UnAssignCallable.java
deleted file mode 100644
index ccff6f0..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/UnAssignCallable.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master;
-
-import java.util.concurrent.Callable;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.HRegionInfo;
-
-/**
- * A callable object that invokes the corresponding action that needs to be
- * taken for unassignment of a region in transition. Implementing as future
- * callable we are able to act on the timeout asynchronously.
- */
-@InterfaceAudience.Private
-public class UnAssignCallable implements Callable<Object> {
- private AssignmentManager assignmentManager;
-
- private HRegionInfo hri;
-
- public UnAssignCallable(AssignmentManager assignmentManager, HRegionInfo hri) {
- this.assignmentManager = assignmentManager;
- this.hri = hri;
- }
-
- @Override
- public Object call() throws Exception {
- assignmentManager.unassign(hri);
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java
new file mode 100644
index 0000000..42ece16
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java
@@ -0,0 +1,338 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.assignment;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
+import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionOpenOperation;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.AssignRegionStateData;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
+
+/**
+ * Procedure that describe the assignment of a single region.
+ * There can only be one RegionTransitionProcedure per region running at a time
+ * since each procedure takes a lock on the region.
+ *
+ * <p>The Assign starts by pushing the "assign" operation to the AssignmentManager
+ * and then will go in a "waiting" state.
+ * The AM will batch the "assign" requests and ask the Balancer where to put
+ * the region (the various policies will be respected: retain, round-robin, random).
+ * Once the AM and the balancer have found a place for the region the procedure
+ * will be resumed and an "open region" request will be placed in the Remote Dispatcher
+ * queue, and the procedure once again will go in a "waiting state".
+ * The Remote Dispatcher will batch the various requests for that server and
+ * they will be sent to the RS for execution.
+ * The RS will complete the open operation by calling master.reportRegionStateTransition().
+ * The AM will intercept the transition report, and notify the procedure.
+ * The procedure will finish the assignment by publishing to new state on meta
+ * or it will retry the assignment.
+ *
+ * <p>This procedure does not rollback when beyond the first
+ * REGION_TRANSITION_QUEUE step; it will press on trying to assign in the face of
+ * failure. Should we ignore rollback calls to Assign/Unassign then? Or just
+ * remove rollback here?
+ */
+@InterfaceAudience.Private
+public class AssignProcedure extends RegionTransitionProcedure {
+ private static final Log LOG = LogFactory.getLog(AssignProcedure.class);
+
+ private boolean forceNewPlan = false;
+
+ /**
+ * Gets set as desired target on move, merge, etc., when we want to go to a particular server.
+ * We may not be able to respect this request but will try. When it is NOT set, then we ask
+ * the balancer to assign. This value is used below in startTransition to set regionLocation if
+ * non-null. Setting regionLocation in regionServerNode is how we override balancer setting
+ * destination.
+ */
+ protected volatile ServerName targetServer;
+
+ public AssignProcedure() {
+ // Required by the Procedure framework to create the procedure on replay
+ super();
+ }
+
+ public AssignProcedure(final HRegionInfo regionInfo) {
+ this(regionInfo, false);
+ }
+
+ public AssignProcedure(final HRegionInfo regionInfo, final boolean forceNewPlan) {
+ super(regionInfo);
+ this.forceNewPlan = forceNewPlan;
+ this.targetServer = null;
+ }
+
+ public AssignProcedure(final HRegionInfo regionInfo, final ServerName destinationServer) {
+ super(regionInfo);
+ this.forceNewPlan = false;
+ this.targetServer = destinationServer;
+ }
+
+ @Override
+ public TableOperationType getTableOperationType() {
+ return TableOperationType.REGION_ASSIGN;
+ }
+
+ @Override
+ protected boolean isRollbackSupported(final RegionTransitionState state) {
+ switch (state) {
+ case REGION_TRANSITION_QUEUE:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ @Override
+ public void serializeStateData(final OutputStream stream) throws IOException {
+ final AssignRegionStateData.Builder state = AssignRegionStateData.newBuilder()
+ .setTransitionState(getTransitionState())
+ .setRegionInfo(HRegionInfo.convert(getRegionInfo()));
+ if (forceNewPlan) {
+ state.setForceNewPlan(true);
+ }
+ if (this.targetServer != null) {
+ state.setTargetServer(ProtobufUtil.toServerName(this.targetServer));
+ }
+ state.build().writeDelimitedTo(stream);
+ }
+
+ @Override
+ public void deserializeStateData(final InputStream stream) throws IOException {
+ final AssignRegionStateData state = AssignRegionStateData.parseDelimitedFrom(stream);
+ setTransitionState(state.getTransitionState());
+ setRegionInfo(HRegionInfo.convert(state.getRegionInfo()));
+ forceNewPlan = state.getForceNewPlan();
+ if (state.hasTargetServer()) {
+ this.targetServer = ProtobufUtil.toServerName(state.getTargetServer());
+ }
+ }
+
+ @Override
+ protected boolean startTransition(final MasterProcedureEnv env, final RegionStateNode regionNode)
+ throws IOException {
+ // If the region is already open we can't do much...
+ if (regionNode.isInState(State.OPEN) && isServerOnline(env, regionNode)) {
+ LOG.info("Assigned, not reassigning; " + this + "; " + regionNode.toShortString());
+ return false;
+ }
+ // If the region is SPLIT, we can't assign it.
+ if (regionNode.isInState(State.SPLIT)) {
+ LOG.info("SPLIT, cannot be assigned; " + this + "; " + regionNode.toShortString());
+ return false;
+ }
+
+ // If we haven't started the operation yet, we can abort
+ if (aborted.get() && regionNode.isInState(State.CLOSED, State.OFFLINE)) {
+ if (incrementAndCheckMaxAttempts(env, regionNode)) {
+ regionNode.setState(State.FAILED_OPEN);
+ setFailure(getClass().getSimpleName(),
+ new RetriesExhaustedException("Max attempts exceeded"));
+ } else {
+ setAbortFailure(getClass().getSimpleName(), "Abort requested");
+ }
+ return false;
+ }
+
+ // Send assign (add into assign-pool). Region is now in OFFLINE state. Setting offline state
+ // scrubs what was the old region location. Setting a new regionLocation here is how we retain
+ // old assignment or specify target server if a move or merge. See
+ // AssignmentManager#processAssignQueue. Otherwise, balancer gives us location.
+ ServerName lastRegionLocation = regionNode.offline();
+ boolean retain = false;
+ if (!forceNewPlan) {
+ if (this.targetServer != null) {
+ retain = targetServer.equals(lastRegionLocation);
+ regionNode.setRegionLocation(targetServer);
+ } else {
+ if (lastRegionLocation != null) {
+ // Try and keep the location we had before we offlined.
+ retain = true;
+ regionNode.setRegionLocation(lastRegionLocation);
+ }
+ }
+ }
+ LOG.info("Start " + this + "; " + regionNode.toShortString() +
+ "; forceNewPlan=" + this.forceNewPlan +
+ ", retain=" + retain);
+ env.getAssignmentManager().queueAssign(regionNode);
+ return true;
+ }
+
+ @Override
+ protected boolean updateTransition(final MasterProcedureEnv env, final RegionStateNode regionNode)
+ throws IOException, ProcedureSuspendedException {
+ // TODO: crash if destinationServer is specified and not online
+ // which is also the case when the balancer provided us with a different location.
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Update " + this + "; " + regionNode.toShortString());
+ }
+ if (regionNode.getRegionLocation() == null) {
+ setTransitionState(RegionTransitionState.REGION_TRANSITION_QUEUE);
+ return true;
+ }
+
+ if (!isServerOnline(env, regionNode)) {
+ // TODO: is this correct? should we wait the chore/ssh?
+ LOG.info("Server not online, re-queuing " + this + "; " + regionNode.toShortString());
+ setTransitionState(RegionTransitionState.REGION_TRANSITION_QUEUE);
+ return true;
+ }
+
+ if (env.getAssignmentManager().waitServerReportEvent(regionNode.getRegionLocation(), this)) {
+ LOG.info("Early suspend! " + this + "; " + regionNode.toShortString());
+ throw new ProcedureSuspendedException();
+ }
+
+ if (regionNode.isInState(State.OPEN)) {
+ LOG.info("Already assigned: " + this + "; " + regionNode.toShortString());
+ return false;
+ }
+
+ // Transition regionNode State. Set it to OPENING. Update hbase:meta, and add
+ // region to list of regions on the target regionserver. Need to UNDO if failure!
+ env.getAssignmentManager().markRegionAsOpening(regionNode);
+
+ // TODO: Requires a migration to be open by the RS?
+ // regionNode.getFormatVersion()
+
+ if (!addToRemoteDispatcher(env, regionNode.getRegionLocation())) {
+ // Failed the dispatch BUT addToRemoteDispatcher internally does
+ // cleanup on failure -- even the undoing of markRegionAsOpening above --
+ // so nothing more to do here; in fact we need to get out of here
+ // fast since we've been put back on the scheduler.
+ }
+
+ // We always return true, even if we fail dispatch because addToRemoteDispatcher
+ // failure processing sets state back to REGION_TRANSITION_QUEUE so we try again;
+ // i.e. return true to keep the Procedure running; it has been reset to startover.
+ return true;
+ }
+
+ @Override
+ protected void finishTransition(final MasterProcedureEnv env, final RegionStateNode regionNode)
+ throws IOException {
+ env.getAssignmentManager().markRegionAsOpened(regionNode);
+ // This success may have been after we failed open a few times. Be sure to cleanup any
+ // failed open references. See #incrementAndCheckMaxAttempts and where it is called.
+ env.getAssignmentManager().getRegionStates().removeFromFailedOpen(regionNode.getRegionInfo());
+ }
+
+ @Override
+ protected void reportTransition(final MasterProcedureEnv env, final RegionStateNode regionNode,
+ final TransitionCode code, final long openSeqNum) throws UnexpectedStateException {
+ switch (code) {
+ case OPENED:
+ if (openSeqNum < 0) {
+ throw new UnexpectedStateException("Received report unexpected " + code +
+ " transition openSeqNum=" + openSeqNum + ", " + regionNode);
+ }
+ if (openSeqNum < regionNode.getOpenSeqNum()) {
+ LOG.warn("Skipping update of open seqnum with " + openSeqNum +
+ " because current seqnum=" + regionNode.getOpenSeqNum());
+ }
+ regionNode.setOpenSeqNum(openSeqNum);
+ // Leave the state here as OPENING for now. We set it to OPEN in
+ // REGION_TRANSITION_FINISH section where we do a bunch of checks.
+ // regionNode.setState(RegionState.State.OPEN, RegionState.State.OPENING);
+ setTransitionState(RegionTransitionState.REGION_TRANSITION_FINISH);
+ break;
+ case FAILED_OPEN:
+ handleFailure(env, regionNode);
+ break;
+ default:
+ throw new UnexpectedStateException("Received report unexpected " + code +
+ " transition openSeqNum=" + openSeqNum + ", " + regionNode.toShortString() +
+ ", " + this + ", expected OPENED or FAILED_OPEN.");
+ }
+ }
+
+ /**
+ * Called when dispatch or subsequent OPEN request fail. Can be run by the
+ * inline dispatch call or later by the ServerCrashProcedure. Our state is
+ * generally OPENING. Cleanup and reset to OFFLINE and put our Procedure
+ * State back to REGION_TRANSITION_QUEUE so the Assign starts over.
+ */
+ private void handleFailure(final MasterProcedureEnv env, final RegionStateNode regionNode) {
+ if (incrementAndCheckMaxAttempts(env, regionNode)) {
+ aborted.set(true);
+ }
+ this.forceNewPlan = true;
+ this.targetServer = null;
+ regionNode.offline();
+ // We were moved to OPENING state before dispatch. Undo. It is safe to call
+ // this method because it checks for OPENING first.
+ env.getAssignmentManager().undoRegionAsOpening(regionNode);
+ setTransitionState(RegionTransitionState.REGION_TRANSITION_QUEUE);
+ }
+
+ private boolean incrementAndCheckMaxAttempts(final MasterProcedureEnv env,
+ final RegionStateNode regionNode) {
+ final int retries = env.getAssignmentManager().getRegionStates().
+ addToFailedOpen(regionNode).incrementAndGetRetries();
+ int max = env.getAssignmentManager().getAssignMaxAttempts();
+ LOG.info("Retry=" + retries + " of max=" + max + "; " +
+ this + "; " + regionNode.toShortString());
+ return retries >= max;
+ }
+
+ @Override
+ public RemoteOperation remoteCallBuild(final MasterProcedureEnv env, final ServerName serverName) {
+ assert serverName.equals(getRegionState(env).getRegionLocation());
+ return new RegionOpenOperation(this, getRegionInfo(),
+ env.getAssignmentManager().getFavoredNodes(getRegionInfo()), false);
+ }
+
+ @Override
+ protected void remoteCallFailed(final MasterProcedureEnv env, final RegionStateNode regionNode,
+ final IOException exception) {
+ handleFailure(env, regionNode);
+ }
+
+ @Override
+ public void toStringClassDetails(StringBuilder sb) {
+ super.toStringClassDetails(sb);
+ if (this.targetServer != null) sb.append(", target=").append(this.targetServer);
+ }
+
+ @Override
+ public ServerName getServer(final MasterProcedureEnv env) {
+ RegionStateNode node =
+ env.getAssignmentManager().getRegionStates().getRegionNode(this.getRegionInfo());
+ if (node == null) return null;
+ return node.getRegionLocation();
+ }
+}
\ No newline at end of file