You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/05/15 06:25:58 UTC
svn commit: r1482676 [3/5] - in /hbase/branches/0.95:
hbase-client/src/main/java/org/apache/hadoop/hbase/
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/
hbase-client/src/main/ja...
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1482676&r1=1482675&r2=1482676&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed May 15 04:25:57 2013
@@ -95,6 +95,7 @@ import org.apache.hadoop.hbase.exception
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
+import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
import org.apache.hadoop.hbase.exceptions.RegionTooBusyException;
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
import org.apache.hadoop.hbase.exceptions.UnknownScannerException;
@@ -200,6 +201,16 @@ public class HRegion implements HeapSize
protected long completeSequenceId = -1L;
+ /**
+ * Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for
+ * startRegionOperation to possibly invoke different checks before any region operations. Not all
+ * operations have to be defined here. It's only needed when a special check is need in
+ * startRegionOperation
+ */
+ protected enum Operation {
+ ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION
+ }
+
//////////////////////////////////////////////////////////////////////////////
// Members
//////////////////////////////////////////////////////////////////////////////
@@ -281,6 +292,11 @@ public class HRegion implements HeapSize
private final AtomicInteger minorInProgress = new AtomicInteger(0);
/**
+ * Min sequence id stored in store files of a region when opening the region
+ */
+ private long minSeqIdForLogReplay = -1;
+
+ /**
* @return The smallest mvcc readPoint across all the scanners in this
* region. Writes older than this readPoint, are included in every
* read operation.
@@ -641,6 +657,9 @@ public class HRegion implements HeapSize
long storeSeqIdForReplay = store.getMaxSequenceId(false);
maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
storeSeqIdForReplay);
+ if (this.minSeqIdForLogReplay == -1 || storeSeqIdForReplay < this.minSeqIdForLogReplay) {
+ this.minSeqIdForLogReplay = storeSeqIdForReplay;
+ }
// Include bulk loaded files when determining seqIdForAssignment
long storeSeqIdForAssignment = store.getMaxSequenceId(true);
if (maxSeqId == -1 || storeSeqIdForAssignment > maxSeqId) {
@@ -825,6 +844,21 @@ public class HRegion implements HeapSize
return this.closing.get();
}
+ /**
+ * Reset recovering state of current region
+ * @param newState
+ */
+ public void setRecovering(boolean newState) {
+ this.getRegionInfo().setRecovering(newState);
+ }
+
+ /**
+ * @return True if current region is in recovering
+ */
+ public boolean isRecovering() {
+ return this.getRegionInfo().isRecovering();
+ }
+
/** @return true if region is available (not closed and not closing) */
public boolean isAvailable() {
return !isClosed() && !isClosing();
@@ -1652,7 +1686,7 @@ public class HRegion implements HeapSize
// look across all the HStores for this region and determine what the
// closest key is across all column families, since the data may be sparse
checkRow(row, "getClosestRowBefore");
- startRegionOperation();
+ startRegionOperation(Operation.GET);
this.readRequestsCount.increment();
try {
Store store = getStore(family);
@@ -1698,7 +1732,7 @@ public class HRegion implements HeapSize
protected RegionScanner getScanner(Scan scan,
List<KeyValueScanner> additionalScanners) throws IOException {
- startRegionOperation();
+ startRegionOperation(Operation.SCAN);
try {
// Verify families are all valid
prepareScanner(scan);
@@ -1749,7 +1783,7 @@ public class HRegion implements HeapSize
throws IOException {
checkReadOnly();
checkResources();
- startRegionOperation();
+ startRegionOperation(Operation.DELETE);
this.writeRequestsCount.increment();
try {
delete.getRow();
@@ -1848,7 +1882,7 @@ public class HRegion implements HeapSize
// read lock, resources may run out. For now, the thought is that this
// will be extremely rare; we'll deal with it when it happens.
checkResources();
- startRegionOperation();
+ startRegionOperation(Operation.PUT);
this.writeRequestsCount.increment();
try {
// All edits for the given row (across all column families) must happen atomically.
@@ -1906,13 +1940,29 @@ public class HRegion implements HeapSize
*/
public OperationStatus[] batchMutate(
Pair<Mutation, Integer>[] mutationsAndLocks) throws IOException {
+ return batchMutate(mutationsAndLocks, false);
+ }
+
+ /**
+ * Perform a batch of mutations.
+ * It supports only Put and Delete mutations and will ignore other types passed.
+ * @param mutationsAndLocks
+ * the list of mutations paired with their requested lock IDs.
+ * @return an array of OperationStatus which internally contains the
+ * OperationStatusCode and the exceptionMessage if any.
+ * @throws IOException
+ */
+ OperationStatus[] batchMutate(Pair<Mutation, Integer>[] mutationsAndLocks, boolean isReplay)
+ throws IOException {
BatchOperationInProgress<Pair<Mutation, Integer>> batchOp =
new BatchOperationInProgress<Pair<Mutation,Integer>>(mutationsAndLocks);
boolean initialized = false;
while (!batchOp.isDone()) {
- checkReadOnly();
+ if (!isReplay) {
+ checkReadOnly();
+ }
checkResources();
long newSize;
@@ -1920,11 +1970,13 @@ public class HRegion implements HeapSize
try {
if (!initialized) {
- this.writeRequestsCount.increment();
- doPreMutationHook(batchOp);
+ if (!isReplay) {
+ this.writeRequestsCount.increment();
+ doPreMutationHook(batchOp);
+ }
initialized = true;
}
- long addedSize = doMiniBatchMutation(batchOp);
+ long addedSize = doMiniBatchMutation(batchOp, isReplay);
newSize = this.addAndGetGlobalMemstoreSize(addedSize);
} finally {
closeRegionOperation();
@@ -1935,6 +1987,7 @@ public class HRegion implements HeapSize
}
return batchOp.retCodeDetails;
}
+
private void doPreMutationHook(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp)
throws IOException {
@@ -1971,10 +2024,9 @@ public class HRegion implements HeapSize
}
}
-
@SuppressWarnings("unchecked")
- private long doMiniBatchMutation(
- BatchOperationInProgress<Pair<Mutation, Integer>> batchOp) throws IOException {
+ private long doMiniBatchMutation(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp,
+ boolean isInReplay) throws IOException {
// variable to note if all Put items are for the same CF -- metrics related
boolean putsCfSetConsistent = true;
@@ -1985,7 +2037,7 @@ public class HRegion implements HeapSize
//The set of columnFamilies first seen for Delete.
Set<byte[]> deletesCfSet = null;
- WALEdit walEdit = new WALEdit();
+ WALEdit walEdit = new WALEdit(isInReplay);
MultiVersionConsistencyControl.WriteEntry w = null;
long txid = 0;
boolean walSyncSuccessful = false;
@@ -2027,7 +2079,11 @@ public class HRegion implements HeapSize
try {
if (isPutMutation) {
// Check the families in the put. If bad, skip this one.
- checkFamilies(familyMap.keySet());
+ if (isInReplay) {
+ removeNonExistentColumnFamilyForReplay(familyMap);
+ } else {
+ checkFamilies(familyMap.keySet());
+ }
checkTimestamps(mutation.getFamilyMap(), now);
} else {
prepareDelete((Delete) mutation);
@@ -2086,7 +2142,7 @@ public class HRegion implements HeapSize
}
}
}
-
+
// we should record the timestamp only after we have acquired the rowLock,
// otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
now = EnvironmentEdgeManager.currentTimeMillis();
@@ -2125,9 +2181,9 @@ public class HRegion implements HeapSize
w = mvcc.beginMemstoreInsert();
// calling the pre CP hook for batch mutation
- if (coprocessorHost != null) {
- MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp =
- new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations,
+ if (!isInReplay && coprocessorHost != null) {
+ MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp =
+ new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations,
batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
}
@@ -2212,9 +2268,9 @@ public class HRegion implements HeapSize
}
walSyncSuccessful = true;
// calling the post CP hook for batch mutation
- if (coprocessorHost != null) {
- MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp =
- new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations,
+ if (!isInReplay && coprocessorHost != null) {
+ MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp =
+ new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations,
batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
coprocessorHost.postBatchMutate(miniBatchOp);
}
@@ -2231,7 +2287,7 @@ public class HRegion implements HeapSize
// STEP 9. Run coprocessor post hooks. This should be done after the wal is
// synced so that the coprocessor contract is adhered to.
// ------------------------------------
- if (coprocessorHost != null) {
+ if (!isInReplay && coprocessorHost != null) {
for (int i = firstIndex; i < lastIndexExclusive; i++) {
// only for successful puts
if (batchOp.retCodeDetails[i].getOperationStatusCode()
@@ -2670,6 +2726,30 @@ public class HRegion implements HeapSize
}
}
+ /**
+ * During replay, there could exist column families which are removed between region server
+ * failure and replay
+ */
+ private void removeNonExistentColumnFamilyForReplay(
+ final Map<byte[], List<? extends Cell>> familyMap) {
+ List<byte[]> nonExistentList = null;
+ for (byte[] family : familyMap.keySet()) {
+ if (!this.htableDescriptor.hasFamily(family)) {
+ if (nonExistentList == null) {
+ nonExistentList = new ArrayList<byte[]>();
+ }
+ nonExistentList.add(family);
+ }
+ }
+ if (nonExistentList != null) {
+ for (byte[] family : nonExistentList) {
+ // Perhaps schema was changed between crash and replay
+ LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
+ familyMap.remove(family);
+ }
+ }
+ }
+
void checkTimestamps(final Map<byte[], List<? extends Cell>> familyMap,
long now) throws FailedSanityCheckException {
if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
@@ -3524,7 +3604,7 @@ public class HRegion implements HeapSize
"after we renewed it. Could be caused by a very slow scanner " +
"or a lengthy garbage collection");
}
- startRegionOperation();
+ startRegionOperation(Operation.SCAN);
readRequestsCount.increment();
try {
@@ -4695,7 +4775,7 @@ public class HRegion implements HeapSize
checkReadOnly();
// Lock row
- startRegionOperation();
+ startRegionOperation(Operation.APPEND);
this.writeRequestsCount.increment();
WriteEntry w = null;
try {
@@ -4863,7 +4943,7 @@ public class HRegion implements HeapSize
checkReadOnly();
// Lock row
- startRegionOperation();
+ startRegionOperation(Operation.INCREMENT);
this.writeRequestsCount.increment();
WriteEntry w = null;
try {
@@ -5000,7 +5080,7 @@ public class HRegion implements HeapSize
ClassSize.OBJECT +
ClassSize.ARRAY +
38 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
- (11 * Bytes.SIZEOF_LONG) +
+ (12 * Bytes.SIZEOF_LONG) +
Bytes.SIZEOF_BOOLEAN);
public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
@@ -5280,6 +5360,37 @@ public class HRegion implements HeapSize
*/
public void startRegionOperation()
throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
+ startRegionOperation(Operation.ANY);
+ }
+
+ /**
+ * @param op The operation is about to be taken on the region
+ * @throws NotServingRegionException
+ * @throws RegionTooBusyException
+ * @throws InterruptedIOException
+ */
+ protected void startRegionOperation(Operation op) throws NotServingRegionException,
+ RegionTooBusyException, InterruptedIOException {
+ switch (op) {
+ case INCREMENT:
+ case APPEND:
+ case GET:
+ case SCAN:
+ case SPLIT_REGION:
+ case MERGE_REGION:
+ // when a region is in recovering state, no read, split or merge is allowed
+ if (this.isRecovering()) {
+ throw new RegionInRecoveryException(this.getRegionNameAsString()
+ + " is recovering");
+ }
+ break;
+ default:
+ break;
+ }
+ if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION) {
+ // split or merge region doesn't need to check the closing/closed state or lock the region
+ return;
+ }
if (this.closing.get()) {
throw new NotServingRegionException(getRegionNameAsString() + " is closing");
}
@@ -5494,6 +5605,14 @@ public class HRegion implements HeapSize
}
/**
+ * Gets the min sequence number that was read from storage when this region was opened. WAL Edits
+ * with smaller sequence number will be skipped from replay.
+ */
+ public long getMinSeqIdForLogReplay() {
+ return this.minSeqIdForLogReplay;
+ }
+
+ /**
* @return if a given region is in compaction now.
*/
public CompactionState getCompactionState() {
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1482676&r1=1482675&r2=1482676&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed May 15 04:25:57 2013
@@ -92,6 +92,7 @@ import org.apache.hadoop.hbase.exception
import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.RegionAlreadyInTransitionException;
+import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.exceptions.RegionServerRunningException;
@@ -112,6 +113,7 @@ import org.apache.hadoop.hbase.ipc.Paylo
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
@@ -178,6 +180,7 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
+import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
@@ -204,6 +207,7 @@ import org.apache.hadoop.hbase.util.Vers
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
+import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
@@ -214,6 +218,7 @@ import org.apache.hadoop.net.DNS;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
import org.cliffc.high_scale_lib.Counter;
import com.google.protobuf.BlockingRpcChannel;
@@ -258,6 +263,10 @@ public class HRegionServer implements Cl
// catalog tracker
protected CatalogTracker catalogTracker;
+ // Watch if a region is out of recovering state from ZooKeeper
+ @SuppressWarnings("unused")
+ private RecoveringRegionWatcher recoveringRegionWatcher;
+
/**
* Go here to get table descriptors.
*/
@@ -291,6 +300,13 @@ public class HRegionServer implements Cl
*/
protected final Map<String, InetSocketAddress[]> regionFavoredNodesMap =
new ConcurrentHashMap<String, InetSocketAddress[]>();
+
+ /**
+ * Set of regions currently being in recovering state which means it can accept writes(edits from
+ * previous failed region server) but not reads. A recovering region is also an online region.
+ */
+ protected final Map<String, HRegion> recoveringRegions = Collections
+ .synchronizedMap(new HashMap<String, HRegion>());
// Leases
protected Leases leases;
@@ -456,6 +472,9 @@ public class HRegionServer implements Cl
/** Handle all the snapshot requests to this server */
RegionServerSnapshotManager snapshotManager;
+
+ // configuration setting on if replay WAL edits directly to another RS
+ private final boolean distributedLogReplay;
// Table level lock manager for locking for region operations
private TableLockManager tableLockManager;
@@ -547,6 +566,9 @@ public class HRegionServer implements Cl
}
};
this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
+
+ this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
+ HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
}
/**
@@ -671,6 +693,11 @@ public class HRegionServer implements Cl
}
this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper,
new ServerName(isa.getHostName(), isa.getPort(), startcode));
+
+ // register watcher for recovering regions
+ if(this.distributedLogReplay) {
+ this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
+ }
}
/**
@@ -1515,8 +1542,7 @@ public class HRegionServer implements Cl
this.rpcServer.start();
// Create the log splitting worker and start it
- this.splitLogWorker = new SplitLogWorker(this.zooKeeper,
- this.getConfiguration(), this.getServerName(), this);
+ this.splitLogWorker = new SplitLogWorker(this.zooKeeper, this.getConfiguration(), this, this);
splitLogWorker.start();
}
@@ -1641,6 +1667,10 @@ public class HRegionServer implements Cl
LOG.error("No sequence number found when opening " + r.getRegionNameAsString());
openSeqNum = 0;
}
+
+ // Update flushed sequence id of a recovering region in ZK
+ updateRecoveringRegionLastFlushedSequenceId(r);
+
// Update ZK, or META
if (r.getRegionInfo().isMetaRegion()) {
MetaRegionTracker.setMetaLocation(getZooKeeper(),
@@ -1884,14 +1914,13 @@ public class HRegionServer implements Cl
public long getLastSequenceId(byte[] region) {
Long lastFlushedSequenceId = -1l;
try {
- GetLastFlushedSequenceIdRequest req =
- RequestConverter.buildGetLastFlushedSequenceIdRequest(region);
+ GetLastFlushedSequenceIdRequest req = RequestConverter
+ .buildGetLastFlushedSequenceIdRequest(region);
lastFlushedSequenceId = rssStub.getLastFlushedSequenceId(null, req)
- .getLastFlushedSequenceId();
+ .getLastFlushedSequenceId();
} catch (ServiceException e) {
lastFlushedSequenceId = -1l;
- LOG.warn("Unable to connect to the master to check " +
- "the last flushed sequence id", e);
+ LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id", e);
}
return lastFlushedSequenceId;
}
@@ -1965,6 +1994,10 @@ public class HRegionServer implements Cl
return this.stopping;
}
+ public Map<String, HRegion> getRecoveringRegions() {
+ return this.recoveringRegions;
+ }
+
/**
*
* @return the configuration
@@ -2651,10 +2684,12 @@ public class HRegionServer implements Cl
try {
requestCount.increment();
HRegion region = getRegion(request.getRegion());
+
GetResponse.Builder builder = GetResponse.newBuilder();
ClientProtos.Get get = request.getGet();
Boolean existence = null;
Result r = null;
+
if (request.getClosestRowBefore()) {
if (get.getColumnCount() != 1) {
throw new DoNotRetryIOException(
@@ -3006,7 +3041,7 @@ public class HRegionServer implements Cl
}
List<KeyValue> values = new ArrayList<KeyValue>();
MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
- region.startRegionOperation();
+ region.startRegionOperation(Operation.SCAN);
try {
int i = 0;
synchronized(scanner) {
@@ -3450,6 +3485,10 @@ public class HRegionServer implements Cl
removeFromMovedRegions(region.getEncodedName());
if (previous == null) {
+ // check if the region to be opened is marked in recovering state in ZK
+ if (isRegionMarkedRecoveringInZK(region.getEncodedName())) {
+ this.recoveringRegions.put(region.getEncodedName(), null);
+ }
// If there is no action in progress, we can submit a specific handler.
// Need to pass the expected version in the constructor.
if (region.isMetaRegion()) {
@@ -3465,6 +3504,9 @@ public class HRegionServer implements Cl
builder.addOpeningState(RegionOpeningState.OPENED);
+ } catch (KeeperException zooKeeperEx) {
+ LOG.error("Can't retrieve recovering state from zookeeper", zooKeeperEx);
+ throw new ServiceException(zooKeeperEx);
} catch (IOException ie) {
LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
if (isBulkAssign) {
@@ -3589,6 +3631,7 @@ public class HRegionServer implements Cl
checkOpen();
requestCount.increment();
HRegion region = getRegion(request.getRegion());
+ region.startRegionOperation(Operation.SPLIT_REGION);
LOG.info("Splitting " + region.getRegionNameAsString());
region.flushcache();
byte[] splitPoint = null;
@@ -3621,6 +3664,8 @@ public class HRegionServer implements Cl
HRegion regionA = getRegion(request.getRegionA());
HRegion regionB = getRegion(request.getRegionB());
boolean forcible = request.getForcible();
+ regionA.startRegionOperation(Operation.MERGE_REGION);
+ regionB.startRegionOperation(Operation.MERGE_REGION);
LOG.info("Receiving merging request for " + regionA + ", " + regionB
+ ",forcible=" + forcible);
regionA.flushcache();
@@ -3714,8 +3759,57 @@ public class HRegionServer implements Cl
}
/**
+ * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
+ * that the given mutations will be durable on the receiving RS if this method returns without any
+ * exception.
+ * @param rpcc the RPC controller
+ * @param request the request
+ * @throws ServiceException
+ */
+ @Override
+ @QosPriority(priority = HConstants.REPLAY_QOS)
+ public MultiResponse replay(final RpcController rpcc, final MultiRequest request)
+ throws ServiceException {
+ long before = EnvironmentEdgeManager.currentTimeMillis();
+ PayloadCarryingRpcController controller = (PayloadCarryingRpcController) rpcc;
+ CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
+ // Clear scanner so we are not holding on to reference across call.
+ controller.setCellScanner(null);
+ try {
+ checkOpen();
+ HRegion region = getRegion(request.getRegion());
+ MultiResponse.Builder builder = MultiResponse.newBuilder();
+ List<MutationProto> mutates = new ArrayList<MutationProto>();
+ for (ClientProtos.MultiAction actionUnion : request.getActionList()) {
+ if (actionUnion.hasMutation()) {
+ MutationProto mutate = actionUnion.getMutation();
+ MutationType type = mutate.getMutateType();
+ switch (type) {
+ case PUT:
+ case DELETE:
+ mutates.add(mutate);
+ break;
+ default:
+ throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
+ }
+ } else {
+ LOG.warn("Error: invalid action: " + actionUnion + ". " + "it must be a Mutation.");
+ throw new DoNotRetryIOException("Invalid action, " + "it must be a Mutation.");
+ }
+ }
+ if (!mutates.isEmpty()) {
+ doBatchOp(builder, region, mutates, cellScanner, true);
+ }
+ return builder.build();
+ } catch (IOException ie) {
+ throw new ServiceException(ie);
+ } finally {
+ metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTimeMillis() - before);
+ }
+ }
+
+ /**
* Roll the WAL writer of the region server.
- *
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
@@ -3843,13 +3937,21 @@ public class HRegionServer implements Cl
/**
* Execute a list of Put/Delete mutations.
+ */
+ protected void doBatchOp(final MultiResponse.Builder builder,
+ final HRegion region, final List<MutationProto> mutates, final CellScanner cells) {
+ doBatchOp(builder, region, mutates, cells, false);
+ }
+
+ /**
+ * Execute a list of Put/Delete mutations.
*
* @param builder
* @param region
* @param mutations
*/
protected void doBatchOp(final MultiResponse.Builder builder, final HRegion region,
- final List<MutationProto> mutations, final CellScanner cells) {
+ final List<MutationProto> mutations, final CellScanner cells, boolean isReplay) {
@SuppressWarnings("unchecked")
Pair<Mutation, Integer>[] mutationsWithLocks = new Pair[mutations.size()];
long before = EnvironmentEdgeManager.currentTimeMillis();
@@ -3877,7 +3979,7 @@ public class HRegionServer implements Cl
cacheFlusher.reclaimMemStoreMemory();
}
- OperationStatus codes[] = region.batchMutate(mutationsWithLocks);
+ OperationStatus codes[] = region.batchMutate(mutationsWithLocks, isReplay);
for (i = 0; i < codes.length; i++) {
switch (codes[i].getOperationStatusCode()) {
case BAD_FAMILY:
@@ -4097,4 +4199,91 @@ public class HRegionServer implements Cl
public CompactSplitThread getCompactSplitThread() {
return this.compactSplitThread;
}
+
+ /**
+ * check if /hbase/recovering-regions/<current region encoded name> exists. Returns true if exists
+ * and set watcher as well.
+ * @param regionEncodedName region encode name
+ * @return true when /hbase/recovering-regions/<current region encoded name> exists
+ * @throws KeeperException
+ */
+ private boolean isRegionMarkedRecoveringInZK(String regionEncodedName) throws KeeperException {
+ boolean result = false;
+ String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode, regionEncodedName);
+
+ byte[] node = ZKUtil.getDataAndWatch(this.zooKeeper, nodePath);
+ if (node != null) {
+ result = true;
+ }
+
+ return result;
+ }
+
+ /**
+ * A helper function to store the last flushed sequence Id with the previous failed RS for a
+ * recovering region. The Id is used to skip wal edits which are flushed. Since the flushed
+ * sequence id is only valid for each RS, we associate the Id with corresponding failed RS.
+ * @throws KeeperException
+ * @throws IOException
+ */
+ private void updateRecoveringRegionLastFlushedSequenceId(HRegion r) throws KeeperException,
+ IOException {
+ if (!r.isRecovering()) {
+ // return immdiately for non-recovering regions
+ return;
+ }
+
+ HRegionInfo region = r.getRegionInfo();
+ ZooKeeperWatcher zkw = getZooKeeper();
+ String previousRSName = this.getLastFailedRSFromZK(region.getEncodedName());
+ long minSeqIdForLogReplay = r.getMinSeqIdForLogReplay();
+ long lastRecordedFlushedSequenceId = -1;
+ String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode,
+ region.getEncodedName());
+ // recovering-region level
+ byte[] data = ZKUtil.getData(zkw, nodePath);
+ if (data != null) {
+ lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data);
+ }
+ if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) {
+ ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
+ }
+ if (previousRSName != null) {
+ // one level deeper for failed RS
+ nodePath = ZKUtil.joinZNode(nodePath, previousRSName);
+ ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
+ LOG.debug("Update last flushed sequence id of region " + region.getEncodedName() + " for "
+ + previousRSName);
+ } else {
+ LOG.warn("Can't find failed region server for recovering region " + region.getEncodedName());
+ }
+ }
+
+ /**
+ * Return the last failed RS name under /hbase/recovering-regions/encodedRegionName
+ * @param encodedRegionName
+ * @return
+ * @throws IOException
+ * @throws KeeperException
+ */
+ private String getLastFailedRSFromZK(String encodedRegionName) throws KeeperException {
+ String result = null;
+ long maxZxid = 0;
+ ZooKeeperWatcher zkw = this.getZooKeeper();
+ String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
+ List<String> failedServers = ZKUtil.listChildrenNoWatch(zkw, nodePath);
+ if (failedServers == null || failedServers.isEmpty()) {
+ return result;
+ }
+ for (String failedServer : failedServers) {
+ String rsPath = ZKUtil.joinZNode(nodePath, failedServer);
+ Stat stat = new Stat();
+ ZKUtil.getDataNoWatch(zkw, rsPath, stat);
+ if (maxZxid < stat.getCzxid()) {
+ maxZxid = stat.getCzxid();
+ result = failedServer;
+ }
+ }
+ return result;
+ }
}
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LastSequenceId.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LastSequenceId.java?rev=1482676&r1=1482675&r2=1482676&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LastSequenceId.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LastSequenceId.java Wed May 15 04:25:57 2013
@@ -26,8 +26,8 @@ import org.apache.hadoop.classification.
@InterfaceAudience.Private
public interface LastSequenceId {
/**
- * @param regionname
- * @return Last flushed sequence Id for regionname
+ * @param regionName Encoded region name
+ * @return Last flushed sequence Id for regionName or -1 if it can't be determined
*/
- public long getLastSequenceId(byte[] regionname);
+ public long getLastSequenceId(byte[] regionName);
}
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java?rev=1482676&r1=1482675&r2=1482676&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java Wed May 15 04:25:57 2013
@@ -87,4 +87,8 @@ public class MetricsRegionServer {
}
serverSource.updateAppend(t);
}
+
+ public void updateReplay(long t){
+ serverSource.updateReplay(t);
+ }
}
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java?rev=1482676&r1=1482675&r2=1482676&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java Wed May 15 04:25:57 2013
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -108,4 +109,9 @@ public interface RegionServerServices ex
* @return The RegionServer's CatalogTracker
*/
public CatalogTracker getCatalogTracker();
+
+ /**
+ * @return set of recovering regions on the hosting region server
+ */
+ public Map<String, HRegion> getRecoveringRegions();
}
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1482676&r1=1482675&r2=1482676&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java Wed May 15 04:25:57 2013
@@ -20,7 +20,11 @@ package org.apache.hadoop.hbase.regionse
import java.io.IOException;
import java.io.InterruptedIOException;
+import java.net.ConnectException;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
@@ -29,10 +33,12 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.util.CancelableProgressable;
@@ -70,6 +76,7 @@ import org.apache.zookeeper.data.Stat;
@InterfaceAudience.Private
public class SplitLogWorker extends ZooKeeperListener implements Runnable {
private static final Log LOG = LogFactory.getLog(SplitLogWorker.class);
+ private static final int checkInterval = 5000; // 5 seconds
Thread worker;
private final ServerName serverName;
@@ -83,20 +90,30 @@ public class SplitLogWorker extends ZooK
private final Object grabTaskLock = new Object();
private boolean workerInGrabTask = false;
private final int report_period;
+ private RegionServerServices server = null;
public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf,
- ServerName serverName, TaskExecutor splitTaskExecutor) {
+ RegionServerServices server, TaskExecutor splitTaskExecutor) {
+ super(watcher);
+ this.server = server;
+ this.serverName = server.getServerName();
+ this.splitTaskExecutor = splitTaskExecutor;
+ report_period = conf.getInt("hbase.splitlog.report.period",
+ conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
+ }
+
+ public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, ServerName serverName,
+ TaskExecutor splitTaskExecutor) {
super(watcher);
this.serverName = serverName;
this.splitTaskExecutor = splitTaskExecutor;
report_period = conf.getInt("hbase.splitlog.report.period",
- conf.getInt("hbase.splitlog.manager.timeout",
- SplitLogManager.DEFAULT_TIMEOUT) / 2);
+ conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
}
- public SplitLogWorker(ZooKeeperWatcher watcher, final Configuration conf,
- final ServerName serverName, final LastSequenceId sequenceIdChecker) {
- this(watcher, conf, serverName, new TaskExecutor () {
+ public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf,
+ RegionServerServices server, final LastSequenceId sequenceIdChecker) {
+ this(watcher, conf, server, new TaskExecutor() {
@Override
public Status exec(String filename, CancelableProgressable p) {
Path rootdir;
@@ -113,7 +130,7 @@ public class SplitLogWorker extends ZooK
// encountered a bad non-retry-able persistent error.
try {
if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
- fs, conf, p, sequenceIdChecker)) {
+ fs, conf, p, sequenceIdChecker, watcher)) {
return Status.PREEMPTED;
}
} catch (InterruptedIOException iioe) {
@@ -121,9 +138,18 @@ public class SplitLogWorker extends ZooK
return Status.RESIGNED;
} catch (IOException e) {
Throwable cause = e.getCause();
- if (cause instanceof InterruptedException) {
+ if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException
+ || cause instanceof ConnectException
+ || cause instanceof SocketTimeoutException)) {
+ LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, "
+ + "resigning", e);
+ return Status.RESIGNED;
+ } else if (cause instanceof InterruptedException) {
LOG.warn("log splitting of " + filename + " interrupted, resigning", e);
return Status.RESIGNED;
+ } else if(cause instanceof KeeperException) {
+ LOG.warn("log splitting of " + filename + " hit ZooKeeper issue, resigning", e);
+ return Status.RESIGNED;
}
LOG.warn("log splitting of " + filename + " failed, returning error", e);
return Status.ERR;
@@ -204,7 +230,39 @@ public class SplitLogWorker extends ZooK
synchronized (taskReadyLock) {
while (seq_start == taskReadySeq) {
try {
- taskReadyLock.wait();
+ taskReadyLock.wait(checkInterval);
+ if (this.server != null) {
+ // check to see if we have stale recovering regions in our internal memory state
+ Map<String, HRegion> recoveringRegions = this.server.getRecoveringRegions();
+ if (!recoveringRegions.isEmpty()) {
+ // Make a local copy to prevent ConcurrentModificationException when other threads
+ // modify recoveringRegions
+ List<String> tmpCopy = new ArrayList<String>(recoveringRegions.keySet());
+ for (String region : tmpCopy) {
+ String nodePath = ZKUtil.joinZNode(this.watcher.recoveringRegionsZNode, region);
+ try {
+ if (ZKUtil.checkExists(this.watcher, nodePath) == -1) {
+ HRegion r = recoveringRegions.remove(region);
+ if (r != null) {
+ r.setRecovering(false);
+ }
+ LOG.debug("Mark recovering region:" + region + " up.");
+ } else {
+ // current check is a defensive(or redundant) mechanism to prevent us from
+ // having stale recovering regions in our internal RS memory state while
+ // zookeeper(source of truth) says differently. We stop at the first good one
+ // because we should not have a single instance such as this in normal case so
+ // check the first one is good enough.
+ break;
+ }
+ } catch (KeeperException e) {
+ // ignore zookeeper error
+ LOG.debug("Got a zookeeper when trying to open a recovering region", e);
+ break;
+ }
+ }
+ }
+ }
} catch (InterruptedException e) {
LOG.info("SplitLogWorker interrupted while waiting for task," +
" exiting: " + e.toString() + (exitWorker ? "" :
@@ -214,6 +272,7 @@ public class SplitLogWorker extends ZooK
}
}
}
+
}
}
@@ -463,9 +522,6 @@ public class SplitLogWorker extends ZooK
}
}
-
-
-
@Override
public void nodeDataChanged(String path) {
// there will be a self generated dataChanged event every time attemptToOwnTask()
@@ -510,7 +566,6 @@ public class SplitLogWorker extends ZooK
return childrenPaths;
}
-
@Override
public void nodeChildrenChanged(String path) {
if(path.equals(watcher.splitLogZNode)) {
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java?rev=1482676&r1=1482675&r2=1482676&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java Wed May 15 04:25:57 2013
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.regionserver.handler;
import java.io.IOException;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
@@ -137,6 +138,16 @@ public class OpenRegionHandler extends E
if (region == null) {
return;
}
+
+ // check if we need set current region in recovering state
+ region.setRecovering(false);
+ Map<String, HRegion> recoveringRegions = this.rsServices.getRecoveringRegions();
+ if (recoveringRegions != null && !recoveringRegions.isEmpty()
+ && recoveringRegions.containsKey(region.getRegionInfo().getEncodedName())) {
+ region.setRecovering(true);
+ recoveringRegions.put(region.getRegionInfo().getEncodedName(), region);
+ }
+
boolean failed = true;
if (tickleOpening("post_region_open")) {
if (updateMeta(region)) {
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java?rev=1482676&r1=1482675&r2=1482676&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java Wed May 15 04:25:57 2013
@@ -1200,6 +1200,10 @@ class FSHLog implements HLog, Syncable {
long now = EnvironmentEdgeManager.currentTimeMillis();
// coprocessor hook:
if (!coprocessorHost.preWALWrite(info, logKey, logEdit)) {
+ if (logEdit.isReplay()) {
+ // set replication scope null so that this won't be replicated
+ logKey.setScopes(null);
+ }
// write to our buffer for the Hlog file.
logSyncer.append(new FSHLog.Entry(logKey, logEdit));
}