You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zj...@apache.org on 2013/03/24 11:26:22 UTC
svn commit: r1460306 [5/5] - in /hbase/trunk:
hbase-client/src/main/java/org/apache/hadoop/hbase/
hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-client/src/main/java/org/apac...
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java?rev=1460306&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java Sun Mar 24 10:26:21 2013
@@ -0,0 +1,770 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.RegionTransition;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.catalog.MetaEditor;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.regionserver.SplitTransaction.LoggingProgressable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+
+/**
+ * Executes region merge as a "transaction". It is similar with
+ * SplitTransaction. Call {@link #prepare(RegionServerServices)} to setup the
+ * transaction, {@link #execute(Server, RegionServerServices)} to run the
+ * transaction and {@link #rollback(Server, RegionServerServices)} to cleanup if
+ * execute fails.
+ *
+ * <p>
+ * Here is an example of how you would use this class:
+ *
+ * <pre>
+ * RegionMergeTransaction mt = new RegionMergeTransaction(this.conf, parent, midKey)
+ * if (!mt.prepare(services)) return;
+ * try {
+ * mt.execute(server, services);
+ * } catch (IOException ioe) {
+ * try {
+ * mt.rollback(server, services);
+ * return;
+ * } catch (RuntimeException e) {
+ * myAbortable.abort("Failed merge, abort");
+ * }
+ * }
+ * </Pre>
+ * <p>
+ * This class is not thread safe. Caller needs ensure merge is run by one thread
+ * only.
+ */
+@InterfaceAudience.Private
+public class RegionMergeTransaction {
+ private static final Log LOG = LogFactory.getLog(RegionMergeTransaction.class);
+
+ // Merged region info
+ private HRegionInfo mergedRegionInfo;
+ // region_a sorts before region_b
+ private final HRegion region_a;
+ private final HRegion region_b;
+ // merges dir is under region_a
+ private final Path mergesdir;
+ private int znodeVersion = -1;
+ // We only merge adjacent regions if forcible is false
+ private final boolean forcible;
+
+ /**
+ * Types to add to the transaction journal. Each enum is a step in the merge
+ * transaction. Used to figure how much we need to rollback.
+ */
+ enum JournalEntry {
+ /**
+ * Set region as in transition, set it into MERGING state.
+ */
+ SET_MERGING_IN_ZK,
+ /**
+ * We created the temporary merge data directory.
+ */
+ CREATED_MERGE_DIR,
+ /**
+ * Closed the merging region A.
+ */
+ CLOSED_REGION_A,
+ /**
+ * The merging region A has been taken out of the server's online regions list.
+ */
+ OFFLINED_REGION_A,
+ /**
+ * Closed the merging region B.
+ */
+ CLOSED_REGION_B,
+ /**
+ * The merging region B has been taken out of the server's online regions list.
+ */
+ OFFLINED_REGION_B,
+ /**
+ * Started in on creation of the merged region.
+ */
+ STARTED_MERGED_REGION_CREATION,
+ /**
+ * Point of no return. If we got here, then transaction is not recoverable
+ * other than by crashing out the regionserver.
+ */
+ PONR
+ }
+
+ /*
+ * Journal of how far the merge transaction has progressed.
+ */
+ private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
+
+ private static IOException closedByOtherException = new IOException(
+ "Failed to close region: already closed by another thread");
+
+ /**
+ * Constructor
+ * @param a region a to merge
+ * @param b region b to merge
+ * @param forcible if false, we will only merge adjacent regions
+ */
+ public RegionMergeTransaction(final HRegion a, final HRegion b,
+ final boolean forcible) {
+ if (a.getRegionInfo().compareTo(b.getRegionInfo()) <= 0) {
+ this.region_a = a;
+ this.region_b = b;
+ } else {
+ this.region_a = b;
+ this.region_b = a;
+ }
+ this.forcible = forcible;
+ this.mergesdir = region_a.getRegionFileSystem().getMergesDir();
+ }
+
+ /**
+ * Does checks on merge inputs.
+ * @param services
+ * @return <code>true</code> if the regions are mergeable else
+ * <code>false</code> if they are not (e.g. its already closed, etc.).
+ */
+ public boolean prepare(final RegionServerServices services) {
+ if (!region_a.getTableDesc().getNameAsString()
+ .equals(region_b.getTableDesc().getNameAsString())) {
+ LOG.info("Can't merge regions " + region_a + "," + region_b
+ + " because they do not belong to the same table");
+ return false;
+ }
+ if (region_a.getRegionInfo().equals(region_b.getRegionInfo())) {
+ LOG.info("Can't merge the same region " + region_a);
+ return false;
+ }
+ if (!forcible && !HRegionInfo.areAdjacent(region_a.getRegionInfo(),
+ region_b.getRegionInfo())) {
+ String msg = "Skip merging " + this.region_a.getRegionNameAsString()
+ + " and " + this.region_b.getRegionNameAsString()
+ + ", because they are not adjacent.";
+ LOG.info(msg);
+ return false;
+ }
+ if (!this.region_a.isMergeable() || !this.region_b.isMergeable()) {
+ return false;
+ }
+ try {
+ boolean regionAHasMergeQualifier = hasMergeQualifierInMeta(services,
+ region_a.getRegionName());
+ if (regionAHasMergeQualifier ||
+ hasMergeQualifierInMeta(services, region_b.getRegionName())) {
+ LOG.debug("Region " + (regionAHasMergeQualifier ? region_a.getRegionNameAsString()
+ : region_b.getRegionNameAsString())
+ + " is not mergeable because it has merge qualifier in META");
+ return false;
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed judging whether merge transaction is available for "
+ + region_a.getRegionNameAsString() + " and "
+ + region_b.getRegionNameAsString(), e);
+ return false;
+ }
+
+ // WARN: make sure there is no parent region of the two merging regions in
+ // .META. If exists, fixing up daughters would cause daughter regions(we
+ // have merged one) online again when we restart master, so we should clear
+ // the parent region to prevent the above case
+ // Since HBASE-7721, we don't need fix up daughters any more. so here do
+ // nothing
+
+ this.mergedRegionInfo = getMergedRegionInfo(region_a.getRegionInfo(),
+ region_b.getRegionInfo());
+ return true;
+ }
+
+ /**
+ * Run the transaction.
+ * @param server Hosting server instance. Can be null when testing (won't try
+ * and update in zk if a null server)
+ * @param services Used to online/offline regions.
+ * @throws IOException If thrown, transaction failed. Call
+ * {@link #rollback(Server, RegionServerServices)}
+ * @return merged region
+ * @throws IOException
+ * @see #rollback(Server, RegionServerServices)
+ */
+ public HRegion execute(final Server server,
+ final RegionServerServices services) throws IOException {
+ HRegion mergedRegion = createMergedRegion(server, services);
+ openMergedRegion(server, services, mergedRegion);
+ transitionZKNode(server, services);
+ return mergedRegion;
+ }
+
+ /**
+ * Prepare the merged region and region files.
+ * @param server Hosting server instance. Can be null when testing (won't try
+ * and update in zk if a null server)
+ * @param services Used to online/offline regions.
+ * @return merged region
+ * @throws IOException If thrown, transaction failed. Call
+ * {@link #rollback(Server, RegionServerServices)}
+ */
+ HRegion createMergedRegion(final Server server,
+ final RegionServerServices services) throws IOException {
+ LOG.info("Starting merge of " + region_a + " and "
+ + region_b.getRegionNameAsString() + ", forcible=" + forcible);
+ if ((server != null && server.isStopped())
+ || (services != null && services.isStopping())) {
+ throw new IOException("Server is stopped or stopping");
+ }
+
+ // If true, no cluster to write meta edits to or to update znodes in.
+ boolean testing = server == null ? true : server.getConfiguration()
+ .getBoolean("hbase.testing.nocluster", false);
+
+ // Set ephemeral MERGING znode up in zk. Mocked servers sometimes don't
+ // have zookeeper so don't do zk stuff if server or zookeeper is null
+ if (server != null && server.getZooKeeper() != null) {
+ try {
+ createNodeMerging(server.getZooKeeper(), this.mergedRegionInfo,
+ server.getServerName());
+ } catch (KeeperException e) {
+ throw new IOException("Failed creating MERGING znode on "
+ + this.mergedRegionInfo.getRegionNameAsString(), e);
+ }
+ }
+ this.journal.add(JournalEntry.SET_MERGING_IN_ZK);
+ if (server != null && server.getZooKeeper() != null) {
+ try {
+ // Transition node from MERGING to MERGING after creating the merge
+ // node. Master will get the callback for node change only if the
+ // transition is successful.
+ // Note that if the transition fails then the rollback will delete the
+ // created znode as the journal entry SET_MERGING_IN_ZK is added.
+ this.znodeVersion = transitionNodeMerging(server.getZooKeeper(),
+ this.mergedRegionInfo, server.getServerName(), -1);
+ } catch (KeeperException e) {
+ throw new IOException("Failed setting MERGING znode on "
+ + this.mergedRegionInfo.getRegionNameAsString(), e);
+ }
+ }
+
+ this.region_a.getRegionFileSystem().createMergesDir();
+ this.journal.add(JournalEntry.CREATED_MERGE_DIR);
+
+ Map<byte[], List<StoreFile>> hstoreFilesOfRegionA = closeAndOfflineRegion(
+ services, this.region_a, true, testing);
+ Map<byte[], List<StoreFile>> hstoreFilesOfRegionB = closeAndOfflineRegion(
+ services, this.region_b, false, testing);
+
+ assert hstoreFilesOfRegionA != null && hstoreFilesOfRegionB != null;
+
+
+ //
+ // mergeStoreFiles creates merged region dirs under the region_a merges dir
+ // Nothing to unroll here if failure -- clean up of CREATE_MERGE_DIR will
+ // clean this up.
+ mergeStoreFiles(hstoreFilesOfRegionA, hstoreFilesOfRegionB);
+
+ // Log to the journal that we are creating merged region. We could fail
+ // halfway through. If we do, we could have left
+ // stuff in fs that needs cleanup -- a storefile or two. Thats why we
+ // add entry to journal BEFORE rather than AFTER the change.
+ this.journal.add(JournalEntry.STARTED_MERGED_REGION_CREATION);
+ HRegion mergedRegion = createMergedRegionFromMerges(this.region_a,
+ this.region_b, this.mergedRegionInfo);
+
+
+ // This is the point of no return. Similar with SplitTransaction.
+ // IF we reach the PONR then subsequent failures need to crash out this
+ // regionserver
+ this.journal.add(JournalEntry.PONR);
+
+ // Add merged region and delete region_a and region_b
+ // as an atomic update. See HBASE-7721. This update to META makes the region
+ // will determine whether the region is merged or not in case of failures.
+ // If it is successful, master will roll-forward, if not, master will
+ // rollback
+ if (!testing) {
+ MetaEditor.mergeRegions(server.getCatalogTracker(),
+ mergedRegion.getRegionInfo(), region_a.getRegionInfo(),
+ region_b.getRegionInfo(), server.getServerName());
+ }
+ return mergedRegion;
+ }
+
+ /**
+ * Create a merged region from the merges directory under region a. In order
+ * to mock it for tests, place it with a new method.
+ * @param a hri of region a
+ * @param b hri of region b
+ * @param mergedRegion hri of merged region
+ * @return merged HRegion.
+ * @throws IOException
+ */
+ HRegion createMergedRegionFromMerges(final HRegion a, final HRegion b,
+ final HRegionInfo mergedRegion) throws IOException {
+ return a.createMergedRegionFromMerges(mergedRegion, b);
+ }
+
+ /**
+ * Close the merging region and offline it in regionserver
+ * @param services
+ * @param region
+ * @param isRegionA true if it is merging region a, false if it is region b
+ * @param testing true if it is testing
+ * @return a map of family name to list of store files
+ * @throws IOException
+ */
+ private Map<byte[], List<StoreFile>> closeAndOfflineRegion(
+ final RegionServerServices services, final HRegion region,
+ final boolean isRegionA, final boolean testing) throws IOException {
+ Map<byte[], List<StoreFile>> hstoreFilesToMerge = null;
+ Exception exceptionToThrow = null;
+ try {
+ hstoreFilesToMerge = region.close(false);
+ } catch (Exception e) {
+ exceptionToThrow = e;
+ }
+ if (exceptionToThrow == null && hstoreFilesToMerge == null) {
+ // The region was closed by a concurrent thread. We can't continue
+ // with the merge, instead we must just abandon the merge. If we
+ // reopen or merge this could cause problems because the region has
+ // probably already been moved to a different server, or is in the
+ // process of moving to a different server.
+ exceptionToThrow = closedByOtherException;
+ }
+ if (exceptionToThrow != closedByOtherException) {
+ this.journal.add(isRegionA ? JournalEntry.CLOSED_REGION_A
+ : JournalEntry.CLOSED_REGION_B);
+ }
+ if (exceptionToThrow != null) {
+ if (exceptionToThrow instanceof IOException)
+ throw (IOException) exceptionToThrow;
+ throw new IOException(exceptionToThrow);
+ }
+
+ if (!testing) {
+ services.removeFromOnlineRegions(region, null);
+ }
+ this.journal.add(isRegionA ? JournalEntry.OFFLINED_REGION_A
+ : JournalEntry.OFFLINED_REGION_B);
+ return hstoreFilesToMerge;
+ }
+
+ /**
+ * Get merged region info through the specified two regions
+ * @param a merging region A
+ * @param b merging region B
+ * @return the merged region info
+ */
+ public static HRegionInfo getMergedRegionInfo(final HRegionInfo a,
+ final HRegionInfo b) {
+ long rid = EnvironmentEdgeManager.currentTimeMillis();
+ // Regionid is timestamp. Merged region's id can't be less than that of
+ // merging regions else will insert at wrong location in .META.
+ if (rid < a.getRegionId() || rid < b.getRegionId()) {
+ LOG.warn("Clock skew; merging regions id are " + a.getRegionId()
+ + " and " + b.getRegionId() + ", but current time here is " + rid);
+ rid = Math.max(a.getRegionId(), b.getRegionId()) + 1;
+ }
+
+ byte[] startKey = null;
+ byte[] endKey = null;
+ if (a.compareTo(b) <= 0) {
+ startKey = a.getStartKey();
+ endKey = b.getEndKey();
+ } else {
+ startKey = b.getStartKey();
+ endKey = a.getEndKey();
+ }
+
+ // Merged region is sorted between two merging regions in META
+ HRegionInfo mergedRegionInfo = new HRegionInfo(a.getTableName(), startKey,
+ endKey, false, rid);
+ return mergedRegionInfo;
+ }
+
+ /**
+ * Perform time consuming opening of the merged region.
+ * @param server Hosting server instance. Can be null when testing (won't try
+ * and update in zk if a null server)
+ * @param services Used to online/offline regions.
+ * @param merged the merged region
+ * @throws IOException If thrown, transaction failed. Call
+ * {@link #rollback(Server, RegionServerServices)}
+ */
+ void openMergedRegion(final Server server,
+ final RegionServerServices services, HRegion merged) throws IOException {
+ boolean stopped = server != null && server.isStopped();
+ boolean stopping = services != null && services.isStopping();
+ if (stopped || stopping) {
+ LOG.info("Not opening merged region " + merged.getRegionNameAsString()
+ + " because stopping=" + stopping + ", stopped=" + stopped);
+ return;
+ }
+ HRegionInfo hri = merged.getRegionInfo();
+ LoggingProgressable reporter = server == null ? null
+ : new LoggingProgressable(hri, server.getConfiguration().getLong(
+ "hbase.regionserver.regionmerge.open.log.interval", 10000));
+ merged.openHRegion(reporter);
+
+ if (services != null) {
+ try {
+ services.postOpenDeployTasks(merged, server.getCatalogTracker());
+ services.addToOnlineRegions(merged);
+ } catch (KeeperException ke) {
+ throw new IOException(ke);
+ }
+ }
+
+ }
+
+ /**
+ * Finish off merge transaction, transition the zknode
+ * @param server Hosting server instance. Can be null when testing (won't try
+ * and update in zk if a null server)
+ * @param services Used to online/offline regions.
+ * @throws IOException If thrown, transaction failed. Call
+ * {@link #rollback(Server, RegionServerServices)}
+ */
+ void transitionZKNode(final Server server, final RegionServerServices services)
+ throws IOException {
+ if (server == null || server.getZooKeeper() == null) {
+ return;
+ }
+
+ // Tell master about merge by updating zk. If we fail, abort.
+ try {
+ this.znodeVersion = transitionNodeMerge(server.getZooKeeper(),
+ this.mergedRegionInfo, region_a.getRegionInfo(),
+ region_b.getRegionInfo(), server.getServerName(), this.znodeVersion);
+
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ int spins = 0;
+ // Now wait for the master to process the merge. We know it's done
+ // when the znode is deleted. The reason we keep tickling the znode is
+ // that it's possible for the master to miss an event.
+ do {
+ if (spins % 10 == 0) {
+ LOG.debug("Still waiting on the master to process the merge for "
+ + this.mergedRegionInfo.getEncodedName() + ", waited "
+ + (EnvironmentEdgeManager.currentTimeMillis() - startTime) + "ms");
+ }
+ Thread.sleep(100);
+ // When this returns -1 it means the znode doesn't exist
+ this.znodeVersion = tickleNodeMerge(server.getZooKeeper(),
+ this.mergedRegionInfo, region_a.getRegionInfo(),
+ region_b.getRegionInfo(), server.getServerName(), this.znodeVersion);
+ spins++;
+ } while (this.znodeVersion != -1 && !server.isStopped()
+ && !services.isStopping());
+ } catch (Exception e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ throw new IOException("Failed telling master about merge "
+ + mergedRegionInfo.getEncodedName(), e);
+ }
+
+
+ // Leaving here, the mergedir with its dross will be in place but since the
+ // merge was successful, just leave it; it'll be cleaned when region_a is
+ // cleaned up by CatalogJanitor on master
+ }
+
+ /**
+ * Create reference file(s) of merging regions under the region_a merges dir
+ * @param hstoreFilesOfRegionA
+ * @param hstoreFilesOfRegionB
+ * @throws IOException
+ */
+ private void mergeStoreFiles(
+ Map<byte[], List<StoreFile>> hstoreFilesOfRegionA,
+ Map<byte[], List<StoreFile>> hstoreFilesOfRegionB)
+ throws IOException {
+ // Create reference file(s) of region A in mergdir
+ HRegionFileSystem fs_a = this.region_a.getRegionFileSystem();
+ for (Map.Entry<byte[], List<StoreFile>> entry : hstoreFilesOfRegionA
+ .entrySet()) {
+ String familyName = Bytes.toString(entry.getKey());
+ for (StoreFile storeFile : entry.getValue()) {
+ fs_a.mergeStoreFile(this.mergedRegionInfo, familyName, storeFile,
+ this.mergesdir);
+ }
+ }
+ // Create reference file(s) of region B in mergedir
+ HRegionFileSystem fs_b = this.region_b.getRegionFileSystem();
+ for (Map.Entry<byte[], List<StoreFile>> entry : hstoreFilesOfRegionB
+ .entrySet()) {
+ String familyName = Bytes.toString(entry.getKey());
+ for (StoreFile storeFile : entry.getValue()) {
+ fs_b.mergeStoreFile(this.mergedRegionInfo, familyName, storeFile,
+ this.mergesdir);
+ }
+ }
+ }
+
+ /**
+ * @param server Hosting server instance (May be null when testing).
+ * @param services Services of regionserver, used to online regions.
+ * @throws IOException If thrown, rollback failed. Take drastic action.
+ * @return True if we successfully rolled back, false if we got to the point
+ * of no return and so now need to abort the server to minimize
+ * damage.
+ */
+ public boolean rollback(final Server server,
+ final RegionServerServices services) throws IOException {
+ assert this.mergedRegionInfo != null;
+ boolean result = true;
+ ListIterator<JournalEntry> iterator = this.journal
+ .listIterator(this.journal.size());
+ // Iterate in reverse.
+ while (iterator.hasPrevious()) {
+ JournalEntry je = iterator.previous();
+ switch (je) {
+
+ case SET_MERGING_IN_ZK:
+ if (server != null && server.getZooKeeper() != null) {
+ cleanZK(server, this.mergedRegionInfo);
+ }
+ break;
+
+ case CREATED_MERGE_DIR:
+ this.region_a.writestate.writesEnabled = true;
+ this.region_b.writestate.writesEnabled = true;
+ this.region_a.getRegionFileSystem().cleanupMergesDir();
+ break;
+
+ case CLOSED_REGION_A:
+ try {
+ // So, this returns a seqid but if we just closed and then reopened,
+ // we should be ok. On close, we flushed using sequenceid obtained
+ // from hosting regionserver so no need to propagate the sequenceid
+ // returned out of initialize below up into regionserver as we
+ // normally do.
+ this.region_a.initialize();
+ } catch (IOException e) {
+ LOG.error("Failed rollbacking CLOSED_REGION_A of region "
+ + this.region_a.getRegionNameAsString(), e);
+ throw new RuntimeException(e);
+ }
+ break;
+
+ case OFFLINED_REGION_A:
+ if (services != null)
+ services.addToOnlineRegions(this.region_a);
+ break;
+
+ case CLOSED_REGION_B:
+ try {
+ this.region_b.initialize();
+ } catch (IOException e) {
+ LOG.error("Failed rollbacking CLOSED_REGION_A of region "
+ + this.region_b.getRegionNameAsString(), e);
+ throw new RuntimeException(e);
+ }
+ break;
+
+ case OFFLINED_REGION_B:
+ if (services != null)
+ services.addToOnlineRegions(this.region_b);
+ break;
+
+ case STARTED_MERGED_REGION_CREATION:
+ this.region_a.getRegionFileSystem().cleanupMergedRegion(
+ this.mergedRegionInfo);
+ break;
+
+ case PONR:
+ // We got to the point-of-no-return so we need to just abort. Return
+ // immediately. Do not clean up created merged regions.
+ return false;
+
+ default:
+ throw new RuntimeException("Unhandled journal entry: " + je);
+ }
+ }
+ return result;
+ }
+
+ HRegionInfo getMergedRegionInfo() {
+ return this.mergedRegionInfo;
+ }
+
+ // For unit testing.
+ Path getMergesDir() {
+ return this.mergesdir;
+ }
+
+ private static void cleanZK(final Server server, final HRegionInfo hri) {
+ try {
+ // Only delete if its in expected state; could have been hijacked.
+ ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
+ EventType.RS_ZK_REGION_MERGING);
+ } catch (KeeperException.NoNodeException e) {
+ LOG.warn("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
+ } catch (KeeperException e) {
+ server.abort("Failed cleanup zk node of " + hri.getRegionNameAsString(),e);
+ }
+
+ }
+
+ /**
+ * Creates a new ephemeral node in the MERGING state for the merged region.
+ * Create it ephemeral in case regionserver dies mid-merge.
+ *
+ * <p>
+ * Does not transition nodes from other states. If a node already exists for
+ * this region, a {@link NodeExistsException} will be thrown.
+ *
+ * @param zkw zk reference
+ * @param region region to be created as offline
+ * @param serverName server event originates from
+ * @return Version of znode created.
+ * @throws KeeperException
+ * @throws IOException
+ */
+ int createNodeMerging(final ZooKeeperWatcher zkw, final HRegionInfo region,
+ final ServerName serverName) throws KeeperException, IOException {
+ LOG.debug(zkw.prefix("Creating ephemeral node for "
+ + region.getEncodedName() + " in MERGING state"));
+ RegionTransition rt = RegionTransition.createRegionTransition(
+ EventType.RS_ZK_REGION_MERGING, region.getRegionName(), serverName);
+ String node = ZKAssign.getNodeName(zkw, region.getEncodedName());
+ if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) {
+ throw new IOException("Failed create of ephemeral " + node);
+ }
+ // Transition node from MERGING to MERGING and pick up version so we
+ // can be sure this znode is ours; version is needed deleting.
+ return transitionNodeMerging(zkw, region, serverName, -1);
+ }
+
+ /**
+ * Transitions an existing node for the specified region which is currently in
+ * the MERGING state to be in the MERGE state. Converts the ephemeral MERGING
+ * znode to an ephemeral MERGE node. Master cleans up MERGE znode when it
+ * reads it (or if we crash, zk will clean it up).
+ *
+ * <p>
+ * Does not transition nodes from other states. If for some reason the node
+ * could not be transitioned, the method returns -1. If the transition is
+ * successful, the version of the node after transition is returned.
+ *
+ * <p>
+ * This method can fail and return false for three different reasons:
+ * <ul>
+ * <li>Node for this region does not exist</li>
+ * <li>Node for this region is not in MERGING state</li>
+ * <li>After verifying MERGING state, update fails because of wrong version
+ * (this should never actually happen since an RS only does this transition
+ * following a transition to MERGING. if two RS are conflicting, one would
+ * fail the original transition to MERGING and not this transition)</li>
+ * </ul>
+ *
+ * <p>
+ * Does not set any watches.
+ *
+ * <p>
+ * This method should only be used by a RegionServer when completing the open
+ * of merged region.
+ *
+ * @param zkw zk reference
+ * @param merged region to be transitioned to opened
+ * @param a merging region A
+ * @param b merging region B
+ * @param serverName server event originates from
+ * @param znodeVersion expected version of data before modification
+ * @return version of node after transition, -1 if unsuccessful transition
+ * @throws KeeperException if unexpected zookeeper exception
+ * @throws IOException
+ */
+ private static int transitionNodeMerge(ZooKeeperWatcher zkw,
+ HRegionInfo merged, HRegionInfo a, HRegionInfo b, ServerName serverName,
+ final int znodeVersion) throws KeeperException, IOException {
+ byte[] payload = HRegionInfo.toDelimitedByteArray(merged, a, b);
+ return ZKAssign.transitionNode(zkw, merged, serverName,
+ EventType.RS_ZK_REGION_MERGING, EventType.RS_ZK_REGION_MERGE,
+ znodeVersion, payload);
+ }
+
+ /**
+ *
+ * @param zkw zk reference
+ * @param parent region to be transitioned to merging
+ * @param serverName server event originates from
+ * @param version znode version
+ * @return version of node after transition, -1 if unsuccessful transition
+ * @throws KeeperException
+ * @throws IOException
+ */
+ int transitionNodeMerging(final ZooKeeperWatcher zkw,
+ final HRegionInfo parent, final ServerName serverName, final int version)
+ throws KeeperException, IOException {
+ return ZKAssign.transitionNode(zkw, parent, serverName,
+ EventType.RS_ZK_REGION_MERGING, EventType.RS_ZK_REGION_MERGING,
+ version);
+ }
+
+ private static int tickleNodeMerge(ZooKeeperWatcher zkw, HRegionInfo merged,
+ HRegionInfo a, HRegionInfo b, ServerName serverName,
+ final int znodeVersion) throws KeeperException, IOException {
+ byte[] payload = HRegionInfo.toDelimitedByteArray(a, b);
+ return ZKAssign.transitionNode(zkw, merged, serverName,
+ EventType.RS_ZK_REGION_MERGE, EventType.RS_ZK_REGION_MERGE,
+ znodeVersion, payload);
+ }
+
+ /**
+ * Checks if the given region has merge qualifier in .META.
+ * @param services
+ * @param regionName name of specified region
+ * @return true if the given region has merge qualifier in META.(It will be
+ * cleaned by CatalogJanitor)
+ * @throws IOException
+ */
+ boolean hasMergeQualifierInMeta(final RegionServerServices services,
+ final byte[] regionName) throws IOException {
+ // Get merge regions if it is a merged region and already has merge
+ // qualifier
+ Pair<HRegionInfo, HRegionInfo> mergeRegions = MetaReader
+ .getRegionsFromMergeQualifier(services.getCatalogTracker(), regionName);
+ if (mergeRegions != null &&
+ (mergeRegions.getFirst() != null || mergeRegions.getSecond() != null)) {
+ // It has merge qualifier
+ return true;
+ }
+ return false;
+ }
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java?rev=1460306&r1=1460305&r2=1460306&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java Sun Mar 24 10:26:21 2013
@@ -102,4 +102,9 @@ public interface RegionServerServices ex
* @return hbase executor service
*/
public ExecutorService getExecutorService();
+
+ /**
+ * @return The RegionServer's CatalogTracker
+ */
+ public CatalogTracker getCatalogTracker();
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java?rev=1460306&r1=1460305&r2=1460306&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java Sun Mar 24 10:26:21 2013
@@ -514,8 +514,9 @@ public class SplitTransaction {
void openDaughterRegion(final Server server, final HRegion daughter)
throws IOException, KeeperException {
HRegionInfo hri = daughter.getRegionInfo();
- LoggingProgressable reporter = server == null? null:
- new LoggingProgressable(hri, server.getConfiguration());
+ LoggingProgressable reporter = server == null ? null
+ : new LoggingProgressable(hri, server.getConfiguration().getLong(
+ "hbase.regionserver.split.daughter.open.log.interval", 10000));
daughter.openHRegion(reporter);
}
@@ -524,10 +525,9 @@ public class SplitTransaction {
private long lastLog = -1;
private final long interval;
- LoggingProgressable(final HRegionInfo hri, final Configuration c) {
+ LoggingProgressable(final HRegionInfo hri, final long interval) {
this.hri = hri;
- this.interval = c.getLong("hbase.regionserver.split.daughter.open.log.interval",
- 10000);
+ this.interval = interval;
}
@Override
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1460306&r1=1460305&r2=1460306&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Sun Mar 24 10:26:21 2013
@@ -1169,6 +1169,23 @@ public class HBaseTestingUtility extends
}
/**
+ * Create a table.
+ * @param tableName
+ * @param family
+ * @param splitRows
+ * @return An HTable instance for the created table.
+ * @throws IOException
+ */
+ public HTable createTable(byte[] tableName, byte[] family, byte[][] splitRows)
+ throws IOException {
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ HColumnDescriptor hcd = new HColumnDescriptor(family);
+ desc.addFamily(hcd);
+ getHBaseAdmin().createTable(desc, splitRows);
+ return new HTable(getConfiguration(), tableName);
+ }
+
+ /**
* Drop an existing table
* @param tableName existing table
*/
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java?rev=1460306&r1=1460305&r2=1460306&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java Sun Mar 24 10:26:21 2013
@@ -29,13 +29,13 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager;
@@ -54,6 +54,8 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
@@ -454,6 +456,13 @@ class MockRegionServer implements AdminP
}
@Override
+ public MergeRegionsResponse mergeRegions(RpcController controller,
+ MergeRegionsRequest request) throws ServiceException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
public CompactRegionResponse compactRegion(RpcController controller,
CompactRegionRequest request) throws ServiceException {
// TODO Auto-generated method stub
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java?rev=1460306&r1=1460305&r2=1460306&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java Sun Mar 24 10:26:21 2013
@@ -65,7 +65,7 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
-import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Triple;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -325,6 +325,11 @@ public class TestCatalogJanitor {
public TableLockManager getTableLockManager() {
return null;
}
+
+ @Override
+ public void dispatchMergingRegions(HRegionInfo region_a, HRegionInfo region_b,
+ boolean forcible) throws IOException {
+ }
}
@Test
@@ -546,9 +551,11 @@ public class TestCatalogJanitor {
splita.setOffline(true); //simulate that splita goes offline when it is split
splitParents.put(splita, createResult(splita, splitaa,splitab));
+ final Map<HRegionInfo, Result> mergedRegions = new TreeMap<HRegionInfo, Result>();
CatalogJanitor janitor = spy(new CatalogJanitor(server, services));
- doReturn(new Pair<Integer, Map<HRegionInfo, Result>>(
- 10, splitParents)).when(janitor).getSplitParents();
+ doReturn(new Triple<Integer, Map<HRegionInfo, Result>, Map<HRegionInfo, Result>>(
+ 10, mergedRegions, splitParents)).when(janitor)
+ .getMergedRegionsAndSplitParents();
//create ref from splita to parent
Path splitaRef =
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransaction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransaction.java?rev=1460306&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransaction.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransaction.java Sun Mar 24 10:26:21 2013
@@ -0,0 +1,426 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.zookeeper.KeeperException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Test the {@link RegionMergeTransaction} class against two HRegions (as
+ * opposed to running cluster).
+ */
+@Category(SmallTests.class)
+public class TestRegionMergeTransaction {
+ private final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private final Path testdir = TEST_UTIL.getDataTestDir(this.getClass()
+ .getName());
+ private HRegion region_a;
+ private HRegion region_b;
+ private HRegion region_c;
+ private HLog wal;
+ private FileSystem fs;
+ // Start rows of region_a,region_b,region_c
+ private static final byte[] STARTROW_A = new byte[] { 'a', 'a', 'a' };
+ private static final byte[] STARTROW_B = new byte[] { 'g', 'g', 'g' };
+ private static final byte[] STARTROW_C = new byte[] { 'w', 'w', 'w' };
+ private static final byte[] ENDROW = new byte[] { '{', '{', '{' };
+ private static final byte[] CF = HConstants.CATALOG_FAMILY;
+
+ @Before
+ public void setup() throws IOException {
+ this.fs = FileSystem.get(TEST_UTIL.getConfiguration());
+ this.fs.delete(this.testdir, true);
+ this.wal = HLogFactory.createHLog(fs, this.testdir, "logs",
+ TEST_UTIL.getConfiguration());
+ this.region_a = createRegion(this.testdir, this.wal, STARTROW_A, STARTROW_B);
+ this.region_b = createRegion(this.testdir, this.wal, STARTROW_B, STARTROW_C);
+ this.region_c = createRegion(this.testdir, this.wal, STARTROW_C, ENDROW);
+ assert region_a != null && region_b != null && region_c != null;
+ TEST_UTIL.getConfiguration().setBoolean("hbase.testing.nocluster", true);
+ }
+
+ @After
+ public void teardown() throws IOException {
+ for (HRegion region : new HRegion[] { region_a, region_b, region_c }) {
+ if (region != null && !region.isClosed()) region.close();
+ if (this.fs.exists(region.getRegionFileSystem().getRegionDir())
+ && !this.fs.delete(region.getRegionFileSystem().getRegionDir(), true)) {
+ throw new IOException("Failed deleting of "
+ + region.getRegionFileSystem().getRegionDir());
+ }
+ }
+ if (this.wal != null)
+ this.wal.closeAndDelete();
+ this.fs.delete(this.testdir, true);
+ }
+
+ /**
+ * Test straight prepare works. Tries to merge on {@link #region_a} and
+ * {@link #region_b}
+ * @throws IOException
+ */
+ @Test
+ public void testPrepare() throws IOException {
+ prepareOnGoodRegions();
+ }
+
+ private RegionMergeTransaction prepareOnGoodRegions() throws IOException {
+ RegionMergeTransaction mt = new RegionMergeTransaction(region_a, region_b,
+ false);
+ RegionMergeTransaction spyMT = Mockito.spy(mt);
+ doReturn(false).when(spyMT).hasMergeQualifierInMeta(null,
+ region_a.getRegionName());
+ doReturn(false).when(spyMT).hasMergeQualifierInMeta(null,
+ region_b.getRegionName());
+ assertTrue(spyMT.prepare(null));
+ return spyMT;
+ }
+
+ /**
+ * Test merging the same region
+ */
+ @Test
+ public void testPrepareWithSameRegion() throws IOException {
+ RegionMergeTransaction mt = new RegionMergeTransaction(this.region_a,
+ this.region_a, true);
+ assertFalse("should not merge the same region even if it is forcible ",
+ mt.prepare(null));
+ }
+
+ /**
+ * Test merging two not adjacent regions under a common merge
+ */
+ @Test
+ public void testPrepareWithRegionsNotAdjacent() throws IOException {
+ RegionMergeTransaction mt = new RegionMergeTransaction(this.region_a,
+ this.region_c, false);
+ assertFalse("should not merge two regions if they are adjacent except it is forcible",
+ mt.prepare(null));
+ }
+
+ /**
+ * Test merging two not adjacent regions under a compulsory merge
+ */
+ @Test
+ public void testPrepareWithRegionsNotAdjacentUnderCompulsory()
+ throws IOException {
+ RegionMergeTransaction mt = new RegionMergeTransaction(region_a, region_c,
+ true);
+ RegionMergeTransaction spyMT = Mockito.spy(mt);
+ doReturn(false).when(spyMT).hasMergeQualifierInMeta(null,
+ region_a.getRegionName());
+ doReturn(false).when(spyMT).hasMergeQualifierInMeta(null,
+ region_c.getRegionName());
+ assertTrue("Since focible is true, should merge two regions even if they are not adjacent",
+ spyMT.prepare(null));
+ }
+
+ /**
+ * Pass a reference store
+ */
+ @Test
+ public void testPrepareWithRegionsWithReference() throws IOException {
+ HStore storeMock = Mockito.mock(HStore.class);
+ when(storeMock.hasReferences()).thenReturn(true);
+ when(storeMock.getFamily()).thenReturn(new HColumnDescriptor("cf"));
+ when(storeMock.close()).thenReturn(ImmutableList.<StoreFile>of());
+ this.region_a.stores.put(Bytes.toBytes(""), storeMock);
+ RegionMergeTransaction mt = new RegionMergeTransaction(this.region_a,
+ this.region_b, false);
+ assertFalse(
+ "a region should not be mergeable if it has instances of store file references",
+ mt.prepare(null));
+ }
+
+ @Test
+ public void testPrepareWithClosedRegion() throws IOException {
+ this.region_a.close();
+ RegionMergeTransaction mt = new RegionMergeTransaction(this.region_a,
+ this.region_b, false);
+ assertFalse(mt.prepare(null));
+ }
+
+ /**
+ * Test merging regions which are merged regions and has reference in META all
+ * the same
+ */
+ @Test
+ public void testPrepareWithRegionsWithMergeReference() throws IOException {
+ RegionMergeTransaction mt = new RegionMergeTransaction(region_a, region_b,
+ false);
+ RegionMergeTransaction spyMT = Mockito.spy(mt);
+ doReturn(true).when(spyMT).hasMergeQualifierInMeta(null,
+ region_a.getRegionName());
+ doReturn(true).when(spyMT).hasMergeQualifierInMeta(null,
+ region_b.getRegionName());
+ assertFalse(spyMT.prepare(null));
+ }
+
+ @Test
+ public void testWholesomeMerge() throws IOException {
+ final int rowCountOfRegionA = loadRegion(this.region_a, CF, true);
+ final int rowCountOfRegionB = loadRegion(this.region_b, CF, true);
+ assertTrue(rowCountOfRegionA > 0 && rowCountOfRegionB > 0);
+ assertEquals(rowCountOfRegionA, countRows(this.region_a));
+ assertEquals(rowCountOfRegionB, countRows(this.region_b));
+
+ // Start transaction.
+ RegionMergeTransaction mt = prepareOnGoodRegions();
+
+ // Run the execute. Look at what it returns.
+ Server mockServer = Mockito.mock(Server.class);
+ when(mockServer.getConfiguration())
+ .thenReturn(TEST_UTIL.getConfiguration());
+ HRegion mergedRegion = mt.execute(mockServer, null);
+ // Do some assertions about execution.
+ assertTrue(this.fs.exists(mt.getMergesDir()));
+ // Assert region_a and region_b is closed.
+ assertTrue(region_a.isClosed());
+ assertTrue(region_b.isClosed());
+
+ // Assert mergedir is empty -- because its content will have been moved out
+ // to be under the merged region dirs.
+ assertEquals(0, this.fs.listStatus(mt.getMergesDir()).length);
+ // Check merged region have correct key span.
+ assertTrue(Bytes.equals(this.region_a.getStartKey(),
+ mergedRegion.getStartKey()));
+ assertTrue(Bytes.equals(this.region_b.getEndKey(),
+ mergedRegion.getEndKey()));
+ // Count rows. merged region are already open
+ try {
+ int mergedRegionRowCount = countRows(mergedRegion);
+ assertEquals((rowCountOfRegionA + rowCountOfRegionB),
+ mergedRegionRowCount);
+ } finally {
+ HRegion.closeHRegion(mergedRegion);
+ }
+ // Assert the write lock is no longer held on region_a and region_b
+ assertTrue(!this.region_a.lock.writeLock().isHeldByCurrentThread());
+ assertTrue(!this.region_b.lock.writeLock().isHeldByCurrentThread());
+ }
+
+ @Test
+ public void testRollback() throws IOException {
+ final int rowCountOfRegionA = loadRegion(this.region_a, CF, true);
+ final int rowCountOfRegionB = loadRegion(this.region_b, CF, true);
+ assertTrue(rowCountOfRegionA > 0 && rowCountOfRegionB > 0);
+ assertEquals(rowCountOfRegionA, countRows(this.region_a));
+ assertEquals(rowCountOfRegionB, countRows(this.region_b));
+
+ // Start transaction.
+ RegionMergeTransaction mt = prepareOnGoodRegions();
+
+ when(mt.createMergedRegionFromMerges(region_a, region_b,
+ mt.getMergedRegionInfo())).thenThrow(
+ new MockedFailedMergedRegionCreation());
+
+ // Run the execute. Look at what it returns.
+ boolean expectedException = false;
+ Server mockServer = Mockito.mock(Server.class);
+ when(mockServer.getConfiguration())
+ .thenReturn(TEST_UTIL.getConfiguration());
+ try {
+ mt.execute(mockServer, null);
+ } catch (MockedFailedMergedRegionCreation e) {
+ expectedException = true;
+ }
+ assertTrue(expectedException);
+ // Run rollback
+ assertTrue(mt.rollback(null, null));
+
+ // Assert I can scan region_a and region_b.
+ int rowCountOfRegionA2 = countRows(this.region_a);
+ assertEquals(rowCountOfRegionA, rowCountOfRegionA2);
+ int rowCountOfRegionB2 = countRows(this.region_b);
+ assertEquals(rowCountOfRegionB, rowCountOfRegionB2);
+
+ // Assert rollback cleaned up stuff in fs
+ assertTrue(!this.fs.exists(HRegion.getRegionDir(this.testdir,
+ mt.getMergedRegionInfo())));
+
+ assertTrue(!this.region_a.lock.writeLock().isHeldByCurrentThread());
+ assertTrue(!this.region_b.lock.writeLock().isHeldByCurrentThread());
+
+ // Now retry the merge but do not throw an exception this time.
+ assertTrue(mt.prepare(null));
+ HRegion mergedRegion = mt.execute(mockServer, null);
+ // Count rows. daughters are already open
+ // Count rows. merged region are already open
+ try {
+ int mergedRegionRowCount = countRows(mergedRegion);
+ assertEquals((rowCountOfRegionA + rowCountOfRegionB),
+ mergedRegionRowCount);
+ } finally {
+ HRegion.closeHRegion(mergedRegion);
+ }
+ // Assert the write lock is no longer held on region_a and region_b
+ assertTrue(!this.region_a.lock.writeLock().isHeldByCurrentThread());
+ assertTrue(!this.region_b.lock.writeLock().isHeldByCurrentThread());
+ }
+
+ @Test
+ public void testFailAfterPONR() throws IOException, KeeperException {
+ final int rowCountOfRegionA = loadRegion(this.region_a, CF, true);
+ final int rowCountOfRegionB = loadRegion(this.region_b, CF, true);
+ assertTrue(rowCountOfRegionA > 0 && rowCountOfRegionB > 0);
+ assertEquals(rowCountOfRegionA, countRows(this.region_a));
+ assertEquals(rowCountOfRegionB, countRows(this.region_b));
+
+ // Start transaction.
+ RegionMergeTransaction mt = prepareOnGoodRegions();
+ Mockito.doThrow(new MockedFailedMergedRegionOpen())
+ .when(mt)
+ .openMergedRegion((Server) Mockito.anyObject(),
+ (RegionServerServices) Mockito.anyObject(),
+ (HRegion) Mockito.anyObject());
+
+ // Run the execute. Look at what it returns.
+ boolean expectedException = false;
+ Server mockServer = Mockito.mock(Server.class);
+ when(mockServer.getConfiguration())
+ .thenReturn(TEST_UTIL.getConfiguration());
+ try {
+ mt.execute(mockServer, null);
+ } catch (MockedFailedMergedRegionOpen e) {
+ expectedException = true;
+ }
+ assertTrue(expectedException);
+ // Run rollback returns false that we should restart.
+ assertFalse(mt.rollback(null, null));
+ // Make sure that merged region is still in the filesystem, that
+ // they have not been removed; this is supposed to be the case if we go
+ // past point of no return.
+ Path tableDir = this.region_a.getRegionFileSystem().getRegionDir()
+ .getParent();
+ Path mergedRegionDir = new Path(tableDir, mt.getMergedRegionInfo()
+ .getEncodedName());
+ assertTrue(TEST_UTIL.getTestFileSystem().exists(mergedRegionDir));
+ }
+
+ /**
+ * Exception used in this class only.
+ */
+ @SuppressWarnings("serial")
+ private class MockedFailedMergedRegionCreation extends IOException {
+ }
+
+ @SuppressWarnings("serial")
+ private class MockedFailedMergedRegionOpen extends IOException {
+ }
+
+ private HRegion createRegion(final Path testdir, final HLog wal,
+ final byte[] startrow, final byte[] endrow)
+ throws IOException {
+ // Make a region with start and end keys.
+ HTableDescriptor htd = new HTableDescriptor("table");
+ HColumnDescriptor hcd = new HColumnDescriptor(CF);
+ htd.addFamily(hcd);
+ HRegionInfo hri = new HRegionInfo(htd.getName(), startrow, endrow);
+ HRegion a = HRegion.createHRegion(hri, testdir,
+ TEST_UTIL.getConfiguration(), htd);
+ HRegion.closeHRegion(a);
+ return HRegion.openHRegion(testdir, hri, htd, wal,
+ TEST_UTIL.getConfiguration());
+ }
+
+ private int countRows(final HRegion r) throws IOException {
+ int rowcount = 0;
+ InternalScanner scanner = r.getScanner(new Scan());
+ try {
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
+ boolean hasNext = true;
+ while (hasNext) {
+ hasNext = scanner.next(kvs);
+ if (!kvs.isEmpty())
+ rowcount++;
+ }
+ } finally {
+ scanner.close();
+ }
+ return rowcount;
+ }
+
+ /**
+ * Load region with rows from 'aaa' to 'zzz', skip the rows which are out of
+ * range of the region
+ * @param r Region
+ * @param f Family
+ * @param flush flush the cache if true
+ * @return Count of rows loaded.
+ * @throws IOException
+ */
+ private int loadRegion(final HRegion r, final byte[] f, final boolean flush)
+ throws IOException {
+ byte[] k = new byte[3];
+ int rowCount = 0;
+ for (byte b1 = 'a'; b1 <= 'z'; b1++) {
+ for (byte b2 = 'a'; b2 <= 'z'; b2++) {
+ for (byte b3 = 'a'; b3 <= 'z'; b3++) {
+ k[0] = b1;
+ k[1] = b2;
+ k[2] = b3;
+ if (!HRegion.rowIsInRange(r.getRegionInfo(), k)) {
+ continue;
+ }
+ Put put = new Put(k);
+ put.add(f, null, k);
+ if (r.getLog() == null)
+ put.setWriteToWAL(false);
+ r.put(put);
+ rowCount++;
+ }
+ }
+ if (flush) {
+ r.flushcache();
+ }
+ }
+ return rowCount;
+ }
+
+}
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java?rev=1460306&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java Sun Mar 24 10:26:21 2013
@@ -0,0 +1,297 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.base.Joiner;
+
+/**
+ * Like {@link TestRegionMergeTransaction} in that we're testing
+ * {@link RegionMergeTransaction} only the below tests are against a running
+ * cluster where {@link TestRegionMergeTransaction} is tests against bare
+ * {@link HRegion}.
+ */
+@Category(LargeTests.class)
+public class TestRegionMergeTransactionOnCluster {
+ private static final Log LOG = LogFactory
+ .getLog(TestRegionMergeTransactionOnCluster.class);
+ private static final int NB_SERVERS = 3;
+
+ private static final byte[] FAMILYNAME = Bytes.toBytes("fam");
+ private static final byte[] QUALIFIER = Bytes.toBytes("q");
+
+ private static byte[] ROW = Bytes.toBytes("testRow");
+ private static final int INITIAL_REGION_NUM = 10;
+ private static final int ROWSIZE = 200;
+ private static byte[][] ROWS = makeN(ROW, ROWSIZE);
+
+ private static int waitTime = 60 * 1000;
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private static HMaster master;
+ private static HBaseAdmin admin;
+
+ @BeforeClass
+ public static void beforeAllTests() throws Exception {
+ // Start a cluster
+ TEST_UTIL.startMiniCluster(NB_SERVERS);
+ MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+ master = cluster.getMaster();
+ master.balanceSwitch(false);
+ admin = TEST_UTIL.getHBaseAdmin();
+ }
+
+ @AfterClass
+ public static void afterAllTests() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testWholesomeMerge() throws Exception {
+ LOG.info("Starting testWholesomeMerge");
+ final byte[] tableName = Bytes.toBytes("testWholesomeMerge");
+
+ // Create table and load data.
+ HTable table = createTableAndLoadData(master, tableName);
+ // Merge 1st and 2nd region
+ mergeRegionsAndVerifyRegionNum(master, tableName, 0, 1,
+ INITIAL_REGION_NUM - 1);
+
+ // Merge 2nd and 3th region
+ mergeRegionsAndVerifyRegionNum(master, tableName, 1, 2,
+ INITIAL_REGION_NUM - 2);
+
+ verifyRowCount(table, ROWSIZE);
+
+ table.close();
+
+ }
+
+ @Test
+ public void testCleanMergeReference() throws Exception {
+ LOG.info("Starting testCleanMergeReference");
+ admin.enableCatalogJanitor(false);
+ try {
+ final byte[] tableName = Bytes.toBytes("testCleanMergeReference");
+ // Create table and load data.
+ HTable table = createTableAndLoadData(master, tableName);
+ // Merge 1st and 2nd region
+ mergeRegionsAndVerifyRegionNum(master, tableName, 0, 1,
+ INITIAL_REGION_NUM - 1);
+ verifyRowCount(table, ROWSIZE);
+ table.close();
+
+ List<Pair<HRegionInfo, ServerName>> tableRegions = MetaReader
+ .getTableRegionsAndLocations(master.getCatalogTracker(),
+ Bytes.toString(tableName));
+ HRegionInfo mergedRegionInfo = tableRegions.get(0).getFirst();
+ HTableDescriptor tableDescritor = master.getTableDescriptors().get(
+ Bytes.toString(tableName));
+ Result mergedRegionResult = MetaReader.getRegionResult(
+ master.getCatalogTracker(), mergedRegionInfo.getRegionName());
+
+ // contains merge reference in META
+ assertTrue(mergedRegionResult.getValue(HConstants.CATALOG_FAMILY,
+ HConstants.MERGEA_QUALIFIER) != null);
+ assertTrue(mergedRegionResult.getValue(HConstants.CATALOG_FAMILY,
+ HConstants.MERGEB_QUALIFIER) != null);
+
+ // merging regions' directory are in the file system all the same
+ HRegionInfo regionA = HRegionInfo.getHRegionInfo(mergedRegionResult,
+ HConstants.MERGEA_QUALIFIER);
+ HRegionInfo regionB = HRegionInfo.getHRegionInfo(mergedRegionResult,
+ HConstants.MERGEB_QUALIFIER);
+ FileSystem fs = master.getMasterFileSystem().getFileSystem();
+ Path rootDir = master.getMasterFileSystem().getRootDir();
+
+ Path tabledir = new Path(rootDir, mergedRegionInfo.getTableNameAsString());
+ Path regionAdir = new Path(tabledir, regionA.getEncodedName());
+ Path regionBdir = new Path(tabledir, regionB.getEncodedName());
+ assertTrue(fs.exists(regionAdir));
+ assertTrue(fs.exists(regionBdir));
+
+ admin.compact(mergedRegionInfo.getRegionName());
+ // wait until merged region doesn't have reference file
+ long timeout = System.currentTimeMillis() + waitTime;
+ HRegionFileSystem hrfs = new HRegionFileSystem(
+ TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo);
+ while (System.currentTimeMillis() < timeout) {
+ if (!hrfs.hasReferences(tableDescritor)) {
+ break;
+ }
+ Thread.sleep(50);
+ }
+ assertFalse(hrfs.hasReferences(tableDescritor));
+
+ // run CatalogJanitor to clean merge references in META and archive the
+ // files of merging regions
+ int cleaned = admin.runCatalogScan();
+ assertTrue(cleaned > 0);
+ assertFalse(fs.exists(regionAdir));
+ assertFalse(fs.exists(regionBdir));
+
+ mergedRegionResult = MetaReader.getRegionResult(
+ master.getCatalogTracker(), mergedRegionInfo.getRegionName());
+ assertFalse(mergedRegionResult.getValue(HConstants.CATALOG_FAMILY,
+ HConstants.MERGEA_QUALIFIER) != null);
+ assertFalse(mergedRegionResult.getValue(HConstants.CATALOG_FAMILY,
+ HConstants.MERGEB_QUALIFIER) != null);
+
+ } finally {
+ admin.enableCatalogJanitor(true);
+ }
+
+
+ }
+
+ private void mergeRegionsAndVerifyRegionNum(HMaster master, byte[] tablename,
+ int regionAnum, int regionBnum, int expectedRegionNum) throws Exception {
+ requestMergeRegion(master, tablename, regionAnum, regionBnum);
+ waitAndVerifyRegionNum(master, tablename, expectedRegionNum);
+ }
+
+ private void requestMergeRegion(HMaster master, byte[] tablename,
+ int regionAnum, int regionBnum) throws Exception {
+ List<Pair<HRegionInfo, ServerName>> tableRegions = MetaReader
+ .getTableRegionsAndLocations(master.getCatalogTracker(),
+ Bytes.toString(tablename));
+ TEST_UTIL.getHBaseAdmin().mergeRegions(
+ tableRegions.get(regionAnum).getFirst().getEncodedNameAsBytes(),
+ tableRegions.get(regionBnum).getFirst().getEncodedNameAsBytes(), false);
+ }
+
+ private void waitAndVerifyRegionNum(HMaster master, byte[] tablename,
+ int expectedRegionNum) throws Exception {
+ List<Pair<HRegionInfo, ServerName>> tableRegionsInMeta;
+ List<HRegionInfo> tableRegionsInMaster;
+ long timeout = System.currentTimeMillis() + waitTime;
+ while (System.currentTimeMillis() < timeout) {
+ tableRegionsInMeta = MetaReader.getTableRegionsAndLocations(
+ master.getCatalogTracker(), Bytes.toString(tablename));
+ tableRegionsInMaster = master.getAssignmentManager().getRegionStates()
+ .getRegionsOfTable(tablename);
+ if (tableRegionsInMeta.size() == expectedRegionNum
+ && tableRegionsInMaster.size() == expectedRegionNum) {
+ break;
+ }
+ Thread.sleep(250);
+ }
+
+ tableRegionsInMeta = MetaReader.getTableRegionsAndLocations(
+ master.getCatalogTracker(), Bytes.toString(tablename));
+ LOG.info("Regions after merge:" + Joiner.on(',').join(tableRegionsInMeta));
+ assertEquals(expectedRegionNum, tableRegionsInMeta.size());
+ }
+
+ private HTable createTableAndLoadData(HMaster master, byte[] tablename)
+ throws Exception {
+ return createTableAndLoadData(master, tablename, INITIAL_REGION_NUM);
+ }
+
+ private HTable createTableAndLoadData(HMaster master, byte[] tablename,
+ int numRegions) throws Exception {
+ assertTrue("ROWSIZE must > numregions:" + numRegions, ROWSIZE > numRegions);
+ byte[][] splitRows = new byte[numRegions - 1][];
+ for (int i = 0; i < splitRows.length; i++) {
+ splitRows[i] = ROWS[(i + 1) * ROWSIZE / numRegions];
+ }
+
+ HTable table = TEST_UTIL.createTable(tablename, FAMILYNAME, splitRows);
+ loadData(table);
+ verifyRowCount(table, ROWSIZE);
+
+ // sleep here is an ugly hack to allow region transitions to finish
+ long timeout = System.currentTimeMillis() + waitTime;
+ List<Pair<HRegionInfo, ServerName>> tableRegions;
+ while (System.currentTimeMillis() < timeout) {
+ tableRegions = MetaReader.getTableRegionsAndLocations(
+ master.getCatalogTracker(), Bytes.toString(tablename));
+ if (tableRegions.size() == numRegions)
+ break;
+ Thread.sleep(250);
+ }
+
+ tableRegions = MetaReader.getTableRegionsAndLocations(
+ master.getCatalogTracker(), Bytes.toString(tablename));
+ LOG.info("Regions after load: " + Joiner.on(',').join(tableRegions));
+ assertEquals(numRegions, tableRegions.size());
+ return table;
+ }
+
+ private static byte[][] makeN(byte[] base, int n) {
+ byte[][] ret = new byte[n][];
+ for (int i = 0; i < n; i++) {
+ ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i)));
+ }
+ return ret;
+ }
+
+ private void loadData(HTable table) throws IOException {
+ for (int i = 0; i < ROWSIZE; i++) {
+ Put put = new Put(ROWS[i]);
+ put.add(FAMILYNAME, QUALIFIER, Bytes.toBytes(i));
+ table.put(put);
+ }
+ }
+
+ private void verifyRowCount(HTable table, int expectedRegionNum)
+ throws IOException {
+ ResultScanner scanner = table.getScanner(new Scan());
+ int rowCount = 0;
+ while (scanner.next() != null) {
+ rowCount++;
+ }
+ assertEquals(expectedRegionNum, rowCount);
+ scanner.close();
+ }
+
+}