You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2011/10/31 18:26:43 UTC

svn commit: r1195574 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/ipc/ src/main/java/org/apache/hadoop/hbase/mapreduce/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/resources/ src/test/java/org/apache/hadoop/hbase/mapredu...

Author: tedyu
Date: Mon Oct 31 17:26:42 2011
New Revision: 1195574

URL: http://svn.apache.org/viewvc?rev=1195574&view=rev
Log:
HBASE-4552  multi-CF bulk load is not atomic across column families (Jonathan Hsieh)

Added:
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/src/main/resources/hbase-default.xml
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1195574&r1=1195573&r2=1195574&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Mon Oct 31 17:26:42 2011
@@ -430,6 +430,7 @@ Release 0.92.0 - Unreleased
                to move a region
    HBASE-4613  hbase.util.Threads#threadDumpingIsAlive sleeps 1 second,
                slowing down the shutdown by 0.5s
+   HBASE-4552  multi-CF bulk load is not atomic across column families (Jonathan Hsieh)
 
   TESTS
    HBASE-4450  test for number of blocks read: to serve as baseline for expected

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1195574&r1=1195573&r2=1195574&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Mon Oct 31 17:26:42 2011
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.hbase.ipc.VersionedProtocol;
 
@@ -62,7 +63,7 @@ public interface HRegionInterface extend
   // maintained a single global version number on all HBase Interfaces.  This
   // meant all HBase RPC was broke though only one of the three RPC Interfaces
   // had changed.  This has since been undone.
-  public static final long VERSION = 28L;
+  public static final long VERSION = 29L;
 
   /**
    * Get metainfo about an HRegion
@@ -323,9 +324,13 @@ public interface HRegionInterface extend
   public <R> MultiResponse multi(MultiAction<R> multi) throws IOException;
 
   /**
-   * Bulk load an HFile into an open region
+   * Atomically bulk load multiple HFiles (say from different column families)
+   * into an open region.
+   * 
+   * @param familyPaths List of (family, hfile path) pairs
+   * @param regionName name of region to load hfiles into
    */
-  public void bulkLoadHFile(String hfilePath, byte[] regionName, byte[] familyName)
+  public void bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, byte[] regionName)
   throws IOException;
 
   // Master methods

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1195574&r1=1195573&r2=1195574&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Mon Oct 31 17:26:42 2011
@@ -21,13 +21,16 @@ package org.apache.hadoop.hbase.mapreduc
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Deque;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
@@ -37,7 +40,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
@@ -62,9 +64,9 @@ import org.apache.hadoop.hbase.io.HalfSt
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.Reference.Range;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -72,9 +74,11 @@ import org.apache.hadoop.hbase.util.Pair
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
-
 /**
  * Tool to load the output of HFileOutputFormat into an existing table.
  * @see #usage()
@@ -87,8 +91,6 @@ public class LoadIncrementalHFiles exten
   static AtomicLong regionCount = new AtomicLong(0);
   private HBaseAdmin hbAdmin;
   private Configuration cfg;
-  private Set<Future> futures = new HashSet<Future>();
-  private Set<Future> futuresForSplittingHFile = new HashSet<Future>();
 
   public static String NAME = "completebulkload";
 
@@ -112,7 +114,7 @@ public class LoadIncrementalHFiles exten
    * region boundary, and each part is added back into the queue.
    * The import process finishes when the queue is empty.
    */
-  private static class LoadQueueItem {
+  static class LoadQueueItem {
     final byte[] family;
     final Path hfilePath;
 
@@ -120,13 +122,17 @@ public class LoadIncrementalHFiles exten
       this.family = family;
       this.hfilePath = hfilePath;
     }
+
+    public String toString() {
+      return "family:"+ Bytes.toString(family) + " path:" + hfilePath.toString();
+    }
   }
 
   /**
    * Walk the given directory for all HFiles, and return a Queue
    * containing all such files.
    */
-  private Deque<LoadQueueItem> discoverLoadQueue(Path hfofDir)
+  private void discoverLoadQueue(Deque<LoadQueueItem> ret, Path hfofDir)
   throws IOException {
     FileSystem fs = hfofDir.getFileSystem(getConf());
 
@@ -140,7 +146,6 @@ public class LoadIncrementalHFiles exten
       throw new FileNotFoundException("No families found in " + hfofDir);
     }
 
-    Deque<LoadQueueItem> ret = new LinkedList<LoadQueueItem>();
     for (FileStatus stat : familyDirStatuses) {
       if (!stat.isDir()) {
         LOG.warn("Skipping non-directory " + stat.getPath());
@@ -156,7 +161,6 @@ public class LoadIncrementalHFiles exten
         ret.add(new LoadQueueItem(family, hfile));
       }
     }
-    return ret;
   }
 
   /**
@@ -167,10 +171,10 @@ public class LoadIncrementalHFiles exten
    * @param table the table to load into
    * @throws TableNotFoundException if table does not yet exist
    */
-  public void doBulkLoad(Path hfofDir, HTable table)
+  public void doBulkLoad(Path hfofDir, final HTable table)
     throws TableNotFoundException, IOException
   {
-    HConnection conn = table.getConnection();
+    final HConnection conn = table.getConnection();
 
     if (!conn.isTableAvailable(table.getTableName())) {
       throw new TableNotFoundException("Table " +
@@ -178,54 +182,51 @@ public class LoadIncrementalHFiles exten
           "is not currently available.");
     }
 
-    Deque<LoadQueueItem> queue = null;
+    // initialize thread pools
     int nrThreads = cfg.getInt("hbase.loadincremental.threads.max",
         Runtime.getRuntime().availableProcessors());
     ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
     builder.setNameFormat("LoadIncrementalHFiles-%1$d");
-
     ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads,
         60, TimeUnit.SECONDS,
         new LinkedBlockingQueue<Runnable>(),
         builder.build());
     ((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true);
+
+    // LQI queue does not need to be threadsafe -- all operations on this queue
+    // happen in this thread
+    Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>();
     try {
-      queue = discoverLoadQueue(hfofDir);
-      // outer loop picks up LoadQueueItem due to HFile split
-      while (!queue.isEmpty() || futuresForSplittingHFile.size() > 0) {
-        Pair<byte[][],byte[][]> startEndKeys = table.getStartEndKeys();
-        // inner loop groups callables
-        while (!queue.isEmpty()) {
-          LoadQueueItem item = queue.remove();
-          tryLoad(item, conn, table, queue, startEndKeys, pool);
-        }
-        Iterator<Future> iter = futuresForSplittingHFile.iterator();
-        while (iter.hasNext()) {
-          boolean timeoutSeen = false;
-          Future future = iter.next();
-          try {
-            future.get(20, TimeUnit.MILLISECONDS);
-            break;  // we have at least two new HFiles to process
-          } catch (ExecutionException ee) {
-            LOG.error(ee);
-          } catch (InterruptedException ie) {
-            LOG.error(ie);
-          } catch (TimeoutException te) {
-              timeoutSeen = true;
-          } finally {
-            if (!timeoutSeen) iter.remove();
-          }
+      discoverLoadQueue(queue, hfofDir);
+      int count = 0;
+
+      // Assumes that region splits can happen while this occurs.
+      while (!queue.isEmpty()) {
+        // need to reload split keys each iteration.
+        final Pair<byte[][], byte[][]> startEndKeys = table.getStartEndKeys();
+        if (count != 0) {
+          LOG.info("Split occured while grouping HFiles, retry attempt " +
+              + count + " with " + queue.size() + " files remaining to load");
         }
-      }
-      for (Future<Void> future : futures) {
-        try {
-          future.get();
-        } catch (ExecutionException ee) {
-          LOG.error(ee);
-        } catch (InterruptedException ie) {
-          LOG.error(ie);
+        
+        int maxRetries = cfg.getInt("hbase.bulkload.retries.number", 10);
+        if (count >= maxRetries) {
+          LOG.error("Retry attempted " + count +  " times without completing, bailing out");
+          return;
         }
+        count++;
+
+        // Using ByteBuffer for byte[] equality semantics
+        Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(table,
+            pool, queue, startEndKeys);
+
+        bulkLoadPhase(table, conn, pool, queue, regionGroups);
+
+        // NOTE: The next iteration's split / group could happen in parallel to
+        // atomic bulkloads assuming that there are splits and no merges, and
+        // that we can atomically pull out the groups we want to retry.
       }
+
     } finally {
       pool.shutdown();
       if (queue != null && !queue.isEmpty()) {
@@ -241,15 +242,111 @@ public class LoadIncrementalHFiles exten
     }
   }
 
+  /**
+   * This takes the LQI's grouped by likely regions and attempts to bulk load
+   * them.  Any failures are re-queued for another pass with the
+   * groupOrSplitPhase.
+   */
+  private void bulkLoadPhase(final HTable table, final HConnection conn,
+      ExecutorService pool, Deque<LoadQueueItem> queue,
+      final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException {
+    // atomically bulk load the groups.
+    Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<Future<List<LoadQueueItem>>>();
+    for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) {
+      final byte[] first = e.getKey().array();
+      final Collection<LoadQueueItem> lqis =  e.getValue();
+
+      final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
+        public List<LoadQueueItem> call() throws Exception {
+          List<LoadQueueItem> toRetry = tryAtomicRegionLoad(conn, table.getTableName(), first, lqis);
+          return toRetry;
+        }
+      };
+      loadingFutures.add(pool.submit(call));
+    }
+
+    // get all the results.
+    for (Future<List<LoadQueueItem>> future : loadingFutures) {
+      try {
+        List<LoadQueueItem> toRetry = future.get();
+        if (toRetry != null && toRetry.size() != 0) {
+          // LQIs that are requeued to be regrouped.
+          queue.addAll(toRetry);
+        }
+
+      } catch (ExecutionException e1) {
+        Throwable t = e1.getCause();
+        if (t instanceof IOException) {
+          LOG.error("IOException during bulk load", e1);
+          throw (IOException)t; // would have been thrown if not parallelized,
+        }
+        LOG.error("Unexpected execution exception during bulk load", e1);
+        throw new IllegalStateException(t);
+      } catch (InterruptedException e1) {
+        LOG.error("Unexpected interrupted exception during bulk load", e1);
+        throw new IllegalStateException(e1);
+      }
+    }
+  }
+
+  /**
+   * @return A Multimap<startkey, LoadQueueItem> that groups LQI by likely
+   * bulk load region targets.
+   */
+  private Multimap<ByteBuffer, LoadQueueItem> groupOrSplitPhase(final HTable table,
+      ExecutorService pool, Deque<LoadQueueItem> queue,
+      final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+    // <region start key, LQI> need synchronized only within this scope of this
+    // phase because of the puts that happen in futures.
+    Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
+    final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
+
+    // drain LQIs and figure out bulk load groups
+    Set<Future<List<LoadQueueItem>>> splittingFutures = new HashSet<Future<List<LoadQueueItem>>>();
+    while (!queue.isEmpty()) {
+      final LoadQueueItem item = queue.remove();
+      
+      final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
+        public List<LoadQueueItem> call() throws Exception {
+          List<LoadQueueItem> splits = groupOrSplit(regionGroups, item, table, startEndKeys);
+          return splits;
+        }
+      };
+      splittingFutures.add(pool.submit(call));
+    }
+    // get all the results.  All grouping and splitting must finish before
+    // we can attempt the atomic loads.
+    for (Future<List<LoadQueueItem>> lqis : splittingFutures) {
+      try {
+        List<LoadQueueItem> splits = lqis.get();
+        if (splits != null) {
+          queue.addAll(splits);
+        }
+      } catch (ExecutionException e1) {
+        Throwable t = e1.getCause();
+        if (t instanceof IOException) {
+          LOG.error("IOException during splitting", e1);
+          throw (IOException)t; // would have been thrown if not parallelized,
+        }
+        LOG.error("Unexpected execution exception during splitting", e1);
+        throw new IllegalStateException(t);
+      } catch (InterruptedException e1) {
+        LOG.error("Unexpected interrupted exception during splitting", e1);
+        throw new IllegalStateException(e1);
+      }
+    }
+    return regionGroups;
+  }
+
   // unique file name for the table
   String getUniqueName(byte[] tableName) {
     String name = Bytes.toStringBinary(tableName) + "," + regionCount.incrementAndGet();
     return name;
   }
 
-  void splitStoreFileAndRequeue(final LoadQueueItem item,
-      final Deque<LoadQueueItem> queue, final HTable table,
-      byte[] startKey, byte[] splitKey) throws IOException {
+  protected List<LoadQueueItem> splitStoreFile(final LoadQueueItem item,
+      final HTable table, byte[] startKey,
+      byte[] splitKey) throws IOException {
     final Path hfilePath = item.hfilePath;
 
     // We use a '_' prefix which is ignored when walking directory trees
@@ -268,25 +365,26 @@ public class LoadIncrementalHFiles exten
 
     // Add these back at the *front* of the queue, so there's a lower
     // chance that the region will just split again before we get there.
-    synchronized (queue) {
-      queue.addFirst(new LoadQueueItem(item.family, botOut));
-      queue.addFirst(new LoadQueueItem(item.family, topOut));
-    }
+    List<LoadQueueItem> lqis = new ArrayList<LoadQueueItem>(2);
+    lqis.add(new LoadQueueItem(item.family, botOut));
+    lqis.add(new LoadQueueItem(item.family, topOut));
+
     LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
+    return lqis;
   }
 
   /**
-   * Attempt to load the given load queue item into its target region server.
+   * Attempt to assign the given load queue item into its target region group.
    * If the hfile boundary no longer fits into a region, physically splits
-   * the hfile such that the new bottom half will fit, and adds the two
-   * resultant hfiles back into the load queue.
+   * the hfile such that the new bottom half will fit and returns the list of
+   * LQI's corresponding to the resultant hfiles.
+   *
+   * protected for testing
    */
-  private boolean tryLoad(final LoadQueueItem item,
-      final HConnection conn, final HTable table,
-      final Deque<LoadQueueItem> queue,
-      final Pair<byte[][],byte[][]> startEndKeys,
-      ExecutorService pool)
-  throws IOException {
+  protected List<LoadQueueItem> groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups,
+      final LoadQueueItem item, final HTable table,
+      final Pair<byte[][], byte[][]> startEndKeys)
+      throws IOException {
     final Path hfilePath = item.hfilePath;
     final FileSystem fs = hfilePath.getFileSystem(getConf());
     HFile.Reader hfr = HFile.createReader(fs, hfilePath,
@@ -305,54 +403,80 @@ public class LoadIncrementalHFiles exten
         " last="  + Bytes.toStringBinary(last));
     if (first == null || last == null) {
       assert first == null && last == null;
+      // TODO what if this is due to a bad HFile?
       LOG.info("hfile " + hfilePath + " has no entries, skipping");
-      return false;
+      return null;
     }
     if (Bytes.compareTo(first, last) > 0) {
       throw new IllegalArgumentException(
       "Invalid range: " + Bytes.toStringBinary(first) +
       " > " + Bytes.toStringBinary(last));
     }
-    int idx = Arrays.binarySearch(startEndKeys.getFirst(), first, Bytes.BYTES_COMPARATOR);
+    int idx = Arrays.binarySearch(startEndKeys.getFirst(), first,
+        Bytes.BYTES_COMPARATOR);
     if (idx < 0) {
-      idx = -(idx+1)-1;
+      // not on boundary, returns -(insertion index).  Calculate region it
+      // would be in.
+      idx = -(idx + 1) - 1;
     }
     final int indexForCallable = idx;
     boolean lastKeyInRange =
       Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 ||
       Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
     if (!lastKeyInRange) {
-      Callable<Void> callable = new Callable<Void>() {
-        public Void call() throws Exception {
-          splitStoreFileAndRequeue(item, queue, table,
-              startEndKeys.getFirst()[indexForCallable],
-              startEndKeys.getSecond()[indexForCallable]);
-          return (Void)null;
-        }
-      };
-      futuresForSplittingHFile.add(pool.submit(callable));
+      List<LoadQueueItem> lqis = splitStoreFile(item, table,
+          startEndKeys.getFirst()[indexForCallable],
+          startEndKeys.getSecond()[indexForCallable]);
+      return lqis;
+    }
+
+    // group regions.
+    regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item);
+    return null;
+  }
+
+  /**
+   * Attempts to do an atomic load of many hfiles into a region.  If it fails,
+   * it returns a list of hfiles that need to be retried.  If it is successful
+   * it will return an empty list.
+   * 
+   * NOTE: To maintain row atomicity guarantees, region server callable should
+   * succeed atomically and fails atomically.
+   * 
+   * Protected for testing.
+   */
+  protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn,
+      byte[] tableName, final byte[] first, Collection<LoadQueueItem> lqis) {
 
-      return true;
+    final List<Pair<byte[], String>> famPaths =
+      new ArrayList<Pair<byte[], String>>(lqis.size());
+    for (LoadQueueItem lqi : lqis) {
+      famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
     }
 
-    final ServerCallable<Void> svrCallable = new ServerCallable<Void>(conn, table.getTableName(), first) {
+    final ServerCallable<Void> svrCallable = new ServerCallable<Void>(conn,
+        tableName, first) {
       @Override
       public Void call() throws Exception {
-        LOG.debug("Going to connect to server " + location +
-            "for row " + Bytes.toStringBinary(row));
-
+        LOG.debug("Going to connect to server " + location + " for row "
+            + Bytes.toStringBinary(row));
         byte[] regionName = location.getRegionInfo().getRegionName();
-        server.bulkLoadHFile(hfilePath.toString(), regionName, item.family);
+        server.bulkLoadHFiles(famPaths, regionName);
         return null;
       }
     };
-    Callable<Void> callable = new Callable<Void>() {
-      public Void call() throws Exception {
-        return conn.getRegionServerWithRetries(svrCallable);
-      }
-    };
-    futures.add(pool.submit(callable));
-    return false;
+
+    List<LoadQueueItem> toRetry = new ArrayList<LoadQueueItem>();
+    try {
+      conn.getRegionServerWithRetries(svrCallable);
+    } catch (IOException e) {
+      LOG.warn("Attempt to bulk load region containing "
+          + Bytes.toStringBinary(first) + " into table "
+          + Bytes.toStringBinary(tableName)  + " with files " + lqis
+          + " failed");
+      toRetry.addAll(lqis);
+    }
+    return toRetry;
   }
 
   /**
@@ -559,7 +683,8 @@ public class LoadIncrementalHFiles exten
   }
 
   public static void main(String[] args) throws Exception {
-    ToolRunner.run(new LoadIncrementalHFiles(HBaseConfiguration.create()), args);
+    int ret = ToolRunner.run(new LoadIncrementalHFiles(HBaseConfiguration.create()), args);
+    System.exit(ret);
   }
 
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1195574&r1=1195573&r2=1195574&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Mon Oct 31 17:26:42 2011
@@ -105,6 +105,7 @@ import org.apache.hadoop.hbase.util.FSUt
 import org.apache.hadoop.hbase.util.HashedBytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.StringUtils;
 import org.cliffc.high_scale_lib.Counter;
@@ -2781,23 +2782,89 @@ public class HRegion implements HeapSize
     return lid;
   }
 
-  public void bulkLoadHFile(String hfilePath, byte[] familyName)
+  /**
+   * Attempts to atomically load a group of hfiles.  This is critical for loading
+   * rows with multiple column families atomically.
+   *
+   * @param familyPaths List of Pair<byte[] column family, String hfilePath>
+   */
+  public void bulkLoadHFiles(List<Pair<byte[], String>> familyPaths)
   throws IOException {
-    startRegionOperation();
+    startBulkRegionOperation();
     this.writeRequestsCount.increment();
+    List<IOException> ioes = new ArrayList<IOException>();
+    List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();
+    boolean rangesOk = true;
     try {
-      Store store = getStore(familyName);
-      if (store == null) {
-        throw new DoNotRetryIOException(
-            "No such column family " + Bytes.toStringBinary(familyName));
+      // There possibly was a split that happend between when the split keys
+      // were gathered and before the HReiogn's write lock was taken.  We need
+      // to validate the HFile region before attempting to bulk load all of them
+      for (Pair<byte[], String> p : familyPaths) {
+        byte[] familyName = p.getFirst();
+        String path = p.getSecond();
+
+        Store store = getStore(familyName);
+        if (store == null) {
+          IOException ioe = new DoNotRetryIOException(
+              "No such column family " + Bytes.toStringBinary(familyName));
+          ioes.add(ioe);
+          failures.add(p);
+        }
+
+        try {
+          store.assertBulkLoadHFileOk(new Path(path));
+        } catch (IOException ioe) {
+          rangesOk = false;
+          ioes.add(ioe);
+          failures.add(p);
+        }
+      }
+
+      if (ioes.size() != 0) {
+        // validation failed, bail out before doing anything permanent.
+        return;
+      }
+
+      for (Pair<byte[], String> p : familyPaths) {
+        byte[] familyName = p.getFirst();
+        String path = p.getSecond();
+        Store store = getStore(familyName);
+        try {
+          store.bulkLoadHFile(path);
+        } catch (IOException ioe) {
+          // a failure here causes an atomicity violation that we currently 
+          // cannot recover from since it is likely a failed hdfs operation.
+          ioes.add(ioe);
+          failures.add(p);
+          break;
+        }
       }
-      store.bulkLoadHFile(hfilePath);
     } finally {
-      closeRegionOperation();
-    }
+      closeBulkRegionOperation();
+      if (ioes.size() != 0) {
+        StringBuilder list = new StringBuilder();
+        for (Pair<byte[], String> p : failures) {
+          list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")
+            .append(p.getSecond());
+        }
 
-  }
+        if (rangesOk) {
+          // TODO Need a better story for reverting partial failures due to HDFS.
+          LOG.error("There was a partial failure due to IO.   These " +
+              "(family,hfile) pairs were not loaded: " + list);
+        } else {
+          // problem when validating
+          LOG.info("There was a recoverable bulk load failure likely due to a" +
+              " split.  These (family, HFile) pairs were not loaded: " + list);
+        }
 
+        if (ioes.size() == 1) {
+          throw ioes.get(0);
+        }
+        throw MultipleIOException.createIOException(ioes);
+      }
+    }
+  }
 
   @Override
   public boolean equals(Object o) {
@@ -4380,6 +4447,34 @@ public class HRegion implements HeapSize
   }
 
   /**
+   * This method needs to be called before any public call that reads or
+   * modifies stores in bulk. It has to be called just before a try.
+   * #closeBulkRegionOperation needs to be called in the try's finally block
+   * Acquires a writelock and checks if the region is closing or closed.
+   * @throws NotServingRegionException when the region is closing or closed
+   */
+  private void startBulkRegionOperation() throws NotServingRegionException {
+    if (this.closing.get()) {
+      throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
+          " is closing");
+    }
+    lock.writeLock().lock();
+    if (this.closed.get()) {
+      lock.writeLock().unlock();
+      throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
+          " is closed");
+    }
+  }
+
+  /**
+   * Closes the lock. This needs to be called in the finally block corresponding
+   * to the try block of #startRegionOperation
+   */
+  private void closeBulkRegionOperation(){
+    lock.writeLock().unlock();
+  }
+
+  /**
    * A mocked list implementaion - discards all updates.
    */
   private static final List<KeyValue> MOCKED_LIST = new AbstractList<KeyValue>() {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1195574&r1=1195573&r2=1195574&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Mon Oct 31 17:26:42 2011
@@ -2434,12 +2434,15 @@ public class HRegionServer implements HR
     }
   }
 
+  /**
+   * Atomically bulk load several HFiles into an open region
+   */
   @Override
-  public void bulkLoadHFile(String hfilePath, byte[] regionName,
-      byte[] familyName) throws IOException {
+  public void bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
+      byte[] regionName) throws IOException {
     checkOpen();
     HRegion region = getRegion(regionName);
-    region.bulkLoadHFile(hfilePath, familyName);
+    region.bulkLoadHFiles(familyPaths);
   }
 
   Map<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1195574&r1=1195573&r2=1195574&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Mon Oct 31 17:26:42 2011
@@ -345,9 +345,12 @@ public class Store implements HeapSize {
     return this.storefiles;
   }
 
-  public void bulkLoadHFile(String srcPathStr) throws IOException {
-    Path srcPath = new Path(srcPathStr);
-
+  /**
+   * This throws a WrongRegionException if the bulkHFile does not fit in this
+   * region.
+   *
+   */
+  void assertBulkLoadHFileOk(Path srcPath) throws IOException {
     HFile.Reader reader  = null;
     try {
       LOG.info("Validating hfile at " + srcPath + " for inclusion in "
@@ -371,12 +374,21 @@ public class Store implements HeapSize {
       HRegionInfo hri = region.getRegionInfo();
       if (!hri.containsRange(firstKey, lastKey)) {
         throw new WrongRegionException(
-            "Bulk load file " + srcPathStr + " does not fit inside region "
+            "Bulk load file " + srcPath.toString() + " does not fit inside region "
             + this.region);
       }
     } finally {
       if (reader != null) reader.close();
     }
+  }
+
+  /**
+   * This method should only be called from HRegion.  It is assumed that the 
+   * ranges of values in the HFile fit within the stores assigned region. 
+   * (assertBulkLoadHFileOk checks this)
+   */
+  void bulkLoadHFile(String srcPathStr) throws IOException {
+    Path srcPath = new Path(srcPathStr);
 
     // Move the file if it's on another filesystem
     FileSystem srcFs = srcPath.getFileSystem(conf);

Modified: hbase/trunk/src/main/resources/hbase-default.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/resources/hbase-default.xml?rev=1195574&r1=1195573&r2=1195574&view=diff
==============================================================================
--- hbase/trunk/src/main/resources/hbase-default.xml (original)
+++ hbase/trunk/src/main/resources/hbase-default.xml Mon Oct 31 17:26:42 2011
@@ -129,6 +129,14 @@
     server, getting a cell's value, starting a row update, etc.
     Default: 10.
     </description>
+  </property> 
+  <property>
+    <name>hbase.bulkload.retries.number</name>
+    <value>10</value>
+    <description>Maximum retries.  This is maximum number of iterations
+    to atomic bulk loads are attempted in the face of splitting operations
+    Default: 10.
+    </description>
   </property>
   <property>
     <name>hbase.client.scanner.caching</name>

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java?rev=1195574&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java Mon Oct 31 17:26:42 2011
@@ -0,0 +1,324 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ServerCallable;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Multimap;
+
+/**
+ * Test cases for the atomic load error handling of the bulk load functionality.
+ */
+public class TestLoadIncrementalHFilesSplitRecovery {
+  final static Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
+
+  private static HBaseTestingUtility util;
+
+  final static int NUM_CFS = 10;
+  final static byte[] QUAL = Bytes.toBytes("qual");
+  final static int ROWCOUNT = 100;
+
+  private final static byte[][] families = new byte[NUM_CFS][];
+  static {
+    for (int i = 0; i < NUM_CFS; i++) {
+      families[i] = Bytes.toBytes(family(i));
+    }
+  }
+
+  static byte[] rowkey(int i) {
+    return Bytes.toBytes(String.format("row_%08d", i));
+  }
+
+  static String family(int i) {
+    return String.format("family_%04d", i);
+  }
+
+  static byte[] value(int i) {
+    return Bytes.toBytes(String.format("%010d", i));
+  }
+
+  public static void buildHFiles(FileSystem fs, Path dir, int value)
+      throws IOException {
+    byte[] val = value(value);
+    for (int i = 0; i < NUM_CFS; i++) {
+      Path testIn = new Path(dir, family(i));
+
+      TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i),
+          Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT);
+    }
+  }
+
+  /**
+   * Creates a table with given table name and specified number of column
+   * families if the table does not already exist.
+   */
+  private void setupTable(String table, int cfs) throws IOException {
+    try {
+      LOG.info("Creating table " + table);
+      HTableDescriptor htd = new HTableDescriptor(table);
+      for (int i = 0; i < 10; i++) {
+        htd.addFamily(new HColumnDescriptor(family(i)));
+      }
+
+      HBaseAdmin admin = util.getHBaseAdmin();
+      admin.createTable(htd);
+    } catch (TableExistsException tee) {
+      LOG.info("Table " + table + " already exists");
+    }
+  }
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    util = new HBaseTestingUtility();
+    util.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void teardownCluster() throws Exception {
+    util.shutdownMiniCluster();
+  }
+
+  void assertExpectedTable(String table, int count, int value) {
+    try {
+      assertEquals(util.getHBaseAdmin().listTables(table).length, 1);
+
+      HTable t = new HTable(util.getConfiguration(), table);
+      Scan s = new Scan();
+      ResultScanner sr = t.getScanner(s);
+      int i = 0;
+      for (Result r : sr) {
+        i++;
+        for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) {
+          for (byte[] val : nm.values()) {
+            assertTrue(Bytes.equals(val, value(value)));
+          }
+        }
+
+      }
+      assertEquals(count, i);
+
+    } catch (IOException e) {
+      fail("Failed due to exception");
+    }
+  }
+
+  @Test
+  public void testBulkLoadPhaseRecovery() throws Exception {
+    String table = "bulkPhaseRetry";
+    setupTable(table, 10);
+
+    final AtomicInteger attmptedCalls = new AtomicInteger();
+    final AtomicInteger failedCalls = new AtomicInteger();
+    LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
+        util.getConfiguration()) {
+
+      protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn,
+          byte[] tableName, final byte[] first, Collection<LoadQueueItem> lqis) {
+        int i = attmptedCalls.incrementAndGet();
+        if (i == 1) {
+          HConnection errConn = mock(HConnection.class);
+          try {
+            doThrow(new IOException("injecting bulk load error")).when(errConn)
+                .getRegionServerWithRetries((ServerCallable) anyObject());
+          } catch (Exception e) {
+            LOG.fatal("mocking cruft, should never happen", e);
+            throw new RuntimeException("mocking cruft, should never happen");
+          }
+          failedCalls.incrementAndGet();
+          return super.tryAtomicRegionLoad(errConn, tableName, first, lqis);
+        }
+
+        return super.tryAtomicRegionLoad(conn, tableName, first, lqis);
+      }
+    };
+
+    // create HFiles for different column families
+    FileSystem fs = util.getTestFileSystem();
+    Path dir = util.getDataTestDir(table);
+    buildHFiles(fs, dir, 1);
+    HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table));
+    lih.doBulkLoad(dir, t);
+
+    // check that data was loaded
+    assertEquals(attmptedCalls.get(), 2);
+    assertEquals(failedCalls.get(), 1);
+    assertExpectedTable(table, ROWCOUNT, 1);
+  }
+
+  /**
+   * This test exercises the path where there is a split after initial
+   * validation but before the atomic bulk load call. We cannot use presplitting
+   * to test this path, so we actually inject a split just before the atomic
+   * region load.
+   */
+  @Test
+  public void testSplitWhileBulkLoadPhase() throws Exception {
+    final String table = "bulkPhaseSplit";
+    setupTable(table, 10);
+    LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
+        util.getConfiguration());
+
+
+    // create HFiles for different column families
+    Path dir = util.getDataTestDir(table);
+    Path bulk1 = new Path(dir, "normalBulkload");
+    FileSystem fs = util.getTestFileSystem();
+    buildHFiles(fs, bulk1, 1);
+    HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table));
+    lih.doBulkLoad(bulk1, t);
+    assertExpectedTable(table, ROWCOUNT, 1);
+
+    // Now let's cause trouble
+    final AtomicInteger attmptedCalls = new AtomicInteger();
+    LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(
+        util.getConfiguration()) {
+
+      protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn,
+          byte[] tableName, final byte[] first, Collection<LoadQueueItem> lqis) {
+        int i = attmptedCalls.incrementAndGet();
+        if (i == 1) {
+          // On first attempt force a split.
+          try {
+            // need to call regions server to by synchronous but isn't visible.
+
+            HRegionServer hrs = util.getRSForFirstRegionInTable(Bytes
+                .toBytes(table));
+
+            HRegionInfo region = null;
+            for (HRegionInfo hri : hrs.getOnlineRegions()) {
+              if (Bytes.equals(hri.getTableName(), Bytes.toBytes(table))) {
+                // splitRegion doesn't work if startkey/endkey are null
+                hrs.splitRegion(hri, rowkey(ROWCOUNT / 2)); // hard code split
+              }
+            }
+
+            int regions;
+            do {
+              regions = 0;
+              for (HRegionInfo hri : hrs.getOnlineRegions()) {
+                if (Bytes.equals(hri.getTableName(), Bytes.toBytes(table))) {
+                  regions++;
+                }
+              }
+              if (regions != 2) {
+                LOG.info("Taking some time to complete split...");
+                Thread.sleep(250);
+              }
+            } while (regions != 2);
+          } catch (IOException e) {
+            e.printStackTrace();
+          } catch (InterruptedException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+          }
+        }
+
+        return super.tryAtomicRegionLoad(conn, tableName, first, lqis);
+      }
+    };
+
+    // create HFiles for different column families
+    Path bulk2 = new Path(dir, "bulkload2");
+    buildHFiles(fs, bulk2, 2); // all values are '2'
+    lih2.doBulkLoad(bulk2, t);
+
+    // check that data was loaded
+
+    // The three expected attempts are 1) failure because need to split, 2)
+    // load of split top 3) load of split bottom
+    assertEquals(attmptedCalls.get(), 3);
+    assertExpectedTable(table, ROWCOUNT, 2);
+  }
+
+  @Test(expected = IOException.class)
+  public void testGroupOrSplitFailure() throws Exception {
+    String table = "groupOrSplitStoreFail";
+    setupTable(table, 10);
+
+    try {
+      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
+          util.getConfiguration()) {
+        int i = 0;
+
+        protected List<LoadQueueItem> groupOrSplit(
+            Multimap<ByteBuffer, LoadQueueItem> regionGroups,
+            final LoadQueueItem item, final HTable table,
+            final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+          i++;
+
+          if (i == 5) {
+            throw new IOException("failure");
+          }
+          return super.groupOrSplit(regionGroups, item, table, startEndKeys);
+        }
+      };
+
+      Path dir = util.getDataTestDir(table);
+
+      // create HFiles for different column families
+      FileSystem fs = util.getTestFileSystem();
+      buildHFiles(fs, dir, 1);
+      HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table));
+      lih.doBulkLoad(dir, t);
+
+      // check that data was loaded
+      assertExpectedTable(table, ROWCOUNT, 1);
+
+    } finally {
+      util.shutdownMiniCluster();
+    }
+  }
+}

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java?rev=1195574&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java Mon Oct 31 17:26:42 2011
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ServerCallable;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of
+ * the region server's bullkLoad functionality.
+ */
+public class TestHRegionServerBulkLoad {
+  final static Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
+  private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private final static Configuration conf = UTIL.getConfiguration();
+  private final static byte[] QUAL = Bytes.toBytes("qual");
+  private final static int NUM_CFS = 10;
+  public static int BLOCKSIZE = 64 * 1024;
+  public static String COMPRESSION = Compression.Algorithm.NONE.getName();
+
+  private final static byte[][] families = new byte[NUM_CFS][];
+  static {
+    for (int i = 0; i < NUM_CFS; i++) {
+      families[i] = Bytes.toBytes(family(i));
+    }
+  }
+
+  static byte[] rowkey(int i) {
+    return Bytes.toBytes(String.format("row_%08d", i));
+  }
+
+  static String family(int i) {
+    return String.format("family_%04d", i);
+  }
+
+  /**
+   * Create an HFile with the given number of rows with a specified value.
+   */
+  public static void createHFile(FileSystem fs, Path path, byte[] family,
+      byte[] qualifier, byte[] value, int numRows) throws IOException {
+    HFile.Writer writer = HFile
+        .getWriterFactory(conf, new CacheConfig(conf))
+        .createWriter(fs, path, BLOCKSIZE, COMPRESSION, KeyValue.KEY_COMPARATOR);
+    long now = System.currentTimeMillis();
+    try {
+      // subtract 2 since iterateOnSplits doesn't include boundary keys
+      for (int i = 0; i < numRows; i++) {
+        KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value);
+        writer.append(kv);
+      }
+    } finally {
+      writer.close();
+    }
+  }
+
+  /**
+   * Thread that does full scans of the table looking for any partially
+   * completed rows.
+   * 
+   * Each iteration of this loads 10 hdfs files, which occupies 5 file open file
+   * handles. So every 10 iterations (500 file handles) it does a region
+   * compaction to reduce the number of open file handles.
+   */
+  public static class AtomicHFileLoader extends RepeatingTestThread {
+    final AtomicLong numBulkLoads = new AtomicLong();
+    final AtomicLong numCompactions = new AtomicLong();
+    private String tableName;
+
+    public AtomicHFileLoader(String tableName, TestContext ctx,
+        byte targetFamilies[][]) throws IOException {
+      super(ctx);
+      this.tableName = tableName;
+    }
+
+    public void doAnAction() throws Exception {
+      long iteration = numBulkLoads.getAndIncrement();
+      Path dir =  UTIL.getDataTestDir(String.format("bulkLoad_%08d",
+          iteration));
+
+      // create HFiles for different column families
+      FileSystem fs = UTIL.getTestFileSystem();
+      byte[] val = Bytes.toBytes(String.format("%010d", iteration));
+      final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>(
+          NUM_CFS);
+      for (int i = 0; i < NUM_CFS; i++) {
+        Path hfile = new Path(dir, family(i));
+        byte[] fam = Bytes.toBytes(family(i));
+        createHFile(fs, hfile, fam, QUAL, val, 1000);
+        famPaths.add(new Pair<byte[], String>(fam, hfile.toString()));
+      }
+
+      // bulk load HFiles
+      HConnection conn = UTIL.getHBaseAdmin().getConnection();
+      byte[] tbl = Bytes.toBytes(tableName);
+      conn.getRegionServerWithRetries(new ServerCallable<Void>(conn, tbl, Bytes
+          .toBytes("aaa")) {
+        @Override
+        public Void call() throws Exception {
+          LOG.debug("Going to connect to server " + location + " for row "
+              + Bytes.toStringBinary(row));
+          byte[] regionName = location.getRegionInfo().getRegionName();
+          server.bulkLoadHFiles(famPaths, regionName);
+          return null;
+        }
+      });
+
+      // Periodically do compaction to reduce the number of open file handles.
+      if (numBulkLoads.get() % 10 == 0) {
+        // 10 * 50 = 500 open file handles!
+        conn.getRegionServerWithRetries(new ServerCallable<Void>(conn, tbl,
+            Bytes.toBytes("aaa")) {
+          @Override
+          public Void call() throws Exception {
+            LOG.debug("compacting " + location + " for row "
+                + Bytes.toStringBinary(row));
+            server.compactRegion(location.getRegionInfo(), true);
+            numCompactions.incrementAndGet();
+            return null;
+          }
+        });
+      }
+    }
+  }
+
+  /**
+   * Thread that does full scans of the table looking for any partially
+   * completed rows.
+   */
+  public static class AtomicScanReader extends RepeatingTestThread {
+    byte targetFamilies[][];
+    HTable table;
+    AtomicLong numScans = new AtomicLong();
+    AtomicLong numRowsScanned = new AtomicLong();
+    String TABLE_NAME;
+
+    public AtomicScanReader(String TABLE_NAME, TestContext ctx,
+        byte targetFamilies[][]) throws IOException {
+      super(ctx);
+      this.TABLE_NAME = TABLE_NAME;
+      this.targetFamilies = targetFamilies;
+      table = new HTable(conf, TABLE_NAME);
+    }
+
+    public void doAnAction() throws Exception {
+      Scan s = new Scan();
+      for (byte[] family : targetFamilies) {
+        s.addFamily(family);
+      }
+      ResultScanner scanner = table.getScanner(s);
+
+      for (Result res : scanner) {
+        byte[] lastRow = null, lastFam = null, lastQual = null;
+        byte[] gotValue = null;
+        for (byte[] family : targetFamilies) {
+          byte qualifier[] = QUAL;
+          byte thisValue[] = res.getValue(family, qualifier);
+          if (gotValue != null && thisValue != null
+              && !Bytes.equals(gotValue, thisValue)) {
+
+            StringBuilder msg = new StringBuilder();
+            msg.append("Failed on scan ").append(numScans)
+                .append(" after scanning ").append(numRowsScanned)
+                .append(" rows!\n");
+            msg.append("Current  was " + Bytes.toString(res.getRow()) + "/"
+                + Bytes.toString(family) + ":" + Bytes.toString(qualifier)
+                + " = " + Bytes.toString(thisValue) + "\n");
+            msg.append("Previous  was " + Bytes.toString(lastRow) + "/"
+                + Bytes.toString(lastFam) + ":" + Bytes.toString(lastQual)
+                + " = " + Bytes.toString(gotValue));
+            throw new RuntimeException(msg.toString());
+          }
+
+          lastFam = family;
+          lastQual = qualifier;
+          lastRow = res.getRow();
+          gotValue = thisValue;
+        }
+        numRowsScanned.getAndIncrement();
+      }
+      numScans.getAndIncrement();
+    }
+  }
+
+  /**
+   * Creates a table with given table name and specified number of column
+   * families if the table does not already exist.
+   */
+  private void setupTable(String table, int cfs) throws IOException {
+    try {
+      LOG.info("Creating table " + table);
+      HTableDescriptor htd = new HTableDescriptor(table);
+      for (int i = 0; i < 10; i++) {
+        htd.addFamily(new HColumnDescriptor(family(i)));
+      }
+
+      HBaseAdmin admin = UTIL.getHBaseAdmin();
+      admin.createTable(htd);
+    } catch (TableExistsException tee) {
+      LOG.info("Table " + table + " already exists");
+    }
+  }
+
+  /**
+   * Atomic bulk load.
+   */
+  @Test
+  public void testAtomicBulkLoad() throws Exception {
+    String TABLE_NAME = "atomicBulkLoad";
+
+    int millisToRun = 30000;
+    int numScanners = 50;
+
+    UTIL.startMiniCluster(1);
+    try {
+      runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners);
+    } finally {
+      UTIL.shutdownMiniCluster();
+    }
+  }
+
+  void runAtomicBulkloadTest(String tableName, int millisToRun, int numScanners)
+      throws Exception {
+    setupTable(tableName, 10);
+
+    TestContext ctx = new TestContext(UTIL.getConfiguration());
+
+    AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
+    ctx.addThread(loader);
+
+    List<AtomicScanReader> scanners = Lists.newArrayList();
+    for (int i = 0; i < numScanners; i++) {
+      AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
+      scanners.add(scanner);
+      ctx.addThread(scanner);
+    }
+
+    ctx.startThreads();
+    ctx.waitFor(millisToRun);
+    ctx.stop();
+
+    LOG.info("Loaders:");
+    LOG.info("  loaded " + loader.numBulkLoads.get());
+    LOG.info("  compations " + loader.numCompactions.get());
+
+    LOG.info("Scanners:");
+    for (AtomicScanReader scanner : scanners) {
+      LOG.info("  scanned " + scanner.numScans.get());
+      LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
+    }
+  }
+
+  /**
+   * Run test on an HBase instance for 5 minutes. This assumes that the table
+   * under test only has a single region.
+   */
+  public static void main(String args[]) throws Exception {
+    try {
+      Configuration c = HBaseConfiguration.create();
+      TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad();
+      test.setConf(c);
+      test.runAtomicBulkloadTest("atomicTableTest", 5 * 60 * 1000, 50);
+    } finally {
+      System.exit(0); // something hangs (believe it is lru threadpool)
+    }
+  }
+
+  private void setConf(Configuration c) {
+    UTIL = new HBaseTestingUtility(c);
+  }
+}

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1195574&r1=1195573&r2=1195574&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Mon Oct 31 17:26:42 2011
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTru
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -42,7 +43,6 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.FlushRequester;
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.security.
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -205,7 +206,9 @@ public class TestWALReplay {
     byte [] row = Bytes.toBytes(tableNameStr);
     writer.append(new KeyValue(row, family, family, row));
     writer.close();
-    region.bulkLoadHFile(f.toString(), family);
+    List <Pair<byte[],String>>  hfs= new ArrayList<Pair<byte[],String>>(1);
+    hfs.add(Pair.newPair(family, f.toString()));
+    region.bulkLoadHFiles(hfs);
     // Add an edit so something in the WAL
     region.put((new Put(row)).add(family, family, family));
     wal.sync();