You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/05/06 06:26:58 UTC
svn commit: r1100045 - in /hbase/trunk: CHANGES.txt
src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
Author: stack
Date: Fri May 6 04:26:58 2011
New Revision: 1100045
URL: http://svn.apache.org/viewvc?rev=1100045&view=rev
Log:
HBASE-3721 Speedup LoadIncrementalHFiles
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1100045&r1=1100044&r2=1100045&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri May 6 04:26:58 2011
@@ -206,6 +206,7 @@ Release 0.91.0 - Unreleased
HBASE-3670 Fix error handling in get(List<Get> gets)
(Harsh J Chouraria)
HBASE-3835 Switch master and region server pages to Jamon-based templates
+ HBASE-3721 Speedup LoadIncrementalHFiles (Ted Yu)
TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1100045&r1=1100044&r2=1100045&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Fri May 6 04:26:58 2011
@@ -21,10 +21,22 @@ package org.apache.hadoop.hbase.mapreduc
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Deque;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
-import java.util.ArrayList;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,10 +46,13 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.ServerCallable;
@@ -50,12 +65,11 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import java.util.TreeMap;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
@@ -67,23 +81,19 @@ public class LoadIncrementalHFiles exten
private static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
private static final int TABLE_CREATE_MAX_RETRIES = 20;
private static final long TABLE_CREATE_SLEEP = 60000;
+ static AtomicLong regionCount = new AtomicLong(0);
private HBaseAdmin hbAdmin;
+ private Configuration cfg;
+ private Set<Future> futures = new HashSet<Future>();
public static String NAME = "completebulkload";
public LoadIncrementalHFiles(Configuration conf) throws Exception {
super(conf);
+ this.cfg = conf;
this.hbAdmin = new HBaseAdmin(conf);
}
- /* This constructor does not add HBase configuration.
- * Explicit addition is necessary. Do we need this constructor?
- */
- public LoadIncrementalHFiles() {
- super();
- }
-
-
private void usage() {
System.err.println("usage: " + NAME +
" /path/to/hfileoutputformat-output " +
@@ -165,13 +175,38 @@ public class LoadIncrementalHFiles exten
}
Deque<LoadQueueItem> queue = null;
+ int nrThreads = cfg.getInt("hbase.loadincremental.threads.max",
+ Runtime.getRuntime().availableProcessors());
+ ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+ builder.setNameFormat("LoadIncrementalHFiles-%1$d");
+
+ ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads,
+ 60, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ builder.build());
+ ((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true);
try {
queue = discoverLoadQueue(hfofDir);
+ // outer loop picks up LoadQueueItem due to HFile split
while (!queue.isEmpty()) {
- LoadQueueItem item = queue.remove();
- tryLoad(item, conn, table.getTableName(), queue);
+ Pair<byte[][],byte[][]> startEndKeys = table.getStartEndKeys();
+ // inner loop groups callables
+ while (!queue.isEmpty()) {
+ LoadQueueItem item = queue.remove();
+ tryLoad(item, conn, table, queue, startEndKeys, pool);
+ }
+ }
+ for (Future<Void> future : futures) {
+ try {
+ future.get();
+ } catch (ExecutionException ee) {
+ LOG.error(ee);
+ } catch (InterruptedException ie) {
+ LOG.error(ie);
+ }
}
} finally {
+ pool.shutdown();
if (queue != null && !queue.isEmpty()) {
StringBuilder err = new StringBuilder();
err.append("-------------------------------------------------\n");
@@ -185,15 +220,48 @@ public class LoadIncrementalHFiles exten
}
}
+ // unique file name for the table
+ String getUniqueName(byte[] tableName) {
+ String name = Bytes.toStringBinary(tableName) + "," + regionCount.incrementAndGet();
+ return name;
+ }
+
+ void splitStoreFileAndRequeue(final LoadQueueItem item,
+ final Deque<LoadQueueItem> queue, final HTable table,
+ byte[] startKey, byte[] splitKey) throws IOException {
+ final Path hfilePath = item.hfilePath;
+
+ // We use a '_' prefix which is ignored when walking directory trees
+ // above.
+ final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp");
+
+ LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
+ "region. Splitting...");
+
+ String uniqueName = getUniqueName(table.getTableName());
+ HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family);
+ Path botOut = new Path(tmpDir, uniqueName + ".bottom");
+ Path topOut = new Path(tmpDir, uniqueName + ".top");
+ splitStoreFile(getConf(), hfilePath, familyDesc, splitKey,
+ botOut, topOut);
+
+ // Add these back at the *front* of the queue, so there's a lower
+ // chance that the region will just split again before we get there.
+ queue.addFirst(new LoadQueueItem(item.family, botOut));
+ queue.addFirst(new LoadQueueItem(item.family, topOut));
+ LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
+ }
+
/**
* Attempt to load the given load queue item into its target region server.
* If the hfile boundary no longer fits into a region, physically splits
* the hfile such that the new bottom half will fit, and adds the two
* resultant hfiles back into the load queue.
*/
- private void tryLoad(final LoadQueueItem item,
- HConnection conn, final byte[] table,
- final Deque<LoadQueueItem> queue)
+ private boolean tryLoad(final LoadQueueItem item,
+ final HConnection conn, final HTable table,
+ final Deque<LoadQueueItem> queue, Pair<byte[][],byte[][]> startEndKeys,
+ ExecutorService pool)
throws IOException {
final Path hfilePath = item.hfilePath;
final FileSystem fs = hfilePath.getFileSystem(getConf());
@@ -213,43 +281,44 @@ public class LoadIncrementalHFiles exten
if (first == null || last == null) {
assert first == null && last == null;
LOG.info("hfile " + hfilePath + " has no entries, skipping");
- return;
+ return false;
}
-
- // We use a '_' prefix which is ignored when walking directory trees
- // above.
- final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp");
-
- conn.getRegionServerWithRetries(
- new ServerCallable<Void>(conn, table, first) {
- @Override
- public Void call() throws Exception {
- LOG.debug("Going to connect to server " + location +
- "for row " + Bytes.toStringBinary(row));
- HRegionInfo hri = location.getRegionInfo();
- if (!hri.containsRange(first, last)) {
- LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
- "region. Splitting...");
-
- HColumnDescriptor familyDesc = hri.getTableDesc().getFamily(item.family);
- Path botOut = new Path(tmpDir, hri.getEncodedName() + ".bottom");
- Path topOut = new Path(tmpDir, hri.getEncodedName() + ".top");
- splitStoreFile(getConf(), hfilePath, familyDesc, hri.getEndKey(),
- botOut, topOut);
-
- // Add these back at the *front* of the queue, so there's a lower
- // chance that the region will just split again before we get there.
- queue.addFirst(new LoadQueueItem(item.family, botOut));
- queue.addFirst(new LoadQueueItem(item.family, topOut));
- LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
- return null;
- }
-
- byte[] regionName = location.getRegionInfo().getRegionName();
- server.bulkLoadHFile(hfilePath.toString(), regionName, item.family);
- return null;
- }
- });
+ if (Bytes.compareTo(first, last) > 0) {
+ throw new IllegalArgumentException(
+ "Invalid range: " + Bytes.toStringBinary(first) +
+ " > " + Bytes.toStringBinary(last));
+ }
+ int idx = Arrays.binarySearch(startEndKeys.getFirst(), first, Bytes.BYTES_COMPARATOR);
+ if (idx < 0) {
+ idx = -(idx+1)-1;
+ }
+ boolean lastKeyInRange =
+ Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 ||
+ Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
+ if (!lastKeyInRange) {
+ splitStoreFileAndRequeue(item, queue, table,
+ startEndKeys.getFirst()[idx], startEndKeys.getSecond()[idx]);
+ return true;
+ }
+
+ final ServerCallable<Void> svrCallable = new ServerCallable<Void>(conn, table.getTableName(), first) {
+ @Override
+ public Void call() throws Exception {
+ LOG.debug("Going to connect to server " + location +
+ "for row " + Bytes.toStringBinary(row));
+
+ byte[] regionName = location.getRegionInfo().getRegionName();
+ server.bulkLoadHFile(hfilePath.toString(), regionName, item.family);
+ return null;
+ }
+ };
+ Callable<Void> callable = new Callable<Void>() {
+ public Void call() throws Exception {
+ return conn.getRegionServerWithRetries(svrCallable);
+ }
+ };
+ futures.add(pool.submit(callable));
+ return false;
}
/**
@@ -418,8 +487,8 @@ public class LoadIncrementalHFiles exten
} catch (java.net.SocketTimeoutException e) {
System.err.println("Caught Socket timeout.. Mostly caused by a slow region assignment by master!");
}
-
- HTable table = new HTable(tableName);
+
+ HTable table = new HTable(this.cfg, tableName);
HConnection conn = table.getConnection();
int ctr = 0;
@@ -441,12 +510,12 @@ public class LoadIncrementalHFiles exten
String dirPath = args[0];
String tableName = args[1];
-
+
boolean tableExists = this.doesTableExist(tableName);
if (!tableExists) this.createTable(tableName,dirPath);
-
+
Path hfofDir = new Path(dirPath);
- HTable table = new HTable(tableName);
+ HTable table = new HTable(this.cfg, tableName);
doBulkLoad(hfofDir, table);
return 0;