You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/05/06 06:26:58 UTC

svn commit: r1100045 - in /hbase/trunk: CHANGES.txt src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java

Author: stack
Date: Fri May  6 04:26:58 2011
New Revision: 1100045

URL: http://svn.apache.org/viewvc?rev=1100045&view=rev
Log:
HBASE-3721 Speedup LoadIncrementalHFiles

Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1100045&r1=1100044&r2=1100045&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri May  6 04:26:58 2011
@@ -206,6 +206,7 @@ Release 0.91.0 - Unreleased
    HBASE-3670  Fix error handling in get(List<Get> gets)
                (Harsh J Chouraria)
    HBASE-3835  Switch master and region server pages to Jamon-based templates
+   HBASE-3721  Speedup LoadIncrementalHFiles (Ted Yu)
 
   TASKS
    HBASE-3559  Move report of split to master OFF the heartbeat channel

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1100045&r1=1100044&r2=1100045&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Fri May  6 04:26:58 2011
@@ -21,10 +21,22 @@ package org.apache.hadoop.hbase.mapreduc
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Deque;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.Map;
-import java.util.ArrayList;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,10 +46,13 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.ServerCallable;
@@ -50,12 +65,11 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import java.util.TreeMap;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 
 /**
@@ -67,23 +81,19 @@ public class LoadIncrementalHFiles exten
   private static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
   private static final int  TABLE_CREATE_MAX_RETRIES = 20;
   private static final long TABLE_CREATE_SLEEP = 60000;
+  static AtomicLong regionCount = new AtomicLong(0);
   private HBaseAdmin hbAdmin;
+  private Configuration cfg;
+  private Set<Future> futures = new HashSet<Future>();
 
   public static String NAME = "completebulkload";
 
   public LoadIncrementalHFiles(Configuration conf) throws Exception {
     super(conf);
+    this.cfg = conf;
     this.hbAdmin = new HBaseAdmin(conf);
   }
 
-  /* This constructor does not add HBase configuration. 
-   * Explicit addition is necessary. Do we need this constructor? 
-   */
-  public LoadIncrementalHFiles() {
-    super();
-  }
-
-
   private void usage() {
     System.err.println("usage: " + NAME +
         " /path/to/hfileoutputformat-output " +
@@ -165,13 +175,38 @@ public class LoadIncrementalHFiles exten
     }
 
     Deque<LoadQueueItem> queue = null;
+    int nrThreads = cfg.getInt("hbase.loadincremental.threads.max",
+        Runtime.getRuntime().availableProcessors());
+    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+    builder.setNameFormat("LoadIncrementalHFiles-%1$d");
+
+    ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads,
+        60, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<Runnable>(),
+        builder.build());
+    ((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true);
     try {
       queue = discoverLoadQueue(hfofDir);
+      // outer loop picks up LoadQueueItem due to HFile split
       while (!queue.isEmpty()) {
-        LoadQueueItem item = queue.remove();
-        tryLoad(item, conn, table.getTableName(), queue);
+        Pair<byte[][],byte[][]> startEndKeys = table.getStartEndKeys();
+        // inner loop groups callables
+        while (!queue.isEmpty()) {
+          LoadQueueItem item = queue.remove();
+          tryLoad(item, conn, table, queue, startEndKeys, pool);
+        }
+      }
+      for (Future<Void> future : futures) {
+        try {
+          future.get();
+        } catch (ExecutionException ee) {
+          LOG.error(ee);
+        } catch (InterruptedException ie) {
+          LOG.error(ie);
+        }
       }
     } finally {
+      pool.shutdown();
       if (queue != null && !queue.isEmpty()) {
         StringBuilder err = new StringBuilder();
         err.append("-------------------------------------------------\n");
@@ -185,15 +220,48 @@ public class LoadIncrementalHFiles exten
     }
   }
 
+  // unique file name for the table
+  String getUniqueName(byte[] tableName) {
+    String name = Bytes.toStringBinary(tableName) + "," + regionCount.incrementAndGet();
+    return name;
+  }
+
+  void splitStoreFileAndRequeue(final LoadQueueItem item,
+      final Deque<LoadQueueItem> queue, final HTable table,
+      byte[] startKey, byte[] splitKey) throws IOException {
+    final Path hfilePath = item.hfilePath;
+
+    // We use a '_' prefix which is ignored when walking directory trees
+    // above.
+    final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp");
+
+    LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
+    "region. Splitting...");
+
+    String uniqueName = getUniqueName(table.getTableName());
+    HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family);
+    Path botOut = new Path(tmpDir, uniqueName + ".bottom");
+    Path topOut = new Path(tmpDir, uniqueName + ".top");
+    splitStoreFile(getConf(), hfilePath, familyDesc, splitKey,
+        botOut, topOut);
+
+    // Add these back at the *front* of the queue, so there's a lower
+    // chance that the region will just split again before we get there.
+    queue.addFirst(new LoadQueueItem(item.family, botOut));
+    queue.addFirst(new LoadQueueItem(item.family, topOut));
+    LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
+  }
+
   /**
    * Attempt to load the given load queue item into its target region server.
    * If the hfile boundary no longer fits into a region, physically splits
    * the hfile such that the new bottom half will fit, and adds the two
    * resultant hfiles back into the load queue.
    */
-  private void tryLoad(final LoadQueueItem item,
-      HConnection conn, final byte[] table,
-      final Deque<LoadQueueItem> queue)
+  private boolean tryLoad(final LoadQueueItem item,
+      final HConnection conn, final HTable table,
+      final Deque<LoadQueueItem> queue, Pair<byte[][],byte[][]> startEndKeys,
+      ExecutorService pool)
   throws IOException {
     final Path hfilePath = item.hfilePath;
     final FileSystem fs = hfilePath.getFileSystem(getConf());
@@ -213,43 +281,44 @@ public class LoadIncrementalHFiles exten
     if (first == null || last == null) {
       assert first == null && last == null;
       LOG.info("hfile " + hfilePath + " has no entries, skipping");
-      return;
+      return false;
     }
-
-    // We use a '_' prefix which is ignored when walking directory trees
-    // above.
-    final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp");
-
-    conn.getRegionServerWithRetries(
-      new ServerCallable<Void>(conn, table, first) {
-        @Override
-        public Void call() throws Exception {
-          LOG.debug("Going to connect to server " + location +
-              "for row " + Bytes.toStringBinary(row));
-          HRegionInfo hri = location.getRegionInfo();
-          if (!hri.containsRange(first, last)) {
-            LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
-                "region. Splitting...");
-
-            HColumnDescriptor familyDesc = hri.getTableDesc().getFamily(item.family);
-            Path botOut = new Path(tmpDir, hri.getEncodedName() + ".bottom");
-            Path topOut = new Path(tmpDir, hri.getEncodedName() + ".top");
-            splitStoreFile(getConf(), hfilePath, familyDesc, hri.getEndKey(),
-                botOut, topOut);
-
-            // Add these back at the *front* of the queue, so there's a lower
-            // chance that the region will just split again before we get there.
-            queue.addFirst(new LoadQueueItem(item.family, botOut));
-            queue.addFirst(new LoadQueueItem(item.family, topOut));
-            LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
-            return null;
-          }
-
-          byte[] regionName = location.getRegionInfo().getRegionName();
-          server.bulkLoadHFile(hfilePath.toString(), regionName, item.family);
-          return null;
-        }
-      });
+    if (Bytes.compareTo(first, last) > 0) {
+      throw new IllegalArgumentException(
+      "Invalid range: " + Bytes.toStringBinary(first) +
+      " > " + Bytes.toStringBinary(last));
+    }
+    int idx = Arrays.binarySearch(startEndKeys.getFirst(), first, Bytes.BYTES_COMPARATOR);
+    if (idx < 0) {
+      idx = -(idx+1)-1;
+    }
+    boolean lastKeyInRange =
+      Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 ||
+      Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
+    if (!lastKeyInRange) {
+      splitStoreFileAndRequeue(item, queue, table,
+          startEndKeys.getFirst()[idx], startEndKeys.getSecond()[idx]);
+      return true;
+    }
+
+    final ServerCallable<Void> svrCallable = new ServerCallable<Void>(conn, table.getTableName(), first) {
+      @Override
+      public Void call() throws Exception {
+        LOG.debug("Going to connect to server " + location +
+            "for row " + Bytes.toStringBinary(row));
+
+        byte[] regionName = location.getRegionInfo().getRegionName();
+        server.bulkLoadHFile(hfilePath.toString(), regionName, item.family);
+        return null;
+      }
+    };
+    Callable<Void> callable = new Callable<Void>() {
+      public Void call() throws Exception {
+        return conn.getRegionServerWithRetries(svrCallable);
+      }
+    };
+    futures.add(pool.submit(callable));
+    return false;
   }
 
   /**
@@ -418,8 +487,8 @@ public class LoadIncrementalHFiles exten
     } catch (java.net.SocketTimeoutException e) {
       System.err.println("Caught Socket timeout.. Mostly caused by a slow region assignment by master!");
     }
-         
-    HTable table = new HTable(tableName);
+
+    HTable table = new HTable(this.cfg, tableName);
 
     HConnection conn = table.getConnection();
     int ctr = 0;
@@ -441,12 +510,12 @@ public class LoadIncrementalHFiles exten
 
     String dirPath   = args[0];
     String tableName = args[1];
-    
+
     boolean tableExists   = this.doesTableExist(tableName);
     if (!tableExists) this.createTable(tableName,dirPath);
-    
+
     Path hfofDir = new Path(dirPath);
-    HTable table = new HTable(tableName);
+    HTable table = new HTable(this.cfg, tableName);
 
     doBulkLoad(hfofDir, table);
     return 0;