You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2011/10/31 18:23:25 UTC
svn commit: r1195573 - in /hbase/branches/0.92: ./
src/main/java/org/apache/hadoop/hbase/ipc/
src/main/java/org/apache/hadoop/hbase/mapreduce/
src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/resources/
src/test/java/org/apache/hadoop/hbase...
Author: tedyu
Date: Mon Oct 31 17:23:24 2011
New Revision: 1195573
URL: http://svn.apache.org/viewvc?rev=1195573&view=rev
Log:
HBASE-4552 multi-CF bulk load is not atomic across column families (Jonathan Hsieh)
Added:
hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
Modified:
hbase/branches/0.92/CHANGES.txt
hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/branches/0.92/src/main/resources/hbase-default.xml
hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
Modified: hbase/branches/0.92/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/CHANGES.txt?rev=1195573&r1=1195572&r2=1195573&view=diff
==============================================================================
--- hbase/branches/0.92/CHANGES.txt (original)
+++ hbase/branches/0.92/CHANGES.txt Mon Oct 31 17:23:24 2011
@@ -396,6 +396,7 @@ Release 0.92.0 - Unreleased
to move a region
HBASE-4613 hbase.util.Threads#threadDumpingIsAlive sleeps 1 second,
slowing down the shutdown by 0.5s
+ HBASE-4552 multi-CF bulk load is not atomic across column families (Jonathan Hsieh)
TESTS
HBASE-4492 TestRollingRestart fails intermittently
Modified: hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1195573&r1=1195572&r2=1195573&view=diff
==============================================================================
--- hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Mon Oct 31 17:23:24 2011
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
@@ -61,7 +62,7 @@ public interface HRegionInterface extend
// maintained a single global version number on all HBase Interfaces. This
// meant all HBase RPC was broke though only one of the three RPC Interfaces
// had changed. This has since been undone.
- public static final long VERSION = 28L;
+ public static final long VERSION = 29L;
/**
* Get metainfo about an HRegion
@@ -309,9 +310,13 @@ public interface HRegionInterface extend
public <R> MultiResponse multi(MultiAction<R> multi) throws IOException;
/**
- * Bulk load an HFile into an open region
+ * Atomically bulk load multiple HFiles (say from different column families)
+ * into an open region.
+ *
+ * @param familyPaths List of (family, hfile path) pairs
+ * @param regionName name of region to load hfiles into
*/
- public void bulkLoadHFile(String hfilePath, byte[] regionName, byte[] familyName)
+ public void bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, byte[] regionName)
throws IOException;
// Master methods
Modified: hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1195573&r1=1195572&r2=1195573&view=diff
==============================================================================
--- hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Mon Oct 31 17:23:24 2011
@@ -21,13 +21,16 @@ package org.apache.hadoop.hbase.mapreduc
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Deque;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
@@ -37,7 +40,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
@@ -62,9 +64,9 @@ import org.apache.hadoop.hbase.io.HalfSt
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.Reference.Range;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
@@ -72,9 +74,11 @@ import org.apache.hadoop.hbase.util.Pair
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
/**
* Tool to load the output of HFileOutputFormat into an existing table.
* @see #usage()
@@ -87,8 +91,6 @@ public class LoadIncrementalHFiles exten
static AtomicLong regionCount = new AtomicLong(0);
private HBaseAdmin hbAdmin;
private Configuration cfg;
- private Set<Future> futures = new HashSet<Future>();
- private Set<Future> futuresForSplittingHFile = new HashSet<Future>();
public static String NAME = "completebulkload";
@@ -112,7 +114,7 @@ public class LoadIncrementalHFiles exten
* region boundary, and each part is added back into the queue.
* The import process finishes when the queue is empty.
*/
- private static class LoadQueueItem {
+ static class LoadQueueItem {
final byte[] family;
final Path hfilePath;
@@ -120,13 +122,17 @@ public class LoadIncrementalHFiles exten
this.family = family;
this.hfilePath = hfilePath;
}
+
+ public String toString() {
+ return "family:"+ Bytes.toString(family) + " path:" + hfilePath.toString();
+ }
}
/**
* Walk the given directory for all HFiles, and return a Queue
* containing all such files.
*/
- private Deque<LoadQueueItem> discoverLoadQueue(Path hfofDir)
+ private void discoverLoadQueue(Deque<LoadQueueItem> ret, Path hfofDir)
throws IOException {
FileSystem fs = hfofDir.getFileSystem(getConf());
@@ -140,7 +146,6 @@ public class LoadIncrementalHFiles exten
throw new FileNotFoundException("No families found in " + hfofDir);
}
- Deque<LoadQueueItem> ret = new LinkedList<LoadQueueItem>();
for (FileStatus stat : familyDirStatuses) {
if (!stat.isDir()) {
LOG.warn("Skipping non-directory " + stat.getPath());
@@ -156,7 +161,6 @@ public class LoadIncrementalHFiles exten
ret.add(new LoadQueueItem(family, hfile));
}
}
- return ret;
}
/**
@@ -167,10 +171,10 @@ public class LoadIncrementalHFiles exten
* @param table the table to load into
* @throws TableNotFoundException if table does not yet exist
*/
- public void doBulkLoad(Path hfofDir, HTable table)
+ public void doBulkLoad(Path hfofDir, final HTable table)
throws TableNotFoundException, IOException
{
- HConnection conn = table.getConnection();
+ final HConnection conn = table.getConnection();
if (!conn.isTableAvailable(table.getTableName())) {
throw new TableNotFoundException("Table " +
@@ -178,54 +182,51 @@ public class LoadIncrementalHFiles exten
"is not currently available.");
}
- Deque<LoadQueueItem> queue = null;
+ // initialize thread pools
int nrThreads = cfg.getInt("hbase.loadincremental.threads.max",
Runtime.getRuntime().availableProcessors());
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
builder.setNameFormat("LoadIncrementalHFiles-%1$d");
-
ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads,
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
builder.build());
((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true);
+
+ // LQI queue does not need to be threadsafe -- all operations on this queue
+ // happen in this thread
+ Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>();
try {
- queue = discoverLoadQueue(hfofDir);
- // outer loop picks up LoadQueueItem due to HFile split
- while (!queue.isEmpty() || futuresForSplittingHFile.size() > 0) {
- Pair<byte[][],byte[][]> startEndKeys = table.getStartEndKeys();
- // inner loop groups callables
- while (!queue.isEmpty()) {
- LoadQueueItem item = queue.remove();
- tryLoad(item, conn, table, queue, startEndKeys, pool);
- }
- Iterator<Future> iter = futuresForSplittingHFile.iterator();
- while (iter.hasNext()) {
- boolean timeoutSeen = false;
- Future future = iter.next();
- try {
- future.get(20, TimeUnit.MILLISECONDS);
- break; // we have at least two new HFiles to process
- } catch (ExecutionException ee) {
- LOG.error(ee);
- } catch (InterruptedException ie) {
- LOG.error(ie);
- } catch (TimeoutException te) {
- timeoutSeen = true;
- } finally {
- if (!timeoutSeen) iter.remove();
- }
+ discoverLoadQueue(queue, hfofDir);
+ int count = 0;
+
+ // Assumes that region splits can happen while this occurs.
+ while (!queue.isEmpty()) {
+ // need to reload split keys each iteration.
+ final Pair<byte[][], byte[][]> startEndKeys = table.getStartEndKeys();
+ if (count != 0) {
+ LOG.info("Split occured while grouping HFiles, retry attempt " +
+ + count + " with " + queue.size() + " files remaining to load");
}
- }
- for (Future<Void> future : futures) {
- try {
- future.get();
- } catch (ExecutionException ee) {
- LOG.error(ee);
- } catch (InterruptedException ie) {
- LOG.error(ie);
+
+ int maxRetries = cfg.getInt("hbase.bulkload.retries.number", 10);
+ if (count >= maxRetries) {
+ LOG.error("Retry attempted " + count + " times without completing, bailing out");
+ return;
}
+ count++;
+
+ // Using ByteBuffer for byte[] equality semantics
+ Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(table,
+ pool, queue, startEndKeys);
+
+ bulkLoadPhase(table, conn, pool, queue, regionGroups);
+
+ // NOTE: The next iteration's split / group could happen in parallel to
+ // atomic bulkloads assuming that there are splits and no merges, and
+ // that we can atomically pull out the groups we want to retry.
}
+
} finally {
pool.shutdown();
if (queue != null && !queue.isEmpty()) {
@@ -241,15 +242,111 @@ public class LoadIncrementalHFiles exten
}
}
+ /**
+ * This takes the LQI's grouped by likely regions and attempts to bulk load
+ * them. Any failures are re-queued for another pass with the
+ * groupOrSplitPhase.
+ */
+ private void bulkLoadPhase(final HTable table, final HConnection conn,
+ ExecutorService pool, Deque<LoadQueueItem> queue,
+ final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException {
+ // atomically bulk load the groups.
+ Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<Future<List<LoadQueueItem>>>();
+ for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) {
+ final byte[] first = e.getKey().array();
+ final Collection<LoadQueueItem> lqis = e.getValue();
+
+ final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
+ public List<LoadQueueItem> call() throws Exception {
+ List<LoadQueueItem> toRetry = tryAtomicRegionLoad(conn, table.getTableName(), first, lqis);
+ return toRetry;
+ }
+ };
+ loadingFutures.add(pool.submit(call));
+ }
+
+ // get all the results.
+ for (Future<List<LoadQueueItem>> future : loadingFutures) {
+ try {
+ List<LoadQueueItem> toRetry = future.get();
+ if (toRetry != null && toRetry.size() != 0) {
+ // LQIs that are requeued to be regrouped.
+ queue.addAll(toRetry);
+ }
+
+ } catch (ExecutionException e1) {
+ Throwable t = e1.getCause();
+ if (t instanceof IOException) {
+ LOG.error("IOException during bulk load", e1);
+ throw (IOException)t; // would have been thrown if not parallelized,
+ }
+ LOG.error("Unexpected execution exception during bulk load", e1);
+ throw new IllegalStateException(t);
+ } catch (InterruptedException e1) {
+ LOG.error("Unexpected interrupted exception during bulk load", e1);
+ throw new IllegalStateException(e1);
+ }
+ }
+ }
+
+ /**
+ * @return A Multimap<startkey, LoadQueueItem> that groups LQI by likely
+ * bulk load region targets.
+ */
+ private Multimap<ByteBuffer, LoadQueueItem> groupOrSplitPhase(final HTable table,
+ ExecutorService pool, Deque<LoadQueueItem> queue,
+ final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+ // <region start key, LQI> need synchronized only within this scope of this
+ // phase because of the puts that happen in futures.
+ Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
+ final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
+
+ // drain LQIs and figure out bulk load groups
+ Set<Future<List<LoadQueueItem>>> splittingFutures = new HashSet<Future<List<LoadQueueItem>>>();
+ while (!queue.isEmpty()) {
+ final LoadQueueItem item = queue.remove();
+
+ final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
+ public List<LoadQueueItem> call() throws Exception {
+ List<LoadQueueItem> splits = groupOrSplit(regionGroups, item, table, startEndKeys);
+ return splits;
+ }
+ };
+ splittingFutures.add(pool.submit(call));
+ }
+ // get all the results. All grouping and splitting must finish before
+ // we can attempt the atomic loads.
+ for (Future<List<LoadQueueItem>> lqis : splittingFutures) {
+ try {
+ List<LoadQueueItem> splits = lqis.get();
+ if (splits != null) {
+ queue.addAll(splits);
+ }
+ } catch (ExecutionException e1) {
+ Throwable t = e1.getCause();
+ if (t instanceof IOException) {
+ LOG.error("IOException during splitting", e1);
+ throw (IOException)t; // would have been thrown if not parallelized,
+ }
+ LOG.error("Unexpected execution exception during splitting", e1);
+ throw new IllegalStateException(t);
+ } catch (InterruptedException e1) {
+ LOG.error("Unexpected interrupted exception during splitting", e1);
+ throw new IllegalStateException(e1);
+ }
+ }
+ return regionGroups;
+ }
+
// unique file name for the table
String getUniqueName(byte[] tableName) {
String name = Bytes.toStringBinary(tableName) + "," + regionCount.incrementAndGet();
return name;
}
- void splitStoreFileAndRequeue(final LoadQueueItem item,
- final Deque<LoadQueueItem> queue, final HTable table,
- byte[] startKey, byte[] splitKey) throws IOException {
+ protected List<LoadQueueItem> splitStoreFile(final LoadQueueItem item,
+ final HTable table, byte[] startKey,
+ byte[] splitKey) throws IOException {
final Path hfilePath = item.hfilePath;
// We use a '_' prefix which is ignored when walking directory trees
@@ -268,25 +365,26 @@ public class LoadIncrementalHFiles exten
// Add these back at the *front* of the queue, so there's a lower
// chance that the region will just split again before we get there.
- synchronized (queue) {
- queue.addFirst(new LoadQueueItem(item.family, botOut));
- queue.addFirst(new LoadQueueItem(item.family, topOut));
- }
+ List<LoadQueueItem> lqis = new ArrayList<LoadQueueItem>(2);
+ lqis.add(new LoadQueueItem(item.family, botOut));
+ lqis.add(new LoadQueueItem(item.family, topOut));
+
LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
+ return lqis;
}
/**
- * Attempt to load the given load queue item into its target region server.
+ * Attempt to assign the given load queue item into its target region group.
* If the hfile boundary no longer fits into a region, physically splits
- * the hfile such that the new bottom half will fit, and adds the two
- * resultant hfiles back into the load queue.
+ * the hfile such that the new bottom half will fit and returns the list of
+ * LQI's corresponding to the resultant hfiles.
+ *
+ * protected for testing
*/
- private boolean tryLoad(final LoadQueueItem item,
- final HConnection conn, final HTable table,
- final Deque<LoadQueueItem> queue,
- final Pair<byte[][],byte[][]> startEndKeys,
- ExecutorService pool)
- throws IOException {
+ protected List<LoadQueueItem> groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups,
+ final LoadQueueItem item, final HTable table,
+ final Pair<byte[][], byte[][]> startEndKeys)
+ throws IOException {
final Path hfilePath = item.hfilePath;
final FileSystem fs = hfilePath.getFileSystem(getConf());
HFile.Reader hfr = HFile.createReader(fs, hfilePath,
@@ -305,54 +403,80 @@ public class LoadIncrementalHFiles exten
" last=" + Bytes.toStringBinary(last));
if (first == null || last == null) {
assert first == null && last == null;
+ // TODO what if this is due to a bad HFile?
LOG.info("hfile " + hfilePath + " has no entries, skipping");
- return false;
+ return null;
}
if (Bytes.compareTo(first, last) > 0) {
throw new IllegalArgumentException(
"Invalid range: " + Bytes.toStringBinary(first) +
" > " + Bytes.toStringBinary(last));
}
- int idx = Arrays.binarySearch(startEndKeys.getFirst(), first, Bytes.BYTES_COMPARATOR);
+ int idx = Arrays.binarySearch(startEndKeys.getFirst(), first,
+ Bytes.BYTES_COMPARATOR);
if (idx < 0) {
- idx = -(idx+1)-1;
+ // not on boundary, returns -(insertion index). Calculate region it
+ // would be in.
+ idx = -(idx + 1) - 1;
}
final int indexForCallable = idx;
boolean lastKeyInRange =
Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 ||
Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
if (!lastKeyInRange) {
- Callable<Void> callable = new Callable<Void>() {
- public Void call() throws Exception {
- splitStoreFileAndRequeue(item, queue, table,
- startEndKeys.getFirst()[indexForCallable],
- startEndKeys.getSecond()[indexForCallable]);
- return (Void)null;
- }
- };
- futuresForSplittingHFile.add(pool.submit(callable));
+ List<LoadQueueItem> lqis = splitStoreFile(item, table,
+ startEndKeys.getFirst()[indexForCallable],
+ startEndKeys.getSecond()[indexForCallable]);
+ return lqis;
+ }
+
+ // group regions.
+ regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item);
+ return null;
+ }
+
+ /**
+ * Attempts to do an atomic load of many hfiles into a region. If it fails,
+ * it returns a list of hfiles that need to be retried. If it is successful
+ * it will return an empty list.
+ *
+ * NOTE: To maintain row atomicity guarantees, region server callable should
+ * succeed atomically and fails atomically.
+ *
+ * Protected for testing.
+ */
+ protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn,
+ byte[] tableName, final byte[] first, Collection<LoadQueueItem> lqis) {
- return true;
+ final List<Pair<byte[], String>> famPaths =
+ new ArrayList<Pair<byte[], String>>(lqis.size());
+ for (LoadQueueItem lqi : lqis) {
+ famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
}
- final ServerCallable<Void> svrCallable = new ServerCallable<Void>(conn, table.getTableName(), first) {
+ final ServerCallable<Void> svrCallable = new ServerCallable<Void>(conn,
+ tableName, first) {
@Override
public Void call() throws Exception {
- LOG.debug("Going to connect to server " + location +
- "for row " + Bytes.toStringBinary(row));
-
+ LOG.debug("Going to connect to server " + location + " for row "
+ + Bytes.toStringBinary(row));
byte[] regionName = location.getRegionInfo().getRegionName();
- server.bulkLoadHFile(hfilePath.toString(), regionName, item.family);
+ server.bulkLoadHFiles(famPaths, regionName);
return null;
}
};
- Callable<Void> callable = new Callable<Void>() {
- public Void call() throws Exception {
- return conn.getRegionServerWithRetries(svrCallable);
- }
- };
- futures.add(pool.submit(callable));
- return false;
+
+ List<LoadQueueItem> toRetry = new ArrayList<LoadQueueItem>();
+ try {
+ conn.getRegionServerWithRetries(svrCallable);
+ } catch (IOException e) {
+ LOG.warn("Attempt to bulk load region containing "
+ + Bytes.toStringBinary(first) + " into table "
+ + Bytes.toStringBinary(tableName) + " with files " + lqis
+ + " failed");
+ toRetry.addAll(lqis);
+ }
+ return toRetry;
}
/**
@@ -559,7 +683,8 @@ public class LoadIncrementalHFiles exten
}
public static void main(String[] args) throws Exception {
- ToolRunner.run(new LoadIncrementalHFiles(HBaseConfiguration.create()), args);
+ int ret = ToolRunner.run(new LoadIncrementalHFiles(HBaseConfiguration.create()), args);
+ System.exit(ret);
}
}
Modified: hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1195573&r1=1195572&r2=1195573&view=diff
==============================================================================
--- hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Mon Oct 31 17:23:24 2011
@@ -103,6 +103,7 @@ import org.apache.hadoop.hbase.util.FSUt
import org.apache.hadoop.hbase.util.HashedBytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.StringUtils;
import org.cliffc.high_scale_lib.Counter;
@@ -2605,23 +2606,89 @@ public class HRegion implements HeapSize
return lid;
}
- public void bulkLoadHFile(String hfilePath, byte[] familyName)
+ /**
+ * Attempts to atomically load a group of hfiles. This is critical for loading
+ * rows with multiple column families atomically.
+ *
+ * @param familyPaths List of Pair<byte[] column family, String hfilePath>
+ */
+ public void bulkLoadHFiles(List<Pair<byte[], String>> familyPaths)
throws IOException {
- startRegionOperation();
+ startBulkRegionOperation();
this.writeRequestsCount.increment();
+ List<IOException> ioes = new ArrayList<IOException>();
+ List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();
+ boolean rangesOk = true;
try {
- Store store = getStore(familyName);
- if (store == null) {
- throw new DoNotRetryIOException(
- "No such column family " + Bytes.toStringBinary(familyName));
+ // There possibly was a split that happend between when the split keys
+ // were gathered and before the HReiogn's write lock was taken. We need
+ // to validate the HFile region before attempting to bulk load all of them
+ for (Pair<byte[], String> p : familyPaths) {
+ byte[] familyName = p.getFirst();
+ String path = p.getSecond();
+
+ Store store = getStore(familyName);
+ if (store == null) {
+ IOException ioe = new DoNotRetryIOException(
+ "No such column family " + Bytes.toStringBinary(familyName));
+ ioes.add(ioe);
+ failures.add(p);
+ }
+
+ try {
+ store.assertBulkLoadHFileOk(new Path(path));
+ } catch (IOException ioe) {
+ rangesOk = false;
+ ioes.add(ioe);
+ failures.add(p);
+ }
+ }
+
+ if (ioes.size() != 0) {
+ // validation failed, bail out before doing anything permanent.
+ return;
+ }
+
+ for (Pair<byte[], String> p : familyPaths) {
+ byte[] familyName = p.getFirst();
+ String path = p.getSecond();
+ Store store = getStore(familyName);
+ try {
+ store.bulkLoadHFile(path);
+ } catch (IOException ioe) {
+ // a failure here causes an atomicity violation that we currently
+ // cannot recover from since it is likely a failed hdfs operation.
+ ioes.add(ioe);
+ failures.add(p);
+ break;
+ }
}
- store.bulkLoadHFile(hfilePath);
} finally {
- closeRegionOperation();
- }
+ closeBulkRegionOperation();
+ if (ioes.size() != 0) {
+ StringBuilder list = new StringBuilder();
+ for (Pair<byte[], String> p : failures) {
+ list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")
+ .append(p.getSecond());
+ }
- }
+ if (rangesOk) {
+ // TODO Need a better story for reverting partial failures due to HDFS.
+ LOG.error("There was a partial failure due to IO. These " +
+ "(family,hfile) pairs were not loaded: " + list);
+ } else {
+ // problem when validating
+ LOG.info("There was a recoverable bulk load failure likely due to a" +
+ " split. These (family, HFile) pairs were not loaded: " + list);
+ }
+ if (ioes.size() == 1) {
+ throw ioes.get(0);
+ }
+ throw MultipleIOException.createIOException(ioes);
+ }
+ }
+ }
@Override
public boolean equals(Object o) {
@@ -4053,6 +4120,34 @@ public class HRegion implements HeapSize
}
/**
+ * This method needs to be called before any public call that reads or
+ * modifies stores in bulk. It has to be called just before a try.
+ * #closeBulkRegionOperation needs to be called in the try's finally block
+ * Acquires a writelock and checks if the region is closing or closed.
+ * @throws NotServingRegionException when the region is closing or closed
+ */
+ private void startBulkRegionOperation() throws NotServingRegionException {
+ if (this.closing.get()) {
+ throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
+ " is closing");
+ }
+ lock.writeLock().lock();
+ if (this.closed.get()) {
+ lock.writeLock().unlock();
+ throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
+ " is closed");
+ }
+ }
+
+ /**
+ * Closes the lock. This needs to be called in the finally block corresponding
+ * to the try block of #startRegionOperation
+ */
+ private void closeBulkRegionOperation(){
+ lock.writeLock().unlock();
+ }
+
+ /**
* A mocked list implementaion - discards all updates.
*/
private static final List<KeyValue> MOCKED_LIST = new AbstractList<KeyValue>() {
Modified: hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1195573&r1=1195572&r2=1195573&view=diff
==============================================================================
--- hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Mon Oct 31 17:23:24 2011
@@ -2338,12 +2338,15 @@ public class HRegionServer implements HR
}
}
+ /**
+ * Atomically bulk load several HFiles into an open region
+ */
@Override
- public void bulkLoadHFile(String hfilePath, byte[] regionName,
- byte[] familyName) throws IOException {
+ public void bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
+ byte[] regionName) throws IOException {
checkOpen();
HRegion region = getRegion(regionName);
- region.bulkLoadHFile(hfilePath, familyName);
+ region.bulkLoadHFiles(familyPaths);
}
Map<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
Modified: hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1195573&r1=1195572&r2=1195573&view=diff
==============================================================================
--- hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Mon Oct 31 17:23:24 2011
@@ -325,9 +325,12 @@ public class Store implements HeapSize {
return this.storefiles;
}
- public void bulkLoadHFile(String srcPathStr) throws IOException {
- Path srcPath = new Path(srcPathStr);
-
+ /**
+ * This throws a WrongRegionException if the bulkHFile does not fit in this
+ * region.
+ *
+ */
+ void assertBulkLoadHFileOk(Path srcPath) throws IOException {
HFile.Reader reader = null;
try {
LOG.info("Validating hfile at " + srcPath + " for inclusion in "
@@ -351,12 +354,21 @@ public class Store implements HeapSize {
HRegionInfo hri = region.getRegionInfo();
if (!hri.containsRange(firstKey, lastKey)) {
throw new WrongRegionException(
- "Bulk load file " + srcPathStr + " does not fit inside region "
+ "Bulk load file " + srcPath.toString() + " does not fit inside region "
+ this.region);
}
} finally {
if (reader != null) reader.close();
}
+ }
+
+ /**
+ * This method should only be called from HRegion. It is assumed that the
+ * ranges of values in the HFile fit within the stores assigned region.
+ * (assertBulkLoadHFileOk checks this)
+ */
+ void bulkLoadHFile(String srcPathStr) throws IOException {
+ Path srcPath = new Path(srcPathStr);
// Move the file if it's on another filesystem
FileSystem srcFs = srcPath.getFileSystem(conf);
Modified: hbase/branches/0.92/src/main/resources/hbase-default.xml
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/main/resources/hbase-default.xml?rev=1195573&r1=1195572&r2=1195573&view=diff
==============================================================================
--- hbase/branches/0.92/src/main/resources/hbase-default.xml (original)
+++ hbase/branches/0.92/src/main/resources/hbase-default.xml Mon Oct 31 17:23:24 2011
@@ -129,6 +129,14 @@
server, getting a cell's value, starting a row update, etc.
Default: 10.
</description>
+ </property>
+ <property>
+ <name>hbase.bulkload.retries.number</name>
+ <value>10</value>
+ <description>Maximum retries. This is maximum number of iterations
+ to atomic bulk loads are attempted in the face of splitting operations
+ Default: 10.
+ </description>
</property>
<property>
<name>hbase.client.scanner.caching</name>
Added: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java?rev=1195573&view=auto
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java (added)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java Mon Oct 31 17:23:24 2011
@@ -0,0 +1,324 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ServerCallable;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Multimap;
+
+/**
+ * Test cases for the atomic load error handling of the bulk load functionality.
+ */
+public class TestLoadIncrementalHFilesSplitRecovery {
+ final static Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
+
+ private static HBaseTestingUtility util;
+
+ final static int NUM_CFS = 10;
+ final static byte[] QUAL = Bytes.toBytes("qual");
+ final static int ROWCOUNT = 100;
+
+ private final static byte[][] families = new byte[NUM_CFS][];
+ static {
+ for (int i = 0; i < NUM_CFS; i++) {
+ families[i] = Bytes.toBytes(family(i));
+ }
+ }
+
+ static byte[] rowkey(int i) {
+ return Bytes.toBytes(String.format("row_%08d", i));
+ }
+
+ static String family(int i) {
+ return String.format("family_%04d", i);
+ }
+
+ static byte[] value(int i) {
+ return Bytes.toBytes(String.format("%010d", i));
+ }
+
+ public static void buildHFiles(FileSystem fs, Path dir, int value)
+ throws IOException {
+ byte[] val = value(value);
+ for (int i = 0; i < NUM_CFS; i++) {
+ Path testIn = new Path(dir, family(i));
+
+ TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i),
+ Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT);
+ }
+ }
+
+ /**
+ * Creates a table with given table name and specified number of column
+ * families if the table does not already exist.
+ */
+ private void setupTable(String table, int cfs) throws IOException {
+ try {
+ LOG.info("Creating table " + table);
+ HTableDescriptor htd = new HTableDescriptor(table);
+ for (int i = 0; i < 10; i++) {
+ htd.addFamily(new HColumnDescriptor(family(i)));
+ }
+
+ HBaseAdmin admin = util.getHBaseAdmin();
+ admin.createTable(htd);
+ } catch (TableExistsException tee) {
+ LOG.info("Table " + table + " already exists");
+ }
+ }
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ util = new HBaseTestingUtility();
+ util.startMiniCluster(1);
+ }
+
+ @AfterClass
+ public static void teardownCluster() throws Exception {
+ util.shutdownMiniCluster();
+ }
+
+ void assertExpectedTable(String table, int count, int value) {
+ try {
+ assertEquals(util.getHBaseAdmin().listTables(table).length, 1);
+
+ HTable t = new HTable(util.getConfiguration(), table);
+ Scan s = new Scan();
+ ResultScanner sr = t.getScanner(s);
+ int i = 0;
+ for (Result r : sr) {
+ i++;
+ for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) {
+ for (byte[] val : nm.values()) {
+ assertTrue(Bytes.equals(val, value(value)));
+ }
+ }
+
+ }
+ assertEquals(count, i);
+
+ } catch (IOException e) {
+ fail("Failed due to exception");
+ }
+ }
+
+ @Test
+ public void testBulkLoadPhaseRecovery() throws Exception {
+ String table = "bulkPhaseRetry";
+ setupTable(table, 10);
+
+ final AtomicInteger attmptedCalls = new AtomicInteger();
+ final AtomicInteger failedCalls = new AtomicInteger();
+ LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
+ util.getConfiguration()) {
+
+ protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn,
+ byte[] tableName, final byte[] first, Collection<LoadQueueItem> lqis) {
+ int i = attmptedCalls.incrementAndGet();
+ if (i == 1) {
+ HConnection errConn = mock(HConnection.class);
+ try {
+ doThrow(new IOException("injecting bulk load error")).when(errConn)
+ .getRegionServerWithRetries((ServerCallable) anyObject());
+ } catch (Exception e) {
+ LOG.fatal("mocking cruft, should never happen", e);
+ throw new RuntimeException("mocking cruft, should never happen");
+ }
+ failedCalls.incrementAndGet();
+ return super.tryAtomicRegionLoad(errConn, tableName, first, lqis);
+ }
+
+ return super.tryAtomicRegionLoad(conn, tableName, first, lqis);
+ }
+ };
+
+ // create HFiles for different column families
+ FileSystem fs = util.getTestFileSystem();
+ Path dir = util.getDataTestDir(table);
+ buildHFiles(fs, dir, 1);
+ HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table));
+ lih.doBulkLoad(dir, t);
+
+ // check that data was loaded
+ assertEquals(attmptedCalls.get(), 2);
+ assertEquals(failedCalls.get(), 1);
+ assertExpectedTable(table, ROWCOUNT, 1);
+ }
+
+ /**
+ * This test exercises the path where there is a split after initial
+ * validation but before the atomic bulk load call. We cannot use presplitting
+ * to test this path, so we actually inject a split just before the atomic
+ * region load.
+ */
+ @Test
+ public void testSplitWhileBulkLoadPhase() throws Exception {
+ final String table = "bulkPhaseSplit";
+ setupTable(table, 10);
+ LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
+ util.getConfiguration());
+
+
+ // create HFiles for different column families
+ Path dir = util.getDataTestDir(table);
+ Path bulk1 = new Path(dir, "normalBulkload");
+ FileSystem fs = util.getTestFileSystem();
+ buildHFiles(fs, bulk1, 1);
+ HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table));
+ lih.doBulkLoad(bulk1, t);
+ assertExpectedTable(table, ROWCOUNT, 1);
+
+ // Now let's cause trouble
+ final AtomicInteger attmptedCalls = new AtomicInteger();
+ LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(
+ util.getConfiguration()) {
+
+ protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn,
+ byte[] tableName, final byte[] first, Collection<LoadQueueItem> lqis) {
+ int i = attmptedCalls.incrementAndGet();
+ if (i == 1) {
+ // On first attempt force a split.
+ try {
+ // need to call regions server to by synchronous but isn't visible.
+
+ HRegionServer hrs = util.getRSForFirstRegionInTable(Bytes
+ .toBytes(table));
+
+ HRegionInfo region = null;
+ for (HRegionInfo hri : hrs.getOnlineRegions()) {
+ if (Bytes.equals(hri.getTableName(), Bytes.toBytes(table))) {
+ // splitRegion doesn't work if startkey/endkey are null
+ hrs.splitRegion(hri, rowkey(ROWCOUNT / 2)); // hard code split
+ }
+ }
+
+ int regions;
+ do {
+ regions = 0;
+ for (HRegionInfo hri : hrs.getOnlineRegions()) {
+ if (Bytes.equals(hri.getTableName(), Bytes.toBytes(table))) {
+ regions++;
+ }
+ }
+ if (regions != 2) {
+ LOG.info("Taking some time to complete split...");
+ Thread.sleep(250);
+ }
+ } while (regions != 2);
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ return super.tryAtomicRegionLoad(conn, tableName, first, lqis);
+ }
+ };
+
+ // create HFiles for different column families
+ Path bulk2 = new Path(dir, "bulkload2");
+ buildHFiles(fs, bulk2, 2); // all values are '2'
+ lih2.doBulkLoad(bulk2, t);
+
+ // check that data was loaded
+
+ // The three expected attempts are 1) failure because need to split, 2)
+ // load of split top 3) load of split bottom
+ assertEquals(attmptedCalls.get(), 3);
+ assertExpectedTable(table, ROWCOUNT, 2);
+ }
+
+ @Test(expected = IOException.class)
+ public void testGroupOrSplitFailure() throws Exception {
+ String table = "groupOrSplitStoreFail";
+ setupTable(table, 10);
+
+ try {
+ LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
+ util.getConfiguration()) {
+ int i = 0;
+
+ protected List<LoadQueueItem> groupOrSplit(
+ Multimap<ByteBuffer, LoadQueueItem> regionGroups,
+ final LoadQueueItem item, final HTable table,
+ final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+ i++;
+
+ if (i == 5) {
+ throw new IOException("failure");
+ }
+ return super.groupOrSplit(regionGroups, item, table, startEndKeys);
+ }
+ };
+
+ Path dir = util.getDataTestDir(table);
+
+ // create HFiles for different column families
+ FileSystem fs = util.getTestFileSystem();
+ buildHFiles(fs, dir, 1);
+ HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table));
+ lih.doBulkLoad(dir, t);
+
+ // check that data was loaded
+ assertExpectedTable(table, ROWCOUNT, 1);
+
+ } finally {
+ util.shutdownMiniCluster();
+ }
+ }
+}
Added: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java?rev=1195573&view=auto
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java (added)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java Mon Oct 31 17:23:24 2011
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ServerCallable;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of
+ * the region server's bullkLoad functionality.
+ */
+public class TestHRegionServerBulkLoad {
+ final static Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
+ private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ private final static Configuration conf = UTIL.getConfiguration();
+ private final static byte[] QUAL = Bytes.toBytes("qual");
+ private final static int NUM_CFS = 10;
+ public static int BLOCKSIZE = 64 * 1024;
+ public static String COMPRESSION = Compression.Algorithm.NONE.getName();
+
+ private final static byte[][] families = new byte[NUM_CFS][];
+ static {
+ for (int i = 0; i < NUM_CFS; i++) {
+ families[i] = Bytes.toBytes(family(i));
+ }
+ }
+
+ static byte[] rowkey(int i) {
+ return Bytes.toBytes(String.format("row_%08d", i));
+ }
+
+ static String family(int i) {
+ return String.format("family_%04d", i);
+ }
+
+ /**
+ * Create an HFile with the given number of rows with a specified value.
+ */
+ public static void createHFile(FileSystem fs, Path path, byte[] family,
+ byte[] qualifier, byte[] value, int numRows) throws IOException {
+ HFile.Writer writer = HFile
+ .getWriterFactory(conf, new CacheConfig(conf))
+ .createWriter(fs, path, BLOCKSIZE, COMPRESSION, KeyValue.KEY_COMPARATOR);
+ long now = System.currentTimeMillis();
+ try {
+ // subtract 2 since iterateOnSplits doesn't include boundary keys
+ for (int i = 0; i < numRows; i++) {
+ KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value);
+ writer.append(kv);
+ }
+ } finally {
+ writer.close();
+ }
+ }
+
+ /**
+ * Thread that does full scans of the table looking for any partially
+ * completed rows.
+ *
+ * Each iteration of this loads 10 hdfs files, which occupies 5 file open file
+ * handles. So every 10 iterations (500 file handles) it does a region
+ * compaction to reduce the number of open file handles.
+ */
+ public static class AtomicHFileLoader extends RepeatingTestThread {
+ final AtomicLong numBulkLoads = new AtomicLong();
+ final AtomicLong numCompactions = new AtomicLong();
+ private String tableName;
+
+ public AtomicHFileLoader(String tableName, TestContext ctx,
+ byte targetFamilies[][]) throws IOException {
+ super(ctx);
+ this.tableName = tableName;
+ }
+
+ public void doAnAction() throws Exception {
+ long iteration = numBulkLoads.getAndIncrement();
+ Path dir = UTIL.getDataTestDir(String.format("bulkLoad_%08d",
+ iteration));
+
+ // create HFiles for different column families
+ FileSystem fs = UTIL.getTestFileSystem();
+ byte[] val = Bytes.toBytes(String.format("%010d", iteration));
+ final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>(
+ NUM_CFS);
+ for (int i = 0; i < NUM_CFS; i++) {
+ Path hfile = new Path(dir, family(i));
+ byte[] fam = Bytes.toBytes(family(i));
+ createHFile(fs, hfile, fam, QUAL, val, 1000);
+ famPaths.add(new Pair<byte[], String>(fam, hfile.toString()));
+ }
+
+ // bulk load HFiles
+ HConnection conn = UTIL.getHBaseAdmin().getConnection();
+ byte[] tbl = Bytes.toBytes(tableName);
+ conn.getRegionServerWithRetries(new ServerCallable<Void>(conn, tbl, Bytes
+ .toBytes("aaa")) {
+ @Override
+ public Void call() throws Exception {
+ LOG.debug("Going to connect to server " + location + " for row "
+ + Bytes.toStringBinary(row));
+ byte[] regionName = location.getRegionInfo().getRegionName();
+ server.bulkLoadHFiles(famPaths, regionName);
+ return null;
+ }
+ });
+
+ // Periodically do compaction to reduce the number of open file handles.
+ if (numBulkLoads.get() % 10 == 0) {
+ // 10 * 50 = 500 open file handles!
+ conn.getRegionServerWithRetries(new ServerCallable<Void>(conn, tbl,
+ Bytes.toBytes("aaa")) {
+ @Override
+ public Void call() throws Exception {
+ LOG.debug("compacting " + location + " for row "
+ + Bytes.toStringBinary(row));
+ server.compactRegion(location.getRegionInfo(), true);
+ numCompactions.incrementAndGet();
+ return null;
+ }
+ });
+ }
+ }
+ }
+
+ /**
+ * Thread that does full scans of the table looking for any partially
+ * completed rows.
+ */
+ public static class AtomicScanReader extends RepeatingTestThread {
+ byte targetFamilies[][];
+ HTable table;
+ AtomicLong numScans = new AtomicLong();
+ AtomicLong numRowsScanned = new AtomicLong();
+ String TABLE_NAME;
+
+ public AtomicScanReader(String TABLE_NAME, TestContext ctx,
+ byte targetFamilies[][]) throws IOException {
+ super(ctx);
+ this.TABLE_NAME = TABLE_NAME;
+ this.targetFamilies = targetFamilies;
+ table = new HTable(conf, TABLE_NAME);
+ }
+
+ public void doAnAction() throws Exception {
+ Scan s = new Scan();
+ for (byte[] family : targetFamilies) {
+ s.addFamily(family);
+ }
+ ResultScanner scanner = table.getScanner(s);
+
+ for (Result res : scanner) {
+ byte[] lastRow = null, lastFam = null, lastQual = null;
+ byte[] gotValue = null;
+ for (byte[] family : targetFamilies) {
+ byte qualifier[] = QUAL;
+ byte thisValue[] = res.getValue(family, qualifier);
+ if (gotValue != null && thisValue != null
+ && !Bytes.equals(gotValue, thisValue)) {
+
+ StringBuilder msg = new StringBuilder();
+ msg.append("Failed on scan ").append(numScans)
+ .append(" after scanning ").append(numRowsScanned)
+ .append(" rows!\n");
+ msg.append("Current was " + Bytes.toString(res.getRow()) + "/"
+ + Bytes.toString(family) + ":" + Bytes.toString(qualifier)
+ + " = " + Bytes.toString(thisValue) + "\n");
+ msg.append("Previous was " + Bytes.toString(lastRow) + "/"
+ + Bytes.toString(lastFam) + ":" + Bytes.toString(lastQual)
+ + " = " + Bytes.toString(gotValue));
+ throw new RuntimeException(msg.toString());
+ }
+
+ lastFam = family;
+ lastQual = qualifier;
+ lastRow = res.getRow();
+ gotValue = thisValue;
+ }
+ numRowsScanned.getAndIncrement();
+ }
+ numScans.getAndIncrement();
+ }
+ }
+
+ /**
+ * Creates a table with given table name and specified number of column
+ * families if the table does not already exist.
+ */
+ private void setupTable(String table, int cfs) throws IOException {
+ try {
+ LOG.info("Creating table " + table);
+ HTableDescriptor htd = new HTableDescriptor(table);
+ for (int i = 0; i < 10; i++) {
+ htd.addFamily(new HColumnDescriptor(family(i)));
+ }
+
+ HBaseAdmin admin = UTIL.getHBaseAdmin();
+ admin.createTable(htd);
+ } catch (TableExistsException tee) {
+ LOG.info("Table " + table + " already exists");
+ }
+ }
+
+ /**
+ * Atomic bulk load.
+ */
+ @Test
+ public void testAtomicBulkLoad() throws Exception {
+ String TABLE_NAME = "atomicBulkLoad";
+
+ int millisToRun = 30000;
+ int numScanners = 50;
+
+ UTIL.startMiniCluster(1);
+ try {
+ runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners);
+ } finally {
+ UTIL.shutdownMiniCluster();
+ }
+ }
+
+ void runAtomicBulkloadTest(String tableName, int millisToRun, int numScanners)
+ throws Exception {
+ setupTable(tableName, 10);
+
+ TestContext ctx = new TestContext(UTIL.getConfiguration());
+
+ AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
+ ctx.addThread(loader);
+
+ List<AtomicScanReader> scanners = Lists.newArrayList();
+ for (int i = 0; i < numScanners; i++) {
+ AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
+ scanners.add(scanner);
+ ctx.addThread(scanner);
+ }
+
+ ctx.startThreads();
+ ctx.waitFor(millisToRun);
+ ctx.stop();
+
+ LOG.info("Loaders:");
+ LOG.info(" loaded " + loader.numBulkLoads.get());
+ LOG.info(" compations " + loader.numCompactions.get());
+
+ LOG.info("Scanners:");
+ for (AtomicScanReader scanner : scanners) {
+ LOG.info(" scanned " + scanner.numScans.get());
+ LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
+ }
+ }
+
+ /**
+ * Run test on an HBase instance for 5 minutes. This assumes that the table
+ * under test only has a single region.
+ */
+ public static void main(String args[]) throws Exception {
+ try {
+ Configuration c = HBaseConfiguration.create();
+ TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad();
+ test.setConf(c);
+ test.runAtomicBulkloadTest("atomicTableTest", 5 * 60 * 1000, 50);
+ } finally {
+ System.exit(0); // something hangs (believe it is lru threadpool)
+ }
+ }
+
+ private void setConf(Configuration c) {
+ UTIL = new HBaseTestingUtility(c);
+ }
+}
Modified: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1195573&r1=1195572&r2=1195573&view=diff
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Mon Oct 31 17:23:24 2011
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTru
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@@ -42,7 +43,6 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.security.
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -205,7 +206,9 @@ public class TestWALReplay {
byte [] row = Bytes.toBytes(tableNameStr);
writer.append(new KeyValue(row, family, family, row));
writer.close();
- region.bulkLoadHFile(f.toString(), family);
+ List <Pair<byte[],String>> hfs= new ArrayList<Pair<byte[],String>>(1);
+ hfs.add(Pair.newPair(family, f.toString()));
+ region.bulkLoadHFiles(hfs);
// Add an edit so something in the WAL
region.put((new Put(row)).add(family, family, family));
wal.sync();